diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 8b6e685b..c57ded80 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -1,366 +1,369 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "commandprocessor.h" #include #include "commands.h" #include "messagequeue.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" #include "definitions.h" #include "storage.h" #include "queuedcommand_generated.h" #include "revisionreplayed_generated.h" #include "synchronize_generated.h" static int sBatchSize = 100; // This interval directly affects the roundtrip time of single commands static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), mCommandQueues(QList() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); Q_UNUSED(ret); } mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) { flatbuffers::FlatBufferBuilder fbb; auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); Sink::FinishQueuedCommandBuffer(fbb, buffer); mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); } void CommandProcessor::processCommand(int commandId, const QByteArray &data) { switch (commandId) { case Commands::FlushCommand: processFlushCommand(data); break; case Commands::SynchronizeCommand: processSynchronizeCommand(data); break; + case Commands::AbortSynchronizationCommand: + mSynchronizer->abort(); + break; // case Commands::RevisionReplayedCommand: // processRevisionReplayedCommand(data); // break; default: { static int modifications = 0; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); modifications++; if (modifications >= sBatchSize) { mUserQueue.commit(); modifications = 0; mCommitQueueTimer.stop(); } else { mCommitQueueTimer.start(); } } }; } void CommandProcessor::processFlushCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(data.constData()); const auto flushType = buffer->type(); const auto flushId = BufferUtils::extractBufferCopy(buffer->id()); if (flushType == Sink::Flush::FlushSynchronization) { mSynchronizer->flush(flushType, flushId); } else { mUserQueue.startTransaction(); enqueueCommand(mUserQueue, Commands::FlushCommand, data); mUserQueue.commit(); } } } void CommandProcessor::processSynchronizeCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { auto buffer = Sink::Commands::GetSynchronize(data.constData()); auto timer = QSharedPointer::create(); timer->start(); Sink::QueryBase query; if (buffer->query()) { auto data = QByteArray::fromStdString(buffer->query()->str()); QDataStream stream(&data, QIODevice::ReadOnly); stream >> query; } mSynchronizer->synchronize(query); } else { SinkWarningCtx(mLogCtx) << "received invalid command"; } } // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) // { // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { // SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } void CommandProcessor::setOldestUsedRevision(qint64 revision) { mLowerBoundRevision = revision; } bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { return true; } } return false; } void CommandProcessor::process() { if (mProcessingLock) { return; } mProcessingLock = true; auto job = processPipeline() .then([this]() { mProcessingLock = false; if (messagesToProcessAvailable()) { process(); } }) .exec(); } KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); const auto data = queuedCommand->command()->Data(); const auto size = queuedCommand->command()->size(); switch (queuedCommand->commandId()) { case Sink::Commands::DeleteEntityCommand: return mPipeline->deletedEntity(data, size); case Sink::Commands::ModifyEntityCommand: return mPipeline->modifiedEntity(data, size); case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: Q_ASSERT(mInspector); return mInspector->processCommand(data, size) .then(KAsync::value(-1)); case Sink::Commands::FlushCommand: return flush(data, size) .then(KAsync::value(-1)); default: return KAsync::error(-1, "Unhandled command"); } } KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { SinkWarningCtx(mLogCtx) << "invalid buffer"; // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Sink::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); return processQueuedCommand(queuedCommand) .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } // Process all messages of this queue KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); return KAsync::start([this]() { mPipeline->startTransaction(); }) .then(KAsync::doWhile( [this, queue, time]() -> KAsync::Job { return queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) -> KAsync::Job { time->start(); return processQueuedCommand(data) .then([this, time](qint64 createdRevision) { SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) .then([queue, this](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } if (queue->isEmpty()) { return KAsync::Break; } else { return KAsync::Continue; } }); })) .then([this](const KAsync::Error &) { mPipeline->commit(); }); } KAsync::Job CommandProcessor::processPipeline() { auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { return KAsync::null(); } auto it = QSharedPointer>::create(mCommandQueues); return KAsync::doWhile( [it, this]() { auto time = QSharedPointer::create(); time->start(); auto queue = it->next(); return processQueue(queue) .then([this, time, it]() { SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); if (it->hasNext()) { return KAsync::Continue; } return KAsync::Break; }); }); } void CommandProcessor::setInspector(const QSharedPointer &inspector) { mInspector = inspector; QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); } void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } KAsync::Job CommandProcessor::flush(void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(command); const auto flushType = buffer->type(); const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; mSynchronizer->flushComplete(flushId); Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; emit notify(n); } return KAsync::null(); } return KAsync::error(-1, "Invalid flush command."); } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) { if (queue.isEmpty()) { f.setFinished(); } else { QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); } }; KAsync::Job CommandProcessor::processAllMessages() { // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. // TODO: report errors while processing sync? // TODO JOBAPI: A helper that waits for n events and then continues? return KAsync::start([this](KAsync::Future &f) { if (mCommitQueueTimer.isActive()) { auto context = new QObject; QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { delete context; f.setFinished(); }); } else { f.setFinished(); } }) .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { if (mSynchronizer->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); } }); } diff --git a/common/commands.cpp b/common/commands.cpp index 0ec2c7be..18508da3 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -1,119 +1,121 @@ /* * Copyright (C) 2014 Aaron Seigo * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "commands.h" #include #include namespace Sink { namespace Commands { QByteArray name(int commandId) { switch (commandId) { case UnknownCommand: return "Unknown"; case CommandCompletionCommand: return "Completion"; case HandshakeCommand: return "Handshake"; case RevisionUpdateCommand: return "RevisionUpdate"; case SynchronizeCommand: return "Synchronize"; case DeleteEntityCommand: return "DeleteEntity"; case ModifyEntityCommand: return "ModifyEntity"; case CreateEntityCommand: return "CreateEntity"; case SearchSourceCommand: return "SearchSource"; case ShutdownCommand: return "Shutdown"; case NotificationCommand: return "Notification"; case PingCommand: return "Ping"; case RevisionReplayedCommand: return "RevisionReplayed"; case InspectionCommand: return "Inspection"; case RemoveFromDiskCommand: return "RemoveFromDisk"; case FlushCommand: return "Flush"; case SecretCommand: return "Secret"; case UpgradeCommand: return "Upgrade"; + case AbortSynchronizationCommand: + return "AbortSynchronization"; case CustomCommand: return "Custom"; }; return QByteArray("Invalid commandId"); } int headerSize() { return sizeof(int) + (sizeof(uint) * 2); } void write(QLocalSocket *device, int messageId, int commandId) { write(device, messageId, commandId, nullptr, 0); } static void write(QLocalSocket *device, const char *buffer, uint size) { if (device->write(buffer, size) < 0) { SinkWarningCtx(Sink::Log::Context{"commands"}) << "Error while writing " << device->errorString(); } } void write(QLocalSocket *device, int messageId, int commandId, const char *buffer, uint size) { if (size > 0 && !buffer) { size = 0; } write(device, (const char *)&messageId, sizeof(int)); write(device, (const char *)&commandId, sizeof(int)); write(device, (const char *)&size, sizeof(uint)); if (buffer) { write(device, buffer, size); } //The default implementation will happily buffer 200k bytes before sending it out which doesn't make the sytem exactly responsive. //1k is arbitrary, but fits a bunch of messages at least. if (device->bytesToWrite() > 1000) { device->flush(); } } void write(QLocalSocket *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) { write(device, messageId, commandId, (const char *)fbb.GetBufferPointer(), fbb.GetSize()); } } // namespace Commands } // namespace Sink diff --git a/common/commands.h b/common/commands.h index 9ca92a37..2eb5ec58 100644 --- a/common/commands.h +++ b/common/commands.h @@ -1,65 +1,66 @@ /* * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include class QLocalSocket; namespace Sink { namespace Commands { enum CommandIds { UnknownCommand = 0, CommandCompletionCommand, HandshakeCommand, RevisionUpdateCommand, SynchronizeCommand, DeleteEntityCommand, ModifyEntityCommand, CreateEntityCommand, SearchSourceCommand, // need a buffer definition for this, but relies on Query API ShutdownCommand, NotificationCommand, PingCommand, RevisionReplayedCommand, InspectionCommand, RemoveFromDiskCommand, FlushCommand, SecretCommand, UpgradeCommand, + AbortSynchronizationCommand, CustomCommand = 0xffff }; QByteArray name(int commandId); int SINK_EXPORT headerSize(); void SINK_EXPORT write(QLocalSocket *device, int messageId, int commandId); void SINK_EXPORT write(QLocalSocket *device, int messageId, int commandId, const char *buffer, uint size); void SINK_EXPORT write(QLocalSocket *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); } } // namespace Sink diff --git a/common/listener.cpp b/common/listener.cpp index ffc25c86..7b8fb8a1 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -1,486 +1,487 @@ /* * Copyright (C) 2014 Aaron Seigo * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "listener.h" #include "common/commands.h" #include "common/resource.h" #include "common/log.h" #include "common/definitions.h" #include "common/resourcecontext.h" #include "common/adaptorfactoryregistry.h" #include "common/bufferutils.h" // commands #include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" #include "common/notification_generated.h" #include "common/revisionreplayed_generated.h" #include "common/secret_generated.h" #include #include #include #include Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), m_resourceName(resourceType), m_resourceInstanceIdentifier(resourceInstanceIdentifier), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0), m_exiting(false) { connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection); SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier; if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { m_server->removeServer(m_resourceInstanceIdentifier); if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { SinkWarning() << "Utter failure to start server"; exit(-1); } } if (m_server->isListening()) { SinkTrace() << QString("Listening on %1").arg(m_server->serverName()); } m_checkConnectionsTimer = std::unique_ptr(new QTimer); m_checkConnectionsTimer->setSingleShot(true); connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() { if (m_connections.isEmpty()) { SinkTrace() << QString("No connections, shutting down."); quit(); } }); //Give plenty of time during the first start. m_checkConnectionsTimer->start(std::chrono::milliseconds{60000}); // TODO: experiment with different timeouts // or even just drop down to invoking the method queued? => invoke queued unless we need throttling m_clientBufferProcessesTimer->setInterval(0); m_clientBufferProcessesTimer->setSingleShot(true); connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers); } Listener::~Listener() { SinkTrace() << "Shutting down " << m_resourceInstanceIdentifier; closeAllConnections(); } void Listener::checkForUpgrade() { if (loadResource().checkForUpgrade()) { //Close the resource to ensure no transactions are open m_resource.reset(nullptr); } } void Listener::emergencyAbortAllConnections() { Sink::Notification n; n.type = Sink::Notification::Status; n.message = "The resource crashed."; n.code = Sink::ApplicationDomain::ErrorStatus; notify(n); for (Client &client : m_connections) { if (client.socket) { SinkWarning() << "Sending panic"; client.socket->write("PANIC"); client.socket->waitForBytesWritten(); disconnect(client.socket, nullptr, this, nullptr); client.socket->abort(); delete client.socket; client.socket = nullptr; } } m_connections.clear(); } void Listener::closeAllConnections() { for (Client &client : m_connections) { if (client.socket) { disconnect(client.socket, nullptr, this, nullptr); client.socket->flush(); client.socket->close(); delete client.socket; client.socket = nullptr; } } m_connections.clear(); } void Listener::acceptConnection() { SinkTrace() << "Accepting connection"; QLocalSocket *socket = m_server->nextPendingConnection(); if (!socket) { SinkWarning() << "Accepted connection but didn't get a socket for it"; return; } m_connections << Client("Unknown Client", socket); connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable); connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped); m_checkConnectionsTimer->stop(); // If this is the first client, set the lower limit for revision cleanup if (m_connections.size() == 1) { loadResource().setLowerBoundRevision(0); } if (socket->bytesAvailable()) { readFromSocket(socket); } } void Listener::clientDropped() { QLocalSocket *socket = qobject_cast(sender()); if (!socket) { return; } bool dropped = false; QMutableVectorIterator it(m_connections); while (it.hasNext()) { const Client &client = it.next(); if (client.socket == socket) { dropped = true; SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket; it.remove(); break; } } if (!dropped) { SinkWarning() << "Failed to find connection for disconnected socket: " << socket; } checkConnections(); } void Listener::checkConnections() { // If this was the last client, disengage the lower limit for revision cleanup if (m_connections.isEmpty()) { loadResource().setLowerBoundRevision(std::numeric_limits::max()); } m_checkConnectionsTimer->start(std::chrono::milliseconds{1000}); } void Listener::onDataAvailable() { QLocalSocket *socket = qobject_cast(sender()); if (!socket || m_exiting) { return; } readFromSocket(socket); } void Listener::readFromSocket(QLocalSocket *socket) { SinkTrace() << "Reading from socket..."; for (Client &client : m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); if (!m_clientBufferProcessesTimer->isActive()) { m_clientBufferProcessesTimer->start(); } break; } } } void Listener::processClientBuffers() { // TODO: we should not process all clients, but iterate async over them and process // one command from each in turn to ensure all clients get fair handling of // commands? bool again = false; for (Client &client : m_connections) { if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { continue; } if (processClientBuffer(client)) { again = true; } } if (again) { m_clientBufferProcessesTimer->start(); } } void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function &callback) { bool success = true; switch (commandId) { case Sink::Commands::HandshakeCommand: { flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifyHandshakeBuffer(verifier)) { auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); client.name = buffer->name()->c_str(); } else { SinkWarning() << "received invalid command"; } break; } case Sink::Commands::SecretCommand: { flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifySecretBuffer(verifier)) { auto buffer = Sink::Commands::GetSecret(commandBuffer.constData()); loadResource().setSecret(QString{buffer->secret()->c_str()}); } else { SinkWarning() << "received invalid command"; } break; } case Sink::Commands::SynchronizeCommand: case Sink::Commands::InspectionCommand: case Sink::Commands::DeleteEntityCommand: case Sink::Commands::ModifyEntityCommand: case Sink::Commands::CreateEntityCommand: case Sink::Commands::FlushCommand: + case Sink::Commands::AbortSynchronizationCommand: SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; loadResource().processCommand(commandId, commandBuffer); break; case Sink::Commands::ShutdownCommand: SinkLog() << QString("Received shutdown command from %1").arg(client.name); m_exiting = true; break; case Sink::Commands::PingCommand: SinkTrace() << QString("Received ping command from %1").arg(client.name); break; case Sink::Commands::RevisionReplayedCommand: { SinkTrace() << QString("Received revision replayed command from %1").arg(client.name); flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); client.currentRevision = buffer->revision(); } else { SinkWarning() << "received invalid command"; } loadResource().setLowerBoundRevision(lowerBoundRevision()); } break; case Sink::Commands::RemoveFromDiskCommand: { SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); //Close the resource to ensure no transactions are open m_resource.reset(nullptr); if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { resourceFactory->removeDataFromDisk(m_resourceInstanceIdentifier); } m_exiting = true; } break; case Sink::Commands::UpgradeCommand: //Because we synchronously run the update directly on resource start, we know that the upgrade is complete once this message completes. break; default: if (commandId > Sink::Commands::CustomCommand) { SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId; loadResource().processCommand(commandId, commandBuffer); } else { success = false; SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; } break; } callback(success); } qint64 Listener::lowerBoundRevision() { qint64 lowerBound = 0; for (const Client &c : m_connections) { if (c.currentRevision > 0) { if (lowerBound == 0) { lowerBound = c.currentRevision; } else { lowerBound = qMin(c.currentRevision, lowerBound); } } } return lowerBound; } void Listener::sendShutdownNotification() { // Broadcast shutdown notifications to open clients, so they don't try to restart the resource auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { if (client.socket && client.socket->isOpen()) { Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); } } } void Listener::quit() { SinkTrace() << "Quitting " << m_resourceInstanceIdentifier; m_clientBufferProcessesTimer->stop(); m_server->close(); sendShutdownNotification(); closeAllConnections(); m_fbb.Clear(); QTimer::singleShot(0, this, [this]() { // This will destroy this object emit noClients(); }); } bool Listener::processClientBuffer(Client &client) { static const int headerSize = Sink::Commands::headerSize(); if (client.commandBuffer.size() < headerSize) { return false; } const uint messageId = *(const uint *)client.commandBuffer.constData(); const int commandId = *(const int *)(client.commandBuffer.constData() + sizeof(uint)); const uint size = *(const uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; // TODO: reject messages above a certain size? const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); if (commandComplete) { client.commandBuffer.remove(0, headerSize); auto socket = QPointer(client.socket); auto clientName = client.name; const QByteArray commandBuffer = client.commandBuffer.left(size); client.commandBuffer.remove(0, size); processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); if (socket) { sendCommandCompleted(socket.data(), messageId, success); } else { SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); } }); if (m_exiting) { quit(); return false; } return client.commandBuffer.size() >= headerSize; } return false; } void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success) { if (!socket || !socket->isValid()) { return; } auto command = Sink::Commands::CreateCommandCompletion(m_fbb, messageId, success); Sink::Commands::FinishCommandCompletionBuffer(m_fbb, command); Sink::Commands::write(socket, ++m_messageId, Sink::Commands::CommandCompletionCommand, m_fbb); if (m_exiting) { socket->waitForBytesWritten(); } m_fbb.Clear(); } void Listener::refreshRevision(qint64 revision) { updateClientsWithRevision(revision); } void Listener::updateClientsWithRevision(qint64 revision) { auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); for (const Client &client : m_connections) { if (!client.socket || !client.socket->isValid()) { continue; } SinkTrace() << "Sending revision update for " << client.name << revision; Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); client.socket->flush(); } m_fbb.Clear(); } void Listener::notify(const Sink::Notification ¬ification) { auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); auto entities = Sink::BufferUtils::toVector(m_fbb, notification.entities); Sink::Commands::NotificationBuilder builder(m_fbb); builder.add_type(notification.type); builder.add_code(notification.code); builder.add_identifier(idString); builder.add_message(messageString); builder.add_entities(entities); builder.add_progress(notification.progress); builder.add_total(notification.total); auto command = builder.Finish(); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { if (client.socket && client.socket->isOpen()) { Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::NotificationCommand, m_fbb); } } m_fbb.Clear(); } Sink::Resource &Listener::loadResource() { if (!m_resource) { if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { m_resource = std::unique_ptr(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)})); if (!m_resource) { SinkError() << "Failed to instantiate the resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get()); connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify); } else { SinkError() << "Failed to load resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); } } Q_ASSERT(m_resource); return *m_resource; } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_listener.cpp" #pragma clang diagnostic pop diff --git a/common/store.cpp b/common/store.cpp index b68cb682..38d8a7af 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -1,532 +1,546 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" #include "utils.h" #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast(A) == static_cast(B), "The enum values must match"); //Ensure the copied enum matches typedef ModelResult MailModelResult; ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole) ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole) ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole) ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole) ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole) ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole) Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); static bool sanityCheckQuery(const Sink::Query &query) { for (const auto &id : query.ids()) { if (id.isEmpty()) { SinkError() << "Empty id in query."; return false; } } return true; } namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) { auto ctx = ctx_.subContext(resourceInstanceIdentifier); auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QPair::Ptr, typename ResultEmitter::Ptr> getEmitter(Query query, const Log::Context &ctx) { query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Query: " << query; // Query all resources and aggregate results auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { //For global types we don't need to query for the resources first. queryResource("", "", query, aggregatingEmitter, ctx).exec(); } else { auto resourceCtx = ctx.subContext("resourceQuery"); auto facade = FacadeFactory::instance().getFacade(); Q_ASSERT(facade); Sink::Query resourceQuery; resourceQuery.request(); if (query.liveQuery()) { SinkTraceCtx(ctx) << "Listening for new resources."; resourceQuery.setFlags(Query::LiveQuery); } //Filter resources by available content types (unless the query already specifies a capability filter) auto resourceFilter = query.getResourceFilter(); if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) { resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName(), Query::Comparator::Contains}); } resourceQuery.setFilter(resourceFilter); for (auto const &properties : resourceFilter.propertyFilter.keys()) { resourceQuery.requestedProperties << properties; } auto result = facade->load(resourceQuery, resourceCtx); auto emitter = result.second; emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec(); }); emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { SinkTraceCtx(resourceCtx) << "Resource query complete"; }); return qMakePair(aggregatingEmitter, emitter); } return qMakePair(aggregatingEmitter, ResultEmitter::Ptr{}); } static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type) { if (!query.id().isEmpty()) { return Log::Context{"query." + type + "." + query.id()}; } return Log::Context{"query." + type}; } template QSharedPointer Store::loadModel(const Query &query) { Q_ASSERT(sanityCheckQuery(query)); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); auto model = QSharedPointer>::create(query, query.requestedProperties, ctx); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). auto result = getEmitter(query, ctx); model->setEmitter(result.first); //Keep the emitter alive if (auto resourceEmitter = result.second) { model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries resourceEmitter->fetch(); } //Automatically populate the top-level model->fetchMore(QModelIndex()); return model; } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade()) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkLog() << "Create: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; }); }); } return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::modify(const Query &query, const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << query << domainObject; return fetchAll(query) .each([=] (const typename DomainType::Ptr &entity) { auto copy = *entity; for (const auto &p : domainObject.changedProperties()) { copy.setProperty(p, domainObject.getProperty(p)); } return modify(copy); }); } template KAsync::Job Store::move(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Move: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); }); } return facade->move(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); } template KAsync::Job Store::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Copy: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); }); } return facade->copy(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkLog() << "Remove: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); }); } return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); } template KAsync::Job Store::remove(const Sink::Query &query) { SinkLog() << "Remove: " << query; return fetchAll(query) .each([] (const typename DomainType::Ptr &entity) { return remove(*entity); }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification from the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown auto guard = new QObject; QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) { if (!ready) { //We don't disconnect if ResourceAccess get's recycled, so ready can fire multiple times, which can result in a crash if the future is no longer valid. delete guard; future.setFinished(); } }); } else { future.setFinished(); } }) .then([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } static KAsync::Job upgrade(const QByteArray &resource) { auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) { return KAsync::value(Store::UpgradeResult{false}); } SinkLog() << "Upgrading " << resource; //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater}; return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during upgrade."; return KAsync::error(error); } SinkTrace() << "Upgrade of resource " << resource << " complete."; return KAsync::null(); }) .then(KAsync::value(Store::UpgradeResult{true})); } KAsync::Job Store::upgrade() { SinkLog() << "Upgrading..."; //Migrate from sink.dav to sink.carddav const auto resources = ResourceConfig::getResources(); for (auto it = resources.constBegin(); it != resources.constEnd(); it++) { if (it.value() == "sink.dav") { ResourceConfig::setResourceType(it.key(), "sink.carddav"); } } auto ret = QSharedPointer::create(false); return fetchAll({}) .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return Sink::upgrade(resource->identifier()) .then([ret](UpgradeResult returnValue) { if (returnValue.upgradeExecuted) { SinkLog() << "Upgrade executed."; *ret = true; } }); }) .then([ret] { if (*ret) { SinkLog() << "Upgrade complete."; } return Store::UpgradeResult{*ret}; }); } static KAsync::Job synchronize(const QByteArray &resource, const Sink::SyncScope &scope) { SinkLog() << "Synchronizing " << resource << scope; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "Synchronization of resource " << resource << " complete."; return KAsync::null(); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { return synchronize(Sink::SyncScope{query}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { auto resourceFilter = scope.getResourceFilter(); //Filter resources by type by default if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); } Sink::Query query; query.setFilter(resourceFilter); SinkLog() << "Synchronizing all resource matching: " << query; return fetchAll(query) .template each([scope](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return synchronize(resource->identifier(), scope); }); } +KAsync::Job Store::abortSynchronization(const QByteArray &identifier) +{ + auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); + return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand) + .addToContext(resourceAccess) + .then([=](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Error aborting synchronization."; + return KAsync::error(error); + } + return KAsync::null(); + }); +} + template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { Q_ASSERT(sanityCheckQuery(query)); auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } SinkWarning() << "Tried to read value but no values are available."; return DomainType(); } template QList Store::read(const Sink::Query &query_) { Q_ASSERT(sanityCheckQuery(query_)); auto query = query_; query.setFlags(Query::SynchronousQuery); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); QList list; auto result = getEmitter(query, ctx); auto aggregatingEmitter = result.first; aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); list << *value; }); if (auto resourceEmitter = result.second) { resourceEmitter->fetch(); } aggregatingEmitter->fetch(); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::remove(const Query &); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template KAsync::Job Store::modify(const Query &, const T &); \ template KAsync::Job Store::move(const T &domainObject, const QByteArray &newResource); \ template KAsync::Job Store::copy(const T &domainObject, const QByteArray &newResource); \ template QSharedPointer Store::loadModel(const Query &query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); SINK_REGISTER_TYPES() } // namespace Sink diff --git a/common/store.h b/common/store.h index fb9c3fe8..f2001323 100644 --- a/common/store.h +++ b/common/store.h @@ -1,154 +1,159 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include "query.h" #include "applicationdomaintype.h" class QAbstractItemModel; namespace Sink { /** * The unified Sink Store. * * This is the primary interface for clients to interact with Sink. * It provides a unified store where all data provided by various resources can be accessed and modified. */ namespace Store { QString SINK_EXPORT storageLocation(); // Must be the same as in ModelResult enum Roles { DomainObjectRole = Qt::UserRole + 1, ChildrenFetchedRole, DomainObjectBaseRole, StatusRole, //ApplicationDomain::SyncStatus WarningRole, //ApplicationDomain::Warning, only if status == warning || status == error ProgressRole //ApplicationDomain::Progress }; /** * Asynchronusly load a dataset with tree structure information */ template QSharedPointer SINK_EXPORT loadModel(const Query &query); /** * Create a new entity. */ template KAsync::Job SINK_EXPORT create(const DomainType &domainObject); /** * Modify an entity. * * This includes moving etc. since these are also simple settings on a property. * Note that the modification will be dropped if there is no changedProperty on the domain object. */ template KAsync::Job SINK_EXPORT modify(const DomainType &domainObject); /** * Modify a set of entities identified by @param query. * * Note that the modification will be dropped if there is no changedProperty on the domain object. */ template KAsync::Job SINK_EXPORT modify(const Query &query, const DomainType &domainObject); /** * Remove an entity. */ template KAsync::Job SINK_EXPORT remove(const DomainType &domainObject); /** * Remove a set of entities identified by @param query. */ template KAsync::Job SINK_EXPORT remove(const Query &query); /** * Move an entity to a new resource. */ template KAsync::Job SINK_EXPORT move(const DomainType &domainObject, const QByteArray &newResource); /** * Copy an entity to a new resource. */ template KAsync::Job SINK_EXPORT copy(const DomainType &domainObject, const QByteArray &newResource); /** * Synchronize data to local cache. */ KAsync::Job SINK_EXPORT synchronize(const Sink::Query &query); KAsync::Job SINK_EXPORT synchronize(const Sink::SyncScope &query); +/** + * Abort all running synchronization commands. + */ +KAsync::Job SINK_EXPORT abortSynchronization(const QByteArray &resourceIdentifier); + /** * Removes all resource data from disk. * * This will not touch the configuration. All commands that that arrived at the resource before this command will be dropped. All commands that arrived later will be executed. */ KAsync::Job SINK_EXPORT removeDataFromDisk(const QByteArray &resourceIdentifier); struct UpgradeResult { bool upgradeExecuted; }; /** * Run upgrade jobs. * * Run this to upgrade your local database to a new version. * Note that this may: * * take a while * * remove some/all of your local caches * * Note: The initial implementation simply calls removeDataFromDisk for all resources. */ KAsync::Job SINK_EXPORT upgrade(); template KAsync::Job SINK_EXPORT fetchOne(const Sink::Query &query); template KAsync::Job> SINK_EXPORT fetchAll(const Sink::Query &query); template KAsync::Job> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); template DomainType SINK_EXPORT readOne(const Sink::Query &query); template QList SINK_EXPORT read(const Sink::Query &query); } } diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 7ae22d0e..7f382ed8 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -1,781 +1,787 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "synchronizer.h" #include "definitions.h" #include "commands.h" #include "bufferutils.h" #include "synchronizerstore.h" #include "datastorequery.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "flush_generated.h" #include "notification_generated.h" #include "utils.h" using namespace Sink; bool operator==(const Synchronizer::SyncRequest &left, const Synchronizer::SyncRequest &right) { return left.flushType == right.flushType && left.requestId == right.requestId && left.requestType == right.requestType && left.options == right.options && left.query == right.query && left.applicableEntities == right.applicableEntities; } Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context, {"synchronizer"}), mLogCtx{"synchronizer"}, mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), mSyncInProgress(false) { mCurrentState.push(ApplicationDomain::Status::NoStatus); SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() { } void Synchronizer::setSecret(const QString &s) { mSecret = s; if (!mSyncRequestQueue.isEmpty()) { processSyncQueue().exec(); } } QString Synchronizer::secret() const { return mSecret; } void Synchronizer::setup(const std::function &enqueueCommandCallback, MessageQueue &mq) { mEnqueue = enqueueCommandCallback; mMessageQueue = &mq; } void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) { Q_ASSERT(mEnqueue); mEnqueue(commandId, data); } Storage::EntityStore &Synchronizer::store() { Q_ASSERT(mEntityStore->hasTransaction()); return *mEntityStore; } SynchronizerStore &Synchronizer::syncStore() { if (!mSyncStore) { mSyncStore = QSharedPointer::create(syncTransaction()); } return *mSyncStore; } void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); auto deletions = BufferUtils::toVector(fbb, deletedProperties); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { scanForRemovals(bufferType, [this, &bufferType](const std::function &callback) { store().readAllUids(bufferType, [callback](const QByteArray &uid) { callback(uid); }); }, exists ); } void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current.getProperty(property)) { SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; changed = true; } } if (changed) { SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } }); } void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Can't modify entity that is not locally existing " << remoteId; return; } Storage::EntityStore store(mResourceContext, mLogCtx); modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; Storage::EntityStore store(mResourceContext, mLogCtx); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); const auto found = store.contains(bufferType, sinkId); if (!found) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } else { // modification modify(bufferType, remoteId, entity); } } template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); Storage::EntityStore store(mResourceContext, mLogCtx); const auto found = store.contains(bufferType, sinkId); if (!found) { if (!mergeCriteria.isEmpty()) { Sink::Query query; for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) { query.filter(it.key(), it.value()); } bool merge = false; Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { merge = true; SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); }); if (!merge) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } QByteArrayList Synchronizer::resolveQuery(const QueryBase &query) { if (query.type().isEmpty()) { SinkWarningCtx(mLogCtx) << "Can't resolve a query without a type" << query; return {}; } QByteArrayList result; Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, query.type(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 0, [&](const ResultSet::Result &r) { result << r.entity.identifier(); }); return result; } QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) { if (filter.value.canConvert()) { const auto value = filter.value.value(); if (value.isEmpty()) { SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter; } else { return {filter.value.value()}; } } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else { SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; Q_ASSERT(false); } return {}; } template void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity, newResource, remove); } QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { return QList() << Synchronizer::SyncRequest{query, "sync"}; } void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue) { mSyncRequestQueue << request; } void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTraceCtx(mLogCtx) << "Synchronizing" << query; auto newRequests = getSyncRequests(query); for (const auto &request: newRequests) { auto shouldSkip = [&] { for (auto &r : mSyncRequestQueue) { if (r == request) { //Merge SinkTraceCtx(mLogCtx) << "Merging equal request " << request.query << "\n to" << r.query; return true; } } return false; }; if (shouldSkip()) { continue; } mergeIntoQueue(request, mSyncRequestQueue); } processSyncQueue().exec(); } +void Synchronizer::abort() +{ + SinkTraceCtx(mLogCtx) << "Aborting all running synchronization requests"; + mSyncRequestQueue.clear(); +} + void Synchronizer::flush(int commandId, const QByteArray &flushId) { Q_ASSERT(!flushId.isEmpty()); SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; processSyncQueue().exec(); } void Synchronizer::flushComplete(const QByteArray &flushId) { SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; if (mPendingSyncRequests.contains(flushId)) { const auto requests = mPendingSyncRequests.values(flushId); for (const auto &r : requests) { //We want to process the pending request before any others in the queue mSyncRequestQueue.prepend(r); } mPendingSyncRequests.remove(flushId); processSyncQueue().exec(); } } void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.message = message; n.code = code; n.entities = entities; emit notify(n); } void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.progress = progress; n.total = total; n.entities = entities; emit notify(n); } void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) { if (progress > 0 && total > 0) { //Limit progress updates for large amounts if (total >= 100 && progress % 10 != 0) { return; } SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; const auto applicableEntities = [&] { if (entities.isEmpty()) { return mCurrentRequest.applicableEntities; } return entities; }(); emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities); } } void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) { if (error) { if (error.errorCode == ApplicationDomain::ConnectionError) { //Couldn't connect, so we assume we don't have a network connection. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::NoServerError) { //Failed to contact the server. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConfigurationError) { //There is an error with the configuration. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::LoginError) { //If we failed to login altough we could connect that indicates a problem with our setup. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConnectionLostError) { //We've lost the connection so we assume the connection to the server broke. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } //We don't know what kind of error this was, so we assume it's transient and don't change our status. } else { //An operation against the server worked, so we're probably online. setStatus(ApplicationDomain::ConnectedStatus, s, requestId); } } KAsync::Job Synchronizer::processRequest(const SyncRequest &request) { if (request.options & SyncRequest::RequestFlush) { return KAsync::start([=] { //Trigger a flush and record original request without flush option auto modifiedRequest = request; modifiedRequest.options = SyncRequest::NoOptions; //Normally we won't have a requestId here if (modifiedRequest.requestId.isEmpty()) { modifiedRequest.requestId = createUuid(); } SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId; //The sync request will be executed once the flush has completed mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); }); } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Synchronizing:" << request.query; setBusy(true, "Synchronization has started.", request.requestId); emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Synchronization has ended.", request.requestId); if (error) { //Emit notification with error SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done Synchronizing"; emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); return KAsync::null(); } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { return KAsync::start([=] { Q_ASSERT(!request.requestId.isEmpty()); //FIXME it looks like this is emitted before the replay actually finishes if (request.flushType == Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } else { flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(request.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); } }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { if (ChangeReplay::allChangesReplayed()) { return KAsync::null(); } else { return KAsync::start([this, request] { setBusy(true, "ChangeReplay has started.", request.requestId); SinkLogCtx(mLogCtx) << "Replaying changes."; }) .then(replayNextRevision()) .then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Changereplay has ended.", request.requestId); if (error) { SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done replaying changes"; return KAsync::null(); } }); } } else { SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; return KAsync::error(KAsync::Error{"Unknown request type."}); } } void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) { if (state != mCurrentState.top()) { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { mCurrentState.pop(); } mCurrentState.push(state); emitNotification(Notification::Status, state, reason, requestId); } } void Synchronizer::resetStatus(const QByteArray requestId) { mCurrentState.pop(); emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); } void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) { if (busy) { setStatus(ApplicationDomain::BusyStatus, reason, requestId); } else { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { resetStatus(requestId); } } } KAsync::Job Synchronizer::processSyncQueue() { if (secret().isEmpty()) { SinkLogCtx(mLogCtx) << "Secret not available but required."; emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {}); return KAsync::null(); } if (mSyncRequestQueue.isEmpty()) { SinkLogCtx(mLogCtx) << "All requests processed."; return KAsync::null(); } if (mSyncInProgress) { SinkTraceCtx(mLogCtx) << "Sync still in progress."; return KAsync::null(); } //Don't process any new requests until we're done with the pending ones. //Otherwise we might process a flush before the previous request actually completed. if (!mPendingSyncRequests.isEmpty()) { SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; return KAsync::null(); } const auto request = mSyncRequestQueue.takeFirst(); return KAsync::start([=] { mMessageQueue->startTransaction(); mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); mSyncInProgress = true; mCurrentRequest = request; }) .then(processRequest(request)) .then([this, request](const KAsync::Error &error) { SinkTraceCtx(mLogCtx) << "Sync request processed"; setBusy(false, {}, request.requestId); mCurrentRequest = {}; mEntityStore->abortTransaction(); mSyncTransaction.abort(); mMessageQueue->commit(); mSyncStore.clear(); mSyncInProgress = false; if (allChangesReplayed()) { emit changesReplayed(); } if (error) { SinkWarningCtx(mLogCtx) << "Error during sync: " << error; emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); } //In case we got more requests meanwhile. return processSyncQueue(); }); } void Synchronizer::commit() { mMessageQueue->commit(); mSyncTransaction.commit(); mSyncStore.clear(); if (mSyncInProgress) { mMessageQueue->startTransaction(); } } Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } void Synchronizer::revisionChanged() { //One replay request is enough for (const auto &r : mSyncRequestQueue) { if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) { return; } } mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; processSyncQueue().exec(); } bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); if (!metadataBuffer->replayToSource()) { SinkTraceCtx(mLogCtx) << "Change is coming from the source"; } return metadataBuffer->replayToSource(); } KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { SinkTraceCtx(mLogCtx) << "Replaying" << type << key; Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); if (!metadataBuffer) { SinkErrorCtx(mLogCtx) << "No metadata buffer available."; return KAsync::error("No metadata buffer"); } if (mSyncTransaction) { SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; mSyncTransaction.abort(); } if (mSyncStore) { SinkErrorCtx(mLogCtx) << "Leftover sync store."; mSyncStore.clear(); } Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); Q_ASSERT(!mSyncTransaction); //The entitystore transaction is handled by processSyncQueue Q_ASSERT(mEntityStore->hasTransaction()); const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; // TODO: should not use internal representations const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); QByteArray oldRemoteId; if (operation != Sink::Operation_Creation) { oldRemoteId = syncStore().resolveLocalId(type, uid); //oldRemoteId can be empty if the resource implementation didn't return a remoteid } SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision(); //If the entity has been removed already and this is not the removal, skip over. //This is important so we can unblock changereplay by removing entities. bool skipOver = false; store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) { if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) { skipOver = true; } }); if (skipOver) { SinkLogCtx(mLogCtx) << "Skipping over already removed entity"; return KAsync::null(); } KAsync::Job job = KAsync::null(); //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else { SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; } return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().recordRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Modification) { SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().updateRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; if (!oldRemoteId.isEmpty()) { syncStore().removeRemoteId(type, uid, oldRemoteId); } } else { SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; } }) .then([this](const KAsync::Error &error) { //We need to commit here otherwise the next change-replay step will abort the transaction mSyncStore.clear(); mSyncTransaction.commit(); if (error) { SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; return KAsync::error(error); } return KAsync::null(); }); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } bool Synchronizer::allChangesReplayed() { if (!mSyncRequestQueue.isEmpty()) { SinkTraceCtx(mLogCtx) << "Queue is not empty"; return false; } return ChangeReplay::allChangesReplayed(); } #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove); SINK_REGISTER_TYPES() diff --git a/common/synchronizer.h b/common/synchronizer.h index d1420e60..7b5c141e 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -1,241 +1,244 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include #include #include #include #include #include "changereplay.h" #include "synchronizerstore.h" namespace Sink { class SynchronizerStore; /** * Synchronize and add what we don't already have to local queue */ class SINK_EXPORT Synchronizer : public ChangeReplay { Q_OBJECT public: Synchronizer(const Sink::ResourceContext &resourceContext); virtual ~Synchronizer() Q_DECL_OVERRIDE; void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); void synchronize(const Sink::QueryBase &query); void flush(int commandId, const QByteArray &flushId); //Read only access to main storage Storage::EntityStore &store(); //Read/Write access to sync storage SynchronizerStore &syncStore(); void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); bool allChangesReplayed() Q_DECL_OVERRIDE; void flushComplete(const QByteArray &flushId); void setSecret(const QString &s); + //Abort all running synchronization requests + void abort(); + signals: void notify(Notification); public slots: virtual void revisionChanged() Q_DECL_OVERRIDE; protected: ///Base implementation calls the replay$Type calls KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; protected: ///Implement to write back changes to the server virtual KAsync::Job replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Event &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Todo &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); protected: QString secret() const; ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false); void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType); /** * A synchronous algorithm to remove entities that are no longer existing. * * A list of entities is generated by @param entryGenerator. * The entiry Generator typically iterates over an index to produce all existing entries. * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, * an entity delete command is enqueued. * * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ void scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); void scanForRemovals(const QByteArray &bufferType, std::function exists); /** * An algorithm to create or modify the entity. * * Depending on whether the entity is locally available, or has changed. */ void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); template void SINK_EXPORT createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria); void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); // template // void create(const DomainType &entity); template void SINK_EXPORT modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false); // template // void remove(const DomainType &entity); QByteArrayList resolveQuery(const QueryBase &query); QByteArrayList resolveFilter(const QueryBase::Comparator &filter); virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; public: struct SyncRequest { enum RequestType { Synchronization, ChangeReplay, Flush }; enum RequestOptions { NoOptions, RequestFlush }; SyncRequest() = default; SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) : requestId(requestId_), requestType(Synchronization), options(o), query(q), applicableEntities(q.ids()) { } SyncRequest(RequestType type) : requestType(type) { } SyncRequest(RequestType type, const QByteArray &requestId_) : requestId(requestId_), requestType(type) { } SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) : flushType(flushType_), requestId(requestId_), requestType(type) { } int flushType = 0; QByteArray requestId; RequestType requestType; RequestOptions options = NoOptions; Sink::QueryBase query; QByteArrayList applicableEntities; }; protected: /** * This allows the synchronizer to turn a single query into multiple synchronization requests. * * The idea is the following; * The input query is a specification by the application of what data needs to be made available. * Requests could be: * * Give me everything (signified by the default constructed/empty query) * * Give me all mails of folder X * * Give me all mails of folders matching some constraints * * getSyncRequests allows the resource implementation to apply it's own defaults to that request; * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. * * This will allow synchronizeWithSource to focus on just getting to the content. */ virtual QList getSyncRequests(const Sink::QueryBase &query); /** * This allows the synchronizer to merge new requests with existing requests in the queue. */ virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue); void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = QByteArray{}, const QByteArrayList &entiteis = QByteArrayList{}); void emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities); /** * Report progress for current task */ virtual void reportProgress(int progress, int total, const QByteArrayList &entities = {}) Q_DECL_OVERRIDE; Sink::Log::Context mLogCtx; private: QStack mCurrentState; void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId); void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId); void resetStatus(const QByteArray requestId); void setBusy(bool busy, const QString &reason, const QByteArray requestId); void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); KAsync::Job processRequest(const SyncRequest &request); KAsync::Job processSyncQueue(); Sink::ResourceContext mResourceContext; Sink::Storage::EntityStore::Ptr mEntityStore; QSharedPointer mSyncStore; Sink::Storage::DataStore mSyncStorage; Sink::Storage::DataStore::Transaction mSyncTransaction; std::function mEnqueue; QList mSyncRequestQueue; SyncRequest mCurrentRequest; MessageQueue *mMessageQueue; bool mSyncInProgress; QMultiHash mPendingSyncRequests; QString mSecret; }; }