• Skip to content
  • Skip to link menu
KDE 4.5 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

session.cpp

00001 /*
00002     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00003 
00004     This library is free software; you can redistribute it and/or modify it
00005     under the terms of the GNU Library General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or (at your
00007     option) any later version.
00008 
00009     This library is distributed in the hope that it will be useful, but WITHOUT
00010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00012     License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to the
00016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00017     02110-1301, USA.
00018 */
00019 
00020 #include "session.h"
00021 #include "session_p.h"
00022 
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "servermanager.h"
00027 #include "servermanager_p.h"
00028 #include "xdgbasedirs_p.h"
00029 
00030 #include <kdebug.h>
00031 #include <klocale.h>
00032 
00033 #include <QCoreApplication>
00034 #include <QtCore/QDir>
00035 #include <QtCore/QQueue>
00036 #include <QtCore/QThreadStorage>
00037 #include <QtCore/QTimer>
00038 #include <QSettings>
00039 
00040 #include <QtNetwork/QLocalSocket>
00041 #include <QtNetwork/QTcpSocket>
00042 
00043 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
00044 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
00045 // sends responses for the next one to the already finished one
00046 #define PIPELINE_LENGTH 0
00047 //#define PIPELINE_LENGTH 2
00048 
00049 using namespace Akonadi;
00050 
00051 
00052 //@cond PRIVATE
00053 
00054 void SessionPrivate::startNext()
00055 {
00056   QTimer::singleShot( 0, mParent, SLOT( doStartNext() ) );
00057 }
00058 
00059 void SessionPrivate::reconnect()
00060 {
00061   QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
00062   if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
00063                        || localSocket->state() == QLocalSocket::ConnectingState ) ) {
00064     // nothing to do, we are still/already connected
00065     return;
00066   }
00067 
00068   QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
00069   if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
00070                      || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
00071     // same here, but for TCP
00072     return;
00073   }
00074 
00075   // try to figure out where to connect to
00076   QString serverAddress;
00077   quint16 port = 0;
00078   bool useTcp = false;
00079 
00080   // env var has precedence
00081   const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
00082   if ( !serverAddressEnvVar.isEmpty() ) {
00083     const int pos = serverAddressEnvVar.indexOf( ':' );
00084     const QByteArray protocol = serverAddressEnvVar.left( pos  );
00085     QMap<QString, QString> options;
00086     foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
00087       const QStringList pair = entry.split( QLatin1Char('=') );
00088       if ( pair.size() != 2 )
00089         continue;
00090       options.insert( pair.first(), pair.last() );
00091     }
00092     kDebug() << protocol << options;
00093 
00094     if ( protocol == "tcp" ) {
00095       serverAddress = options.value( QLatin1String( "host" ) );
00096       port = options.value( QLatin1String( "port" ) ).toUInt();
00097       useTcp = true;
00098     } else if ( protocol == "unix" ) {
00099       serverAddress = options.value( QLatin1String( "path" ) );
00100     } else if ( protocol == "pipe" ) {
00101       serverAddress = options.value( QLatin1String( "name" ) );
00102     }
00103   }
00104 
00105   // try config file next, fall back to defaults if that fails as well
00106   if ( serverAddress.isEmpty() ) {
00107     const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00108     const QFileInfo fileInfo( connectionConfigFile );
00109     if ( !fileInfo.exists() ) {
00110       kDebug() << "Akonadi Client Session: connection config file '"
00111                   "akonadi/akonadiconnectionrc' can not be found in"
00112                << XdgBaseDirs::homePath( "config" ) << "nor in any of"
00113                << XdgBaseDirs::systemPathList( "config" );
00114     }
00115     const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
00116 
00117 #ifdef Q_OS_WIN  //krazy:exclude=cpp
00118     serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00119 #else
00120     const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00121     serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString();
00122 #endif
00123   }
00124 
00125   // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
00126   // but that's probably not something we need to support anyway
00127   if ( !socket ) {
00128     if ( !useTcp ) {
00129       socket = localSocket = new QLocalSocket( mParent );
00130       mParent->connect( localSocket, SIGNAL( error( QLocalSocket::LocalSocketError ) ), SLOT( socketError( QLocalSocket::LocalSocketError ) ) );
00131     } else {
00132       socket = tcpSocket = new QTcpSocket( mParent );
00133       mParent->connect( tcpSocket, SIGNAL( error( QAbstractSocket::SocketError ) ), SLOT( socketError( QAbstractSocket::SocketError ) ) );
00134     }
00135     mParent->connect( socket, SIGNAL( disconnected() ), SLOT( socketDisconnected() ) );
00136     mParent->connect( socket, SIGNAL( readyRead() ), SLOT( dataReceived() ) );
00137   }
00138 
00139   // actually do connect
00140   kDebug() << "connectToServer" << serverAddress;
00141   if ( !useTcp ) {
00142     localSocket->connectToServer( serverAddress );
00143   } else {
00144     tcpSocket->connectToHost( serverAddress, port );
00145   }
00146 }
00147 
00148 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
00149 {
00150   Q_ASSERT( mParent->sender() == socket );
00151   kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
00152   socketDisconnected();
00153 }
00154 
00155 void SessionPrivate::socketError( QAbstractSocket::SocketError )
00156 {
00157   Q_ASSERT( mParent->sender() == socket );
00158   kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
00159   socketDisconnected();
00160 }
00161 
00162 void SessionPrivate::socketDisconnected()
00163 {
00164   if ( currentJob )
00165     currentJob->d_ptr->lostConnection();
00166   connected = false;
00167   QTimer::singleShot( 30000, mParent, SLOT( reconnect() ) );
00168 }
00169 
00170 void SessionPrivate::dataReceived()
00171 {
00172   while ( socket->bytesAvailable() > 0 ) {
00173     if ( parser->continuationSize() > 1 ) {
00174       const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00175       parser->parseBlock( data );
00176     } else if ( socket->canReadLine() ) {
00177       if ( !parser->parseNextLine( socket->readLine() ) )
00178         continue; // response not yet completed
00179 
00180       // handle login response
00181       if ( parser->tag() == QByteArray( "0" ) ) {
00182         if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
00183           connected = true;
00184           startNext();
00185         } else {
00186           kWarning() << "Unable to login to Akonadi server:" << parser->data();
00187           socket->close();
00188           QTimer::singleShot( 1000, mParent, SLOT( reconnect() ) );
00189         }
00190       }
00191 
00192       // send login command
00193       if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00194         const int pos = parser->data().indexOf( "[PROTOCOL" );
00195         if ( pos > 0 ) {
00196           qint64 tmp = 0;
00197           ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00198           protocolVersion = tmp;
00199           Internal::setServerProtocolVersion( tmp );
00200         }
00201         kDebug() << "Server protocol version is:" << protocolVersion;
00202 
00203         writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
00204 
00205       // work for the current job
00206       } else {
00207         if ( currentJob )
00208           currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00209       }
00210 
00211       // reset parser stuff
00212       parser->reset();
00213     } else {
00214       break; // nothing we can do for now
00215     }
00216   }
00217 }
00218 
00219 bool SessionPrivate::canPipelineNext()
00220 {
00221   if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00222     return false;
00223   if ( pipeline.isEmpty() && currentJob )
00224     return currentJob->d_ptr->mWriteFinished;
00225   if ( !pipeline.isEmpty() )
00226     return pipeline.last()->d_ptr->mWriteFinished;
00227   return false;
00228 }
00229 
00230 void SessionPrivate::doStartNext()
00231 {
00232   if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00233     return;
00234   if ( canPipelineNext() ) {
00235     Akonadi::Job *nextJob = queue.dequeue();
00236     pipeline.enqueue( nextJob );
00237     startJob( nextJob );
00238   }
00239   if ( jobRunning )
00240     return;
00241   jobRunning = true;
00242   if ( !pipeline.isEmpty() ) {
00243     currentJob = pipeline.dequeue();
00244   } else {
00245     currentJob = queue.dequeue();
00246     startJob( currentJob );
00247   }
00248 }
00249 
00250 void SessionPrivate::startJob( Job *job )
00251 {
00252   if ( protocolVersion < minimumProtocolVersion() ) {
00253     job->setError( Job::ProtocolVersionMismatch );
00254     job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
00255     job->emitResult();
00256   } else {
00257     job->d_ptr->startQueued();
00258   }
00259 }
00260 
00261 void SessionPrivate::endJob( Job *job )
00262 {
00263   job->emitResult();
00264 }
00265 
00266 void SessionPrivate::jobDone(KJob * job)
00267 {
00268   // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
00269   // so don't call any methods on job itself
00270   if ( job == currentJob ) {
00271     if ( pipeline.isEmpty() ) {
00272       jobRunning = false;
00273       currentJob = 0;
00274     } else {
00275       currentJob = pipeline.dequeue();
00276     }
00277     startNext();
00278   } else {
00279     // non-current job finished, likely canceled while still in the queue
00280     queue.removeAll( static_cast<Akonadi::Job*>( job ) );
00281     // ### likely not enough to really cancel already running jobs
00282     pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
00283   }
00284 }
00285 
00286 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00287 {
00288   Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00289   Q_UNUSED( job );
00290 
00291   startNext();
00292 }
00293 
00294 void SessionPrivate::jobDestroyed(QObject * job)
00295 {
00296   // careful, accessing non-QObject methods of job will fail here already
00297   jobDone( static_cast<KJob*>( job ) );
00298 }
00299 
00300 void SessionPrivate::addJob(Job * job)
00301 {
00302   queue.append( job );
00303   QObject::connect( job, SIGNAL( result( KJob* ) ), mParent, SLOT( jobDone( KJob* ) ) );
00304   QObject::connect( job, SIGNAL( writeFinished( Akonadi::Job* ) ), mParent, SLOT( jobWriteFinished( Akonadi::Job* ) ) );
00305   QObject::connect( job, SIGNAL( destroyed( QObject* ) ), mParent, SLOT( jobDestroyed( QObject* ) ) );
00306   startNext();
00307 }
00308 
00309 int SessionPrivate::nextTag()
00310 {
00311   return theNextTag++;
00312 }
00313 
00314 void SessionPrivate::writeData(const QByteArray & data)
00315 {
00316   if ( socket )
00317     socket->write( data );
00318   else
00319     kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
00320 }
00321 
00322 void SessionPrivate::serverStateChanged( ServerManager::State state )
00323 {
00324   if ( state == ServerManager::Running && !connected )
00325     reconnect();
00326 }
00327 
00328 //@endcond
00329 
00330 
00331 SessionPrivate::SessionPrivate( Session *parent )
00332     : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
00333 {
00334 }
00335 
00336 void SessionPrivate::init( const QByteArray &id )
00337 {
00338   kDebug() << id;
00339   parser = new ImapParser();
00340 
00341   if ( !id.isEmpty() ) {
00342     sessionId = id;
00343   } else {
00344     sessionId = QCoreApplication::instance()->applicationName().toUtf8()
00345         + '-' + QByteArray::number( qrand() );
00346   }
00347 
00348   connected = false;
00349   theNextTag = 1;
00350   jobRunning = false;
00351 
00352   if ( ServerManager::state() == ServerManager::NotRunning )
00353     ServerManager::start();
00354   mParent->connect( ServerManager::self(), SIGNAL( stateChanged( Akonadi::ServerManager::State ) ),
00355                     SLOT( serverStateChanged( Akonadi::ServerManager::State ) ) );
00356 
00357   reconnect();
00358 }
00359 
00360 Session::Session(const QByteArray & sessionId, QObject * parent) :
00361     QObject( parent ),
00362     d( new SessionPrivate( this ) )
00363 {
00364   d->init( sessionId );
00365 }
00366 
00367 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
00368     : QObject( parent ),
00369     d( dd )
00370 {
00371   d->init( sessionId );
00372 }
00373 
00374 Session::~Session()
00375 {
00376   clear();
00377   delete d;
00378 }
00379 
00380 QByteArray Session::sessionId() const
00381 {
00382   return d->sessionId;
00383 }
00384 
00385 QThreadStorage<Session*> instances;
00386 
00387 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00388 {
00389   Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00390               "You tried to create a default session with empty session id!" );
00391   Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00392               "You tried to create a default session twice!" );
00393 
00394   instances.setLocalData( new Session( sessionId ) );
00395 }
00396 
00397 Session* Session::defaultSession()
00398 {
00399   if ( !instances.hasLocalData() )
00400     instances.setLocalData( new Session() );
00401   return instances.localData();
00402 }
00403 
00404 void Session::clear()
00405 {
00406   foreach ( Job* job, d->queue )
00407     job->kill( KJob::EmitResult );
00408   d->queue.clear();
00409   foreach ( Job* job, d->pipeline )
00410     job->kill( KJob::EmitResult );
00411   d->pipeline.clear();
00412   if ( d->currentJob )
00413     d->currentJob->kill( KJob::EmitResult );
00414   d->jobRunning = false;
00415   d->connected = false;
00416   if ( d->socket )
00417     d->socket->disconnect( this ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
00418   delete d->socket;
00419   d->socket = 0;
00420   QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
00421 }
00422 
00423 #include "session.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.1
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal