00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "itemsync.h"
00022
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031
00032 #include <kdebug.h>
00033
00034 #include <QtCore/QStringList>
00035
00036 using namespace Akonadi;
00037
00041 class ItemSync::Private
00042 {
00043 public:
00044 Private( ItemSync *parent ) :
00045 q( parent ),
00046 mTransactionMode( Single ),
00047 mCurrentTransaction( 0 ),
00048 mTransactionJobs( 0 ),
00049 mPendingJobs( 0 ),
00050 mProgress( 0 ),
00051 mTotalItems( -1 ),
00052 mTotalItemsProcessed( 0 ),
00053 mStreaming( false ),
00054 mIncremental( false ),
00055 mLocalListDone( false ),
00056 mDeliveryDone( false )
00057 {
00058
00059 mFetchScope.fetchFullPayload();
00060 mFetchScope.fetchAllAttributes();
00061 }
00062
00063 void createLocalItem( const Item &item );
00064 void checkDone();
00065 void slotLocalListDone( KJob* );
00066 void slotLocalDeleteDone( KJob* );
00067 void slotLocalChangeDone( KJob* );
00068 void execute();
00069 void processItems();
00070 void deleteItems( const Item::List &items );
00071 void slotTransactionResult( KJob *job );
00072 Job* subjobParent() const;
00073
00074 ItemSync *q;
00075 Collection mSyncCollection;
00076 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00077 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00078 QSet<Akonadi::Item> mUnprocessedLocalItems;
00079
00080
00081 enum TransactionMode {
00082 Single,
00083 Chunkwise,
00084 None
00085 };
00086 TransactionMode mTransactionMode;
00087 TransactionSequence *mCurrentTransaction;
00088 int mTransactionJobs;
00089
00090
00091 ItemFetchScope mFetchScope;
00092
00093
00094 Akonadi::Item::List mRemoteItems;
00095
00096
00097 Item::List mRemovedRemoteItems;
00098
00099
00100 int mPendingJobs;
00101 int mProgress;
00102 int mTotalItems;
00103 int mTotalItemsProcessed;
00104
00105 bool mStreaming;
00106 bool mIncremental;
00107 bool mLocalListDone;
00108 bool mDeliveryDone;
00109 };
00110
00111 void ItemSync::Private::createLocalItem( const Item & item )
00112 {
00113
00114 if ( q->error() )
00115 return;
00116 mPendingJobs++;
00117 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00118 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00119 }
00120
00121 void ItemSync::Private::checkDone()
00122 {
00123 q->setProcessedAmount( KJob::Bytes, mProgress );
00124 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00125 return;
00126
00127 q->emitResult();
00128 }
00129
00130 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00131 Job( parent ),
00132 d( new Private( this ) )
00133 {
00134 d->mSyncCollection = collection;
00135 }
00136
00137 ItemSync::~ItemSync()
00138 {
00139 delete d;
00140 }
00141
00142 void ItemSync::setFullSyncItems( const Item::List &items )
00143 {
00144 Q_ASSERT( !d->mIncremental );
00145 if ( !d->mStreaming )
00146 d->mDeliveryDone = true;
00147 d->mRemoteItems += items;
00148 d->mTotalItemsProcessed += items.count();
00149 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00150 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00151 if ( d->mTotalItemsProcessed == d->mTotalItems )
00152 d->mDeliveryDone = true;
00153 d->execute();
00154 }
00155
00156 void ItemSync::setTotalItems( int amount )
00157 {
00158 Q_ASSERT( !d->mIncremental );
00159 Q_ASSERT( amount >= 0 );
00160 setStreamingEnabled( true );
00161 kDebug() << amount;
00162 d->mTotalItems = amount;
00163 setTotalAmount( KJob::Bytes, amount );
00164 if ( d->mTotalItems == 0 ) {
00165 d->mDeliveryDone = true;
00166 d->execute();
00167 }
00168 }
00169
00170 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00171 {
00172 d->mIncremental = true;
00173 if ( !d->mStreaming )
00174 d->mDeliveryDone = true;
00175 d->mRemoteItems += changedItems;
00176 d->mRemovedRemoteItems += removedItems;
00177 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00178 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00179 if ( d->mTotalItemsProcessed == d->mTotalItems )
00180 d->mDeliveryDone = true;
00181 d->execute();
00182 }
00183
00184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00185 {
00186 d->mFetchScope = fetchScope;
00187 }
00188
00189 ItemFetchScope &ItemSync::fetchScope()
00190 {
00191 return d->mFetchScope;
00192 }
00193
00194 void ItemSync::doStart()
00195 {
00196 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00197 job->setFetchScope( d->mFetchScope );
00198
00199
00200 job->fetchScope().setCacheOnly( true );
00201
00202 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00203 }
00204
00205 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00206 {
00207
00208 if ( error() )
00209 return false;
00210
00211
00212
00213
00214
00215
00216 if ( d->mIncremental )
00217 return true;
00218
00219
00220 if ( storedItem.flags() != newItem.flags() ) {
00221 kDebug() << "Stored flags " << storedItem.flags()
00222 << "new flags " << newItem.flags();
00223 return true;
00224 }
00225
00226
00227 QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00228 missingParts.subtract( storedItem.loadedPayloadParts() );
00229 if ( !missingParts.isEmpty() )
00230 return true;
00231
00232
00233
00234
00235 if ( newItem.hasPayload()
00236 && storedItem.payloadData() != newItem.payloadData() )
00237 return true;
00238
00239
00240 foreach ( Attribute* attr, newItem.attributes() ) {
00241 if ( !storedItem.hasAttribute( attr->type() ) )
00242 return true;
00243 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00244 return true;
00245 }
00246
00247 return false;
00248 }
00249
00250 void ItemSync::Private::slotLocalListDone( KJob * job )
00251 {
00252 if ( !job->error() ) {
00253 const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00254 foreach ( const Item &item, list ) {
00255 if ( item.remoteId().isEmpty() )
00256 continue;
00257 mLocalItemsById.insert( item.id(), item );
00258 mLocalItemsByRemoteId.insert( item.remoteId(), item );
00259 mUnprocessedLocalItems.insert( item );
00260 }
00261 }
00262
00263 mLocalListDone = true;
00264 execute();
00265 }
00266
00267 void ItemSync::Private::execute()
00268 {
00269 if ( !mLocalListDone )
00270 return;
00271
00272 if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00273 ++mTransactionJobs;
00274 mCurrentTransaction = new TransactionSequence( q );
00275 mCurrentTransaction->setAutomaticCommittingEnabled( false );
00276 connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00277 }
00278
00279 processItems();
00280 if ( !mDeliveryDone ) {
00281 if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00282 mCurrentTransaction->commit();
00283 mCurrentTransaction = 0;
00284 }
00285 return;
00286 }
00287
00288
00289 if ( !mIncremental ) {
00290 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00291 mUnprocessedLocalItems.clear();
00292 }
00293
00294 deleteItems( mRemovedRemoteItems );
00295 mLocalItemsById.clear();
00296 mLocalItemsByRemoteId.clear();
00297 mRemovedRemoteItems.clear();
00298
00299 if ( mCurrentTransaction ) {
00300 mCurrentTransaction->commit();
00301 mCurrentTransaction = 0;
00302 }
00303
00304 checkDone();
00305 }
00306
00307 void ItemSync::Private::processItems()
00308 {
00309
00310 foreach ( Item remoteItem, mRemoteItems ) {
00311 #ifndef NDEBUG
00312 if ( remoteItem.remoteId().isEmpty() ) {
00313 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00314 }
00315 #endif
00316
00317 Item localItem = mLocalItemsById.value( remoteItem.id() );
00318 if ( !localItem.isValid() )
00319 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00320 mUnprocessedLocalItems.remove( localItem );
00321
00322 if ( !localItem.isValid() ) {
00323 createLocalItem( remoteItem );
00324 continue;
00325 }
00326
00327 if ( q->updateItem( localItem, remoteItem ) ) {
00328 mPendingJobs++;
00329
00330 remoteItem.setId( localItem.id() );
00331 remoteItem.setRevision( localItem.revision() );
00332 remoteItem.setSize( localItem.size() );
00333 remoteItem.setRemoteId( localItem.remoteId() );
00334 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00335 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00336 } else {
00337 mProgress++;
00338 }
00339 }
00340 mRemoteItems.clear();
00341 }
00342
00343 void ItemSync::Private::deleteItems( const Item::List &items )
00344 {
00345
00346 if ( q->error() )
00347 return;
00348
00349 Item::List itemsToDelete;
00350 foreach ( const Item &item, items ) {
00351 Item delItem( item );
00352 if ( !item.isValid() ) {
00353 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00354 }
00355
00356 if ( !delItem.isValid() ) {
00357 #ifndef NDEBUG
00358 kWarning() << "Delete item (remoteeId=" << delItem.remoteId()
00359 << "mimeType=" << delItem.mimeType()
00360 << ") does not have a valid UID and no item with that remote ID exists either";
00361 #endif
00362 continue;
00363 }
00364
00365 if ( delItem.remoteId().isEmpty() ) {
00366
00367 continue;
00368 }
00369
00370 itemsToDelete.append ( delItem );
00371 }
00372
00373 if ( !itemsToDelete.isEmpty() ) {
00374 mPendingJobs++;
00375 ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00376 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00377
00378
00379
00380
00381
00382 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00383 if ( transaction )
00384 transaction->setIgnoreJobFailure( job );
00385 }
00386 }
00387
00388 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00389 {
00390 mPendingJobs--;
00391 mProgress++;
00392
00393 checkDone();
00394 }
00395
00396 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00397 {
00398 Q_UNUSED( job );
00399 mPendingJobs--;
00400 mProgress++;
00401
00402 checkDone();
00403 }
00404
00405 void ItemSync::Private::slotTransactionResult( KJob *job )
00406 {
00407 --mTransactionJobs;
00408 if ( mCurrentTransaction == job )
00409 mCurrentTransaction = 0;
00410
00411 checkDone();
00412 }
00413
00414 Job * ItemSync::Private::subjobParent() const
00415 {
00416 if ( mCurrentTransaction && mTransactionMode != None )
00417 return mCurrentTransaction;
00418 return q;
00419 }
00420
00421 void ItemSync::setStreamingEnabled(bool enable)
00422 {
00423 d->mStreaming = enable;
00424 }
00425
00426 void ItemSync::deliveryDone()
00427 {
00428 Q_ASSERT( d->mStreaming );
00429 d->mDeliveryDone = true;
00430 d->execute();
00431 }
00432
00433 void ItemSync::slotResult(KJob* job)
00434 {
00435 if ( job->error() ) {
00436
00437 Akonadi::Job::removeSubjob( job );
00438
00439 if ( !error() ) {
00440 setError( job->error() );
00441 setErrorText( job->errorText() );
00442 }
00443 } else {
00444 Akonadi::Job::slotResult( job );
00445 }
00446 }
00447
00448 void ItemSync::rollback()
00449 {
00450 setError( UserCanceled );
00451 if ( d->mCurrentTransaction )
00452 d->mCurrentTransaction->rollback();
00453 d->mDeliveryDone = true;
00454 d->execute();
00455 }
00456
00457
00458 #include "itemsync.moc"