00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "resourcebase.h"
00022 #include "agentbase_p.h"
00023
00024 #include "resourceadaptor.h"
00025 #include "collectiondeletejob.h"
00026 #include "collectionsync_p.h"
00027 #include "itemsync.h"
00028 #include "resourcescheduler_p.h"
00029 #include "tracerinterface.h"
00030 #include "xdgbasedirs_p.h"
00031
00032 #include "changerecorder.h"
00033 #include "collectionfetchjob.h"
00034 #include "collectionmodifyjob.h"
00035 #include "itemfetchjob.h"
00036 #include "itemfetchscope.h"
00037 #include "itemmodifyjob.h"
00038 #include "itemmodifyjob_p.h"
00039 #include "session.h"
00040 #include "resourceselectjob_p.h"
00041
00042 #include <kaboutdata.h>
00043 #include <kcmdlineargs.h>
00044 #include <kdebug.h>
00045 #include <klocale.h>
00046
00047 #include <QtCore/QDebug>
00048 #include <QtCore/QDir>
00049 #include <QtCore/QHash>
00050 #include <QtCore/QSettings>
00051 #include <QtCore/QTimer>
00052 #include <QtGui/QApplication>
00053 #include <QtDBus/QtDBus>
00054
00055 using namespace Akonadi;
00056
00057 class Akonadi::ResourceBasePrivate : public AgentBasePrivate
00058 {
00059 public:
00060 ResourceBasePrivate( ResourceBase *parent )
00061 : AgentBasePrivate( parent ),
00062 scheduler( 0 ),
00063 mItemSyncer( 0 ),
00064 mCollectionSyncer( 0 )
00065 {
00066 mStatusMessage = defaultReadyMessage();
00067 }
00068
00069 Q_DECLARE_PUBLIC( ResourceBase )
00070
00071 void delayedInit()
00072 {
00073 if ( !QDBusConnection::sessionBus().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) )
00074 kFatal() << "Unable to register service at D-Bus: " << QDBusConnection::sessionBus().lastError().message();
00075 AgentBasePrivate::delayedInit();
00076 }
00077
00078 virtual void changeProcessed()
00079 {
00080 mMonitor->changeProcessed();
00081 if ( !mMonitor->isEmpty() )
00082 scheduler->scheduleChangeReplay();
00083 scheduler->taskDone();
00084 }
00085
00086 void slotDeliveryDone( KJob* job );
00087 void slotCollectionSyncDone( KJob *job );
00088 void slotLocalListDone( KJob *job );
00089 void slotSynchronizeCollection( const Collection &col );
00090 void slotCollectionListDone( KJob *job );
00091
00092 void slotItemSyncDone( KJob *job );
00093
00094 void slotPercent( KJob* job, unsigned long percent );
00095 void slotDeleteResourceCollection();
00096 void slotDeleteResourceCollectionDone( KJob *job );
00097 void slotCollectionDeletionDone( KJob *job );
00098
00099
00100 Collection currentCollection;
00101
00102 ResourceScheduler *scheduler;
00103 ItemSync *mItemSyncer;
00104 CollectionSync *mCollectionSyncer;
00105 };
00106
00107 ResourceBase::ResourceBase( const QString & id )
00108 : AgentBase( new ResourceBasePrivate( this ), id )
00109 {
00110 Q_D( ResourceBase );
00111
00112 new ResourceAdaptor( this );
00113
00114 d->scheduler = new ResourceScheduler( this );
00115
00116 d->mMonitor->setChangeRecordingEnabled( true );
00117 connect( d->mMonitor, SIGNAL( changesAdded() ),
00118 d->scheduler, SLOT( scheduleChangeReplay() ) );
00119
00120 d->mMonitor->setResourceMonitored( d->mId.toLatin1() );
00121
00122 connect( d->scheduler, SIGNAL( executeFullSync() ),
00123 SLOT( retrieveCollections() ) );
00124 connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ),
00125 SLOT( retrieveCollections() ) );
00126 connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ),
00127 SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) );
00128 connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ),
00129 SLOT( retrieveItem( const Akonadi::Item&, const QSet<QByteArray>& ) ) );
00130 connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ),
00131 SLOT( slotDeleteResourceCollection() ) );
00132 connect( d->scheduler, SIGNAL( status( int, const QString& ) ),
00133 SIGNAL( status( int, const QString& ) ) );
00134 connect( d->scheduler, SIGNAL( executeChangeReplay() ),
00135 d->mMonitor, SLOT( replayNext() ) );
00136 connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) );
00137 connect( d->mMonitor, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) );
00138 connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) );
00139 connect( this, SIGNAL( agentNameChanged( const QString& ) ),
00140 this, SIGNAL( nameChanged( const QString& ) ) );
00141
00142 d->scheduler->setOnline( d->mOnline );
00143 if ( !d->mMonitor->isEmpty() )
00144 d->scheduler->scheduleChangeReplay();
00145
00146 new ResourceSelectJob( identifier() );
00147 }
00148
00149 ResourceBase::~ResourceBase()
00150 {
00151 }
00152
00153 void ResourceBase::synchronize()
00154 {
00155 d_func()->scheduler->scheduleFullSync();
00156 }
00157
00158 void ResourceBase::setName( const QString &name )
00159 {
00160 AgentBase::setAgentName( name );
00161 }
00162
00163 QString ResourceBase::name() const
00164 {
00165 return AgentBase::agentName();
00166 }
00167
00168 QString ResourceBase::parseArguments( int argc, char **argv )
00169 {
00170 QString identifier;
00171 if ( argc < 3 ) {
00172 kDebug( 5250 ) << "Not enough arguments passed...";
00173 exit( 1 );
00174 }
00175
00176 for ( int i = 1; i < argc - 1; ++i ) {
00177 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) )
00178 identifier = QLatin1String( argv[ i + 1 ] );
00179 }
00180
00181 if ( identifier.isEmpty() ) {
00182 kDebug( 5250 ) << "Identifier argument missing";
00183 exit( 1 );
00184 }
00185
00186 QByteArray catalog;
00187 char *p = strrchr( argv[0], '/' );
00188 if ( p )
00189 catalog = QByteArray( p + 1 );
00190 else
00191 catalog = QByteArray( argv[0] );
00192
00193 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog,
00194 ki18nc("@title, application name", "Akonadi Resource"), "0.1",
00195 ki18nc("@title, application description", "Akonadi Resource") );
00196
00197 KCmdLineOptions options;
00198 options.add( "identifier <argument>",
00199 ki18nc("@label, commandline option", "Resource identifier") );
00200 KCmdLineArgs::addCmdLineOptions( options );
00201
00202 return identifier;
00203 }
00204
00205 int ResourceBase::init( ResourceBase *r )
00206 {
00207 QApplication::setQuitOnLastWindowClosed( false );
00208 int rv = kapp->exec();
00209 delete r;
00210 return rv;
00211 }
00212
00213 void ResourceBase::itemRetrieved( const Item &item )
00214 {
00215 Q_D( ResourceBase );
00216 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem );
00217 if ( !item.isValid() ) {
00218 QDBusMessage reply( d->scheduler->currentTask().dbusMsg );
00219 reply << false;
00220 QDBusConnection::sessionBus().send( reply );
00221 d->scheduler->taskDone();
00222 return;
00223 }
00224
00225 Item i( item );
00226 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
00227 foreach ( const QByteArray &part, requestedParts ) {
00228 if ( !item.loadedPayloadParts().contains( part ) ) {
00229 kWarning( 5250 ) << "Item does not provide part" << part;
00230 }
00231 }
00232
00233 ItemModifyJob *job = new ItemModifyJob( i );
00234
00235 job->disableRevisionCheck();
00236 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) );
00237 }
00238
00239 void ResourceBasePrivate::slotDeliveryDone(KJob * job)
00240 {
00241 Q_Q( ResourceBase );
00242 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem );
00243 QDBusMessage reply( scheduler->currentTask().dbusMsg );
00244 if ( job->error() ) {
00245 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() );
00246 reply << false;
00247 } else {
00248 reply << true;
00249 }
00250 QDBusConnection::sessionBus().send( reply );
00251 scheduler->taskDone();
00252 }
00253
00254 void ResourceBasePrivate::slotDeleteResourceCollection()
00255 {
00256 Q_Q( ResourceBase );
00257
00258 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel );
00259 job->setResource( q->identifier() );
00260 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) );
00261 }
00262
00263 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job )
00264 {
00265 Q_Q( ResourceBase );
00266 if ( job->error() ) {
00267 emit q->error( job->errorString() );
00268 scheduler->taskDone();
00269 } else {
00270 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job );
00271
00272 if ( !fetchJob->collections().isEmpty() ) {
00273 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() );
00274 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) );
00275 } else {
00276
00277 scheduler->taskDone();
00278 }
00279 }
00280 }
00281
00282 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job )
00283 {
00284 Q_Q( ResourceBase );
00285 if ( job->error() ) {
00286 emit q->error( job->errorString() );
00287 }
00288
00289 scheduler->taskDone();
00290 }
00291
00292 void ResourceBase::changeCommitted( const Item& item )
00293 {
00294 Q_D( ResourceBase );
00295 ItemModifyJob *job = new ItemModifyJob( item );
00296 job->d_func()->setClean();
00297 job->disableRevisionCheck();
00298 job->ignorePayload();
00299 d->changeProcessed();
00300 }
00301
00302 void ResourceBase::changeCommitted( const Collection &collection )
00303 {
00304 Q_D( ResourceBase );
00305 CollectionModifyJob *job = new CollectionModifyJob( collection );
00306 Q_UNUSED( job );
00307
00308 d->changeProcessed();
00309 }
00310
00311 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId,
00312 const QString &mimeType, const QStringList &_parts )
00313 {
00314 Q_D( ResourceBase );
00315 if ( !isOnline() ) {
00316 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) );
00317 return false;
00318 }
00319
00320 setDelayedReply( true );
00321
00322 Item item( uid );
00323 item.setMimeType( mimeType );
00324 item.setRemoteId( remoteId );
00325
00326 QSet<QByteArray> parts;
00327 Q_FOREACH( const QString &str, _parts )
00328 parts.insert( str.toLatin1() );
00329
00330 d->scheduler->scheduleItemFetch( item, parts, message().createReply() );
00331
00332 return true;
00333 }
00334
00335 void ResourceBase::collectionsRetrieved( const Collection::List & collections )
00336 {
00337 Q_D( ResourceBase );
00338 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00339 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00340 "ResourceBase::collectionsRetrieved()",
00341 "Calling collectionsRetrieved() although no collection retrieval is in progress" );
00342 if ( !d->mCollectionSyncer ) {
00343 d->mCollectionSyncer = new CollectionSync( identifier() );
00344 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00345 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00346 }
00347 d->mCollectionSyncer->setRemoteCollections( collections );
00348 }
00349
00350 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections,
00351 const Collection::List & removedCollections )
00352 {
00353 Q_D( ResourceBase );
00354 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00355 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00356 "ResourceBase::collectionsRetrievedIncremental()",
00357 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" );
00358 if ( !d->mCollectionSyncer ) {
00359 d->mCollectionSyncer = new CollectionSync( identifier() );
00360 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00361 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00362 }
00363 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections );
00364 }
00365
00366 void ResourceBase::setCollectionStreamingEnabled( bool enable )
00367 {
00368 Q_D( ResourceBase );
00369 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00370 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00371 "ResourceBase::setCollectionStreamingEnabled()",
00372 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" );
00373 if ( !d->mCollectionSyncer ) {
00374 d->mCollectionSyncer = new CollectionSync( identifier() );
00375 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00376 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00377 }
00378 d->mCollectionSyncer->setStreamingEnabled( enable );
00379 }
00380
00381 void ResourceBase::collectionsRetrievalDone()
00382 {
00383 Q_D( ResourceBase );
00384 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00385 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00386 "ResourceBase::collectionsRetrievalDone()",
00387 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" );
00388
00389 if ( d->mCollectionSyncer ) {
00390 d->mCollectionSyncer->retrievalDone();
00391 }
00392
00393 else {
00394 d->scheduler->taskDone();
00395 }
00396 }
00397
00398 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job )
00399 {
00400 Q_Q( ResourceBase );
00401 mCollectionSyncer = 0;
00402 if ( job->error() ) {
00403 emit q->error( job->errorString() );
00404 } else {
00405 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) {
00406 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive );
00407 list->setResource( mId );
00408 q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) );
00409 return;
00410 }
00411 }
00412 scheduler->taskDone();
00413 }
00414
00415 void ResourceBasePrivate::slotLocalListDone( KJob * job )
00416 {
00417 Q_Q( ResourceBase );
00418 if ( job->error() ) {
00419 emit q->error( job->errorString() );
00420 } else {
00421 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections();
00422 foreach ( const Collection &col, cols ) {
00423 scheduler->scheduleSync( col );
00424 }
00425 scheduler->scheduleFullSyncCompletion();
00426 }
00427 scheduler->taskDone();
00428 }
00429
00430 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col )
00431 {
00432 Q_Q( ResourceBase );
00433 currentCollection = col;
00434
00435 QStringList contentTypes = currentCollection.contentMimeTypes();
00436 contentTypes.removeAll( Collection::mimeType() );
00437 if ( !contentTypes.isEmpty() ) {
00438 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) );
00439 q->retrieveItems( currentCollection );
00440 return;
00441 }
00442 scheduler->taskDone();
00443 }
00444
00445 void ResourceBase::itemsRetrievalDone()
00446 {
00447 Q_D( ResourceBase );
00448
00449 if ( d->mItemSyncer ) {
00450 d->mItemSyncer->deliveryDone();
00451 }
00452
00453 else {
00454 d->scheduler->taskDone();
00455 }
00456 }
00457
00458 void ResourceBase::clearCache()
00459 {
00460 Q_D( ResourceBase );
00461 d->scheduler->scheduleResourceCollectionDeletion();
00462 }
00463
00464 Collection ResourceBase::currentCollection() const
00465 {
00466 Q_D( const ResourceBase );
00467 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
00468 "ResourceBase::currentCollection()",
00469 "Trying to access current collection although no item retrieval is in progress" );
00470 return d->currentCollection;
00471 }
00472
00473 Item ResourceBase::currentItem() const
00474 {
00475 Q_D( const ResourceBase );
00476 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
00477 "ResourceBase::currentItem()",
00478 "Trying to access current item although no item retrieval is in progress" );
00479 return d->scheduler->currentTask().item;
00480 }
00481
00482 void ResourceBase::synchronizeCollectionTree()
00483 {
00484 d_func()->scheduler->scheduleCollectionTreeSync();
00485 }
00486
00487 void ResourceBase::cancelTask()
00488 {
00489 Q_D( ResourceBase );
00490 switch ( d->scheduler->currentTask().type ) {
00491 case ResourceScheduler::FetchItem:
00492 itemRetrieved( Item() );
00493 break;
00494 case ResourceScheduler::ChangeReplay:
00495 d->changeProcessed();
00496 break;
00497 default:
00498 d->scheduler->taskDone();
00499 }
00500 }
00501
00502 void ResourceBase::cancelTask( const QString &msg )
00503 {
00504 cancelTask();
00505
00506 emit error( msg );
00507 }
00508
00509 void ResourceBase::deferTask()
00510 {
00511 Q_D( ResourceBase );
00512 d->scheduler->deferTask();
00513 }
00514
00515 void ResourceBase::doSetOnline( bool state )
00516 {
00517 d_func()->scheduler->setOnline( state );
00518 }
00519
00520 void ResourceBase::synchronizeCollection( qint64 collectionId )
00521 {
00522 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base );
00523 job->setResource( identifier() );
00524 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) );
00525 }
00526
00527 void ResourceBasePrivate::slotCollectionListDone( KJob *job )
00528 {
00529 if ( !job->error() ) {
00530 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00531 if ( !list.isEmpty() ) {
00532 Collection col = list.first();
00533 scheduler->scheduleSync( col );
00534 }
00535 }
00536
00537 }
00538
00539 void ResourceBase::setTotalItems( int amount )
00540 {
00541 kDebug() << amount;
00542 Q_D( ResourceBase );
00543 setItemStreamingEnabled( true );
00544 d->mItemSyncer->setTotalItems( amount );
00545 }
00546
00547 void ResourceBase::setItemStreamingEnabled( bool enable )
00548 {
00549 Q_D( ResourceBase );
00550 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00551 "ResourceBase::setItemStreamingEnabled()",
00552 "Calling setItemStreamingEnabled() although no item retrieval is in progress" );
00553 if ( !d->mItemSyncer ) {
00554 d->mItemSyncer = new ItemSync( currentCollection() );
00555 connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00556 connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) );
00557 }
00558 d->mItemSyncer->setStreamingEnabled( enable );
00559 }
00560
00561 void ResourceBase::itemsRetrieved( const Item::List &items )
00562 {
00563 Q_D( ResourceBase );
00564 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00565 "ResourceBase::itemsRetrieved()",
00566 "Calling itemsRetrieved() although no item retrieval is in progress" );
00567 if ( !d->mItemSyncer ) {
00568 d->mItemSyncer = new ItemSync( currentCollection() );
00569 connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00570 connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) );
00571 }
00572 d->mItemSyncer->setFullSyncItems( items );
00573 }
00574
00575 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems )
00576 {
00577 Q_D( ResourceBase );
00578 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00579 "ResourceBase::itemsRetrievedIncremental()",
00580 "Calling itemsRetrievedIncremental() although no item retrieval is in progress" );
00581 if ( !d->mItemSyncer ) {
00582 d->mItemSyncer = new ItemSync( currentCollection() );
00583 connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00584 connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) );
00585 }
00586 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems );
00587 }
00588
00589 void ResourceBasePrivate::slotItemSyncDone( KJob *job )
00590 {
00591 mItemSyncer = 0;
00592 Q_Q( ResourceBase );
00593 if ( job->error() ) {
00594 emit q->error( job->errorString() );
00595 }
00596 scheduler->taskDone();
00597 }
00598
00599 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent )
00600 {
00601 Q_Q( ResourceBase );
00602 Q_UNUSED( job );
00603 emit q->percent( percent );
00604 }
00605
00606 #include "resourcebase.moc"