diff --git a/src/core/session.cpp b/src/core/session.cpp index 5fe32f2e7..073671e99 100644 --- a/src/core/session.cpp +++ b/src/core/session.cpp @@ -1,432 +1,432 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "session.h" #include "session_p.h" #include "job.h" #include "job_p.h" #include "servermanager.h" #include "servermanager_p.h" #include "protocolhelper_p.h" #include "sessionthread_p.h" #include "private/standarddirs_p.h" #include "private/protocol_p.h" #include "akonadicore_debug.h" #include #include #include #include #include #include #include // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still // sends responses for the next one to the already finished one #define PIPELINE_LENGTH 0 //#define PIPELINE_LENGTH 2 using namespace Akonadi; //@cond PRIVATE void SessionPrivate::startNext() { QTimer::singleShot(0, mParent, [this]() { doStartNext(); }); } void SessionPrivate::reconnect() { if (!connection) { connection = sessionThread()->createConnection(Connection::CommandConnection, sessionId, &mCommandBuffer); mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection); mParent->connect(connection, SIGNAL(socketDisconnected()), mParent, SLOT(socketDisconnected()), Qt::QueuedConnection); mParent->connect(connection, SIGNAL(socketError(QString)), mParent, SLOT(socketError(QString)), Qt::QueuedConnection); } connection->reconnect(); } void SessionPrivate::socketError(const QString &error) { qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error; socketDisconnected(); } void SessionPrivate::socketDisconnected() { if (currentJob) { currentJob->d_ptr->lostConnection(); } connected = false; } bool SessionPrivate::handleCommands() { CommandBufferLocker lock(&mCommandBuffer); CommandBufferNotifyBlocker notify(&mCommandBuffer); while (!mCommandBuffer.isEmpty()) { const auto command = mCommandBuffer.dequeue(); lock.unlock(); const auto cmd = command.command; const auto tag = command.tag; // Handle Hello response -> send Login if (cmd->type() == Protocol::Command::Hello) { const auto &hello = Protocol::cmdCast(cmd); if (hello.isError()) { qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage(); connection->closeConnection(); QTimer::singleShot(1000, connection, &Connection::reconnect); return false; } qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion(); qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation(); qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message(); // Version mismatch is handled in SessionPrivate::startJob() so that // we can report the error out via KJob API protocolVersion = hello.protocolVersion(); Internal::setServerProtocolVersion(protocolVersion); Internal::setGeneration(hello.generation()); sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId)); } else if (cmd->type() == Protocol::Command::Login) { const auto &login = Protocol::cmdCast(cmd); if (login.isError()) { qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage(); connection->closeConnection(); QTimer::singleShot(1000, mParent, SLOT(reconnect())); return false; } connected = true; startNext(); } else if (currentJob) { currentJob->d_ptr->handleResponse(tag, cmd); } lock.relock(); } return true; } bool SessionPrivate::canPipelineNext() { if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) { return false; } if (pipeline.isEmpty() && currentJob) { return currentJob->d_ptr->mWriteFinished; } if (!pipeline.isEmpty()) { return pipeline.last()->d_ptr->mWriteFinished; } return false; } void SessionPrivate::doStartNext() { if (!connected || (queue.isEmpty() && pipeline.isEmpty())) { return; } if (canPipelineNext()) { Akonadi::Job *nextJob = queue.dequeue(); pipeline.enqueue(nextJob); startJob(nextJob); } if (jobRunning) { return; } jobRunning = true; if (!pipeline.isEmpty()) { currentJob = pipeline.dequeue(); } else { currentJob = queue.dequeue(); startJob(currentJob); } } void SessionPrivate::startJob(Job *job) { if (protocolVersion != Protocol::version()) { job->setError(Job::ProtocolVersionMismatch); if (protocolVersion < Protocol::version()) { job->setErrorText(i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). " "If you updated your system recently please restart the Akonadi server.", protocolVersion, Protocol::version())); qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version() << "). " "If you updated your system recently please restart the Akonadi server."; } else { job->setErrorText(i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). " "If you updated your system recently please restart all KDE PIM applications.", protocolVersion, Protocol::version())); qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version() << "). " "If you updated your system recently please restart all KDE PIM applications."; } job->emitResult(); } else { job->d_ptr->startQueued(); } } void SessionPrivate::endJob(Job *job) { job->emitResult(); } void SessionPrivate::jobDone(KJob *job) { // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below) // so don't call any methods on job itself if (job == currentJob) { if (pipeline.isEmpty()) { jobRunning = false; currentJob = nullptr; } else { currentJob = pipeline.dequeue(); } startNext(); } else { // non-current job finished, likely canceled while still in the queue queue.removeAll(static_cast(job)); // ### likely not enough to really cancel already running jobs pipeline.removeAll(static_cast(job)); } } void SessionPrivate::jobWriteFinished(Akonadi::Job *job) { Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last())); Q_UNUSED(job); startNext(); } void SessionPrivate::jobDestroyed(QObject *job) { // careful, accessing non-QObject methods of job will fail here already jobDone(static_cast(job)); } void SessionPrivate::addJob(Job *job) { queue.append(job); QObject::connect(job, SIGNAL(finished(KJob*)), mParent, SLOT(jobDone(KJob*))); QObject::connect(job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*))); QObject::connect(job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*))); startNext(); } qint64 SessionPrivate::nextTag() { return theNextTag++; } void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command) { connection->sendCommand(tag, command); } void SessionPrivate::serverStateChanged(ServerManager::State state) { if (state == ServerManager::Running && !connected) { reconnect(); } else if (!connected && state == ServerManager::Broken) { // If the server is broken, cancel all pending jobs, otherwise they will be // blocked forever and applications waiting for them to finish would be stuck for (Job *job : qAsConst(queue)) { job->setError(Job::ConnectionFailed); job->kill(KJob::EmitResult); } } else if (state == ServerManager::Stopping) { delete connection; connection = nullptr; } } void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision) { // only deal with the queue, for the guys in the pipeline it's too late already anyway // and they shouldn't have gotten there if they depend on a preceding job anyway. for (Job *job : qAsConst(queue)) { job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision); } } //@endcond SessionPrivate::SessionPrivate(Session *parent) : mParent(parent) , mSessionThread(new SessionThread) , connection(nullptr) , protocolVersion(0) , mCommandBuffer(parent, "handleCommands") , currentJob(nullptr) { // Shutdown the thread before QApplication event loop quits - the // thread()->wait() mechanism in Connection dtor crashes sometimes // when called from QApplication destructor connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, [this]() { delete mSessionThread; mSessionThread = nullptr; }); } SessionPrivate::~SessionPrivate() { QObject::disconnect(connThreadCleanUp); delete mSessionThread; } void SessionPrivate::init(const QByteArray &id) { qCDebug(AKONADICORE_LOG) << id; if (!id.isEmpty()) { sessionId = id; } else { sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number(qrand()); } connected = false; theNextTag = 2; jobRunning = false; if (ServerManager::state() == ServerManager::NotRunning) { ServerManager::start(); } mParent->connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)), SLOT(serverStateChanged(Akonadi::ServerManager::State))); reconnect(); } void SessionPrivate::forceReconnect() { jobRunning = false; connected = false; if (connection) { connection->forceReconnect(); } #if QT_VERSION >= QT_VERSION_CHECK(5, 10, 0) - QMetaObject::invokeMethod(mParent, &Session::reconnected, Qt::QueuedConnection); + QMetaObject::invokeMethod(mParent, [this]() { reconnect(); }, Qt::QueuedConnection); #else QMetaObject::invokeMethod(mParent, "reconnect", Qt::QueuedConnection); #endif } Session::Session(const QByteArray &sessionId, QObject *parent) : QObject(parent) , d(new SessionPrivate(this)) { d->init(sessionId); } Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent) : QObject(parent) , d(dd) { d->mParent = this; d->init(sessionId); } Session::~Session() { d->clear(false); delete d; } QByteArray Session::sessionId() const { return d->sessionId; } Q_GLOBAL_STATIC(QThreadStorage, instances) static void cleanupDefaultSession() { instances()->setLocalData(nullptr); } void SessionPrivate::createDefaultSession(const QByteArray &sessionId) { Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!"); Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!"); Session *session = new Session(sessionId); instances()->setLocalData(session); qAddPostRoutine(cleanupDefaultSession); } void SessionPrivate::setDefaultSession(Session *session) { instances()->setLocalData(session); } Session *Session::defaultSession() { if (!instances()->hasLocalData()) { Session *session = new Session(); instances()->setLocalData(session); qAddPostRoutine(cleanupDefaultSession); } return instances()->localData(); } void Session::clear() { d->clear(true); } void SessionPrivate::clear(bool forceReconnect) { for (Job *job : qAsConst(queue)) { job->kill(KJob::EmitResult); // safe, not started yet } queue.clear(); for (Job *job : qAsConst(pipeline)) { job->d_ptr->mStarted = false; // avoid killing/reconnect loops job->kill(KJob::EmitResult); } pipeline.clear(); if (currentJob) { currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops currentJob->kill(KJob::EmitResult); } if (forceReconnect) { this->forceReconnect(); } } #include "moc_session.cpp"