diff --git a/akonadi/session.cpp b/akonadi/session.cpp index 976a18bdf..8f8495efc 100644 --- a/akonadi/session.cpp +++ b/akonadi/session.cpp @@ -1,421 +1,423 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "session.h" #include "session_p.h" #include "imapparser_p.h" #include "job.h" #include "job_p.h" #include "servermanager.h" #include "servermanager_p.h" #include "xdgbasedirs_p.h" #include #include #include #include #include #include #include #include #include #include // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still // sends responses for the next one to the already finished one #define PIPELINE_LENGTH 0 //#define PIPELINE_LENGTH 2 using namespace Akonadi; //@cond PRIVATE void SessionPrivate::startNext() { QTimer::singleShot( 0, mParent, SLOT( doStartNext() ) ); } void SessionPrivate::reconnect() { QLocalSocket *localSocket = qobject_cast( socket ); if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState || localSocket->state() == QLocalSocket::ConnectingState ) ) { // nothing to do, we are still/already connected return; } QTcpSocket *tcpSocket = qobject_cast( socket ); if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState || tcpSocket->state() == QTcpSocket::ConnectingState ) ) { // same here, but for TCP return; } // try to figure out where to connect to QString serverAddress; quint16 port = 0; bool useTcp = false; // env var has precedence const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" ); if ( !serverAddressEnvVar.isEmpty() ) { const int pos = serverAddressEnvVar.indexOf( ':' ); const QByteArray protocol = serverAddressEnvVar.left( pos ); QMap options; foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) { const QStringList pair = entry.split( QLatin1Char('=') ); if ( pair.size() != 2 ) continue; options.insert( pair.first(), pair.last() ); } kDebug() << protocol << options; if ( protocol == "tcp" ) { serverAddress = options.value( QLatin1String( "host" ) ); port = options.value( QLatin1String( "port" ) ).toUInt(); useTcp = true; } else if ( protocol == "unix" ) { serverAddress = options.value( QLatin1String( "path" ) ); } else if ( protocol == "pipe" ) { serverAddress = options.value( QLatin1String( "name" ) ); } } // try config file next, fall back to defaults if that fails as well if ( serverAddress.isEmpty() ) { const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile(); const QFileInfo fileInfo( connectionConfigFile ); if ( !fileInfo.exists() ) { kDebug() << "Akonadi Client Session: connection config file '" "akonadi/akonadiconnectionrc' can not be found in" << XdgBaseDirs::homePath( "config" ) << "nor in any of" << XdgBaseDirs::systemPathList( "config" ); } const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat ); #ifdef Q_OS_WIN //krazy:exclude=cpp serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString(); #else const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) ); serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString(); #endif } // create sockets if not yet done, note that this does not yet allow changing socket types on the fly // but that's probably not something we need to support anyway if ( !socket ) { if ( !useTcp ) { socket = localSocket = new QLocalSocket( mParent ); mParent->connect( localSocket, SIGNAL( error( QLocalSocket::LocalSocketError ) ), SLOT( socketError( QLocalSocket::LocalSocketError ) ) ); } else { socket = tcpSocket = new QTcpSocket( mParent ); mParent->connect( tcpSocket, SIGNAL( error( QAbstractSocket::SocketError ) ), SLOT( socketError( QAbstractSocket::SocketError ) ) ); } mParent->connect( socket, SIGNAL( disconnected() ), SLOT( socketDisconnected() ) ); mParent->connect( socket, SIGNAL( readyRead() ), SLOT( dataReceived() ) ); } // actually do connect kDebug() << "connectToServer" << serverAddress; if ( !useTcp ) { localSocket->connectToServer( serverAddress ); } else { tcpSocket->connectToHost( serverAddress, port ); } } void SessionPrivate::socketError( QLocalSocket::LocalSocketError ) { Q_ASSERT( mParent->sender() == socket ); kWarning() << "Socket error occurred:" << qobject_cast( socket )->errorString(); socketDisconnected(); } void SessionPrivate::socketError( QAbstractSocket::SocketError ) { Q_ASSERT( mParent->sender() == socket ); kWarning() << "Socket error occurred:" << qobject_cast( socket )->errorString(); socketDisconnected(); } void SessionPrivate::socketDisconnected() { if ( currentJob ) currentJob->d_ptr->lostConnection(); connected = false; QTimer::singleShot( 30000, mParent, SLOT( reconnect() ) ); } void SessionPrivate::dataReceived() { while ( socket->bytesAvailable() > 0 ) { if ( parser->continuationSize() > 1 ) { const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) ); parser->parseBlock( data ); } else if ( socket->canReadLine() ) { if ( !parser->parseNextLine( socket->readLine() ) ) continue; // response not yet completed // handle login response if ( parser->tag() == QByteArray( "0" ) ) { if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings connected = true; startNext(); } else { kWarning() << "Unable to login to Akonadi server:" << parser->data(); socket->close(); QTimer::singleShot( 1000, mParent, SLOT( reconnect() ) ); } } // send login command if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) { const int pos = parser->data().indexOf( "[PROTOCOL" ); if ( pos > 0 ) { qint64 tmp = 0; ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 ); protocolVersion = tmp; Internal::setServerProtocolVersion( tmp ); } kDebug() << "Server protocol version is:" << protocolVersion; writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' ); // work for the current job } else { if ( currentJob ) currentJob->d_ptr->handleResponse( parser->tag(), parser->data() ); } // reset parser stuff parser->reset(); } else { break; // nothing we can do for now } } } bool SessionPrivate::canPipelineNext() { if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH ) return false; if ( pipeline.isEmpty() && currentJob ) return currentJob->d_ptr->mWriteFinished; if ( !pipeline.isEmpty() ) return pipeline.last()->d_ptr->mWriteFinished; return false; } void SessionPrivate::doStartNext() { if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) ) return; if ( canPipelineNext() ) { Akonadi::Job *nextJob = queue.dequeue(); pipeline.enqueue( nextJob ); startJob( nextJob ); } if ( jobRunning ) return; jobRunning = true; if ( !pipeline.isEmpty() ) { currentJob = pipeline.dequeue(); } else { currentJob = queue.dequeue(); startJob( currentJob ); } } void SessionPrivate::startJob( Job *job ) { if ( protocolVersion < minimumProtocolVersion() ) { job->setError( Job::ProtocolVersionMismatch ); job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) ); job->emitResult(); } else { job->d_ptr->startQueued(); } } void SessionPrivate::endJob( Job *job ) { job->emitResult(); } void SessionPrivate::jobDone(KJob * job) { // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below) // so don't call any methods on job itself if ( job == currentJob ) { if ( pipeline.isEmpty() ) { jobRunning = false; currentJob = 0; } else { currentJob = pipeline.dequeue(); } startNext(); } else { // non-current job finished, likely canceled while still in the queue queue.removeAll( static_cast( job ) ); // ### likely not enough to really cancel already running jobs pipeline.removeAll( static_cast( job ) ); } } void SessionPrivate::jobWriteFinished( Akonadi::Job* job ) { Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) ); Q_UNUSED( job ); startNext(); } void SessionPrivate::jobDestroyed(QObject * job) { // careful, accessing non-QObject methods of job will fail here already jobDone( static_cast( job ) ); } void SessionPrivate::addJob(Job * job) { queue.append( job ); QObject::connect( job, SIGNAL( result( KJob* ) ), mParent, SLOT( jobDone( KJob* ) ) ); QObject::connect( job, SIGNAL( writeFinished( Akonadi::Job* ) ), mParent, SLOT( jobWriteFinished( Akonadi::Job* ) ) ); QObject::connect( job, SIGNAL( destroyed( QObject* ) ), mParent, SLOT( jobDestroyed( QObject* ) ) ); startNext(); } int SessionPrivate::nextTag() { return theNextTag++; } void SessionPrivate::writeData(const QByteArray & data) { if ( socket ) socket->write( data ); else kWarning() << "Trying to write while session is disconnected!" << kBacktrace(); } void SessionPrivate::serverStateChanged( ServerManager::State state ) { if ( state == ServerManager::Running && !connected ) reconnect(); } //@endcond SessionPrivate::SessionPrivate( Session *parent ) : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 ) { } void SessionPrivate::init( const QByteArray &id ) { kDebug() << id; parser = new ImapParser(); if ( !id.isEmpty() ) { sessionId = id; } else { sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number( qrand() ); } connected = false; theNextTag = 1; jobRunning = false; if ( ServerManager::state() == ServerManager::NotRunning ) ServerManager::start(); mParent->connect( ServerManager::self(), SIGNAL( stateChanged( Akonadi::ServerManager::State ) ), SLOT( serverStateChanged( Akonadi::ServerManager::State ) ) ); reconnect(); } Session::Session(const QByteArray & sessionId, QObject * parent) : QObject( parent ), d( new SessionPrivate( this ) ) { d->init( sessionId ); } Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent) : QObject( parent ), d( dd ) { d->init( sessionId ); } Session::~Session() { clear(); delete d; } QByteArray Session::sessionId() const { return d->sessionId; } QThreadStorage instances; void SessionPrivate::createDefaultSession( const QByteArray &sessionId ) { Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!" ); Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!" ); instances.setLocalData( new Session( sessionId ) ); } Session* Session::defaultSession() { if ( !instances.hasLocalData() ) instances.setLocalData( new Session() ); return instances.localData(); } void Session::clear() { foreach ( Job* job, d->queue ) job->kill( KJob::EmitResult ); d->queue.clear(); foreach ( Job* job, d->pipeline ) job->kill( KJob::EmitResult ); d->pipeline.clear(); if ( d->currentJob ) d->currentJob->kill( KJob::EmitResult ); d->jobRunning = false; d->connected = false; + if ( d->socket ) + d->socket->disconnect( this ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage! delete d->socket; d->socket = 0; QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor } #include "session.moc"