diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index d66e0f82..0fccc492 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -1,380 +1,388 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "commandprocessor.h" #include #include "commands.h" #include "messagequeue.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" #include "definitions.h" #include "storage.h" #include "queuedcommand_generated.h" #include "revisionreplayed_generated.h" #include "synchronize_generated.h" static int sBatchSize = 100; // This interval directly affects the roundtrip time of single commands static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), mCommandQueues({&mUserQueue, &mSynchronizerQueue}), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { /* * This is a queued connection because otherwise we would execute CommandProcessor::process in the middle of * Synchronizer::commit, which is not what we want. */ const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process, Qt::QueuedConnection); Q_UNUSED(ret); } mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) { + // qWarning() << "Enquing command " << mq.name(); flatbuffers::FlatBufferBuilder fbb; auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); Sink::FinishQueuedCommandBuffer(fbb, buffer); mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); } void CommandProcessor::processCommand(int commandId, const QByteArray &data) { switch (commandId) { case Commands::FlushCommand: processFlushCommand(data); break; case Commands::SynchronizeCommand: processSynchronizeCommand(data); break; case Commands::AbortSynchronizationCommand: mSynchronizer->abort(); break; // case Commands::RevisionReplayedCommand: // processRevisionReplayedCommand(data); // break; default: { static int modifications = 0; mUserQueue.startTransaction(); - SinkTraceCtx(mLogCtx) << "Received a command" << commandId; + SinkWarningCtx(mLogCtx) << "Received a command" << commandId; enqueueCommand(mUserQueue, commandId, data); modifications++; if (modifications >= sBatchSize) { mUserQueue.commit(); modifications = 0; mCommitQueueTimer.stop(); } else { mCommitQueueTimer.start(); } } }; } void CommandProcessor::processFlushCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(data.constData()); const auto flushType = buffer->type(); const auto flushId = BufferUtils::extractBufferCopy(buffer->id()); SinkTraceCtx(mLogCtx) << "Received flush command " << flushId; if (flushType == Sink::Flush::FlushSynchronization) { mSynchronizer->flush(flushType, flushId); } else { mUserQueue.startTransaction(); enqueueCommand(mUserQueue, Commands::FlushCommand, data); mUserQueue.commit(); } } } void CommandProcessor::processSynchronizeCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { auto buffer = Sink::Commands::GetSynchronize(data.constData()); Sink::QueryBase query; if (buffer->query()) { auto data = QByteArray::fromStdString(buffer->query()->str()); QDataStream stream(&data, QIODevice::ReadOnly); stream >> query; } - mSynchronizer->synchronize(query); + //Avoid blocking this call + QMetaObject::invokeMethod(mSynchronizer.data(), [=] { + mSynchronizer->synchronize(query); + }, Qt::QueuedConnection); } else { SinkWarningCtx(mLogCtx) << "received invalid command"; } } // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) // { // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { // SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } void CommandProcessor::setOldestUsedRevision(qint64 revision) { mLowerBoundRevision = revision; } bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { return true; } } return false; } void CommandProcessor::process() { if (mProcessingLock) { return; } mProcessingLock = true; - auto job = processPipeline() - .then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }) - .exec(); + processPipeline() + .then([this]() { + mProcessingLock = false; + //TODO process some events? + if (messagesToProcessAvailable()) { + process(); + } + }) + .exec(); } KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand &queuedCommand) { SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand.commandId()); const auto data = queuedCommand.command()->Data(); const auto size = queuedCommand.command()->size(); switch (queuedCommand.commandId()) { case Sink::Commands::DeleteEntityCommand: return mPipeline->deletedEntity(data, size); case Sink::Commands::ModifyEntityCommand: return mPipeline->modifiedEntity(data, size); case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: Q_ASSERT(mInspector); return mInspector->processCommand(data, size) .then(KAsync::value(-1)); case Sink::Commands::FlushCommand: return flush(data, size) .then(KAsync::value(-1)); default: return KAsync::error(-1, "Unhandled command"); } } KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { SinkWarningCtx(mLogCtx) << "invalid buffer"; - // return KAsync::error(1, "Invalid Buffer"); + return KAsync::error(-1, "Invalid Buffer"); } - auto queuedCommand = Sink::GetQueuedCommand(data.constData()); + const auto queuedCommand = Sink::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); return processQueuedCommand(*queuedCommand) .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } // Process one batch of messages from this queue KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); return KAsync::start([=] { mPipeline->startTransaction(); }) .then([=] { return queue->dequeueBatch(sBatchSize, [=](const QByteArray &data) { time->start(); return processQueuedCommand(data) .then([=](qint64 createdRevision) { SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) .then([=](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } }); }) .then([=](const KAsync::Error &) { mPipeline->commit(); //The flushed content has been persistet, we can notify the world for (const auto &flushId : mCompleteFlushes) { SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; mSynchronizer->flushComplete(flushId); Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; emit notify(n); } mCompleteFlushes.clear(); }); } KAsync::Job CommandProcessor::processPipeline() { auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { return KAsync::null(); } return KAsync::doWhile([this]() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { + // SinkLogCtx(mLogCtx) << "Processing queue" << queue->name(); auto time = QSharedPointer::create(); time->start(); return processQueue(queue) .then([=] { - SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); + SinkLogCtx(mLogCtx) << "Queue processed." << queue->name() << Log::TraceTime(time->elapsed()); return KAsync::Continue; }); + } else { + // SinkLogCtx(mLogCtx) << "Nothing in queue" << queue->name(); } } return KAsync::value(KAsync::Break); }); } void CommandProcessor::setInspector(const QSharedPointer &inspector) { mInspector = inspector; QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); } void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } KAsync::Job CommandProcessor::flush(void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(command); const auto flushType = buffer->type(); const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Flushing synchronizer " << flushId; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { //Defer notification until the results have been comitted mCompleteFlushes << flushId; } return KAsync::null(); } return KAsync::error(-1, "Invalid flush command."); } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) { if (queue.isEmpty()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(&queue, &MessageQueue::drained, context, [&f, context]() { delete context; f.setFinished(); }); } }; KAsync::Job CommandProcessor::processAllMessages() { // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. // TODO: report errors while processing sync? // TODO JOBAPI: A helper that waits for n events and then continues? return KAsync::start([this](KAsync::Future &f) { if (mCommitQueueTimer.isActive()) { auto context = new QObject; QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { delete context; f.setFinished(); }); } else { f.setFinished(); } }) .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { if (mSynchronizer->ChangeReplay::allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); mSynchronizer->replayNextRevision().exec(); } }); } diff --git a/common/listener.cpp b/common/listener.cpp index 7fbfca9e..690900ea 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -1,481 +1,484 @@ /* * Copyright (C) 2014 Aaron Seigo * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "listener.h" #include "common/commands.h" #include "common/resource.h" #include "common/log.h" #include "common/definitions.h" #include "common/resourcecontext.h" #include "common/adaptorfactoryregistry.h" #include "common/bufferutils.h" // commands #include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" #include "common/notification_generated.h" #include "common/revisionreplayed_generated.h" #include "common/secret_generated.h" #include #include #include #include Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), m_resourceName(resourceType), m_resourceInstanceIdentifier(resourceInstanceIdentifier), m_clientBufferProcessesTimer(new QTimer), m_checkConnectionsTimer(new QTimer), m_messageId(0), m_exiting(false) { connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier; m_checkConnectionsTimer->setSingleShot(true); connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { if (m_connections.isEmpty()) { SinkTrace() << QString("No connections, shutting down."); quit(); } }); //Give plenty of time during the first start. m_checkConnectionsTimer->start(std::chrono::milliseconds{60000}); // TODO: experiment with different timeouts // or even just drop down to invoking the method queued? => invoke queued unless we need throttling m_clientBufferProcessesTimer->setInterval(0); m_clientBufferProcessesTimer->setSingleShot(true); connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers); if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { m_server->removeServer(m_resourceInstanceIdentifier); if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { SinkWarning() << "Utter failure to start server"; exit(-1); } } if (m_server->isListening()) { SinkTrace() << QString("Listening on %1").arg(m_server->serverName()); } } Listener::~Listener() { SinkTrace() << "Shutting down " << m_resourceInstanceIdentifier; closeAllConnections(); } void Listener::checkForUpgrade() { if (loadResource().checkForUpgrade()) { //Close the resource to ensure no transactions are open m_resource.reset(nullptr); } } void Listener::emergencyAbortAllConnections() { for (Client &client : m_connections) { if (client.socket) { SinkWarning() << "Sending panic"; client.socket->write("PANIC"); client.socket->waitForBytesWritten(); disconnect(client.socket, nullptr, this, nullptr); client.socket->abort(); delete client.socket; client.socket = nullptr; } } m_connections.clear(); } void Listener::closeAllConnections() { for (Client &client : m_connections) { if (client.socket) { disconnect(client.socket, nullptr, this, nullptr); client.socket->flush(); client.socket->close(); delete client.socket; client.socket = nullptr; } } m_connections.clear(); } void Listener::acceptConnection() { SinkTrace() << "Accepting connection"; QLocalSocket *socket = m_server->nextPendingConnection(); if (!socket) { SinkWarning() << "Accepted connection but didn't get a socket for it"; return; } m_connections << Client("Unknown Client", socket); connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable); connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped); m_checkConnectionsTimer->stop(); // If this is the first client, set the lower limit for revision cleanup if (m_connections.size() == 1) { loadResource().setLowerBoundRevision(0); } if (socket->bytesAvailable()) { readFromSocket(socket); } } void Listener::clientDropped() { QLocalSocket *socket = qobject_cast(sender()); if (!socket) { return; } bool dropped = false; QMutableVectorIterator it(m_connections); while (it.hasNext()) { const Client &client = it.next(); if (client.socket == socket) { dropped = true; SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket; it.remove(); break; } } if (!dropped) { SinkWarning() << "Failed to find connection for disconnected socket: " << socket; } checkConnections(); } void Listener::checkConnections() { // If this was the last client, disengage the lower limit for revision cleanup if (m_connections.isEmpty()) { loadResource().setLowerBoundRevision(std::numeric_limits::max()); } m_checkConnectionsTimer->start(std::chrono::milliseconds{1000}); } +//FIXME looks like this does not even get invoked under heavy load, because readFromSocket is never called void Listener::onDataAvailable() { QLocalSocket *socket = qobject_cast(sender()); if (!socket || m_exiting) { return; } readFromSocket(socket); } void Listener::readFromSocket(QLocalSocket *socket) { - SinkTrace() << "Reading from socket..."; + SinkLog() << "Reading from socket..."; for (Client &client : m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); if (!m_clientBufferProcessesTimer->isActive()) { m_clientBufferProcessesTimer->start(); } break; } } + SinkLog() << "Finished reading from socket..."; } void Listener::processClientBuffers() { // TODO: we should not process all clients, but iterate async over them and process // one command from each in turn to ensure all clients get fair handling of // commands? bool again = false; for (Client &client : m_connections) { if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { continue; } if (processClientBuffer(client)) { again = true; } } if (again) { m_clientBufferProcessesTimer->start(); } } void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function &callback) { bool success = true; switch (commandId) { case Sink::Commands::HandshakeCommand: { flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifyHandshakeBuffer(verifier)) { auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); client.name = buffer->name()->c_str(); } else { SinkWarning() << "received invalid command"; } break; } case Sink::Commands::SecretCommand: { flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifySecretBuffer(verifier)) { auto buffer = Sink::Commands::GetSecret(commandBuffer.constData()); loadResource().setSecret(QString{buffer->secret()->c_str()}); } else { SinkWarning() << "received invalid command"; } break; } case Sink::Commands::SynchronizeCommand: case Sink::Commands::InspectionCommand: case Sink::Commands::DeleteEntityCommand: case Sink::Commands::ModifyEntityCommand: case Sink::Commands::CreateEntityCommand: case Sink::Commands::FlushCommand: case Sink::Commands::AbortSynchronizationCommand: - SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; + SinkLog() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; loadResource().processCommand(commandId, commandBuffer); + SinkLog() << "Command processed " << messageId; break; case Sink::Commands::ShutdownCommand: SinkLog() << QString("Received shutdown command from %1").arg(client.name); m_exiting = true; break; case Sink::Commands::PingCommand: SinkTrace() << QString("Received ping command from %1").arg(client.name); break; case Sink::Commands::RevisionReplayedCommand: { SinkTrace() << QString("Received revision replayed command from %1").arg(client.name); flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); client.currentRevision = buffer->revision(); } else { SinkWarning() << "received invalid command"; } loadResource().setLowerBoundRevision(lowerBoundRevision()); } break; case Sink::Commands::RemoveFromDiskCommand: { SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); //Close the resource to ensure no transactions are open m_resource.reset(nullptr); if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { resourceFactory->removeDataFromDisk(m_resourceInstanceIdentifier); } m_exiting = true; } break; case Sink::Commands::UpgradeCommand: //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes. break; default: if (commandId > Sink::Commands::CustomCommand) { SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; loadResource().processCommand(commandId, commandBuffer); } else { success = false; SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; } break; } callback(success); } qint64 Listener::lowerBoundRevision() { qint64 lowerBound = 0; for (const Client &c : m_connections) { if (c.currentRevision > 0) { if (lowerBound == 0) { lowerBound = c.currentRevision; } else { lowerBound = qMin(c.currentRevision, lowerBound); } } } return lowerBound; } void Listener::sendShutdownNotification() { // Broadcast shutdown notifications to open clients, so they don't try to restart the resource auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { if (client.socket && client.socket->isOpen()) { Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); } } } void Listener::quit() { SinkTrace() << "Quitting " << m_resourceInstanceIdentifier; m_clientBufferProcessesTimer->stop(); m_server->close(); sendShutdownNotification(); closeAllConnections(); m_fbb.Clear(); QTimer::singleShot(0, this, [this]() { // This will destroy this object emit noClients(); }); } bool Listener::processClientBuffer(Client &client) { static const int headerSize = Sink::Commands::headerSize(); if (client.commandBuffer.size() < headerSize) { return false; } const uint messageId = *(const uint *)client.commandBuffer.constData(); const int commandId = *(const int *)(client.commandBuffer.constData() + sizeof(uint)); const uint size = *(const uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); - SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; + SinkLog() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; // TODO: reject messages above a certain size? const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); if (commandComplete) { client.commandBuffer.remove(0, headerSize); auto socket = QPointer(client.socket); auto clientName = client.name; const QByteArray commandBuffer = client.commandBuffer.left(size); client.commandBuffer.remove(0, size); processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { - SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); + SinkLog() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); if (socket) { sendCommandCompleted(socket.data(), messageId, success); } else { SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); } }); if (m_exiting) { quit(); return false; } return client.commandBuffer.size() >= headerSize; } return false; } void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success) { if (!socket || !socket->isValid()) { return; } auto command = Sink::Commands::CreateCommandCompletion(m_fbb, messageId, success); Sink::Commands::FinishCommandCompletionBuffer(m_fbb, command); Sink::Commands::write(socket, ++m_messageId, Sink::Commands::CommandCompletionCommand, m_fbb); if (m_exiting) { socket->waitForBytesWritten(); } m_fbb.Clear(); } void Listener::refreshRevision(qint64 revision) { updateClientsWithRevision(revision); } void Listener::updateClientsWithRevision(qint64 revision) { auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); for (const Client &client : m_connections) { if (!client.socket || !client.socket->isValid()) { continue; } SinkTrace() << "Sending revision update for " << client.name << revision; Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); client.socket->flush(); } m_fbb.Clear(); } void Listener::notify(const Sink::Notification ¬ification) { auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); auto entities = Sink::BufferUtils::toVector(m_fbb, notification.entities); Sink::Commands::NotificationBuilder builder(m_fbb); builder.add_type(notification.type); builder.add_code(notification.code); builder.add_identifier(idString); builder.add_message(messageString); builder.add_entities(entities); builder.add_progress(notification.progress); builder.add_total(notification.total); auto command = builder.Finish(); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { if (client.socket && client.socket->isOpen()) { Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); } } m_fbb.Clear(); } Sink::Resource &Listener::loadResource() { if (!m_resource) { if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { m_resource = std::unique_ptr(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)})); if (!m_resource) { SinkError() << "Failed to instantiate the resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); } else { SinkError() << "Failed to load resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } } Q_ASSERT(m_resource); return *m_resource; } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_listener.cpp" #pragma clang diagnostic pop diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index 2bd5d746..6857a362 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp @@ -1,172 +1,172 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "mailpreprocessor.h" #include #include #include #include #include #include #include "pipeline.h" #include "fulltextindex.h" #include "definitions.h" #include "applicationdomaintype.h" using namespace Sink; static Sink::ApplicationDomain::Mail::Contact getContact(const KMime::Headers::Generics::MailboxList *header) { const auto name = header->displayNames().isEmpty() ? QString() : header->displayNames().first(); const auto address = header->addresses().isEmpty() ? QString() : header->addresses().first(); return Sink::ApplicationDomain::Mail::Contact{name, address}; } static QList getContactList(const KMime::Headers::Generics::AddressList *header) { QList list; for (const auto &mb : header->mailboxes()) { list << Sink::ApplicationDomain::Mail::Contact{mb.name(), mb.address()}; } return list; } static QList> processPart(KMime::Content* content) { if (KMime::Headers::ContentType* type = content->contentType(false)) { if (type->isMultipart() && !type->isSubtype("encrypted")) { QList> list; for (const auto c : content->contents()) { list << processPart(c); } return list; } else if (type->isHTMLText()) { //QTextDocument has an implicit runtime dependency on QGuiApplication via the color palette. //If the QGuiApplication is not available we will crash (if the html contains colors). Q_ASSERT(QGuiApplication::instance()); // Only get HTML content, if no plain text content QTextDocument doc; doc.setHtml(content->decodedText()); return {{{}, {doc.toPlainText()}}}; } else if (type->isEmpty()) { return {{{}, {content->decodedText()}}}; } } return {}; } QByteArray normalizeMessageId(const QByteArray &id) { return id; } void MailPropertyExtractor::updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, const QByteArray &data) { if (data.isEmpty()) { return; } auto msg = KMime::Message::Ptr(new KMime::Message); msg->setContent(KMime::CRLFtoLF(data)); msg->parse(); if (!msg) { return; } mail.setExtractedSubject(msg->subject(true)->asUnicodeString()); mail.setExtractedSender(getContact(msg->from(true))); mail.setExtractedTo(getContactList(msg->to(true))); mail.setExtractedCc(getContactList(msg->cc(true))); mail.setExtractedBcc(getContactList(msg->bcc(true))); mail.setExtractedDate(msg->date(true)->dateTime()); const auto parentMessageIds = [&] { //The last is the parent auto references = msg->references(true)->identifiers(); if (!references.isEmpty()) { QByteArrayList list; std::transform(references.constBegin(), references.constEnd(), std::back_inserter(list), [] (const QByteArray &id) { return normalizeMessageId(id); }); return list; } else { auto inReplyTo = msg->inReplyTo(true)->identifiers(); if (!inReplyTo.isEmpty()) { //According to RFC5256 we should ignore all but the first return QByteArrayList{normalizeMessageId(inReplyTo.first())}; } } return QByteArrayList{}; }(); //The rest should never change, unless we didn't have the headers available initially. auto messageId = normalizeMessageId(msg->messageID(true)->identifier()); if (messageId.isEmpty()) { //reuse an existing messageid (on modification) const auto existing = mail.getMessageId(); if (existing.isEmpty()) { auto tmp = KMime::Message::Ptr::create(); //Genereate a globally unique messageid that doesn't leak the local hostname messageId = QString{"<" + QUuid::createUuid().toString().mid(1, 36).remove('-') + "@sink>"}.toLatin1(); tmp->messageID(true)->fromUnicodeString(messageId, "utf-8"); - SinkWarning() << "Message id is empty, generating one: " << messageId; + // SinkWarning() << "Message id is empty, generating one: " << messageId; } else { messageId = existing; } } mail.setExtractedMessageId(messageId); if (!parentMessageIds.isEmpty()) { mail.setExtractedParentMessageIds(parentMessageIds); } QList> contentToIndex; contentToIndex.append({{}, msg->subject()->asUnicodeString()}); if (KMime::Content* mainBody = msg->mainBodyPart("text/plain")) { contentToIndex.append({{}, mainBody->decodedText()}); } else { contentToIndex << processPart(msg.data()); } const auto sender = mail.getSender(); contentToIndex.append({{}, sender.name}); contentToIndex.append({{}, sender.emailAddress}); for (const auto &c : mail.getTo()) { contentToIndex.append({{}, c.name}); contentToIndex.append({{}, c.emailAddress}); } for (const auto &c : mail.getCc()) { contentToIndex.append({{}, c.name}); contentToIndex.append({{}, c.emailAddress}); } for (const auto &c : mail.getBcc()) { contentToIndex.append({{}, c.name}); contentToIndex.append({{}, c.emailAddress}); } //Prepare content for indexing; mail.setProperty("index", QVariant::fromValue(contentToIndex)); } void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail) { updatedIndexedProperties(mail, mail.getMimeMessage()); } void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) { updatedIndexedProperties(newMail, newMail.getMimeMessage()); } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 469ae6c3..1460672e 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -1,506 +1,506 @@ /* * Copyright (C) 2014 Aaron Seigo * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "pipeline.h" #include #include #include #include #include "entity_generated.h" #include "metadata_generated.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "entitybuffer.h" #include "log.h" #include "domain/applicationdomaintype.h" #include "adaptorfactoryregistry.h" #include "definitions.h" #include "bufferutils.h" #include "storage/entitystore.h" #include "store.h" using namespace Sink; using namespace Sink::Storage; class Pipeline::Private { public: Private(const ResourceContext &context, const Sink::Log::Context &ctx) : logCtx{ctx.subContext("pipeline")}, resourceContext(context), entityStore(context, ctx), revisionChanged(false) { } Sink::Log::Context logCtx; ResourceContext resourceContext; Storage::EntityStore entityStore; QHash>> processors; bool revisionChanged; QTime transactionTime; int transactionItemCount; }; Pipeline::Pipeline(const ResourceContext &context, const Sink::Log::Context &ctx) : QObject(nullptr), d(new Private(context, ctx)) { //Create main store immediately on first start d->entityStore.initialize(); } Pipeline::~Pipeline() { } void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) { auto &list = d->processors[entityType]; list.clear(); for (auto p : processors) { p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this, &d->entityStore); list.append(QSharedPointer(p)); } } void Pipeline::startTransaction() { // TODO call for all types // But avoid doing it during cleanup // for (auto processor : d->processors[bufferType]) { // processor->startBatch(); // } SinkTraceCtx(d->logCtx) << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; d->entityStore.startTransaction(DataStore::ReadWrite); } void Pipeline::commit() { // TODO call for all types // But avoid doing it during cleanup // for (auto processor : d->processors[bufferType]) { // processor->finalize(); // } if (!d->revisionChanged) { d->entityStore.abortTransaction(); return; } const auto revision = d->entityStore.maxRevision(); const auto elapsed = d->transactionTime.elapsed(); - SinkTraceCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " + SinkLogCtx(d->logCtx) << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; d->entityStore.commitTransaction(); if (d->revisionChanged) { d->revisionChanged = false; emit revisionUpdated(revision); } } KAsync::Job Pipeline::newEntity(void const *command, size_t size) { d->transactionItemCount++; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyCreateEntityBuffer(verifyer)) { SinkWarningCtx(d->logCtx) << "invalid buffer, not a create entity buffer"; return KAsync::error(); } } auto createEntity = Commands::GetCreateEntity(command); const bool replayToSource = createEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); QByteArray key; if (createEntity->entityId()) { key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); if (!key.isEmpty() && d->entityStore.contains(bufferType, key)) { SinkErrorCtx(d->logCtx) << "An entity with this id already exists: " << key; return KAsync::error(); } } if (key.isEmpty()) { key = DataStore::generateUid(); } SinkTraceCtx(d->logCtx) << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; Q_ASSERT(!key.isEmpty()); { flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!VerifyEntityBuffer(verifyer)) { SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; return KAsync::error(); } } auto entity = GetEntity(createEntity->delta()->Data()); if (!entity->resource()->size() && !entity->local()->size()) { SinkWarningCtx(d->logCtx) << "No local and no resource buffer while trying to create entity."; return KAsync::error(); } auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType << d->resourceContext.resourceType; return KAsync::error(); } auto adaptor = adaptorFactory->createAdaptor(*entity); auto memoryAdaptor = QSharedPointer::create(); Sink::ApplicationDomain::copyBuffer(*adaptor, *memoryAdaptor); d->revisionChanged = true; auto revision = d->entityStore.maxRevision(); auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; o.setChangedProperties(o.availableProperties().toSet()); auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(o, o.availableProperties()); newEntity.setChangedProperties(newEntity.availableProperties().toSet()); foreach (const auto &processor, d->processors[bufferType]) { processor->newEntity(newEntity); } if (!d->entityStore.add(bufferType, newEntity, replayToSource)) { return KAsync::error(); } return KAsync::value(d->entityStore.maxRevision()); } template struct CreateHelper { KAsync::Job operator()(const ApplicationDomain::ApplicationDomainType &arg) const { return Sink::Store::create(T{arg}); } }; static KAsync::Job create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) { return TypeHelper{type}.operator(), const ApplicationDomain::ApplicationDomainType&>(newEntity); } KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) { d->transactionItemCount++; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyModifyEntityBuffer(verifyer)) { SinkWarningCtx(d->logCtx) << "invalid buffer, not a modify entity buffer"; return KAsync::error(); } } auto modifyEntity = Commands::GetModifyEntity(command); Q_ASSERT(modifyEntity); QList changeset; if (modifyEntity->modifiedProperties()) { changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); } else { SinkWarningCtx(d->logCtx) << "No changeset available"; } const qint64 baseRevision = modifyEntity->revision(); const bool replayToSource = modifyEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); SinkTraceCtx(d->logCtx) << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; if (bufferType.isEmpty() || key.isEmpty()) { SinkWarningCtx(d->logCtx) << "entity type or key " << bufferType << key; return KAsync::error(); } { flatbuffers::Verifier verifyer(reinterpret_cast(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); if (!VerifyEntityBuffer(verifyer)) { SinkWarningCtx(d->logCtx) << "invalid buffer, not an entity buffer"; return KAsync::error(); } } auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarningCtx(d->logCtx) << "no adaptor factory for type " << bufferType; return KAsync::error(); } auto diffEntity = GetEntity(modifyEntity->delta()->Data()); Q_ASSERT(diffEntity); Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)}; diff.setChangedProperties(changeset.toSet()); QByteArrayList deletions; if (modifyEntity->deletions()) { deletions = BufferUtils::fromVector(*modifyEntity->deletions()); } Sink::ApplicationDomain::ApplicationDomainType current; bool alreadyRemoved = false; d->entityStore.readLatest(bufferType, diff.identifier(), [&](const QByteArray &uid, const EntityBuffer &buffer) { if (buffer.operation() == Sink::Operation_Removal) { alreadyRemoved = true; } else { auto entity = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(buffer.entity())}; current = *Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); } }); if (alreadyRemoved) { SinkWarningCtx(d->logCtx) << "Tried to modify a removed entity: " << diff.identifier(); return KAsync::error(); } if (current.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); return KAsync::error(); } //We avoid overwriting local changes that haven't been played back yet with remote modifications QSet excludeProperties; if (!replayToSource) { //We assume this means the change is coming from the source already d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) { if (entity.metadataBuffer()) { if (auto metadata = GetMetadata(entity.metadataBuffer())) { if (metadata->operation() == Operation_Modification && metadata->modifiedProperties()) { excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet(); } } } }); } auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties); bool isMove = false; if (modifyEntity->targetResource()) { isMove = modifyEntity->removeEntity(); newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); } foreach (const auto &processor, d->processors[bufferType]) { bool exitLoop = false; const auto result = processor->process(Preprocessor::Modification, current, newEntity); switch (result.action) { case Preprocessor::MoveToResource: isMove = true; exitLoop = true; break; case Preprocessor::CopyToResource: isMove = true; exitLoop = true; break; case Preprocessor::DropModification: SinkTraceCtx(d->logCtx) << "Dropping modification"; return KAsync::error(); case Preprocessor::NoAction: case Preprocessor::DeleteEntity: default: break; } if (exitLoop) { break; } } //The entity is either being copied or moved if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { auto copy = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy(newEntity, newEntity.availableProperties()); copy.setResource(newEntity.resourceInstanceIdentifier()); copy.setChangedProperties(copy.availableProperties().toSet()); SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << copy.identifier() << copy.resourceInstanceIdentifier(); return create(bufferType, copy) .then([=](const KAsync::Error &error) { if (!error) { SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; if (isMove) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(current.identifier().toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); const auto data = BufferUtils::extractBuffer(fbb); deletedEntity(data, data.size()).exec(); } } else { SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); } }) .then([this] { return d->entityStore.maxRevision(); }); } d->revisionChanged = true; if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { return KAsync::error(); } return KAsync::value(d->entityStore.maxRevision()); } KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) { d->transactionItemCount++; { flatbuffers::Verifier verifyer(reinterpret_cast(command), size); if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { SinkWarningCtx(d->logCtx) << "invalid buffer, not a delete entity buffer"; return KAsync::error(); } } auto deleteEntity = Commands::GetDeleteEntity(command); const bool replayToSource = deleteEntity->replayToSource(); const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; const auto current = d->entityStore.readLatest(bufferType, key); foreach (const auto &processor, d->processors[bufferType]) { processor->deletedEntity(current); } d->revisionChanged = true; if (!d->entityStore.remove(bufferType, current, replayToSource)) { return KAsync::error(); } return KAsync::value(d->entityStore.maxRevision()); } void Pipeline::cleanupRevisions(qint64 revision) { //We have to set revisionChanged, otherwise a call to commit might abort //the transaction when not using the implicit internal transaction d->revisionChanged = d->entityStore.cleanupRevisions(revision); } class Preprocessor::Private { public: QByteArray resourceType; QByteArray resourceInstanceIdentifier; Pipeline *pipeline; Storage::EntityStore *entityStore; }; Preprocessor::Preprocessor() : d(new Preprocessor::Private) { } Preprocessor::~Preprocessor() { } void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline, Storage::EntityStore *entityStore) { d->resourceType = resourceType; d->resourceInstanceIdentifier = resourceInstanceIdentifier; d->pipeline = pipeline; d->entityStore = entityStore; } void Preprocessor::startBatch() { } void Preprocessor::finalizeBatch() { } void Preprocessor::newEntity(ApplicationDomain::ApplicationDomainType &newEntity) { } void Preprocessor::modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { } void Preprocessor::deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) { } Preprocessor::Result Preprocessor::process(Type type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType &diff) { switch(type) { case Creation: newEntity(diff); break; case Modification: modifiedEntity(current, diff); break; case Deletion: deletedEntity(current); break; default: break; } return {NoAction}; } QByteArray Preprocessor::resourceInstanceIdentifier() const { return d->resourceInstanceIdentifier; } Storage::EntityStore &Preprocessor::entityStore() const { return *d->entityStore; } void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource) { flatbuffers::FlatBufferBuilder entityFbb; auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, typeName); adaptorFactory->createBuffer(entity, entityFbb); const auto entityBuffer = BufferUtils::extractBuffer(entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(entity.identifier().toStdString()); auto type = fbb.CreateString(typeName.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); const auto data = BufferUtils::extractBuffer(fbb); d->pipeline->newEntity(data, data.size()).exec(); } void Preprocessor::deleteEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &typeName, bool replayToSource) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(entity.identifier().toStdString()); auto type = fbb.CreateString(typeName.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, entity.revision(), entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); const auto data = BufferUtils::extractBuffer(fbb); d->pipeline->deletedEntity(data, data.size()).exec(); } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_pipeline.cpp" #pragma clang diagnostic pop diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index e8cf24ba..739a0168 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -1,763 +1,763 @@ /* * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "resourceaccess.h" #include "common/commands.h" #include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" #include "common/synchronize_generated.h" #include "common/notification_generated.h" #include "common/createentity_generated.h" #include "common/modifyentity_generated.h" #include "common/deleteentity_generated.h" #include "common/revisionreplayed_generated.h" #include "common/inspection_generated.h" #include "common/flush_generated.h" #include "common/secret_generated.h" #include "common/entitybuffer.h" #include "common/bufferutils.h" #include "common/test.h" #include "common/secretstore.h" #include "log.h" #include #include #include #include #include #include #include #include static void queuedInvoke(const std::function &f, QObject *context = nullptr) { auto timer = QSharedPointer::create(); timer->setSingleShot(true); QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); }); timer->start(0); } namespace Sink { struct QueuedCommand { public: QueuedCommand(int commandId, const std::function &callback) : commandId(commandId), callback(callback) { } QueuedCommand(int commandId, const QByteArray &b, const std::function &callback) : commandId(commandId), buffer(b), callback(callback) { } private: QueuedCommand(const QueuedCommand &other); QueuedCommand &operator=(const QueuedCommand &rhs); public: const int commandId; QByteArray buffer; std::function callback; }; class ResourceAccess::Private { public: Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *ra); ~Private(); KAsync::Job tryToConnect(); KAsync::Job initializeSocket(); void abortPendingOperations(); void callCallbacks(); QByteArray resourceName; QByteArray resourceInstanceIdentifier; QSharedPointer socket; QByteArray partialMessageBuffer; QVector> commandQueue; QMap> pendingCommands; QMultiMap> resultHandler; QHash completeCommands; uint messageId; bool openingSocket; SINK_DEBUG_COMPONENT(resourceInstanceIdentifier) }; ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) { } ResourceAccess::Private::~Private() { } void ResourceAccess::Private::abortPendingOperations() { callCallbacks(); if (!resultHandler.isEmpty()) { SinkWarning() << "Aborting pending operations " << resultHandler.keys(); } auto handlers = resultHandler.values(); resultHandler.clear(); for (auto handler : handlers) { handler(1, "The resource closed unexpectedly"); } for (auto queuedCommand : commandQueue) { queuedCommand->callback(1, "The resource closed unexpectedly"); } commandQueue.clear(); } void ResourceAccess::Private::callCallbacks() { const auto commandIds = completeCommands.keys(); for (auto id : commandIds) { const bool success = completeCommands.take(id); // We remove the callbacks first because the handler can kill resourceaccess directly const auto callbacks = resultHandler.values(id); resultHandler.remove(id); for (auto handler : callbacks) { if (success) { handler(0, QString()); } else { handler(1, "Command failed."); } } } } // Connects to server and returns connected socket on success KAsync::Job> ResourceAccess::connectToServer(const QByteArray &identifier) { auto s = QSharedPointer{new QLocalSocket, &QObject::deleteLater}; return KAsync::start>([identifier, s](KAsync::Future> &future) { SinkTrace() << "Connecting to server " << identifier; auto context = new QObject; QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context, identifier]() { SinkTrace() << "Connected to server " << identifier; Q_ASSERT(s); delete context; future.setValue(s); future.setFinished(); }); QObject::connect(s.data(), static_cast(&QLocalSocket::error), context, [&future, &s, context, identifier](QLocalSocket::LocalSocketError localSocketError) { SinkTrace() << "Failed to connect to server " << localSocketError << identifier; const auto errorString = s->errorString(); const auto name = s->fullServerName(); delete context; //We don't set the localSocketError as errorcode, because ConnectionRefused is 0 (no error) future.setError({1, QString("Failed to connect to socket %1: %2 %3").arg(name).arg(localSocketError).arg(errorString)}); }); s->connectToServer(identifier); }); } KAsync::Job ResourceAccess::Private::tryToConnect() { // We may have a socket from the last connection leftover socket.reset(); auto counter = QSharedPointer::create(0); return KAsync::doWhile( [this, counter]() { SinkTrace() << "Try to connect " << resourceInstanceIdentifier; return connectToServer(resourceInstanceIdentifier) .then>( [this, counter](const KAsync::Error &error, const QSharedPointer &s) { if (error) { static int waitTime = 10; static int timeout = 20000; static int maxRetries = timeout / waitTime; if (*counter >= maxRetries) { SinkTrace() << "Giving up after " << *counter << "tries"; return KAsync::error(error); } else { *counter = *counter + 1; return KAsync::wait(waitTime).then(KAsync::value(KAsync::Continue)); } } else { Q_ASSERT(s); socket = s; return KAsync::value(KAsync::Break); } }); }); } KAsync::Job ResourceAccess::Private::initializeSocket() { return KAsync::start([this] { SinkTrace() << "Trying to connect"; return connectToServer(resourceInstanceIdentifier) .then>( [this](const KAsync::Error &error, const QSharedPointer &s) { if (error) { // We failed to connect, so let's start the resource QStringList args; if (Sink::Test::testModeEnabled()) { args << "--test"; } if (resourceName.isEmpty()) { SinkWarning() << "No resource type given"; return KAsync::error(); } args << resourceInstanceIdentifier << resourceName; //Prefer a binary next to this binary, otherwise fall-back to PATH. Necessary for MacOS bundles because the bundle is not in the PATH. auto executable = QStandardPaths::findExecutable("sink_synchronizer", {QCoreApplication::applicationDirPath()}); if (executable.isEmpty()) { executable = QStandardPaths::findExecutable("sink_synchronizer"); } if (executable.isEmpty()) { SinkError() << "Failed to find the sink_synchronizer binary in the paths: " << QCoreApplication::applicationDirPath(); return KAsync::error("Failed to find the sink_synchronizer binary."); } qint64 pid = 0; SinkLog() << "Starting resource " << executable << args.join(" ") << "Home path: " << QDir::homePath(); if (QProcess::startDetached(executable, args, QDir::homePath(), &pid)) { SinkTrace() << "Started resource " << resourceInstanceIdentifier << pid; return tryToConnect() .onError([this, args](const KAsync::Error &error) { SinkError() << "Failed to connect to started resource: sink_synchronizer " << args; }); } else { SinkError() << "Failed to start resource " << resourceInstanceIdentifier; return KAsync::error("Failed to start resource."); } } else { SinkTrace() << "Connected to resource, without having to start it."; Q_ASSERT(s); socket = s; return KAsync::null(); } }); }); } ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) { mResourceStatus = Sink::ApplicationDomain::NoStatus; SinkTrace() << "Starting access"; QObject::connect(&SecretStore::instance(), &SecretStore::secretAvailable, this, [this] (const QByteArray &resourceId) { if (resourceId == d->resourceInstanceIdentifier) { sendSecret(SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier)).exec(); } }); } ResourceAccess::~ResourceAccess() { SinkLog() << "Closing access"; if (!d->resultHandler.isEmpty()) { SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); } delete d; } QByteArray ResourceAccess::resourceName() const { return d->resourceName; } bool ResourceAccess::isReady() const { return (d->socket && d->socket->isValid()); } void ResourceAccess::registerCallback(uint messageId, const std::function &callback) { d->resultHandler.insert(messageId, callback); } void ResourceAccess::enqueueCommand(const QSharedPointer &command) { d->commandQueue << command; if (isReady()) { processCommandQueue(); } else { open(); } } KAsync::Job ResourceAccess::sendCommand(int commandId) { return KAsync::start([this, commandId](KAsync::Future &f) { auto continuation = [&f](int error, const QString &errorMessage) { if (error) { f.setError(error, errorMessage); } f.setFinished(); }; enqueueCommand(QSharedPointer::create(commandId, continuation)); }); } KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { // The flatbuffer is transient, but we want to store it until the job is executed QByteArray buffer(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); return KAsync::start([commandId, buffer, this](KAsync::Future &f) { auto callback = [&f](int error, const QString &errorMessage) { if (error) { f.setError(error, errorMessage); } else { f.setFinished(); } }; enqueueCommand(QSharedPointer::create(commandId, buffer, callback)); }); } KAsync::Job ResourceAccess::synchronizeResource(const Sink::QueryBase &query) { flatbuffers::FlatBufferBuilder fbb; QByteArray queryString; { QDataStream stream(&queryString, QIODevice::WriteOnly); stream << query; } auto q = fbb.CreateString(queryString.toStdString()); auto builder = Sink::Commands::SynchronizeBuilder(fbb); builder.add_query(q); Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); return sendCommand(Commands::SynchronizeCommand, fbb); } KAsync::Job ResourceAccess::sendCreateCommand(const QByteArray &uid, const QByteArray &resourceBufferType, const QByteArray &buffer) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(uid.constData()); // This is the resource buffer type and not the domain type auto type = fbb.CreateString(resourceBufferType.constData()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); Sink::Commands::FinishCreateEntityBuffer(fbb, location); return sendCommand(Sink::Commands::CreateEntityCommand, fbb); } KAsync::Job ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer, const QByteArrayList &changedProperties, const QByteArray &newResource, bool remove) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(uid.constData()); auto type = fbb.CreateString(resourceBufferType.constData()); auto modifiedProperties = BufferUtils::toVector(fbb, changedProperties); auto deletions = BufferUtils::toVector(fbb, deletedProperties); auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, true, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); return sendCommand(Sink::Commands::ModifyEntityCommand, fbb); } KAsync::Job ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(uid.constData()); auto type = fbb.CreateString(resourceBufferType.constData()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); return sendCommand(Sink::Commands::DeleteEntityCommand, fbb); } KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) { flatbuffers::FlatBufferBuilder fbb; auto location = Sink::Commands::CreateRevisionReplayed(fbb, revision); Sink::Commands::FinishRevisionReplayedBuffer(fbb, location); return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); } KAsync::Job ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { flatbuffers::FlatBufferBuilder fbb; auto id = fbb.CreateString(inspectionId.toStdString()); auto domain = fbb.CreateString(domainType.toStdString()); auto entity = fbb.CreateString(entityId.toStdString()); auto prop = fbb.CreateString(property.toStdString()); QByteArray array; QDataStream s(&array, QIODevice::WriteOnly); s << expectedValue; auto expected = fbb.CreateString(array.toStdString()); auto location = Sink::Commands::CreateInspection(fbb, id, inspectionType, entity, domain, prop, expected); Sink::Commands::FinishInspectionBuffer(fbb, location); return sendCommand(Sink::Commands::InspectionCommand, fbb); } KAsync::Job ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId) { flatbuffers::FlatBufferBuilder fbb; auto id = fbb.CreateString(flushId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, id, flushType); Sink::Commands::FinishFlushBuffer(fbb, location); return sendCommand(Sink::Commands::FlushCommand, fbb); } KAsync::Job ResourceAccess::sendSecret(const QString &secret) { flatbuffers::FlatBufferBuilder fbb; auto s = fbb.CreateString(secret.toStdString()); auto location = Sink::Commands::CreateSecret(fbb, s); Sink::Commands::FinishSecretBuffer(fbb, location); return sendCommand(Sink::Commands::SecretCommand, fbb); } KAsync::Job ResourceAccess::shutdown() { return sendCommand(Sink::Commands::ShutdownCommand); } void ResourceAccess::open() { if (d->socket && d->socket->isValid()) { // SinkTrace() << "Socket valid, so not opening again"; return; } if (d->openingSocket) { return; } auto time = QSharedPointer::create(); time->start(); d->openingSocket = true; d->initializeSocket() .then( [this, time](const KAsync::Error &error) { d->openingSocket = false; if (error) { SinkError() << "Failed to initialize socket " << error; d->abortPendingOperations(); } else { SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); Q_ASSERT(d->socket); QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); connected(); } return KAsync::null(); }) .exec(); } void ResourceAccess::close() { SinkLog() << QString("Closing %1").arg(d->socket->fullServerName()); SinkTrace() << "Pending commands: " << d->pendingCommands.size(); SinkTrace() << "Queued commands: " << d->commandQueue.size(); d->socket->close(); } void ResourceAccess::sendCommand(const QSharedPointer &command) { Q_ASSERT(isReady()); // TODO: we should have a timeout for commands d->messageId++; const auto messageId = d->messageId; - SinkTrace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); + SinkLog() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); Q_ASSERT(command->callback); registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { - SinkTrace() << "Command complete " << messageId; + SinkLog() << "Command complete " << messageId; d->pendingCommands.remove(messageId); command->callback(errorCode, errorMessage); }); // Keep track of the command until we're sure it arrived d->pendingCommands.insert(d->messageId, command); Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); } void ResourceAccess::processCommandQueue() { // TODO: serialize instead of blast them all through the socket? SinkTrace() << "We have " << d->commandQueue.size() << " queued commands"; SinkTrace() << "Pending commands: " << d->pendingCommands.size(); for (auto command : d->commandQueue) { sendCommand(command); } d->commandQueue.clear(); } void ResourceAccess::processPendingCommandQueue() { SinkTrace() << "We have " << d->pendingCommands.size() << " pending commands"; for (auto command : d->pendingCommands) { SinkTrace() << "Reenquing command " << command->commandId; d->commandQueue << command; } d->pendingCommands.clear(); processCommandQueue(); } void ResourceAccess::connected() { if (!isReady()) { SinkTrace() << "Connected but not ready?"; return; } SinkTrace() << QString("Connected: %1").arg(d->socket->fullServerName()); { flatbuffers::FlatBufferBuilder fbb; auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast(this)).toLatin1().toStdString()); auto command = Sink::Commands::CreateHandshake(fbb, name); Sink::Commands::FinishHandshakeBuffer(fbb, command); Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); } // Reenqueue pending commands, we failed to send them processPendingCommandQueue(); auto secret = SecretStore::instance().resourceSecret(d->resourceInstanceIdentifier); if (!secret.isEmpty()) { sendSecret(secret).exec(); } emit ready(true); } void ResourceAccess::disconnected() { SinkLog() << QString("Disconnected from %1").arg(d->socket->fullServerName()); //Ensure we read all remaining data before closing the socket. //This is required on windows at least. readResourceMessage(); d->socket->close(); emit ready(false); } void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) { const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); if (resourceCrashed) { SinkError() << "The resource crashed!"; mResourceStatus = Sink::ApplicationDomain::ErrorStatus; Sink::Notification n; n.type = Sink::Notification::Status; emit notification(n); Sink::Notification crashNotification; crashNotification.type = Sink::Notification::Error; crashNotification.code = Sink::ApplicationDomain::ResourceCrashedError; emit notification(crashNotification); d->abortPendingOperations(); } else if (error == QLocalSocket::PeerClosedError) { SinkLog() << "The resource closed the connection."; d->abortPendingOperations(); } else { SinkWarning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); if (d->pendingCommands.size()) { SinkTrace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); open(); } } } void ResourceAccess::readResourceMessage() { if (!d->socket) { SinkWarning() << "No socket available"; return; } if (d->socket->bytesAvailable()) { d->partialMessageBuffer += d->socket->readAll(); // should be scheduled rather than processed all at once while (processMessageBuffer()) { } } } static Sink::Notification getNotification(const Sink::Commands::Notification *buffer) { Sink::Notification n; if (buffer->identifier()) { // Don't use fromRawData, the buffer is gone once we invoke emit notification n.id = BufferUtils::extractBufferCopy(buffer->identifier()); } if (buffer->message()) { // Don't use fromRawData, the buffer is gone once we invoke emit notification n.message = BufferUtils::extractBufferCopy(buffer->message()); } n.type = buffer->type(); n.code = buffer->code(); n.progress = buffer->progress(); n.total = buffer->total(); n.entities = BufferUtils::fromVector(*buffer->entities()); return n; } bool ResourceAccess::processMessageBuffer() { static const int headerSize = Commands::headerSize(); if (d->partialMessageBuffer.size() < headerSize) { //This is not an error SinkTrace() << "command too small, smaller than headerSize: " << d->partialMessageBuffer.size() << headerSize; return false; } // const uint messageId = *(int*)(d->partialMessageBuffer.constData()); const int commandId = *(const int *)(d->partialMessageBuffer.constData() + sizeof(uint)); const uint size = *(const int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); const uint availableMessageSize = d->partialMessageBuffer.size() - headerSize; if (size > availableMessageSize) { //This is not an error SinkTrace() << "command too small, message smaller than advertised: " << availableMessageSize << headerSize; return false; } switch (commandId) { case Commands::RevisionUpdateCommand: { auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); SinkTrace() << QString("Revision updated to: %1").arg(buffer->revision()); Notification n; n.type = Sink::Notification::RevisionUpdate; emit notification(n); emit revisionChanged(buffer->revision()); break; } case Commands::CommandCompletionCommand: { auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); SinkTrace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); d->completeCommands.insert(buffer->id(), buffer->success()); // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first queuedInvoke([=]() { d->callCallbacks(); }, this); break; } case Commands::NotificationCommand: { auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); switch (buffer->type()) { case Sink::Notification::Shutdown: SinkLog() << "Received shutdown notification."; close(); break; case Sink::Notification::Inspection: { SinkTrace() << "Received inspection notification."; auto n = getNotification(buffer); // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first queuedInvoke([=]() { emit notification(n); }, this); } break; case Sink::Notification::Status: if (mResourceStatus != buffer->code()) { mResourceStatus = buffer->code(); SinkTrace() << "Updated status: " << mResourceStatus; } [[clang::fallthrough]]; case Sink::Notification::Info: [[clang::fallthrough]]; case Sink::Notification::Warning: [[clang::fallthrough]]; case Sink::Notification::Error: [[clang::fallthrough]]; case Sink::Notification::FlushCompletion: [[clang::fallthrough]]; case Sink::Notification::Progress: { auto n = getNotification(buffer); SinkTrace() << "Received notification: " << n; n.resource = d->resourceInstanceIdentifier; emit notification(n); } break; case Sink::Notification::RevisionUpdate: default: SinkWarning() << "Received unknown notification: " << buffer->type(); break; } break; } default: break; } d->partialMessageBuffer.remove(0, headerSize + size); return d->partialMessageBuffer.size() >= headerSize; } ResourceAccessFactory::ResourceAccessFactory() { } ResourceAccessFactory &ResourceAccessFactory::instance() { static ResourceAccessFactory *instance = nullptr; if (!instance) { instance = new ResourceAccessFactory; } return *instance; } Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier, const QByteArray resourceType) { if (!mCache.contains(instanceIdentifier)) { // Reuse the pointer if something else kept the resourceaccess alive if (mWeakCache.contains(instanceIdentifier)) { if (auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef()) { mCache.insert(instanceIdentifier, sharedPointer); } } if (!mCache.contains(instanceIdentifier)) { // Create a new instance if necessary auto sharedPointer = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(instanceIdentifier, resourceType), &QObject::deleteLater}; QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { if (!ready) { //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps. auto ptr = mCache.take(instanceIdentifier); if (auto timer = mTimer.take(instanceIdentifier)) { timer->stop(); } Q_UNUSED(ptr); } }); mCache.insert(instanceIdentifier, sharedPointer); mWeakCache.insert(instanceIdentifier, sharedPointer); } } if (!mTimer.contains(instanceIdentifier)) { auto timer = QSharedPointer::create(); timer->setSingleShot(true); // Drop connection after 3 seconds (which is a random value) QObject::connect(timer.data(), &QTimer::timeout, timer.data(), [this, instanceIdentifier]() { //We want to remove, but we don't want shared pointer to be destroyed until end of the function as this might trigger further steps. auto ptr = mCache.take(instanceIdentifier); Q_UNUSED(ptr); }); timer->setInterval(3000); mTimer.insert(instanceIdentifier, timer); } mTimer.value(instanceIdentifier)->start(); return mCache.value(instanceIdentifier); } } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_resourceaccess.cpp" #pragma clang diagnostic pop diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 040b893c..1d0318a6 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -1,888 +1,896 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "synchronizer.h" +#include + #include "definitions.h" #include "commands.h" #include "bufferutils.h" #include "synchronizerstore.h" #include "datastorequery.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "flush_generated.h" #include "notification_generated.h" #include "utils.h" using namespace Sink; bool operator==(const Synchronizer::SyncRequest &left, const Synchronizer::SyncRequest &right) { return left.flushType == right.flushType && left.requestId == right.requestId && left.requestType == right.requestType && left.options == right.options && left.query == right.query && left.applicableEntities == right.applicableEntities; } Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context, {"synchronizer"}), mLogCtx{"synchronizer"}, mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), mSyncInProgress(false), mAbort(false) { mCurrentState.push(ApplicationDomain::Status::NoStatus); SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() { } void Synchronizer::setSecret(const QString &s) { mSecret = s; if (!mSyncRequestQueue.isEmpty()) { processSyncQueue().exec(); } } QString Synchronizer::secret() const { return mSecret; } void Synchronizer::setup(const std::function &enqueueCommandCallback, MessageQueue &mq) { mEnqueue = enqueueCommandCallback; mMessageQueue = &mq; } void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) { Q_ASSERT(mEnqueue); mEnqueue(commandId, data); } Storage::EntityStore &Synchronizer::store() { Q_ASSERT(mEntityStore->hasTransaction()); return *mEntityStore; } SynchronizerStore &Synchronizer::syncStore() { if (!mSyncStore) { mSyncStore = QSharedPointer::create(syncTransaction()); } return *mSyncStore; } void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); auto deletions = BufferUtils::toVector(fbb, deletedProperties); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { scanForRemovals(bufferType, [this, &bufferType](const std::function &callback) { store().readAllUids(bufferType, [callback](const QByteArray &uid) { callback(uid); }); }, exists ); } void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { const bool changed = [&] { for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current.getProperty(property)) { SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; return true; } } return false; }(); if (changed) { SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } else { SinkTraceCtx(mLogCtx) << "Entity was not modified: " << sinkId; } }); } void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to find the local id for " << remoteId; return; } Storage::EntityStore store(mResourceContext, mLogCtx); modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; Q_ASSERT(false); return; } Storage::EntityStore store(mResourceContext, mLogCtx); if (!store.contains(bufferType, sinkId)) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; Q_ASSERT(false); return; } Storage::EntityStore store(mResourceContext, mLogCtx); if (!store.contains(bufferType, sinkId)) { if (!mergeCriteria.isEmpty()) { Sink::Query query; for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) { query.filter(it.key(), it.value()); } bool merge = false; DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { merge = true; SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); }); if (!merge) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } QByteArrayList Synchronizer::resolveQuery(const QueryBase &query) { if (query.type().isEmpty()) { SinkWarningCtx(mLogCtx) << "Can't resolve a query without a type" << query; return {}; } QByteArrayList result; Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, query.type(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 0, [&](const ResultSet::Result &r) { result << r.entity.identifier(); }); return result; } QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) { if (filter.value.canConvert()) { const auto value = filter.value.value(); if (value.isEmpty()) { SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter; } else { return {filter.value.value()}; } } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else { SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; Q_ASSERT(false); } return {}; } template void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity, newResource, remove); } QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { return {Synchronizer::SyncRequest{query, "sync"}}; } void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue) { queue << request; } void Synchronizer::addToQueue(const Synchronizer::SyncRequest &request) { mergeIntoQueue(request, mSyncRequestQueue); } void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTraceCtx(mLogCtx) << "Synchronizing" << query; auto newRequests = getSyncRequests(query); for (const auto &request: newRequests) { auto shouldSkip = [&] { for (auto &r : mSyncRequestQueue) { if (r == request) { //Merge SinkTraceCtx(mLogCtx) << "Merging equal request " << request.query << "\n to" << r.query; return true; } } return false; }; if (shouldSkip()) { continue; } mergeIntoQueue(request, mSyncRequestQueue); } processSyncQueue().exec(); } void Synchronizer::clearQueue() { //Complete all pending flushes. Without this pending flushes would get stuck indefinitely when we clear the queue on failure. //TODO we should probably fail them instead for (const auto &request : mSyncRequestQueue) { if (request.requestType == Synchronizer::SyncRequest::Flush) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } } mSyncRequestQueue.clear(); } void Synchronizer::abort() { SinkLogCtx(mLogCtx) << "Aborting all running synchronization requests"; clearQueue(); mAbort = true; } void Synchronizer::flush(int commandId, const QByteArray &flushId) { Q_ASSERT(!flushId.isEmpty()); SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; processSyncQueue().exec(); } void Synchronizer::flushComplete(const QByteArray &flushId) { SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; if (mPendingSyncRequests.contains(flushId)) { const auto requests = mPendingSyncRequests.values(flushId); for (const auto &r : requests) { //We want to process the pending request before any others in the queue mSyncRequestQueue.prepend(r); } mPendingSyncRequests.remove(flushId); processSyncQueue().exec(); } } void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.message = message; n.code = code; n.entities = entities; emit notify(n); } void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.progress = progress; n.total = total; n.entities = entities; emit notify(n); } void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) { if (progress > 0 && total > 0) { //Limit progress updates for large amounts if (total >= 100 && progress % 10 != 0) { return; } - SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; + // SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; const auto applicableEntities = [&] { if (entities.isEmpty()) { return mCurrentRequest.applicableEntities; } return entities; }(); emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities); } } void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) { if (error) { if (error.errorCode == ApplicationDomain::ConnectionError) { //Couldn't connect, so we assume we don't have a network connection. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::NoServerError) { //Failed to contact the server. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConfigurationError) { //There is an error with the configuration. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::LoginError) { //If we failed to login altough we could connect that indicates a problem with our setup. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConnectionLostError) { //We've lost the connection so we assume the connection to the server broke. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } //We don't know what kind of error this was, so we assume it's transient and don't change our status. } else { //An operation against the server worked, so we're probably online. setStatus(ApplicationDomain::ConnectedStatus, s, requestId); } } KAsync::Job Synchronizer::processRequest(const SyncRequest &request) { if (request.options & SyncRequest::RequestFlush) { return KAsync::start([=] { //Trigger a flush and record original request without flush option auto modifiedRequest = request; modifiedRequest.options = SyncRequest::NoOptions; //Normally we won't have a requestId here if (modifiedRequest.requestId.isEmpty()) { modifiedRequest.requestId = createUuid(); } SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId; //The sync request will be executed once the flush has completed mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); }); } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Synchronizing:" << request.query; setBusy(true, "Synchronization has started.", request.requestId); emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Synchronization has ended.", request.requestId); if (error) { //Emit notification with error SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done Synchronizing"; emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); return KAsync::null(); } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { return KAsync::start([=] { Q_ASSERT(!request.requestId.isEmpty()); //FIXME it looks like this is emitted before the replay actually finishes if (request.flushType == Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } else { flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(request.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); } }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { if (ChangeReplay::allChangesReplayed()) { return KAsync::null(); } else { return KAsync::start([this, request] { setBusy(true, "ChangeReplay has started.", request.requestId); SinkLogCtx(mLogCtx) << "Replaying changes."; }) .then(replayNextRevision()) .then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Changereplay has ended.", request.requestId); if (error) { SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error; return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done replaying changes"; return KAsync::null(); } }); } } else { SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; return KAsync::error(KAsync::Error{"Unknown request type."}); } } /* * We're using a stack so we can go back to whatever we had after the temporary busy status. * Whenever we do change the status we emit a status notification. */ void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) { //We won't be able to execute any of the coming requests, so clear them if (state == ApplicationDomain::OfflineStatus || state == ApplicationDomain::ErrorStatus) { clearQueue(); } if (state != mCurrentState.top()) { //The busy state is transient and we want to override it. if (mCurrentState.top() == ApplicationDomain::BusyStatus) { mCurrentState.pop(); } if (state != mCurrentState.top()) { //Always leave the first state intact if (mCurrentState.count() > 1 && state != ApplicationDomain::BusyStatus) { mCurrentState.pop(); } mCurrentState.push(state); } //We should never have more than: (NoStatus, $SOMESTATUS, BusyStatus) if (mCurrentState.count() > 3) { qWarning() << mCurrentState; Q_ASSERT(false); } emitNotification(Notification::Status, state, reason, requestId); } } void Synchronizer::resetStatus(const QByteArray requestId) { mCurrentState.pop(); emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); } void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) { if (busy) { setStatus(ApplicationDomain::BusyStatus, reason, requestId); } else { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { resetStatus(requestId); } } } KAsync::Job Synchronizer::processSyncQueue() { if (secret().isEmpty()) { SinkLogCtx(mLogCtx) << "Secret not available but required."; emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {}); return KAsync::null(); } if (mSyncRequestQueue.isEmpty()) { SinkLogCtx(mLogCtx) << "All requests processed."; return KAsync::null(); } if (mSyncInProgress) { SinkTraceCtx(mLogCtx) << "Sync still in progress."; return KAsync::null(); } //Don't process any new requests until we're done with the pending ones. //Otherwise we might process a flush before the previous request actually completed. if (!mPendingSyncRequests.isEmpty()) { SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; return KAsync::null(); } const auto request = mSyncRequestQueue.takeFirst(); return KAsync::start([=] { + SinkLogCtx(mLogCtx) << "Start processing request " << request.requestType; + mTime.start(); mMessageQueue->startTransaction(); mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); mSyncInProgress = true; mCurrentRequest = request; }) .then(processRequest(request)) .then([this, request](const KAsync::Error &error) { - SinkTraceCtx(mLogCtx) << "Sync request processed"; + SinkLogCtx(mLogCtx) << "Sync request processed " << Sink::Log::TraceTime(mTime.elapsed()); setBusy(false, {}, request.requestId); mCurrentRequest = {}; mEntityStore->abortTransaction(); mSyncTransaction.abort(); mMessageQueue->commit(); mSyncStore.clear(); mSyncInProgress = false; mAbort = false; if (allChangesReplayed()) { emit changesReplayed(); } if (error) { SinkWarningCtx(mLogCtx) << "Error during sync: " << error; emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); } //In case we got more requests meanwhile. return processSyncQueue(); }); } bool Synchronizer::aborting() const { return mAbort; } void Synchronizer::commit() { + SinkLogCtx(mLogCtx) << "Commit." << Sink::Log::TraceTime(mTime.elapsed()); mMessageQueue->commit(); mSyncTransaction.commit(); mSyncStore.clear(); + + QCoreApplication::processEvents(QEventLoop::AllEvents, 10); + if (mSyncInProgress) { mMessageQueue->startTransaction(); } } Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } void Synchronizer::revisionChanged() { //One replay request is enough for (const auto &r : mSyncRequestQueue) { if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) { return; } } mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; processSyncQueue().exec(); } bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); if (!metadataBuffer->replayToSource()) { SinkTraceCtx(mLogCtx) << "Change is coming from the source"; } return metadataBuffer->replayToSource(); } KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { SinkTraceCtx(mLogCtx) << "Replaying" << type << key; Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); if (!metadataBuffer) { SinkErrorCtx(mLogCtx) << "No metadata buffer available."; return KAsync::error("No metadata buffer"); } if (mSyncTransaction) { SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; mSyncTransaction.abort(); } if (mSyncStore) { SinkErrorCtx(mLogCtx) << "Leftover sync store."; mSyncStore.clear(); } Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); Q_ASSERT(!mSyncTransaction); //The entitystore transaction is handled by processSyncQueue Q_ASSERT(mEntityStore->hasTransaction()); const auto operation = metadataBuffer->operation(); // TODO: should not use internal representations const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); QByteArray oldRemoteId; if (operation != Sink::Operation_Creation) { oldRemoteId = syncStore().resolveLocalId(type, uid); //oldRemoteId can be empty if the resource implementation didn't return a remoteid } SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision(); //If the entity has been removed already and this is not the removal, skip over. //This is important so we can unblock changereplay by removing entities. bool skipOver = false; store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) { if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) { skipOver = true; } }); if (skipOver) { SinkLogCtx(mLogCtx) << "Skipping over already removed entity"; return KAsync::null(); } KAsync::Job job = KAsync::null(); //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else { SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; } return job.then([=](const KAsync::Error &error, const QByteArray &remoteId) { //Returning an error here means we stop replaying, so we only to that for known-to-be-transient errors. if (error) { switch (error.errorCode) { case ApplicationDomain::ConnectionError: case ApplicationDomain::NoServerError: case ApplicationDomain::ConfigurationError: case ApplicationDomain::LoginError: case ApplicationDomain::ConnectionLostError: SinkTraceCtx(mLogCtx) << "Error during changereplay (aborting):" << error; return KAsync::error(error); default: SinkErrorCtx(mLogCtx) << "Error during changereplay (continuing):" << error; break; } } switch (operation) { case Sink::Operation_Creation: { SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().recordRemoteId(type, uid, remoteId); } } break; case Sink::Operation_Modification: { SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().updateRemoteId(type, uid, remoteId); } } break; case Sink::Operation_Removal: { SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; if (!oldRemoteId.isEmpty()) { syncStore().removeRemoteId(type, uid, oldRemoteId); } } break; default: SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; } //We need to commit here otherwise the next change-replay step will abort the transaction mSyncStore.clear(); mSyncTransaction.commit(); //Ignore errors if not caught above return KAsync::null(); }); } void Synchronizer::notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); if (!metadataBuffer) { SinkErrorCtx(mLogCtx) << "No metadata buffer available."; Q_ASSERT(false); return; } if (metadataBuffer->operation() == Sink::Operation_Removal) { const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); const auto oldRemoteId = syncStore().resolveLocalId(type, uid); SinkLogCtx(mLogCtx) << "Cleaning up removal with remote id: " << oldRemoteId; if (!oldRemoteId.isEmpty()) { syncStore().removeRemoteId(type, uid, oldRemoteId); } } mSyncStore.clear(); mSyncTransaction.commit(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } bool Synchronizer::allChangesReplayed() { if (!mSyncRequestQueue.isEmpty()) { SinkTraceCtx(mLogCtx) << "Queue is not empty"; return false; } return ChangeReplay::allChangesReplayed(); } #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove); SINK_REGISTER_TYPES() diff --git a/common/synchronizer.h b/common/synchronizer.h index efcc76d6..d4e941e9 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -1,256 +1,258 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include +#include #include #include #include #include #include #include #include "changereplay.h" #include "synchronizerstore.h" namespace Sink { class SynchronizerStore; /** * Synchronize and add what we don't already have to local queue */ class SINK_EXPORT Synchronizer : public ChangeReplay { Q_OBJECT public: Synchronizer(const Sink::ResourceContext &resourceContext); virtual ~Synchronizer() Q_DECL_OVERRIDE; void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); void synchronize(const Sink::QueryBase &query); void flush(int commandId, const QByteArray &flushId); //Read only access to main storage Storage::EntityStore &store(); //Read/Write access to sync storage SynchronizerStore &syncStore(); void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); bool allChangesReplayed() Q_DECL_OVERRIDE; void flushComplete(const QByteArray &flushId); void setSecret(const QString &s); //Abort all running synchronization requests void abort(); KAsync::Job processSyncQueue(); signals: void notify(Notification); public slots: virtual void revisionChanged() Q_DECL_OVERRIDE; protected: ///Base implementation calls the replay$Type calls KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; virtual void notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; protected: ///Implement to write back changes to the server virtual KAsync::Job replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Event &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Todo &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); protected: QString secret() const; ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false); void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType); /** * A synchronous algorithm to remove entities that are no longer existing. * * A list of entities is generated by @param entryGenerator. * The entiry Generator typically iterates over an index to produce all existing entries. * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, * an entity delete command is enqueued. * * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ void scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); void scanForRemovals(const QByteArray &bufferType, std::function exists); /** * An algorithm to create or modify the entity. * * Depending on whether the entity is locally available, or has changed. */ void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); template void SINK_EXPORT createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria); void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); // template // void create(const DomainType &entity); template void SINK_EXPORT modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false); // template // void remove(const DomainType &entity); QByteArrayList resolveQuery(const QueryBase &query); QByteArrayList resolveFilter(const QueryBase::Comparator &filter); virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; public: struct SyncRequest { enum RequestType { Synchronization, ChangeReplay, Flush }; enum RequestOptions { NoOptions, RequestFlush }; SyncRequest() = default; SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) : requestId(requestId_), requestType(Synchronization), options(o), query(q), applicableEntities(q.ids()) { } SyncRequest(RequestType type) : requestType(type) { } SyncRequest(RequestType type, const QByteArray &requestId_) : requestId(requestId_), requestType(type) { } SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) : flushType(flushType_), requestId(requestId_), requestType(type) { } int flushType = 0; QByteArray requestId; RequestType requestType; RequestOptions options = NoOptions; Sink::QueryBase query; QByteArrayList applicableEntities; }; protected: /** * This allows the synchronizer to turn a single query into multiple synchronization requests. * * The idea is the following; * The input query is a specification by the application of what data needs to be made available. * Requests could be: * * Give me everything (signified by the default constructed/empty query) * * Give me all mails of folder X * * Give me all mails of folders matching some constraints * * getSyncRequests allows the resource implementation to apply it's own defaults to that request; * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. * * This will allow synchronizeWithSource to focus on just getting to the content. */ virtual QList getSyncRequests(const Sink::QueryBase &query); /** * This allows the synchronizer to merge new requests with existing requests in the queue. */ virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue); void addToQueue(const Synchronizer::SyncRequest &request); void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = QByteArray{}, const QByteArrayList &entiteis = QByteArrayList{}); void emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities); /** * Report progress for current task */ virtual void reportProgress(int progress, int total, const QByteArrayList &entities = {}) Q_DECL_OVERRIDE; Sink::Log::Context mLogCtx; /** * True while aborting. * * Stop the synchronization as soon as possible. */ bool aborting() const; private: QStack mCurrentState; void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId); void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId); void resetStatus(const QByteArray requestId); void setBusy(bool busy, const QString &reason, const QByteArray requestId); void clearQueue(); void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); KAsync::Job processRequest(const SyncRequest &request); Sink::ResourceContext mResourceContext; Sink::Storage::EntityStore::Ptr mEntityStore; QSharedPointer mSyncStore; Sink::Storage::DataStore mSyncStorage; Sink::Storage::DataStore::Transaction mSyncTransaction; std::function mEnqueue; QList mSyncRequestQueue; SyncRequest mCurrentRequest; MessageQueue *mMessageQueue; bool mSyncInProgress; bool mAbort; QMultiHash mPendingSyncRequests; QString mSecret; + QTime mTime; }; } diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 1c89aaee..b49ba4d6 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -1,1188 +1,1188 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "imapresource.h" #include "facade.h" #include "resourceconfig.h" #include "commands.h" #include "index.h" #include "log.h" #include "definitions.h" #include "inspection.h" #include "synchronizer.h" #include "inspector.h" #include "query.h" #include #include #include #include #include #include "facadefactory.h" #include "adaptorfactoryregistry.h" #include "imapserverproxy.h" #include "mailpreprocessor.h" #include "specialpurposepreprocessor.h" //This is the resources entity type, and not the domain type #define ENTITY_TYPE_MAIL "mail" #define ENTITY_TYPE_FOLDER "folder" Q_DECLARE_METATYPE(QSharedPointer) using namespace Imap; using namespace Sink; static qint64 uidFromMailRid(const QByteArray &remoteId) { auto ridParts = remoteId.split(':'); Q_ASSERT(ridParts.size() == 2); return ridParts.last().toLongLong(); } static QByteArray folderIdFromMailRid(const QByteArray &remoteId) { auto ridParts = remoteId.split(':'); Q_ASSERT(ridParts.size() == 2); return ridParts.first(); } static QByteArray assembleMailRid(const QByteArray &folderLocalId, qint64 imapUid) { return folderLocalId + ':' + QByteArray::number(imapUid); } static QByteArray assembleMailRid(const ApplicationDomain::Mail &mail, qint64 imapUid) { return assembleMailRid(mail.getFolder(), imapUid); } static QByteArray folderRid(const Imap::Folder &folder) { return folder.path().toUtf8(); } static QByteArray parentRid(const Imap::Folder &folder) { return folder.parentPath().toUtf8(); } static QByteArray getSpecialPurposeType(const QByteArrayList &flags) { if (Imap::flagsContain(Imap::FolderFlags::Trash, flags)) { return ApplicationDomain::SpecialPurpose::Mail::trash; } if (Imap::flagsContain(Imap::FolderFlags::Drafts, flags)) { return ApplicationDomain::SpecialPurpose::Mail::drafts; } if (Imap::flagsContain(Imap::FolderFlags::Sent, flags)) { return ApplicationDomain::SpecialPurpose::Mail::sent; } return {}; } static bool hasSpecialPurposeFlag(const QByteArrayList &flags) { return !getSpecialPurposeType(flags).isEmpty(); } class ImapSynchronizer : public Sink::Synchronizer { Q_OBJECT public: ImapSynchronizer(const ResourceContext &resourceContext) : Sink::Synchronizer(resourceContext) { } QByteArray createFolder(const Imap::Folder &f) { const auto parentFolderRid = parentRid(f); bool isToplevel = parentFolderRid.isEmpty(); SinkTraceCtx(mLogCtx) << "Creating folder: " << f.name() << parentFolderRid << f.flags; const auto remoteId = folderRid(f); Sink::ApplicationDomain::Folder folder; folder.setName(f.name()); folder.setIcon("folder"); folder.setEnabled(f.subscribed); const auto specialPurpose = [&] { if (hasSpecialPurposeFlag(f.flags)) { return getSpecialPurposeType(f.flags); } else if (SpecialPurpose::isSpecialPurposeFolderName(f.name()) && isToplevel) { return SpecialPurpose::getSpecialPurposeType(f.name()); } return QByteArray{}; }(); if (!specialPurpose.isEmpty()) { folder.setSpecialPurpose(QByteArrayList() << specialPurpose); } //Always show the inbox if (specialPurpose == ApplicationDomain::SpecialPurpose::Mail::inbox) { folder.setEnabled(true); } if (!isToplevel) { folder.setParent(syncStore().resolveRemoteId(ApplicationDomain::Folder::name, parentFolderRid)); } createOrModify(ApplicationDomain::getTypeName(), remoteId, folder); return remoteId; } static bool contains(const QVector &folderList, const QByteArray &remoteId) { for (const auto &folder : folderList) { if (folderRid(folder) == remoteId) { return true; } } return false; } void synchronizeFolders(const QVector &folderList) { SinkTraceCtx(mLogCtx) << "Found folders " << folderList.size(); scanForRemovals(ENTITY_TYPE_FOLDER, [&folderList](const QByteArray &remoteId) -> bool { return contains(folderList, remoteId); } ); for (const auto &f : folderList) { createFolder(f); } } static void setFlags(Sink::ApplicationDomain::Mail &mail, const KIMAP2::MessageFlags &flags) { mail.setUnread(!flags.contains(Imap::Flags::Seen)); mail.setImportant(flags.contains(Imap::Flags::Flagged)); } KIMAP2::MessageFlags getFlags(const Sink::ApplicationDomain::Mail &mail) { KIMAP2::MessageFlags flags; if (!mail.getUnread()) { flags << Imap::Flags::Seen; } if (mail.getImportant()) { flags << Imap::Flags::Flagged; } return flags; } void synchronizeMails(const QByteArray &folderRid, const QByteArray &folderLocalId, const Message &message) { auto time = QSharedPointer::create(); time->start(); SinkTraceCtx(mLogCtx) << "Importing new mail." << folderRid; const auto remoteId = assembleMailRid(folderLocalId, message.uid); Q_ASSERT(message.msg); SinkTraceCtx(mLogCtx) << "Found a mail " << remoteId << message.flags; auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); mail.setFolder(folderLocalId); mail.setMimeMessage(message.msg->encodedContent(true)); mail.setExtractedFullPayloadAvailable(message.fullPayload); setFlags(mail, message.flags); createOrModify(ENTITY_TYPE_MAIL, remoteId, mail); // const auto elapsed = time->elapsed(); // SinkTraceCtx(mLogCtx) << "Synchronized " << count << " mails in " << folderRid << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; } void synchronizeRemovals(const QByteArray &folderRid, const QSet &messages) { auto time = QSharedPointer::create(); time->start(); const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRid); if (folderLocalId.isEmpty()) { SinkWarning() << "Failed to lookup local id of: " << folderRid; return; } SinkTraceCtx(mLogCtx) << "Finding removed mail: " << folderLocalId << " remoteId: " << folderRid; int count = 0; scanForRemovals(ENTITY_TYPE_MAIL, [&](const std::function &callback) { store().indexLookup(folderLocalId, callback); }, [&](const QByteArray &remoteId) -> bool { if (messages.contains(uidFromMailRid(remoteId))) { return true; } count++; return false; } ); const auto elapsed = time->elapsed(); SinkLog() << "Removed " << count << " mails in " << folderRid << Sink::Log::TraceTime(elapsed) << " " << elapsed/qMax(count, 1) << " [ms/mail]"; } KAsync::Job synchronizeFolder(QSharedPointer imap, const Imap::Folder &folder, const QDate &dateFilter, bool fetchHeaderAlso = false) { SinkLogCtx(mLogCtx) << "Synchronizing mails in folder: " << folderRid(folder); SinkLogCtx(mLogCtx) << " fetching headers also: " << fetchHeaderAlso; const auto folderRemoteId = folderRid(folder); if (folder.path().isEmpty() || folderRemoteId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Invalid folder " << folderRemoteId << folder.path(); return KAsync::error("Invalid folder"); } //Start by checking if UIDVALIDITY is still correct return KAsync::start([=] { bool ok = false; const auto uidvalidity = syncStore().readValue(folderRemoteId, "uidvalidity").toLongLong(&ok); return imap->select(folder) .then([=](const SelectResult &selectResult) { SinkLogCtx(mLogCtx) << "Checking UIDVALIDITY. Local" << uidvalidity << "remote " << selectResult.uidValidity; if (ok && selectResult.uidValidity != uidvalidity) { SinkWarningCtx(mLogCtx) << "UIDVALIDITY changed " << selectResult.uidValidity << uidvalidity; syncStore().removePrefix(folderRemoteId); } syncStore().writeValue(folderRemoteId, "uidvalidity", QByteArray::number(selectResult.uidValidity)); }); }) // //First we fetch flag changes for all messages. Since we don't know which messages are locally available we just get everything and only apply to what we have. .then([=] { const auto lastSeenUid = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); bool ok = false; const auto changedsince = syncStore().readValue(folderRemoteId, "changedsince").toLongLong(&ok); SinkLogCtx(mLogCtx) << "About to update flags" << folder.path() << "changedsince: " << changedsince; //If we have any mails so far we start off by updating any changed flags using changedsince, unless we don't have any mails at all. if (ok && lastSeenUid >= 1) { return imap->fetchFlags(folder, KIMAP2::ImapSet(1, lastSeenUid), changedsince, [=](const Message &message) { const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); const auto remoteId = assembleMailRid(folderLocalId, message.uid); SinkLogCtx(mLogCtx) << "Updating mail flags " << remoteId << message.flags; auto mail = Sink::ApplicationDomain::Mail::create(mResourceInstanceIdentifier); setFlags(mail, message.flags); modify(ENTITY_TYPE_MAIL, remoteId, mail); }) .then([=](const SelectResult &selectResult) { SinkLogCtx(mLogCtx) << "Flags updated. New changedsince value: " << selectResult.highestModSequence; syncStore().writeValue(folderRemoteId, "changedsince", QByteArray::number(selectResult.highestModSequence)); return selectResult.uidNext; }); } else { //We hit this path on initial sync and simply record the current changedsince value return imap->select(imap->mailboxFromFolder(folder)) .then([=](const SelectResult &selectResult) { SinkLogCtx(mLogCtx) << "No flags to update. New changedsince value: " << selectResult.highestModSequence; syncStore().writeValue(folderRemoteId, "changedsince", QByteArray::number(selectResult.highestModSequence)); return selectResult.uidNext; }); } }) //Next we synchronize the full set that is given by the date limit. //We fetch all data for this set. //This will also pull in any new messages in subsequent runs. .then([=] (qint64 serverUidNext){ const auto lastSeenUid = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); auto job = [=] { if (dateFilter.isValid()) { SinkLogCtx(mLogCtx) << "Fetching messages since: " << dateFilter << lastSeenUid; //Avoid creating a gap if we didn't fetch messages older than dateFilter, but aren't in the initial fetch either if (lastSeenUid > 0) { return imap->fetchUidsSince(imap->mailboxFromFolder(folder), dateFilter, lastSeenUid); } else { return imap->fetchUidsSince(imap->mailboxFromFolder(folder), dateFilter); } } else { SinkLogCtx(mLogCtx) << "Fetching messages."; return imap->fetchUids(imap->mailboxFromFolder(folder)); } }(); return job.then([=](const QVector &uidsToFetch) { SinkTraceCtx(mLogCtx) << "Received result set " << uidsToFetch; SinkTraceCtx(mLogCtx) << "About to fetch mail" << folder.path(); const auto lastSeenUid = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); //Make sure the uids are sorted in reverse order and drop everything below lastSeenUid (so we don't refetch what we already have QVector filteredAndSorted = uidsToFetch; qSort(filteredAndSorted.begin(), filteredAndSorted.end(), qGreater()); const auto lowerBound = qLowerBound(filteredAndSorted.begin(), filteredAndSorted.end(), lastSeenUid, qGreater()); if (lowerBound != filteredAndSorted.end()) { filteredAndSorted.erase(lowerBound, filteredAndSorted.end()); } const qint64 lowerBoundUid = filteredAndSorted.isEmpty() ? 0 : filteredAndSorted.last(); auto maxUid = QSharedPointer::create(0); if (!filteredAndSorted.isEmpty()) { *maxUid = filteredAndSorted.first(); } SinkTraceCtx(mLogCtx) << "Uids to fetch: " << filteredAndSorted; bool headersOnly = false; const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); return imap->fetchMessages(folder, filteredAndSorted, headersOnly, [=](const Message &m) { if (*maxUid < m.uid) { *maxUid = m.uid; } synchronizeMails(folderRemoteId, folderLocalId, m); }, [=](int progress, int total) { reportProgress(progress, total, QByteArrayList{} << folderLocalId); //commit every 10 messages - if ((progress % 10) == 0) { + if ((progress % 100) == 0) { commit(); } }) .then([=] { SinkLogCtx(mLogCtx) << "Highest found uid: " << *maxUid << folder.path(); if (*maxUid > 0) { syncStore().writeValue(folderRemoteId, "uidnext", QByteArray::number(*maxUid)); } else { if (serverUidNext) { SinkLogCtx(mLogCtx) << "Storing the server side uidnext: " << serverUidNext << folder.path(); //If we don't receive a mail we should still record the updated uidnext value. syncStore().writeValue(folderRemoteId, "uidnext", QByteArray::number(serverUidNext - 1)); } } syncStore().writeValue(folderRemoteId, "fullsetLowerbound", QByteArray::number(lowerBoundUid)); commit(); }); }); }) .then([=] { bool ok = false; const auto latestHeaderFetched = syncStore().readValue(folderRemoteId, "latestHeaderFetched").toLongLong(); const auto fullsetLowerbound = syncStore().readValue(folderRemoteId, "fullsetLowerbound").toLongLong(&ok); if (ok && latestHeaderFetched <= fullsetLowerbound) { SinkLogCtx(mLogCtx) << "Fetching headers until: " << fullsetLowerbound; return imap->fetchUids(imap->mailboxFromFolder(folder)) .then([=] (const QVector &uids) { //sort in reverse order and remove everything greater than fullsetLowerbound QVector toFetch = uids; qSort(toFetch.begin(), toFetch.end(), qGreater()); if (fullsetLowerbound) { auto upperBound = qUpperBound(toFetch.begin(), toFetch.end(), fullsetLowerbound, qGreater()); if (upperBound != toFetch.begin()) { toFetch.erase(toFetch.begin(), upperBound); } } SinkLogCtx(mLogCtx) << "Fetching headers for: " << toFetch; bool headersOnly = true; const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); return imap->fetchMessages(folder, toFetch, headersOnly, [=](const Message &m) { synchronizeMails(folderRemoteId, folderLocalId, m); }, [=](int progress, int total) { reportProgress(progress, total, QByteArrayList{} << folderLocalId); //commit every 100 messages if ((progress % 100) == 0) { commit(); } }); }) .then([=] { SinkLogCtx(mLogCtx) << "Headers fetched: " << folder.path(); syncStore().writeValue(folderRemoteId, "latestHeaderFetched", QByteArray::number(fullsetLowerbound)); commit(); }); } else { SinkLogCtx(mLogCtx) << "No additional headers to fetch."; } return KAsync::null(); }) //Finally remove messages that are no longer existing on the server. .then([=] { //TODO do an examine with QRESYNC and remove VANISHED messages if supported instead return imap->fetchUids(folder).then([=](const QVector &uids) { SinkTraceCtx(mLogCtx) << "Syncing removals: " << folder.path(); synchronizeRemovals(folderRemoteId, uids.toList().toSet()); commit(); }); }); } Sink::QueryBase applyMailDefaults(const Sink::QueryBase &query) { if (mDaysToSync > 0) { auto defaultDateFilter = QDate::currentDate().addDays(0 - mDaysToSync); auto queryWithDefaults = query; if (!queryWithDefaults.hasFilter()) { queryWithDefaults.filter(ApplicationDomain::Mail::Date::name, QVariant::fromValue(defaultDateFilter)); } return queryWithDefaults; } return query; } QList getSyncRequests(const Sink::QueryBase &query) Q_DECL_OVERRIDE { QList list; if (query.type() == ApplicationDomain::getTypeName()) { auto request = Synchronizer::SyncRequest{applyMailDefaults(query)}; if (query.hasFilter(ApplicationDomain::Mail::Folder::name)) { request.applicableEntities << query.getFilter(ApplicationDomain::Mail::Folder::name).value.toByteArray(); } list << request; } else if (query.type() == ApplicationDomain::getTypeName()) { list << Synchronizer::SyncRequest{query}; } else { list << Synchronizer::SyncRequest{Sink::QueryBase(ApplicationDomain::getTypeName())}; //This request depends on the previous one so we flush first. list << Synchronizer::SyncRequest{applyMailDefaults(Sink::QueryBase(ApplicationDomain::getTypeName())), QByteArray{}, Synchronizer::SyncRequest::RequestFlush}; } return list; } QByteArray getFolderFromLocalId(const QByteArray &id) { auto mailRemoteId = syncStore().resolveLocalId(ApplicationDomain::getTypeName(), id); if (mailRemoteId.isEmpty()) { return {}; } return folderIdFromMailRid(mailRemoteId); } void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue) Q_DECL_OVERRIDE { auto isIndividualMailSync = [](const Synchronizer::SyncRequest &request) { if (request.requestType == SyncRequest::Synchronization) { const auto query = request.query; if (query.type() == ApplicationDomain::getTypeName()) { return !query.ids().isEmpty(); } } return false; }; if (isIndividualMailSync(request)) { auto newId = request.query.ids().first(); auto requestFolder = getFolderFromLocalId(newId); if (requestFolder.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to find folder for local id. Ignoring request: " << request.query; return; } for (auto &r : queue) { if (isIndividualMailSync(r)) { auto queueFolder = getFolderFromLocalId(r.query.ids().first()); if (requestFolder == queueFolder) { //Merge r.query.filter(newId); SinkTrace() << "Merging request " << request.query; SinkTrace() << " to " << r.query; return; } } } } queue << request; } KAsync::Job login(QSharedPointer imap) { SinkTrace() << "Connecting to:" << mServer << mPort; SinkTrace() << "as:" << mUser; return imap->login(mUser, secret()) .addToContext(imap); } KAsync::Job> getFolderList(QSharedPointer imap, const Sink::QueryBase &query) { if (query.hasFilter()) { //If we have a folder filter fetch full payload of date-range & all headers QVector folders; auto folderFilter = query.getFilter(); auto localIds = resolveFilter(folderFilter); auto folderRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName(), localIds); for (const auto &r : folderRemoteIds) { Q_ASSERT(!r.isEmpty()); folders << Folder{r}; } return KAsync::value(folders); } else { //Otherwise fetch full payload for daterange auto folderList = QSharedPointer>::create(); return imap->fetchFolders([folderList](const Folder &folder) { if (!folder.noselect && folder.subscribed) { *folderList << folder; } }) .onError([](const KAsync::Error &error) { SinkWarning() << "Folder list sync failed."; }) .then([folderList] { return *folderList; } ); } } KAsync::Error getError(const KAsync::Error &error) { if (error) { switch(error.errorCode) { case Imap::CouldNotConnectError: return {ApplicationDomain::ConnectionError, error.errorMessage}; case Imap::SslHandshakeError: return {ApplicationDomain::LoginError, error.errorMessage}; case Imap::LoginFailed: return {ApplicationDomain::LoginError, error.errorMessage}; case Imap::HostNotFoundError: return {ApplicationDomain::NoServerError, error.errorMessage}; case Imap::ConnectionLost: return {ApplicationDomain::ConnectionLostError, error.errorMessage}; case Imap::MissingCredentialsError: return {ApplicationDomain::MissingCredentialsError, error.errorMessage}; default: return {ApplicationDomain::UnknownError, error.errorMessage}; } } return {}; } KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE { if (!QUrl{mServer}.isValid()) { return KAsync::error(ApplicationDomain::ConfigurationError, "Invalid server url: " + mServer); } auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode, &mSessionCache); if (query.type() == ApplicationDomain::getTypeName()) { return login(imap) .then([=] { auto folderList = QSharedPointer>::create(); return imap->fetchFolders([folderList](const Folder &folder) { *folderList << folder; }) .then([=]() { synchronizeFolders(*folderList); return *folderList; }) //The rest is only to check for new messages. .each([=](const Imap::Folder &folder) { if (!folder.noselect && folder.subscribed) { return imap->examine(folder) .then([=](const SelectResult &result) { const auto folderRemoteId = folderRid(folder); auto lastSeenUid = syncStore().readValue(folderRemoteId, "uidnext").toLongLong(); SinkTraceCtx(mLogCtx) << "Checking for new messages." << folderRemoteId << " Last seen uid: " << lastSeenUid << " Uidnext: " << result.uidNext; if (result.uidNext > (lastSeenUid + 1)) { const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); emitNotification(Notification::Info, ApplicationDomain::NewContentAvailable, {}, {}, {{folderLocalId}}); } }).then([=] (const KAsync::Error &error) { if (error) { SinkWarningCtx(mLogCtx) << "Examine failed: " << error; if (error.errorCode == Imap::CommandFailed) { //Ignore the error because we don't want to fail the synchronization for all folders return KAsync::null(); } return KAsync::error(error); } return KAsync::null(); }); } return KAsync::null(); }); }) .then([=] (const KAsync::Error &error) { return imap->logout() .then(KAsync::error(getError(error))); }); } else if (query.type() == ApplicationDomain::getTypeName()) { //TODO //if we have a folder filter: //* execute the folder query and resolve the results to the remote identifier //* query only those folders //if we have a date filter: //* apply the date filter to the fetch //if we have no folder filter: //* fetch list of folders from server directly and sync (because we have no guarantee that the folder sync was already processed by the pipeline). return login(imap) .then([=] { if (!query.ids().isEmpty()) { //If we have mail id's simply fetch the full payload of those mails QVector toFetch; auto mailRemoteIds = syncStore().resolveLocalIds(ApplicationDomain::getTypeName(), query.ids()); QByteArray folderRemoteId; for (const auto &r : mailRemoteIds) { const auto folderLocalId = folderIdFromMailRid(r); auto f = syncStore().resolveLocalId(ApplicationDomain::getTypeName(), folderLocalId); if (folderRemoteId.isEmpty()) { folderRemoteId = f; } else { if (folderRemoteId != f) { SinkWarningCtx(mLogCtx) << "Not all messages come from the same folder " << r << folderRemoteId << ". Skipping message."; continue; } } toFetch << uidFromMailRid(r); } SinkLog() << "Fetching messages: " << toFetch << folderRemoteId; bool headersOnly = false; const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, folderRemoteId); return imap->fetchMessages(Folder{folderRemoteId}, toFetch, headersOnly, [=](const Message &m) { synchronizeMails(folderRemoteId, folderLocalId, m); }, [=](int progress, int total) { reportProgress(progress, total, QByteArrayList{} << folderLocalId); //commit every 100 messages if ((progress % 100) == 0) { commit(); } }); } else { //Otherwise we sync the folder(s) bool syncHeaders = query.hasFilter(); const QDate dateFilter = [&] { auto filter = query.getFilter(); if (filter.value.canConvert()) { SinkLog() << " with date-range " << filter.value.value(); return filter.value.value(); } return QDate{}; }(); //FIXME If we were able to to flush in between we could just query the local store for the folder list. return getFolderList(imap, query) .then([=](const QVector &folders) { auto job = KAsync::null(); for (const auto &folder : folders) { job = job.then([=] { if (aborting()) { return KAsync::null(); } return synchronizeFolder(imap, folder, dateFilter, syncHeaders) .then([=](const KAsync::Error &error) { if (error) { if (error.errorCode == Imap::CommandFailed) { SinkWarning() << "Continuing after protocol error: " << folder.path() << "Error: " << error; //Ignore protocol-level errors and continue return KAsync::null(); } SinkWarning() << "Aborting on error: " << folder.path() << "Error: " << error; //Abort otherwise, e.g. if we disconnected return KAsync::error(error); } return KAsync::null(); }); }); } return job; }); } }) .then([=] (const KAsync::Error &error) { return imap->logout() .then(KAsync::error(getError(error))); }); } return KAsync::error("Nothing to do"); } static QByteArray ensureCRLF(const QByteArray &data) { auto index = data.indexOf('\n'); if (index > 0 && data.at(index - 1) == '\r') { //First line is LF-only terminated //Convert back and forth in case there's a mix. We don't want to expand CRLF into CRCRLF. return KMime::LFtoCRLF(KMime::CRLFtoLF(data)); } else { return data; } } static bool validateContent(const QByteArray &data) { if (data.isEmpty()) { SinkError() << "No data available."; return false; } if (data.contains('\0')) { SinkError() << "Data contains NUL, this will fail with IMAP."; return false; } return true; } KAsync::Job replay(const ApplicationDomain::Mail &mail, Sink::Operation operation, const QByteArray &oldRemoteId, const QList &changedProperties) Q_DECL_OVERRIDE { if (operation != Sink::Operation_Creation) { if(oldRemoteId.isEmpty()) { SinkWarning() << "Tried to replay modification without old remoteId."; // Since we can't recover from the situation we just skip over the revision. // This can for instance happen if creation failed, and we then process a removal or modification. return KAsync::null(); } } auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode, &mSessionCache); auto login = imap->login(mUser, secret()); KAsync::Job job = KAsync::null(); if (operation == Sink::Operation_Creation) { const QString mailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); const auto content = ensureCRLF(mail.getMimeMessage()); if (!validateContent(content)) { SinkError() << "Validation failed during creation replay " << mail.identifier() << "\n Content:" << content; //We can't recover from this other than deleting the mail, so we skip it. return KAsync::null(); } const auto flags = getFlags(mail); const QDateTime internalDate = mail.getDate(); job = login.then(imap->append(mailbox, content, flags, internalDate)) .addToContext(imap) .then([mail](qint64 uid) { const auto remoteId = assembleMailRid(mail, uid); SinkTrace() << "Finished creating a new mail: " << remoteId; return remoteId; }); } else if (operation == Sink::Operation_Removal) { const auto folderId = folderIdFromMailRid(oldRemoteId); const QString mailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); const auto uid = uidFromMailRid(oldRemoteId); SinkTrace() << "Removing a mail: " << oldRemoteId << "in the mailbox: " << mailbox; KIMAP2::ImapSet set; set.add(uid); job = login.then(imap->remove(mailbox, set)) .then([imap, oldRemoteId] { SinkTrace() << "Finished removing a mail: " << oldRemoteId; return QByteArray(); }); } else if (operation == Sink::Operation_Modification) { const QString mailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); const auto uid = uidFromMailRid(oldRemoteId); SinkTrace() << "Modifying a mail: " << oldRemoteId << " in the mailbox: " << mailbox << changedProperties; auto flags = getFlags(mail); const bool messageMoved = changedProperties.contains(ApplicationDomain::Mail::Folder::name); const bool messageChanged = changedProperties.contains(ApplicationDomain::Mail::MimeMessage::name); if (messageChanged || messageMoved) { const auto folderId = folderIdFromMailRid(oldRemoteId); const QString oldMailbox = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folderId); const auto content = ensureCRLF(mail.getMimeMessage()); if (!validateContent(content)) { SinkError() << "Validation failed during modification replay " << mail.identifier() << "\n Content:" << content; //We can't recover from this other than deleting the mail, so we skip it. return KAsync::null(); } const QDateTime internalDate = mail.getDate(); SinkTrace() << "Replacing message. Old mailbox: " << oldMailbox << "New mailbox: " << mailbox << "Flags: " << flags << "Content: " << content; KIMAP2::ImapSet set; set.add(uid); job = login.then(imap->append(mailbox, content, flags, internalDate)) .addToContext(imap) .then([=](qint64 uid) { const auto remoteId = assembleMailRid(mail, uid); SinkTrace() << "Finished creating a modified mail: " << remoteId; return imap->remove(oldMailbox, set).then(KAsync::value(remoteId)); }); } else { SinkTrace() << "Updating flags only."; KIMAP2::ImapSet set; set.add(uid); job = login.then(imap->select(mailbox)) .addToContext(imap) .then(imap->storeFlags(set, flags)) .then([=] { SinkTrace() << "Finished modifying mail"; return oldRemoteId; }); } } return job .then([=] (const KAsync::Error &error, const QByteArray &remoteId) { if (error) { SinkWarning() << "Error during changereplay: " << error.errorMessage; return imap->logout() .then(KAsync::error(getError(error))); } return imap->logout() .then(KAsync::value(remoteId)); }); } KAsync::Job replay(const ApplicationDomain::Folder &folder, Sink::Operation operation, const QByteArray &oldRemoteId, const QList &changedProperties) Q_DECL_OVERRIDE { if (operation != Sink::Operation_Creation) { if(oldRemoteId.isEmpty()) { Q_ASSERT(false); return KAsync::error("Tried to replay modification without old remoteId."); } } auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode, &mSessionCache); auto login = imap->login(mUser, secret()); if (operation == Sink::Operation_Creation) { QString parentFolder; if (!folder.getParent().isEmpty()) { parentFolder = syncStore().resolveLocalId(ENTITY_TYPE_FOLDER, folder.getParent()); } SinkTraceCtx(mLogCtx) << "Creating a new folder: " << parentFolder << folder.getName(); auto rid = QSharedPointer::create(); auto createFolder = login.then(imap->createSubfolder(parentFolder, folder.getName())) .then([this, imap, rid](const QString &createdFolder) { SinkTraceCtx(mLogCtx) << "Finished creating a new folder: " << createdFolder; *rid = createdFolder.toUtf8(); }); if (folder.getSpecialPurpose().isEmpty()) { return createFolder .then([rid](){ return *rid; }); } else { //We try to merge special purpose folders first auto specialPurposeFolders = QSharedPointer>::create(); auto mergeJob = imap->login(mUser, secret()) .then(imap->fetchFolders([=](const Imap::Folder &folder) { if (SpecialPurpose::isSpecialPurposeFolderName(folder.name())) { specialPurposeFolders->insert(SpecialPurpose::getSpecialPurposeType(folder.name()), folder.path()); }; })) .then([this, specialPurposeFolders, folder, imap, parentFolder, rid]() -> KAsync::Job { for (const auto &purpose : folder.getSpecialPurpose()) { if (specialPurposeFolders->contains(purpose)) { auto f = specialPurposeFolders->value(purpose); SinkTraceCtx(mLogCtx) << "Merging specialpurpose folder with: " << f << " with purpose: " << purpose; *rid = f.toUtf8(); return KAsync::null(); } } SinkTraceCtx(mLogCtx) << "No match found for merging, creating a new folder"; return imap->createSubfolder(parentFolder, folder.getName()) .then([this, imap, rid](const QString &createdFolder) { SinkTraceCtx(mLogCtx) << "Finished creating a new folder: " << createdFolder; *rid = createdFolder.toUtf8(); }); }) .then([rid](){ return *rid; }); return mergeJob; } } else if (operation == Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Removing a folder: " << oldRemoteId; return login.then(imap->remove(oldRemoteId)) .then([this, oldRemoteId, imap] { SinkTraceCtx(mLogCtx) << "Finished removing a folder: " << oldRemoteId; return QByteArray(); }); } else if (operation == Sink::Operation_Modification) { SinkTraceCtx(mLogCtx) << "Renaming a folder: " << oldRemoteId << folder.getName(); auto rid = QSharedPointer::create(); return login.then(imap->renameSubfolder(oldRemoteId, folder.getName())) .then([this, imap, rid](const QString &createdFolder) { SinkTraceCtx(mLogCtx) << "Finished renaming a folder: " << createdFolder; *rid = createdFolder.toUtf8(); }) .then([rid] { return *rid; }); } return KAsync::null(); } public: QString mServer; int mPort; Imap::EncryptionMode mEncryptionMode = Imap::NoEncryption; QString mUser; int mDaysToSync = 0; QByteArray mResourceInstanceIdentifier; Imap::SessionCache mSessionCache; }; class ImapInspector : public Sink::Inspector { public: ImapInspector(const Sink::ResourceContext &resourceContext) : Sink::Inspector(resourceContext) { } protected: KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE { auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); Sink::Storage::EntityStore entityStore(mResourceContext, {"imapresource"}); auto syncStore = QSharedPointer::create(synchronizationTransaction); SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; if (domainType == ENTITY_TYPE_MAIL) { const auto mail = entityStore.readLatest(entityId); const auto folder = entityStore.readLatest(mail.getFolder()); const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier()); if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) { //There is no remote id to find if we expect the message to not exist if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType && !expectedValue.toBool()) { return KAsync::null(); } SinkWarning() << "Missing remote id for folder or mail. " << mailRemoteId << folderRemoteId; return KAsync::error(); } const auto uid = uidFromMailRid(mailRemoteId); SinkTrace() << "Mail remote id: " << folderRemoteId << mailRemoteId << mail.identifier() << folder.identifier(); KIMAP2::ImapSet set; set.add(uid); if (set.isEmpty()) { return KAsync::error(1, "Couldn't determine uid of mail."); } KIMAP2::FetchJob::FetchScope scope; scope.mode = KIMAP2::FetchJob::FetchScope::Full; auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode); auto messageByUid = QSharedPointer>::create(); SinkTrace() << "Connecting to:" << mServer << mPort; SinkTrace() << "as:" << mUser; auto inspectionJob = imap->login(mUser, secret()) .then(imap->select(folderRemoteId)) .then([](Imap::SelectResult){}) .then(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) { //We avoid parsing normally, so we have to do it explicitly here if (message.msg) { message.msg->parse(); } messageByUid->insert(message.uid, message); })); if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { if (property == "unread") { return inspectionJob.then([=] { auto msg = messageByUid->value(uid); if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { return KAsync::error(1, "Expected unread but couldn't find it."); } if (!expectedValue.toBool() && !msg.flags.contains(Imap::Flags::Seen)) { return KAsync::error(1, "Expected read but couldn't find it."); } return KAsync::null(); }); } if (property == "subject") { return inspectionJob.then([=] { auto msg = messageByUid->value(uid); if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { return KAsync::error(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); } return KAsync::null(); }); } } if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { return inspectionJob.then([=] { if (!messageByUid->contains(uid)) { SinkWarning() << "Existing messages are: " << messageByUid->keys(); SinkWarning() << "We're looking for: " << uid; return KAsync::error(1, "Couldn't find message: " + mailRemoteId); } return KAsync::null(); }); } } if (domainType == ENTITY_TYPE_FOLDER) { const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); const auto folder = entityStore.readLatest(entityId); if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { SinkLog() << "Inspecting cache integrity" << remoteId; int expectedCount = 0; Index index("mail.index.folder", transaction); index.lookup(entityId, [&](const QByteArray &sinkId) { expectedCount++; }, [&](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); auto set = KIMAP2::ImapSet::fromImapSequenceSet("1:*"); KIMAP2::FetchJob::FetchScope scope; scope.mode = KIMAP2::FetchJob::FetchScope::Headers; auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode); auto messageByUid = QSharedPointer>::create(); return imap->login(mUser, secret()) .then(imap->select(remoteId)) .then(imap->fetch(set, scope, [=](const Imap::Message message) { messageByUid->insert(message.uid, message); })) .then([imap, messageByUid, expectedCount] { if (messageByUid->size() != expectedCount) { return KAsync::error(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); } return KAsync::null(); }); } if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { auto folderByPath = QSharedPointer>::create(); auto folderByName = QSharedPointer>::create(); auto imap = QSharedPointer::create(mServer, mPort, mEncryptionMode); auto inspectionJob = imap->login(mUser, secret()) .then(imap->fetchFolders([=](const Imap::Folder &f) { *folderByPath << f.path(); *folderByName << f.name(); })) .then([folderByName, folderByPath, folder, remoteId, imap] { if (!folderByName->contains(folder.getName())) { SinkWarning() << "Existing folders are: " << *folderByPath; SinkWarning() << "We're looking for: " << folder.getName(); return KAsync::error(1, "Wrong folder name: " + remoteId); } return KAsync::null(); }); return inspectionJob; } } return KAsync::null(); } public: QString mServer; int mPort; Imap::EncryptionMode mEncryptionMode = Imap::NoEncryption; QString mUser; }; class FolderCleanupPreprocessor : public Sink::Preprocessor { public: virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE { //Remove all mails of a folder when removing the folder. const auto revision = entityStore().maxRevision(); entityStore().indexLookup(oldEntity.identifier(), [&] (const QByteArray &identifier) { deleteEntity(ApplicationDomain::ApplicationDomainType{{}, identifier, revision, {}}, ApplicationDomain::getTypeName(), false); }); } }; ImapResource::ImapResource(const ResourceContext &resourceContext) : Sink::GenericResource(resourceContext) { auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); auto server = config.value("server").toString(); auto port = config.value("port").toInt(); auto user = config.value("username").toString(); auto daysToSync = config.value("daysToSync", 14).toInt(); auto starttls = config.value("starttls", false).toBool(); auto encryption = Imap::NoEncryption; if (server.startsWith("imaps")) { encryption = Imap::Tls; } if (starttls) { encryption = Imap::Starttls; } if (server.startsWith("imap")) { server.remove("imap://"); server.remove("imaps://"); } if (server.contains(':')) { auto list = server.split(':'); server = list.at(0); port = list.at(1).toInt(); } //Backwards compatibilty //For kolabnow we assumed that port 143 means starttls if (encryption == Imap::Tls && port == 143) { encryption = Imap::Starttls; } if (!QSslSocket::supportsSsl()) { SinkWarning() << "Qt doesn't support ssl. This is likely a distribution/packaging problem."; //On windows this means that the required ssl dll's are missing SinkWarning() << "Ssl Library Build Version Number: " << QSslSocket::sslLibraryBuildVersionString(); SinkWarning() << "Ssl Library Runtime Version Number: " << QSslSocket::sslLibraryVersionString(); } else { SinkTrace() << "Ssl support available"; SinkTrace() << "Ssl Library Build Version Number: " << QSslSocket::sslLibraryBuildVersionString(); SinkTrace() << "Ssl Library Runtime Version Number: " << QSslSocket::sslLibraryVersionString(); } auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mServer = server; synchronizer->mPort = port; synchronizer->mEncryptionMode = encryption; synchronizer->mUser = user; synchronizer->mDaysToSync = daysToSync; setupSynchronizer(synchronizer); auto inspector = QSharedPointer::create(resourceContext); inspector->mServer = server; inspector->mPort = port; inspector->mEncryptionMode = encryption; inspector->mUser = user; setupInspector(inspector); setupPreprocessors(ENTITY_TYPE_MAIL, {new SpecialPurposeProcessor, new MailPropertyExtractor}); setupPreprocessors(ENTITY_TYPE_FOLDER, {new FolderCleanupPreprocessor}); } ImapResourceFactory::ImapResourceFactory(QObject *parent) : Sink::ResourceFactory(parent, {Sink::ApplicationDomain::ResourceCapabilities::Mail::mail, Sink::ApplicationDomain::ResourceCapabilities::Mail::folder, Sink::ApplicationDomain::ResourceCapabilities::Mail::storage, Sink::ApplicationDomain::ResourceCapabilities::Mail::drafts, Sink::ApplicationDomain::ResourceCapabilities::Mail::folderhierarchy, Sink::ApplicationDomain::ResourceCapabilities::Mail::trash, Sink::ApplicationDomain::ResourceCapabilities::Mail::sent} ) { } Sink::Resource *ImapResourceFactory::createResource(const ResourceContext &context) { return new ImapResource(context); } void ImapResourceFactory::registerFacades(const QByteArray &name, Sink::FacadeFactory &factory) { factory.registerFacade>(name); factory.registerFacade>(name); } void ImapResourceFactory::registerAdaptorFactories(const QByteArray &name, Sink::AdaptorFactoryRegistry ®istry) { registry.registerFactory>(name); registry.registerFactory>(name); } void ImapResourceFactory::removeDataFromDisk(const QByteArray &instanceIdentifier) { ImapResource::removeFromDisk(instanceIdentifier); } #include "imapresource.moc" diff --git a/examples/imapresource/tests/CMakeLists.txt b/examples/imapresource/tests/CMakeLists.txt index 4d26a9f5..81dfdcf1 100644 --- a/examples/imapresource/tests/CMakeLists.txt +++ b/examples/imapresource/tests/CMakeLists.txt @@ -1,25 +1,27 @@ set(CMAKE_AUTOMOC ON) include_directories( ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../../tests/hawd ${CMAKE_CURRENT_BINARY_DIR}/../../../tests/hawd ) include(SinkTest) auto_tests ( imapserverproxytest imapmailtest imapmailsynctest ) target_link_libraries(imapserverproxytest sink_resource_imap) target_link_libraries(imapmailtest sink_resource_imap) target_link_libraries(imapmailsynctest sink_resource_imap) manual_tests ( imapmailsyncbenchmark + imapmailsyncresponsivenesstest ) target_link_libraries(imapmailsyncbenchmark sink_resource_imap) +target_link_libraries(imapmailsyncresponsivenesstest sink_resource_imap) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/resetmailbox.sh DESTINATION bin PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/populatemailbox.sh DESTINATION bin PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ) diff --git a/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain:2,S b/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain:2,S index 91231b5d..c9c7db9c 100644 --- a/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain:2,S +++ b/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain:2,S @@ -1,72 +1,58 @@ Return-Path: Received: from compute4.internal (compute4.nyi.mail.srv.osa [10.202.2.44]) by slots3a1p1 (Cyrus git2.5+0-git-fastmail-8998) with LMTPA; Mon, 11 Mar 2013 14:28:42 -0400 X-Sieve: CMU Sieve 2.4 X-Spam-score: 0.0 X-Spam-hits: BAYES_00 -1.9, RCVD_IN_DNSWL_MED -2.3, RP_MATCHES_RCVD -0.704, LANGUAGES unknown, BAYES_USED global, SA_VERSION 3.3.1 X-Spam-source: IP='46.4.96.248', Host='postbox.kde.org', Country='unk', FromHeader='org', MailFrom='org' X-Spam-charsets: plain='us-ascii' X-Resolved-to: chrigi_1@fastmail.fm X-Delivered-to: chrigi_1@fastmail.fm X-Mail-from: nepomuk-bounces@kde.org Received: from mx4.nyi.mail.srv.osa ([10.202.2.203]) by compute4.internal (LMTPProxy); Mon, 11 Mar 2013 14:28:42 -0400 Received: from postbox.kde.org (postbox.kde.org [46.4.96.248]) by mx4.messagingengine.com (Postfix) with ESMTP id 1C9D2440F88 for ; Mon, 11 Mar 2013 14:28:42 -0400 (EDT) Received: from postbox.kde.org (localhost [IPv6:::1]) by postbox.kde.org (Postfix) with ESMTP id 00FFEB3732B; Mon, 11 Mar 2013 18:28:40 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=simple/simple; d=kde.org; s=default; t=1363026520; bh=cOdvyBAJJ8ho64q0H7rxkl+cB2y6TiyVOX0fO3yZ64U=; h=Date:From:To:Message-ID:In-Reply-To:References:MIME-Version: Subject:List-Id:List-Unsubscribe:List-Archive:List-Post:List-Help: List-Subscribe:Content-Type:Content-Transfer-Encoding:Sender; b=dv dJAFu+6JCuNun5WIuP4ysfKpLh0DeuhEEfy2cQavUGMICJ27k7tI73x6gN37V5Q/evJ NDFna3/IhNBsAQeLiXs28HKxzcVhbnq5jdFR6fbyo6k1fOKt5vTT1GTDZ+3zIGPD1CU ioDBGxPb/Ds6gee90tjadOj6o+Oc+2ZSq94= X-Original-To: nepomuk@kde.org X-Remote-Delivered-To: nepomuk@localhost.kde.org Received: from build.kde.org (build.kde.org [IPv6:2a01:4f8:160:9363::5]) by postbox.kde.org (Postfix) with ESMTP id 4491CB3732B for ; Mon, 11 Mar 2013 18:28:27 +0000 (UTC) Received: from localhost ([127.0.0.1]) by build.kde.org with esmtp (Exim 4.72) (envelope-from ) id 1UF7SV-0000gs-11 for nepomuk@kde.org; Mon, 11 Mar 2013 18:28:27 +0000 Date: Mon, 11 Mar 2013 18:28:27 +0000 (UTC) From: KDE CI System To: nepomuk@kde.org -Message-ID: <1977027405.27.1363026507008.JavaMail.jenkins@build> -In-Reply-To: <880663748.26.1363026023717.JavaMail.jenkins@build> -References: <880663748.26.1363026023717.JavaMail.jenkins@build> MIME-Version: 1.0 -X-Jenkins-Job: nepomuk-core_stable -X-Jenkins-Result: UNSTABLE -X-Scanned-By: MIMEDefang 2.71 on 46.4.96.248 Subject: [Nepomuk] Jenkins build is still unstable: nepomuk-core_stable #158 X-BeenThere: nepomuk@kde.org X-Mailman-Version: 2.1.14 Precedence: list -List-Id: The Semantic KDE -List-Unsubscribe: , - -List-Archive: -List-Post: -List-Help: -List-Subscribe: , - Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: nepomuk-bounces@kde.org Sender: nepomuk-bounces@kde.org X-Truedomain: NotChecked See _______________________________________________ Nepomuk mailing list Nepomuk@kde.org https://mail.kde.org/mailman/listinfo/nepomuk diff --git a/examples/imapresource/tests/imapmailsyncresponsivenesstest.cpp b/examples/imapresource/tests/imapmailsyncresponsivenesstest.cpp new file mode 100644 index 00000000..937e1c33 --- /dev/null +++ b/examples/imapresource/tests/imapmailsyncresponsivenesstest.cpp @@ -0,0 +1,153 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include +#include + +#include "../imapresource.h" +#include "../imapserverproxy.h" +#include "tests/testutils.h" + +#include "common/test.h" +#include "common/domain/applicationdomaintype.h" +#include "common/store.h" +#include "common/resourcecontrol.h" +#include "common/secretstore.h" +#include "common/commands.h" +#include "common/notification.h" + +#include +#include + +using namespace Sink; +using namespace Sink::ApplicationDomain; + +/** + * Test if the system remains somewhat responsive under load. + */ +class ImapMailSyncResponsivenessTest : public QObject +{ + Q_OBJECT + + bool isBackendAvailable() + { + QTcpSocket socket; + socket.connectToHost("localhost", 143); + return socket.waitForConnected(200); + } + + void resetTestEnvironment() + { + system("populatemailbox.sh"); + } + + Sink::ApplicationDomain::SinkResource createResource() + { + auto resource = ApplicationDomain::ImapResource::create("account1"); + resource.setProperty("server", "localhost"); + resource.setProperty("port", 143); + resource.setProperty("username", "doe"); + Sink::SecretStore::instance().insert(resource.identifier(), "doe"); + return resource; + } + + void removeResourceFromDisk(const QByteArray &identifier) + { + ::ImapResource::removeFromDisk(identifier); + } + + QByteArray mResourceInstanceIdentifier; + +private slots: + + void initTestCase() + { + Test::initTest(); + QVERIFY(isBackendAvailable()); + resetTestEnvironment(); + auto resource = createResource(); + QVERIFY(!resource.identifier().isEmpty()); + + VERIFYEXEC(Store::create(resource)); + + mResourceInstanceIdentifier = resource.identifier(); + } + + void cleanup() + { + VERIFYEXEC(ResourceControl::shutdown(mResourceInstanceIdentifier)); + removeResourceFromDisk(mResourceInstanceIdentifier); + } + + void init() + { + VERIFYEXEC(ResourceControl::start(mResourceInstanceIdentifier)); + } + + void testSync() + { + Sink::Query query; + query.resourceFilter(mResourceInstanceIdentifier); + + QTime time; + time.start(); + + // Trigger the sync + Store::synchronize(query).exec(); + + //Repeatedly ping the resource and check if a response arrives within an acceptable timeframe + //We could improve this check by actually modifying something (it should get priority over the sync) + Sink::ResourceAccess resourceAccess(mResourceInstanceIdentifier, ""); + resourceAccess.open(); + + bool syncComplete = false; + + QObject::connect(&resourceAccess, &ResourceAccess::notification, [&syncComplete](Sink::Notification notification) { + qWarning() << "Notification" << notification; + //Sync complete + if (notification.type == Sink::Notification::Status && notification.code == Sink::ApplicationDomain::ConnectedStatus) { + syncComplete = true; + } + }); + + QTime pingTime; + for (int i = 0; i < 500; i++) { + pingTime.start(); + VERIFYEXEC(resourceAccess.sendCommand(Sink::Commands::PingCommand)); + if (pingTime.elapsed() > 500) { + if (pingTime.elapsed() > 2000) { + SinkWarning() << "Ping took: " << Sink::Log::TraceTime(pingTime.elapsed()); + // QVERIFY(pingTime.elapsed() < 2000); + } else { + SinkLog() << "Ping took: " << Sink::Log::TraceTime(pingTime.elapsed()); + } + } + if (syncComplete) { + break; + } + QTest::qWait(500); + } + + auto total = time.elapsed(); + SinkLog() << "Total took: " << Sink::Log::TraceTime(total); + } +}; + +QTEST_MAIN(ImapMailSyncResponsivenessTest) + +#include "imapmailsyncresponsivenesstest.moc" diff --git a/examples/imapresource/tests/populatemailbox.sh b/examples/imapresource/tests/populatemailbox.sh index 38a3857e..cbdb72fc 100644 --- a/examples/imapresource/tests/populatemailbox.sh +++ b/examples/imapresource/tests/populatemailbox.sh @@ -1,37 +1,39 @@ #!/bin/bash +echo "cm user.doe" | cyradm --auth PLAIN -u cyrus -w admin localhost + sudo echo "sam user.doe.* cyrus c; dm user.doe.*; cm user.doe.test; cm user.doe.Drafts; cm user.doe.Trash; sam user.doe cyrus c; " | cyradm --auth PLAIN -u cyrus -w admin localhost sudo echo "sam user.doe.* cyrus c; subscribe INBOX.test; subscribe INBOX.Drafts; subscribe INBOX.Trash; " | cyradm --auth PLAIN -u doe -w doe localhost #Create a bunch of test messages in the test folder # for i in `seq 1 5000`; # do # # sudo cp /src/sink/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain\:2\,S /var/spool/imap/d/user/doe/test/$i. # done # Because this is way faster than a loop FOLDERPATH=/var/spool/imap/d/user/doe/test SRCMESSAGE=/src/sink/examples/imapresource/tests/data/1365777830.R28.localhost.localdomain\:2\,S sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{1..1000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{1001..2000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{2001..3000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{3001..4000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{4001..5000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{5001..6000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{6001..7000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{7001..8000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{8001..9000}. sudo tee <$SRCMESSAGE >/dev/null $FOLDERPATH/{9001..10000}. sudo chown -R cyrus:mail $FOLDERPATH sudo reconstruct "user.doe.test" diff --git a/synchronizer/backtrace.cpp b/synchronizer/backtrace.cpp index bd993ccb..1321427a 100644 --- a/synchronizer/backtrace.cpp +++ b/synchronizer/backtrace.cpp @@ -1,309 +1,312 @@ /* * Copyright (C) 2016 The Qt Company Ltd. * Copyright (C) 2016 Intel Corporation. * Copyright (C) 2019 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "backtrace.h" #include //For the OS ifdefs #include #include #include #include #include #include #include #include #ifndef Q_OS_WIN #include #include #include #include #else #include #include #include # if !defined(Q_CC_MINGW) || (defined(Q_CC_MINGW) && defined(__MINGW64_VERSION_MAJOR)) #include # endif #endif #include "listener.h" -using namespace Sink; - #if defined(Q_OS_WIN) && !defined(Q_OS_WINRT) // Helper class for resolving symbol names by dynamically loading "dbghelp.dll". class DebugSymbolResolver { Q_DISABLE_COPY(DebugSymbolResolver) public: struct Symbol { Symbol() : name(Q_NULLPTR), address(0) {} const char *name; // Must be freed by caller. DWORD64 address; }; explicit DebugSymbolResolver(HANDLE process) : m_process(process), m_dbgHelpLib(0), m_symFromAddr(Q_NULLPTR) { bool success = false; m_dbgHelpLib = LoadLibraryW(L"dbghelp.dll"); if (m_dbgHelpLib) { SymInitializeType symInitialize = (SymInitializeType)(GetProcAddress(m_dbgHelpLib, "SymInitialize")); m_symFromAddr = (SymFromAddrType)(GetProcAddress(m_dbgHelpLib, "SymFromAddr")); success = symInitialize && m_symFromAddr && symInitialize(process, NULL, TRUE); } if (!success) { cleanup(); } } ~DebugSymbolResolver() { cleanup(); } bool isValid() const { return m_symFromAddr; } Symbol resolveSymbol(DWORD64 address) const { // reserve additional buffer where SymFromAddr() will store the name struct NamedSymbolInfo : public DBGHELP_SYMBOL_INFO { enum { symbolNameLength = 255 }; char name[symbolNameLength + 1]; }; Symbol result; if (!isValid()) return result; NamedSymbolInfo symbolBuffer; memset(&symbolBuffer, 0, sizeof(NamedSymbolInfo)); symbolBuffer.MaxNameLen = NamedSymbolInfo::symbolNameLength; symbolBuffer.SizeOfStruct = sizeof(DBGHELP_SYMBOL_INFO); if (!m_symFromAddr(m_process, address, 0, &symbolBuffer)) return result; result.name = qstrdup(symbolBuffer.Name); result.address = symbolBuffer.Address; return result; } private: // typedefs from DbgHelp.h/.dll struct DBGHELP_SYMBOL_INFO { // SYMBOL_INFO ULONG SizeOfStruct; ULONG TypeIndex; // Type Index of symbol ULONG64 Reserved[2]; ULONG Index; ULONG Size; ULONG64 ModBase; // Base Address of module comtaining this symbol ULONG Flags; ULONG64 Value; // Value of symbol, ValuePresent should be 1 ULONG64 Address; // Address of symbol including base address of module ULONG Register; // register holding value or pointer to value ULONG Scope; // scope of the symbol ULONG Tag; // pdb classification ULONG NameLen; // Actual length of name ULONG MaxNameLen; CHAR Name[1]; // Name of symbol }; typedef BOOL (__stdcall *SymInitializeType)(HANDLE, PCSTR, BOOL); typedef BOOL (__stdcall *SymFromAddrType)(HANDLE, DWORD64, PDWORD64, DBGHELP_SYMBOL_INFO *); void cleanup() { if (m_dbgHelpLib) { FreeLibrary(m_dbgHelpLib); } m_dbgHelpLib = 0; m_symFromAddr = Q_NULLPTR; } const HANDLE m_process; HMODULE m_dbgHelpLib; SymFromAddrType m_symFromAddr; }; #endif //Print a demangled stacktrace static void printStacktrace() { #ifndef Q_OS_WIN int skip = 1; void *callstack[128]; const int nMaxFrames = sizeof(callstack) / sizeof(callstack[0]); char buf[1024]; int nFrames = backtrace(callstack, nMaxFrames); char **symbols = backtrace_symbols(callstack, nFrames); std::ostringstream trace_buf; for (int i = skip; i < nFrames; i++) { // printf("%s\n", symbols[i]); Dl_info info; if (dladdr(callstack[i], &info) && info.dli_sname) { char *demangled = NULL; int status = -1; if (info.dli_sname[0] == '_') { demangled = abi::__cxa_demangle(info.dli_sname, NULL, 0, &status); } snprintf(buf, sizeof(buf), "%-3d %*p %s + %zd\n", i, int(2 + sizeof(void*) * 2), callstack[i], status == 0 ? demangled : info.dli_sname == 0 ? symbols[i] : info.dli_sname, (char *)callstack[i] - (char *)info.dli_saddr); free(demangled); } else { snprintf(buf, sizeof(buf), "%-3d %*p %s\n", i, int(2 + sizeof(void*) * 2), callstack[i], symbols[i]); } trace_buf << buf; } free(symbols); if (nFrames == nMaxFrames) { trace_buf << "[truncated]\n"; } std::cerr << trace_buf.str(); #else enum { maxStackFrames = 100 }; DebugSymbolResolver resolver(GetCurrentProcess()); if (resolver.isValid()) { void *stack[maxStackFrames]; fputs("\nStack:\n", stdout); const unsigned frameCount = CaptureStackBackTrace(0, DWORD(maxStackFrames), stack, NULL); for (unsigned f = 0; f < frameCount; ++f) { DebugSymbolResolver::Symbol symbol = resolver.resolveSymbol(DWORD64(stack[f])); if (symbol.name) { printf("#%3u: %s() - 0x%p\n", f + 1, symbol.name, (const void *)symbol.address); delete [] symbol.name; } else { printf("#%3u: Unable to obtain symbol\n", f + 1); } } } fputc('\n', stdout); fflush(stdout); #endif } #if defined(Q_OS_WIN) && !defined(Q_OS_WINRT) static LONG WINAPI windowsFaultHandler(struct _EXCEPTION_POINTERS *exInfo) { char appName[MAX_PATH]; if (!GetModuleFileNameA(NULL, appName, MAX_PATH)) { appName[0] = 0; } const void *exceptionAddress = exInfo->ExceptionRecord->ExceptionAddress; printf("A crash occurred in %s.\n" "Exception address: 0x%p\n" "Exception code : 0x%lx\n", appName, exceptionAddress, exInfo->ExceptionRecord->ExceptionCode); DebugSymbolResolver resolver(GetCurrentProcess()); if (resolver.isValid()) { DebugSymbolResolver::Symbol exceptionSymbol = resolver.resolveSymbol(DWORD64(exceptionAddress)); if (exceptionSymbol.name) { printf("Nearby symbol : %s\n", exceptionSymbol.name); delete [] exceptionSymbol.name; } } printStacktrace(); return EXCEPTION_EXECUTE_HANDLER; } #endif // Q_OS_WIN) && !Q_OS_WINRT static int sCounter = 0; static Listener *sListener = nullptr; static void crashHandler(int signal) { //Guard against crashing in here if (sCounter > 1) { std::_Exit(EXIT_FAILURE); } sCounter++; if (signal == SIGABRT) { std::cerr << "SIGABRT received\n"; } else if (signal == SIGSEGV) { std::cerr << "SIGSEV received\n"; } else { std::cerr << "Unexpected signal " << signal << " received\n"; } printStacktrace(); //Get the word out that we're going down if (sListener) { sListener->emergencyAbortAllConnections(); } std::fprintf(stdout, "Sleeping for 10s to attach a debugger: gdb attach %i\n", getpid()); std::this_thread::sleep_for(std::chrono::seconds(10)); // std::system("exec gdb -p \"$PPID\" -ex \"thread apply all bt\""); // This only works if we actually have xterm and X11 available // std::system("exec xterm -e gdb -p \"$PPID\""); std::_Exit(EXIT_FAILURE); } static void terminateHandler() { std::exception_ptr exptr = std::current_exception(); if (exptr != 0) { // the only useful feature of std::exception_ptr is that it can be rethrown... try { std::rethrow_exception(exptr); } catch (std::exception &ex) { std::fprintf(stderr, "Terminated due to exception: %s\n", ex.what()); } catch (...) { std::fprintf(stderr, "Terminated due to unknown exception\n"); } } else { std::fprintf(stderr, "Terminated due to unknown reason :(\n"); } std::abort(); } +void Sink::printStackTrace() +{ + printStacktrace(); +} + void Sink::setListener(Listener *listener) { sListener = listener; } void Sink::installCrashHandler() { #ifndef Q_OS_WIN std::signal(SIGSEGV, crashHandler); std::signal(SIGABRT, crashHandler); std::set_terminate(terminateHandler); #else # ifndef Q_CC_MINGW _CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_DEBUG); # endif # ifndef Q_OS_WINRT SetErrorMode(SetErrorMode(0) | SEM_NOGPFAULTERRORBOX); SetUnhandledExceptionFilter(windowsFaultHandler); # endif #endif } diff --git a/synchronizer/backtrace.h b/synchronizer/backtrace.h index 8e83c014..59d660d1 100644 --- a/synchronizer/backtrace.h +++ b/synchronizer/backtrace.h @@ -1,25 +1,26 @@ /* * Copyright (C) 2019 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #pragma once class Listener; namespace Sink { void setListener(Listener*); void installCrashHandler(); + void printStackTrace(); } diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp index 0b954f0d..a3dc761a 100644 --- a/synchronizer/main.cpp +++ b/synchronizer/main.cpp @@ -1,201 +1,227 @@ /* * Copyright (C) 2014 Aaron Seigo * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include +#include #include #ifndef Q_OS_WIN #include #else #include #include #endif #include "listener.h" #include "log.h" #include "test.h" #include "definitions.h" #include "backtrace.h" #ifdef Q_OS_OSX #include #endif /* * We capture all qt debug messages in the same process and feed it into the sink debug system. * This way we get e.g. kimap debug messages as well together with the rest. */ -void qtMessageHandler(QtMsgType type, const QMessageLogContext &context, const QString &msg) +static void qtMessageHandler(QtMsgType type, const QMessageLogContext &context, const QString &msg) { QByteArray localMsg = msg.toLocal8Bit(); switch (type) { case QtDebugMsg: Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, context.line, context.file, context.function, context.category) << msg; break; case QtInfoMsg: Sink::Log::debugStream(Sink::Log::DebugLevel::Log, context.line, context.file, context.function, context.category) << msg; break; case QtWarningMsg: Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, context.line, context.file, context.function, context.category) << msg; break; case QtCriticalMsg: Sink::Log::debugStream(Sink::Log::DebugLevel::Error, context.line, context.file, context.function, context.category) << msg; break; case QtFatalMsg: Sink::Log::debugStream(Sink::Log::DebugLevel::Error, context.line, context.file, context.function, context.category) << msg; abort(); } } -QString read(const QString &filename) +static QString read(const QString &filename) { QFile file{filename}; file.open(QIODevice::ReadOnly); return file.readAll(); } void printStats() { #if defined(Q_OS_LINUX) /* * See 'man proc' for details */ { auto statm = read("/proc/self/statm").split(' '); SinkLog() << "Program size:" << statm.value(0).toInt() << "pages"; SinkLog() << "RSS:"<< statm.value(1).toInt() << "pages"; SinkLog() << "Resident Shared:" << statm.value(2).toInt() << "pages"; SinkLog() << "Text (code):" << statm.value(3).toInt() << "pages"; SinkLog() << "Data (data + stack):" << statm.value(5).toInt() << "pages"; } { auto stat = read("/proc/self/stat").split(' '); SinkLog() << "Minor page faults: " << stat.value(10).toInt(); SinkLog() << "Children minor page faults: " << stat.value(11).toInt(); SinkLog() << "Major page faults: " << stat.value(12).toInt(); SinkLog() << "Children major page faults: " << stat.value(13).toInt(); } //Dump the complete memory map for the process // std::cout << "smaps: " << read("/proc/self/smaps").toStdString(); //Dump all sorts of stats for the process // std::cout << read("/proc/self/status").toStdString(); { auto io = read("/proc/self/io").split('\n'); QHash hash; for (const auto &s : io) { const auto parts = s.split(": "); hash.insert(parts.value(0), parts.value(1)); } SinkLog() << "Read syscalls: " << hash.value("syscr").toInt(); SinkLog() << "Write syscalls: " << hash.value("syscw").toInt(); SinkLog() << "Read from disk: " << hash.value("read_bytes").toInt() / 1024 << "kb"; SinkLog() << "Written to disk: " << hash.value("write_bytes").toInt() / 1024 << "kb"; SinkLog() << "Cancelled write bytes: " << hash.value("cancelled_write_bytes").toInt(); } #endif } +class SynchronizerApplication : public QGuiApplication +{ + Q_OBJECT +protected: + using QGuiApplication::QGuiApplication; + + QTime time; + + /* + * If we block the event loop for too long the system becomes unresponsive to user inputs, + * so we monitor it and attempt to avoid blocking behaviour + */ + bool notify(QObject *receiver, QEvent *event) override + { + time.start(); + const auto ret = QGuiApplication::notify(receiver, event); + if (time.elapsed() > 1000) { + SinkWarning() << "Blocked the eventloop for " << Sink::Log::TraceTime(time.elapsed()) << " with event " << event->type(); + } + return ret; + } +}; + int main(int argc, char *argv[]) { if (qEnvironmentVariableIsSet("SINK_GDB_DEBUG")) { #ifndef Q_OS_WIN SinkWarning() << "Running resource in debug mode and waiting for gdb to attach: gdb attach " << getpid(); raise(SIGSTOP); #endif } else { Sink::installCrashHandler(); } qInstallMessageHandler(qtMessageHandler); #ifdef Q_OS_OSX //Necessary to hide this QGuiApplication from the dock and application switcher on mac os. if (CFBundleRef mainBundle = CFBundleGetMainBundle()) { // get the application's Info Dictionary. For app bundles this would live in the bundle's Info.plist, if (CFMutableDictionaryRef infoDict = (CFMutableDictionaryRef) CFBundleGetInfoDictionary(mainBundle)) { // Add or set the "LSUIElement" key with/to value "1". This can simply be a CFString. CFDictionarySetValue(infoDict, CFSTR("LSUIElement"), CFSTR("1")); // That's it. We're now considered as an "agent" by the window server, and thus will have // neither menubar nor presence in the Dock or App Switcher. } } #endif - QGuiApplication app(argc, argv); + SynchronizerApplication app(argc, argv); app.setQuitLockEnabled(false); QByteArrayList arguments; for (int i = 0; i < argc; i++) { arguments << argv[i]; } if (arguments.contains("--test")) { SinkLog() << "Running in test-mode"; arguments.removeAll("--test"); Sink::Test::setTestModeEnabled(true); } if (arguments.count() < 3) { SinkWarning() << "Not enough args passed, no resource loaded."; return app.exec(); } const QByteArray instanceIdentifier = arguments.at(1); const QByteArray resourceType = arguments.at(2); app.setApplicationName(instanceIdentifier); Sink::Log::setPrimaryComponent(instanceIdentifier); SinkLog() << "Starting: " << instanceIdentifier << resourceType; QDir{}.mkpath(Sink::resourceStorageLocation(instanceIdentifier)); QLockFile lockfile(Sink::storageLocation() + QString("/%1.lock").arg(QString(instanceIdentifier))); lockfile.setStaleLockTime(500); if (!lockfile.tryLock(0)) { const auto error = lockfile.error(); if (error == QLockFile::LockFailedError) { qint64 pid; QString hostname, appname; lockfile.getLockInfo(&pid, &hostname, &appname); SinkWarning() << "Failed to acquire exclusive resource lock."; SinkLog() << "Pid:" << pid << "Host:" << hostname << "App:" << appname; } else { SinkError() << "Error while trying to acquire exclusive resource lock: " << error; } return -1; } auto listener = new Listener(instanceIdentifier, resourceType, &app); Sink::setListener(listener); listener->checkForUpgrade(); QObject::connect(&app, &QCoreApplication::aboutToQuit, listener, &Listener::closeAllConnections); QObject::connect(listener, &Listener::noClients, &app, &QCoreApplication::quit); auto ret = app.exec(); SinkLog() << "Exiting: " << instanceIdentifier; printStats(); return ret; } + +#include "main.moc"