00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "resourcescheduler_p.h"
00021
00022 #include <kdebug.h>
00023 #include <klocale.h>
00024
00025 #include <QtCore/QTimer>
00026 #include <QtDBus/QDBusInterface>
00027 #include <QtDBus/QDBusConnectionInterface>
00028 #include <boost/graph/graph_concepts.hpp>
00029
00030 using namespace Akonadi;
00031
00032 qint64 ResourceScheduler::Task::latestSerial = 0;
00033 static QDBusAbstractInterface *s_resourcetracker = 0;
00034
00035
00036
00037 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00038 QObject( parent ),
00039 mOnline( false )
00040 {
00041 }
00042
00043 void ResourceScheduler::scheduleFullSync()
00044 {
00045 Task t;
00046 t.type = SyncAll;
00047 TaskList& queue = queueForTaskType( t.type );
00048 if ( queue.contains( t ) || mCurrentTask == t )
00049 return;
00050 queue << t;
00051 signalTaskToTracker( t, "SyncAll" );
00052 scheduleNext();
00053 }
00054
00055 void ResourceScheduler::scheduleCollectionTreeSync()
00056 {
00057 Task t;
00058 t.type = SyncCollectionTree;
00059 TaskList& queue = queueForTaskType( t.type );
00060 if ( queue.contains( t ) || mCurrentTask == t )
00061 return;
00062 queue << t;
00063 signalTaskToTracker( t, "SyncCollectionTree" );
00064 scheduleNext();
00065 }
00066
00067 void ResourceScheduler::scheduleSync(const Collection & col)
00068 {
00069 Task t;
00070 t.type = SyncCollection;
00071 t.collection = col;
00072 TaskList& queue = queueForTaskType( t.type );
00073 if ( queue.contains( t ) || mCurrentTask == t )
00074 return;
00075 queue << t;
00076 signalTaskToTracker( t, "SyncCollection" );
00077 scheduleNext();
00078 }
00079
00080 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00081 {
00082 Task t;
00083 t.type = FetchItem;
00084 t.item = item;
00085 t.itemParts = parts;
00086
00087
00088
00089 if ( mCurrentTask == t ) {
00090 mCurrentTask.dbusMsgs << msg;
00091 return;
00092 }
00093
00094
00095 TaskList& queue = queueForTaskType( t.type );
00096 const int idx = queue.indexOf( t );
00097 if ( idx != -1 ) {
00098 queue[ idx ].dbusMsgs << msg;
00099 return;
00100 }
00101
00102 t.dbusMsgs << msg;
00103 queue << t;
00104 signalTaskToTracker( t, "FetchItem" );
00105 scheduleNext();
00106 }
00107
00108 void ResourceScheduler::scheduleResourceCollectionDeletion()
00109 {
00110 Task t;
00111 t.type = DeleteResourceCollection;
00112 TaskList& queue = queueForTaskType( t.type );
00113 if ( queue.contains( t ) || mCurrentTask == t )
00114 return;
00115 queue << t;
00116 signalTaskToTracker( t, "DeleteResourceCollection" );
00117 scheduleNext();
00118 }
00119
00120 void ResourceScheduler::scheduleChangeReplay()
00121 {
00122 Task t;
00123 t.type = ChangeReplay;
00124 TaskList& queue = queueForTaskType( t.type );
00125
00126 if ( queue.contains( t ) )
00127 return;
00128 queue << t;
00129 signalTaskToTracker( t, "ChangeReplay" );
00130 scheduleNext();
00131 }
00132
00133 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00134 {
00135 Task t;
00136 t.type = SyncAllDone;
00137 TaskList& queue = queueForTaskType( t.type );
00138
00139 queue << t;
00140 signalTaskToTracker( t, "SyncAllDone" );
00141 scheduleNext();
00142 }
00143
00144 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00145 {
00146 Task t;
00147 t.type = Custom;
00148 t.receiver = receiver;
00149 t.methodName = methodName;
00150 t.argument = argument;
00151 QueueType queueType = GenericTaskQueue;
00152 if ( priority == ResourceBase::AfterChangeReplay )
00153 queueType = AfterChangeReplayQueue;
00154 TaskList& queue = mTaskList[ queueType ];
00155
00156 if ( queue.contains( t ) )
00157 return;
00158
00159 switch (priority) {
00160 case ResourceBase::Prepend:
00161 queue.prepend( t );
00162 break;
00163 default:
00164 queue.append(t);
00165 break;
00166 }
00167
00168 signalTaskToTracker( t, "Custom-" + t.methodName );
00169 scheduleNext();
00170 }
00171
00172 void ResourceScheduler::taskDone()
00173 {
00174 if ( isEmpty() )
00175 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00176
00177 if ( s_resourcetracker ) {
00178 QList<QVariant> argumentList;
00179 argumentList << QString::number( mCurrentTask.serial )
00180 << QString();
00181 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00182 }
00183
00184 mCurrentTask = Task();
00185 scheduleNext();
00186 }
00187
00188 void ResourceScheduler::deferTask()
00189 {
00190 if ( s_resourcetracker ) {
00191 QList<QVariant> argumentList;
00192 argumentList << QString::number( mCurrentTask.serial )
00193 << QString();
00194 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00195 }
00196
00197 Task t = mCurrentTask;
00198 mCurrentTask = Task();
00199 mTaskList[GenericTaskQueue] << t;
00200 signalTaskToTracker( t, "DeferedTask" );
00201
00202 scheduleNext();
00203 }
00204
00205 bool ResourceScheduler::isEmpty()
00206 {
00207 for ( int i = 0; i < NQueueCount; ++i ) {
00208 if ( !mTaskList[i].isEmpty() )
00209 return false;
00210 }
00211 return true;
00212 }
00213
00214 void ResourceScheduler::scheduleNext()
00215 {
00216 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00217 return;
00218 QTimer::singleShot( 0, this, SLOT( executeNext() ) );
00219 }
00220
00221 void ResourceScheduler::executeNext()
00222 {
00223 if ( mCurrentTask.type != Invalid || isEmpty() )
00224 return;
00225
00226 for ( int i = 0; i < NQueueCount; ++i ) {
00227 if ( !mTaskList[ i ].isEmpty() ) {
00228 mCurrentTask = mTaskList[ i ].takeFirst();
00229 break;
00230 }
00231 }
00232
00233 if ( s_resourcetracker ) {
00234 QList<QVariant> argumentList;
00235 argumentList << QString::number( mCurrentTask.serial );
00236 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00237 }
00238
00239 switch ( mCurrentTask.type ) {
00240 case SyncAll:
00241 emit executeFullSync();
00242 break;
00243 case SyncCollectionTree:
00244 emit executeCollectionTreeSync();
00245 break;
00246 case SyncCollection:
00247 emit executeCollectionSync( mCurrentTask.collection );
00248 break;
00249 case FetchItem:
00250 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00251 break;
00252 case DeleteResourceCollection:
00253 emit executeResourceCollectionDeletion();
00254 break;
00255 case ChangeReplay:
00256 emit executeChangeReplay();
00257 break;
00258 case SyncAllDone:
00259 emit fullSyncComplete();
00260 break;
00261 case Custom:
00262 {
00263 bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00264 if ( !success )
00265 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00266
00267 if ( !success )
00268 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00269 break;
00270 }
00271 default: {
00272 kError() << "Unhandled task type" << mCurrentTask.type;
00273 dump();
00274 Q_ASSERT( false );
00275 }
00276 }
00277 }
00278
00279 ResourceScheduler::Task ResourceScheduler::currentTask() const
00280 {
00281 return mCurrentTask;
00282 }
00283
00284 void ResourceScheduler::setOnline(bool state)
00285 {
00286 if ( mOnline == state )
00287 return;
00288 mOnline = state;
00289 if ( mOnline ) {
00290 scheduleNext();
00291 } else {
00292 if ( mCurrentTask.type != Invalid ) {
00293
00294 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00295 mCurrentTask = Task();
00296 }
00297
00298 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00299 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00300 if ( (*it).type == FetchItem ) {
00301 (*it).sendDBusReplies( false );
00302 it = itemFetchQueue.erase( it );
00303 if ( s_resourcetracker ) {
00304 QList<QVariant> argumentList;
00305 argumentList << QString::number( mCurrentTask.serial )
00306 << QLatin1String( "Job canceled." );
00307 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00308 }
00309 } else {
00310 ++it;
00311 }
00312 }
00313 }
00314 }
00315
00316 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00317 {
00318
00319 if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00320 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00321 QLatin1String( "/resourcesJobtracker" ),
00322 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00323 QDBusConnection::sessionBus(), 0 );
00324 }
00325
00326 if ( s_resourcetracker ) {
00327 QList<QVariant> argumentList;
00328 argumentList << static_cast<AgentBase*>( parent() )->identifier()
00329 << QString::number( task.serial )
00330 << QString()
00331 << QString::fromLatin1( taskType );
00332 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00333 }
00334 }
00335
00336 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00337 {
00338 if ( !collection.isValid() )
00339 return;
00340 TaskList& queue = queueForTaskType( SyncCollection );
00341 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00342 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00343 it = queue.erase( it );
00344 kDebug() << " erasing";
00345 } else
00346 ++it;
00347 }
00348 }
00349
00350 void ResourceScheduler::Task::sendDBusReplies( bool success )
00351 {
00352 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00353 QDBusMessage reply( msg );
00354 reply << success;
00355 QDBusConnection::sessionBus().send( reply );
00356 }
00357 }
00358
00359 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00360 {
00361 switch( type ) {
00362 case ChangeReplay:
00363 return ChangeReplayQueue;
00364 case FetchItem:
00365 return ItemFetchQueue;
00366 default:
00367 return GenericTaskQueue;
00368 }
00369 }
00370
00371 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00372 {
00373 const QueueType qt = queueTypeForTaskType( type );
00374 return mTaskList[ qt ];
00375 }
00376
00377 void ResourceScheduler::dump()
00378 {
00379 kDebug() << "ResourceScheduler: Online:" << mOnline;
00380 kDebug() << " current task:" << mCurrentTask;
00381 for ( int i = 0; i < NQueueCount; ++i ) {
00382 const TaskList& queue = mTaskList[i];
00383 kDebug() << " queue" << i << queue.size() << "tasks:";
00384 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00385 kDebug() << " " << (*it);
00386 }
00387 }
00388 }
00389
00390 void ResourceScheduler::clear()
00391 {
00392 kDebug() << "Clearing ResourceScheduler queues:";
00393 for ( int i = 0; i < NQueueCount; ++i ) {
00394 TaskList& queue = mTaskList[i];
00395 queue.clear();
00396 }
00397 mCurrentTask = Task();
00398 }
00399
00400 static const char s_taskTypes[][25] = {
00401 "Invalid",
00402 "SyncAll",
00403 "SyncCollectionTree",
00404 "SyncCollection",
00405 "FetchItem",
00406 "ChangeReplay",
00407 "DeleteResourceCollection",
00408 "SyncAllDone",
00409 "Custom"
00410 };
00411
00412 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00413 {
00414 d << task.serial << s_taskTypes[task.type];
00415 if ( task.type != ResourceScheduler::Invalid ) {
00416 if ( task.collection.id() != -1 )
00417 d << "collection" << task.collection.id();
00418 if ( task.item.id() != -1 )
00419 d << "item" << task.item.id();
00420 if ( !task.methodName.isEmpty() )
00421 d << task.methodName << task.argument;
00422 }
00423 return d;
00424 }
00425
00426
00427
00428 #include "resourcescheduler_p.moc"