• Skip to content
  • Skip to link menu
KDE 4.5 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

itemsync.cpp

00001 /*
00002     Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "itemsync.h"
00022 
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031 
00032 #include <kdebug.h>
00033 
00034 #include <QtCore/QStringList>
00035 
00036 using namespace Akonadi;
00037 
00041 class ItemSync::Private
00042 {
00043   public:
00044     Private( ItemSync *parent ) :
00045       q( parent ),
00046       mTransactionMode( Single ),
00047       mCurrentTransaction( 0 ),
00048       mTransactionJobs( 0 ),
00049       mPendingJobs( 0 ),
00050       mProgress( 0 ),
00051       mTotalItems( -1 ),
00052       mTotalItemsProcessed( 0 ),
00053       mStreaming( false ),
00054       mIncremental( false ),
00055       mLocalListDone( false ),
00056       mDeliveryDone( false )
00057     {
00058       // we want to fetch all data by default
00059       mFetchScope.fetchFullPayload();
00060       mFetchScope.fetchAllAttributes();
00061     }
00062 
00063     void createLocalItem( const Item &item );
00064     void checkDone();
00065     void slotLocalListDone( KJob* );
00066     void slotLocalDeleteDone( KJob* );
00067     void slotLocalChangeDone( KJob* );
00068     void execute();
00069     void processItems();
00070     void deleteItems( const Item::List &items );
00071     void slotTransactionResult( KJob *job );
00072     Job* subjobParent() const;
00073 
00074     ItemSync *q;
00075     Collection mSyncCollection;
00076     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00077     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00078     QSet<Akonadi::Item> mUnprocessedLocalItems;
00079 
00080     // transaction mode, TODO: make this public API?
00081     enum TransactionMode {
00082       Single,
00083       Chunkwise,
00084       None
00085     };
00086     TransactionMode mTransactionMode;
00087     TransactionSequence *mCurrentTransaction;
00088     int mTransactionJobs;
00089 
00090     // fetch scope for initial item listing
00091     ItemFetchScope mFetchScope;
00092 
00093     // remote items
00094     Akonadi::Item::List mRemoteItems;
00095 
00096     // removed remote items
00097     Item::List mRemovedRemoteItems;
00098 
00099     // create counter
00100     int mPendingJobs;
00101     int mProgress;
00102     int mTotalItems;
00103     int mTotalItemsProcessed;
00104 
00105     bool mStreaming;
00106     bool mIncremental;
00107     bool mLocalListDone;
00108     bool mDeliveryDone;
00109 };
00110 
00111 void ItemSync::Private::createLocalItem( const Item & item )
00112 {
00113   // don't try to do anything in error state
00114   if ( q->error() )
00115     return;
00116   mPendingJobs++;
00117   ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00118   q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00119 }
00120 
00121 void ItemSync::Private::checkDone()
00122 {
00123   q->setProcessedAmount( KJob::Bytes, mProgress );
00124   if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00125     return;
00126 
00127   q->emitResult();
00128 }
00129 
00130 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00131     Job( parent ),
00132     d( new Private( this ) )
00133 {
00134   d->mSyncCollection = collection;
00135 }
00136 
00137 ItemSync::~ItemSync()
00138 {
00139   delete d;
00140 }
00141 
00142 void ItemSync::setFullSyncItems( const Item::List &items )
00143 {
00144   Q_ASSERT( !d->mIncremental );
00145   if ( !d->mStreaming )
00146     d->mDeliveryDone = true;
00147   d->mRemoteItems += items;
00148   d->mTotalItemsProcessed += items.count();
00149   kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00150   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00151   if ( d->mTotalItemsProcessed == d->mTotalItems )
00152     d->mDeliveryDone = true;
00153   d->execute();
00154 }
00155 
00156 void ItemSync::setTotalItems( int amount )
00157 {
00158   Q_ASSERT( !d->mIncremental );
00159   Q_ASSERT( amount >= 0 );
00160   setStreamingEnabled( true );
00161   kDebug() << amount;
00162   d->mTotalItems = amount;
00163   setTotalAmount( KJob::Bytes, amount );
00164   if ( d->mTotalItems == 0 ) {
00165     d->mDeliveryDone = true;
00166     d->execute();
00167   }
00168 }
00169 
00170 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00171 {
00172   d->mIncremental = true;
00173   if ( !d->mStreaming )
00174     d->mDeliveryDone = true;
00175   d->mRemoteItems += changedItems;
00176   d->mRemovedRemoteItems += removedItems;
00177   d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00178   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00179   if ( d->mTotalItemsProcessed == d->mTotalItems )
00180     d->mDeliveryDone = true;
00181   d->execute();
00182 }
00183 
00184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00185 {
00186   d->mFetchScope = fetchScope;
00187 }
00188 
00189 ItemFetchScope &ItemSync::fetchScope()
00190 {
00191   return d->mFetchScope;
00192 }
00193 
00194 void ItemSync::doStart()
00195 {
00196   ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00197   job->setFetchScope( d->mFetchScope );
00198 
00199   // we only can fetch parts already in the cache, otherwise this will deadlock
00200   job->fetchScope().setCacheOnly( true );
00201 
00202   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00203 }
00204 
00205 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00206 {
00207   // we are in error state, better not change anything at all anymore
00208   if ( error() )
00209     return false;
00210 
00211   /*
00212    * We know that this item has changed (as it is part of the
00213    * incremental changed list), so we just put it into the
00214    * storage.
00215    */
00216   if ( d->mIncremental )
00217     return true;
00218 
00219   // Check whether the flags differ
00220   if ( storedItem.flags() != newItem.flags() ) {
00221     kDebug() << "Stored flags "  << storedItem.flags()
00222              << "new flags " << newItem.flags();
00223     return true;
00224   }
00225 
00226   // Check whether the new item contains unknown parts
00227   QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00228   missingParts.subtract( storedItem.loadedPayloadParts() );
00229   if ( !missingParts.isEmpty() )
00230     return true;
00231 
00232   // ### FIXME SLOW!!!
00233   // If the available part identifiers don't differ, check
00234   // whether the content of the payload differs
00235   if ( newItem.hasPayload()
00236     && storedItem.payloadData() != newItem.payloadData() )
00237     return true;
00238 
00239   // check if remote attributes have been changed
00240   foreach ( Attribute* attr, newItem.attributes() ) {
00241     if ( !storedItem.hasAttribute( attr->type() ) )
00242       return true;
00243     if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00244       return true;
00245   }
00246 
00247   return false;
00248 }
00249 
00250 void ItemSync::Private::slotLocalListDone( KJob * job )
00251 {
00252   if ( !job->error() ) {
00253     const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00254     foreach ( const Item &item, list ) {
00255       if ( item.remoteId().isEmpty() )
00256         continue;
00257       mLocalItemsById.insert( item.id(), item );
00258       mLocalItemsByRemoteId.insert( item.remoteId(), item );
00259       mUnprocessedLocalItems.insert( item );
00260     }
00261   }
00262 
00263   mLocalListDone = true;
00264   execute();
00265 }
00266 
00267 void ItemSync::Private::execute()
00268 {
00269   if ( !mLocalListDone )
00270     return;
00271 
00272   if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00273     ++mTransactionJobs;
00274     mCurrentTransaction = new TransactionSequence( q );
00275     mCurrentTransaction->setAutomaticCommittingEnabled( false );
00276     connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00277   }
00278 
00279   processItems();
00280   if ( !mDeliveryDone ) {
00281     if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00282       mCurrentTransaction->commit();
00283       mCurrentTransaction = 0;
00284     }
00285     return;
00286   }
00287 
00288   // removed
00289   if ( !mIncremental ) {
00290     mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00291     mUnprocessedLocalItems.clear();
00292   }
00293 
00294   deleteItems( mRemovedRemoteItems );
00295   mLocalItemsById.clear();
00296   mLocalItemsByRemoteId.clear();
00297   mRemovedRemoteItems.clear();
00298 
00299   if ( mCurrentTransaction ) {
00300     mCurrentTransaction->commit();
00301     mCurrentTransaction = 0;
00302   }
00303 
00304   checkDone();
00305 }
00306 
00307 void ItemSync::Private::processItems()
00308 {
00309   // added / updated
00310   foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
00311 #ifndef NDEBUG
00312     if ( remoteItem.remoteId().isEmpty() ) {
00313       kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00314     }
00315 #endif
00316 
00317     Item localItem = mLocalItemsById.value( remoteItem.id() );
00318     if ( !localItem.isValid() )
00319       localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00320     mUnprocessedLocalItems.remove( localItem );
00321     // missing locally
00322     if ( !localItem.isValid() ) {
00323       createLocalItem( remoteItem );
00324       continue;
00325     }
00326 
00327     if ( q->updateItem( localItem, remoteItem ) ) {
00328       mPendingJobs++;
00329 
00330       remoteItem.setId( localItem.id() );
00331       remoteItem.setRevision( localItem.revision() );
00332       remoteItem.setSize( localItem.size() );
00333       remoteItem.setRemoteId( localItem.remoteId() );  // in case someone clears remoteId by accident
00334       ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00335       q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00336     } else {
00337       mProgress++;
00338     }
00339   }
00340   mRemoteItems.clear();
00341 }
00342 
00343 void ItemSync::Private::deleteItems( const Item::List &items )
00344 {
00345   // if in error state, better not change anything anymore
00346   if ( q->error() )
00347     return;
00348 
00349   Item::List itemsToDelete;
00350   foreach ( const Item &item, items ) {
00351     Item delItem( item );
00352     if ( !item.isValid() ) {
00353       delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00354     }
00355 
00356     if ( !delItem.isValid() ) {
00357 #ifndef NDEBUG
00358       kWarning() << "Delete item (remoteeId=" << delItem.remoteId()
00359                  << "mimeType=" << delItem.mimeType()
00360                  << ") does not have a valid UID and no item with that remote ID exists either";
00361 #endif
00362       continue;
00363     }
00364 
00365     if ( delItem.remoteId().isEmpty() ) {
00366       // don't attempt to remove items that never were written to the backend
00367       continue;
00368     }
00369 
00370     itemsToDelete.append ( delItem );
00371   }
00372 
00373   if ( !itemsToDelete.isEmpty() ) {
00374     mPendingJobs++;
00375     ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00376     q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00377 
00378     // It can happen that the groupware servers report us deleted items
00379     // twice, in this case this item delete job will fail on the second try.
00380     // To avoid a rollback of the complete transaction we gracefully allow the job
00381     // to fail :)
00382     TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00383     if ( transaction )
00384       transaction->setIgnoreJobFailure( job );
00385   }
00386 }
00387 
00388 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00389 {
00390   mPendingJobs--;
00391   mProgress++;
00392 
00393   checkDone();
00394 }
00395 
00396 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00397 {
00398   Q_UNUSED( job );
00399   mPendingJobs--;
00400   mProgress++;
00401 
00402   checkDone();
00403 }
00404 
00405 void ItemSync::Private::slotTransactionResult( KJob *job )
00406 {
00407   --mTransactionJobs;
00408   if ( mCurrentTransaction == job )
00409     mCurrentTransaction = 0;
00410 
00411   checkDone();
00412 }
00413 
00414 Job * ItemSync::Private::subjobParent() const
00415 {
00416   if ( mCurrentTransaction && mTransactionMode != None )
00417     return mCurrentTransaction;
00418   return q;
00419 }
00420 
00421 void ItemSync::setStreamingEnabled(bool enable)
00422 {
00423   d->mStreaming = enable;
00424 }
00425 
00426 void ItemSync::deliveryDone()
00427 {
00428   Q_ASSERT( d->mStreaming );
00429   d->mDeliveryDone = true;
00430   d->execute();
00431 }
00432 
00433 void ItemSync::slotResult(KJob* job)
00434 {
00435   if ( job->error() ) {
00436     // pretent there were no errors
00437     Akonadi::Job::removeSubjob( job );
00438     // propagate the first error we got but continue, we might still be fed with stuff from a resource
00439     if ( !error() ) {
00440       setError( job->error() );
00441       setErrorText( job->errorText() );
00442     }
00443   } else {
00444     Akonadi::Job::slotResult( job );
00445   }
00446 }
00447 
00448 void ItemSync::rollback()
00449 {
00450   setError( UserCanceled );
00451   if ( d->mCurrentTransaction )
00452     d->mCurrentTransaction->rollback();
00453   d->mDeliveryDone = true; // user wont deliver more data
00454   d->execute(); // end this in an ordered way, since we have an error set no real change will be done
00455 }
00456 
00457 
00458 #include "itemsync.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.1
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal