diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 3507ef1d..8b6e685b 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -1,371 +1,366 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "commandprocessor.h" #include #include "commands.h" #include "messagequeue.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" #include "definitions.h" #include "storage.h" #include "queuedcommand_generated.h" #include "revisionreplayed_generated.h" #include "synchronize_generated.h" static int sBatchSize = 100; // This interval directly affects the roundtrip time of single commands static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), mCommandQueues(QList() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); Q_UNUSED(ret); } mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) { flatbuffers::FlatBufferBuilder fbb; auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); Sink::FinishQueuedCommandBuffer(fbb, buffer); mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); } void CommandProcessor::processCommand(int commandId, const QByteArray &data) { switch (commandId) { case Commands::FlushCommand: processFlushCommand(data); break; case Commands::SynchronizeCommand: processSynchronizeCommand(data); break; // case Commands::RevisionReplayedCommand: // processRevisionReplayedCommand(data); // break; default: { static int modifications = 0; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); modifications++; if (modifications >= sBatchSize) { mUserQueue.commit(); modifications = 0; mCommitQueueTimer.stop(); } else { mCommitQueueTimer.start(); } } }; } void CommandProcessor::processFlushCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(data.constData()); const auto flushType = buffer->type(); const auto flushId = BufferUtils::extractBufferCopy(buffer->id()); if (flushType == Sink::Flush::FlushSynchronization) { mSynchronizer->flush(flushType, flushId); } else { mUserQueue.startTransaction(); enqueueCommand(mUserQueue, Commands::FlushCommand, data); mUserQueue.commit(); } } } void CommandProcessor::processSynchronizeCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { auto buffer = Sink::Commands::GetSynchronize(data.constData()); auto timer = QSharedPointer::create(); timer->start(); Sink::QueryBase query; if (buffer->query()) { auto data = QByteArray::fromStdString(buffer->query()->str()); QDataStream stream(&data, QIODevice::ReadOnly); stream >> query; } mSynchronizer->synchronize(query); } else { SinkWarningCtx(mLogCtx) << "received invalid command"; } } // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) // { // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { // SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } void CommandProcessor::setOldestUsedRevision(qint64 revision) { mLowerBoundRevision = revision; } bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { return true; } } return false; } void CommandProcessor::process() { if (mProcessingLock) { return; } mProcessingLock = true; auto job = processPipeline() .then([this]() { mProcessingLock = false; if (messagesToProcessAvailable()) { process(); } }) .exec(); } KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); const auto data = queuedCommand->command()->Data(); const auto size = queuedCommand->command()->size(); switch (queuedCommand->commandId()) { case Sink::Commands::DeleteEntityCommand: return mPipeline->deletedEntity(data, size); case Sink::Commands::ModifyEntityCommand: return mPipeline->modifiedEntity(data, size); case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: Q_ASSERT(mInspector); return mInspector->processCommand(data, size) .then(KAsync::value(-1)); case Sink::Commands::FlushCommand: return flush(data, size) .then(KAsync::value(-1)); default: return KAsync::error(-1, "Unhandled command"); } } KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { SinkWarningCtx(mLogCtx) << "invalid buffer"; // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Sink::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); return processQueuedCommand(queuedCommand) .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } // Process all messages of this queue KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); return KAsync::start([this]() { mPipeline->startTransaction(); }) .then(KAsync::doWhile( [this, queue, time]() -> KAsync::Job { return queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) -> KAsync::Job { time->start(); return processQueuedCommand(data) .then([this, time](qint64 createdRevision) { SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) .then([queue, this](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } if (queue->isEmpty()) { return KAsync::Break; } else { return KAsync::Continue; } }); })) .then([this](const KAsync::Error &) { mPipeline->commit(); }); } KAsync::Job CommandProcessor::processPipeline() { auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { return KAsync::null(); } auto it = QSharedPointer>::create(mCommandQueues); return KAsync::doWhile( [it, this]() { auto time = QSharedPointer::create(); time->start(); auto queue = it->next(); return processQueue(queue) .then([this, time, it]() { SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); if (it->hasNext()) { return KAsync::Continue; } return KAsync::Break; }); }); } void CommandProcessor::setInspector(const QSharedPointer &inspector) { mInspector = inspector; QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); } void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } KAsync::Job CommandProcessor::flush(void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(command); const auto flushType = buffer->type(); const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; mSynchronizer->flushComplete(flushId); Sink::Notification n; n.type = Sink::Notification::FlushCompletion; n.id = flushId; emit notify(n); } return KAsync::null(); } return KAsync::error(-1, "Invalid flush command."); } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) { if (queue.isEmpty()) { f.setFinished(); } else { QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); } }; KAsync::Job CommandProcessor::processAllMessages() { // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. // TODO: report errors while processing sync? // TODO JOBAPI: A helper that waits for n events and then continues? return KAsync::start([this](KAsync::Future &f) { if (mCommitQueueTimer.isActive()) { auto context = new QObject; QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { delete context; f.setFinished(); }); } else { f.setFinished(); } }) .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { if (mSynchronizer->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); } }); } -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" -#include "commandprocessor.moc" -#include "moc_commandprocessor.cpp" -#pragma clang diagnostic pop diff --git a/common/facade.cpp b/common/facade.cpp index 9c14a233..92eeaca6 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -1,119 +1,117 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "facade.h" #include "commands.h" #include "log.h" #include "storage.h" #include "definitions.h" #include "domainadaptor.h" #include "queryrunner.h" #include "bufferutils.h" #include "resourceconfig.h" using namespace Sink; template GenericFacade::GenericFacade(const ResourceContext &context) : Sink::StoreFacade(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()) { } template GenericFacade::~GenericFacade() { } template QByteArray GenericFacade::bufferTypeForDomainType() { // We happen to have a one to one mapping return Sink::ApplicationDomain::getTypeName(); } template KAsync::Job GenericFacade::create(const DomainType &domainObject) { flatbuffers::FlatBufferBuilder entityFbb; if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } return mResourceAccess->sendCreateCommand(domainObject.identifier(), bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb)); } template KAsync::Job GenericFacade::modify(const DomainType &domainObject) { SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties(); flatbuffers::FlatBufferBuilder entityFbb; if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties(), QByteArray(), false); } template KAsync::Job GenericFacade::move(const DomainType &domainObject, const QByteArray &newResource) { SinkTrace() << "Moving entity: " << domainObject.identifier() << domainObject.changedProperties() << newResource; flatbuffers::FlatBufferBuilder entityFbb; if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties(), newResource, true); } template KAsync::Job GenericFacade::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkTrace() << "Copying entity: " << domainObject.identifier() << domainObject.changedProperties() << newResource; flatbuffers::FlatBufferBuilder entityFbb; if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties(), newResource, false); } template KAsync::Job GenericFacade::remove(const DomainType &domainObject) { return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); } template QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Sink::Query &query, const Log::Context &ctx) { Q_ASSERT(DomainType::name == query.type() || query.type().isEmpty()); // The runner lives for the lifetime of the query auto runner = new QueryRunner(query, mResourceContext, bufferTypeForDomainType(), ctx); runner->setResultTransformation(mResultTransformation); return qMakePair(KAsync::null(), runner->emitter()); } #define REGISTER_TYPE(T) \ template class Sink::GenericFacade; \ SINK_REGISTER_TYPES() - -#include "facade.moc" diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 9cf91da7..bd970367 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -1,172 +1,168 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "genericresource.h" #include "pipeline.h" #include "synchronizer.h" #include "inspector.h" #include "commandprocessor.h" #include "definitions.h" #include "storage.h" using namespace Sink; using namespace Sink::Storage; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), mResourceContext(resourceContext), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext, Log::Context{})), mProcessor(QSharedPointer::create(mPipeline.data(), resourceContext.instanceId(), Log::Context{})), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } GenericResource::~GenericResource() { } void GenericResource::setSecret(const QString &s) { if (mSynchronizer) { mSynchronizer->setSecret(s); } if (mInspector) { mInspector->setSecret(s); } } bool GenericResource::checkForUpgrade() { auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); //We rely on the store already having been created in the pipeline constructor before this get's called. Q_ASSERT(store.exists()); const auto currentDatabaseVersion = Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)); if (currentDatabaseVersion != Sink::latestDatabaseVersion()) { SinkLog() << "Starting database upgrade from " << currentDatabaseVersion << " to " << Sink::latestDatabaseVersion(); bool nukeDatabases = false; //Only apply the necessary updates. for (int i = currentDatabaseVersion; i < Sink::latestDatabaseVersion(); i++) { //TODO implement specific upgrade paths where applicable, and only nuke otherwise nukeDatabases = true; } if (nukeDatabases) { SinkLog() << "Wiping all databases during upgrade, you will have to resync."; //Right now upgrading just means removing all local storage so we will resync Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } auto store = Sink::Storage::DataStore(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadWrite); auto t = store.createTransaction(Storage::DataStore::ReadWrite); Storage::DataStore::setDatabaseVersion(t, Sink::latestDatabaseVersion()); SinkLog() << "Finished database upgrade to " << Sink::latestDatabaseVersion(); return true; } return false; } void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); } void GenericResource::setupSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; mProcessor->setSynchronizer(synchronizer); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } void GenericResource::setupInspector(const QSharedPointer &inspector) { mInspector = inspector; mProcessor->setInspector(inspector); } void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) { Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) { auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage(); size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage(); size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadOnly).diskUsage(); return size; } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) { SinkWarning() << "Received error from Processor: " << errorCode << errorMessage; mError = errorCode; } int GenericResource::error() const { return mError; } void GenericResource::processCommand(int commandId, const QByteArray &data) { mProcessor->processCommand(commandId, data); } KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { mSynchronizer->synchronize(query); return KAsync::null(); } KAsync::Job GenericResource::processAllMessages() { return mProcessor->processAllMessages(); } void GenericResource::updateLowerBoundRevision() { mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSynchronizer->getLastReplayedRevision())); } void GenericResource::setLowerBoundRevision(qint64 revision) { mClientLowerBoundRevision = revision; updateLowerBoundRevision(); } -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" -#include "genericresource.moc" -#pragma clang diagnostic pop diff --git a/common/inspector.cpp b/common/inspector.cpp index 0e7a61fe..0fa0f3db 100644 --- a/common/inspector.cpp +++ b/common/inspector.cpp @@ -1,97 +1,92 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "inspector.h" #include "resourcecontext.h" #include "inspection_generated.h" #include "bufferutils.h" #include using namespace Sink; Inspector::Inspector(const ResourceContext &context) : QObject(), mResourceContext(context) { } Inspector::~Inspector() { } void Inspector::setSecret(const QString &s) { mSecret = s; } QString Inspector::secret() const { return mSecret; } KAsync::Job Inspector::processCommand(void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyInspectionBuffer(verifier)) { auto buffer = Sink::Commands::GetInspection(command); int inspectionType = buffer->type(); QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); QByteArray property = BufferUtils::extractBuffer(buffer->property()); QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); QDataStream s(expectedValueString); QVariant expectedValue; s >> expectedValue; inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) .then( [=](const KAsync::Error &error) { Sink::Notification n; n.type = Sink::Notification::Inspection; n.id = inspectionId; if (error) { Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; n.code = Sink::Notification::Failure; } else { Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; n.code = Sink::Notification::Success; } emit notify(n); return KAsync::null(); }) .exec(); return KAsync::null(); } return KAsync::error(-1, "Invalid inspection command."); } KAsync::Job Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { return KAsync::error(-1, "Inspection not implemented."); } -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" -#include "inspector.moc" -#include "moc_inspector.cpp" -#pragma clang diagnostic pop diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 2f6e7e1b..ddad69d4 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -1,1141 +1,1138 @@ /* * Copyright (C) 2014 Christian Mollekopf * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "storage.h" #include #include #include #include #include #include #include #include #include #include #include "log.h" #ifdef Q_OS_WIN #include typedef SSIZE_T ssize_t; #endif namespace Sink { namespace Storage { static QReadWriteLock sDbisLock; static QReadWriteLock sEnvironmentsLock; static QMutex sCreateDbiLock; static QHash sEnvironments; static QHash sDbis; int getErrorCode(int e) { switch (e) { case MDB_NOTFOUND: return DataStore::ErrorCodes::NotFound; default: break; } return -1; } static QList getDatabaseNames(MDB_txn *transaction) { if (!transaction) { SinkWarning() << "Invalid transaction"; return QList(); } int rc; QList list; MDB_dbi dbi; if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) { MDB_val key; MDB_val data; MDB_cursor *cursor; mdb_cursor_open(transaction, dbi, &cursor); if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); } } else { //Normal if we don't have any databases yet if (rc == MDB_NOTFOUND) { rc = 0; } if (rc) { SinkWarning() << "Failed to get a value" << rc; } } mdb_cursor_close(cursor); } else { SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); } return list; } /* * To create a dbi we always need a write transaction, * and we always need to commit the transaction ASAP * We can only ever enter from one point per process. */ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, bool allowDuplicates, MDB_dbi &dbi) { unsigned int flags = 0; if (allowDuplicates) { flags |= MDB_DUPSORT; } MDB_dbi flagtableDbi; if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { if (!readOnly) { SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); } } else { MDB_val key, value; key.mv_data = const_cast(static_cast(db.constData())); key.mv_size = db.size(); if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { //We expect this to fail for new databases if (rc != MDB_NOTFOUND) { SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); } } else { //Found the flags const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); flags = ba.toInt(); } } if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { //Create the db if it is not existing already if (rc == MDB_NOTFOUND && !readOnly) { //Sanity check db name { auto parts = db.split('.'); for (const auto &p : parts) { auto containsSpecialCharacter = [] (const QByteArray &p) { for (int i = 0; i < p.size(); i++) { const auto c = p.at(i); //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars if (c < 0x30 || c > 0x7A) { return true; } } return false; }; if (p.isEmpty() || containsSpecialCharacter(p)) { SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; Q_ASSERT(false); throw std::runtime_error("Fatal error while creating db."); } } } if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); return false; } //Record the db flags MDB_val key, value; key.mv_data = const_cast(static_cast(db.constData())); key.mv_size = db.size(); //Store the flags without the create option const auto ba = QByteArray::number(flags); value.mv_data = const_cast(static_cast(db.constData())); value.mv_size = db.size(); if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { //We expect this to fail if we're only creating the dbi but not the db if (rc != MDB_KEYEXIST) { SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); } } } else { //It's not an error if we only want to read if (!readOnly) { SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); return true; } return false; } } return true; } class DataStore::NamedDatabase::Private { public: Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } ~Private() { } QByteArray db; MDB_txn *transaction; MDB_dbi dbi; bool allowDuplicates; std::function defaultErrorHandler; QString name; bool createdNewDbi = false; QString createdNewDbiName; bool dbiValidForTransaction(MDB_dbi dbi, MDB_txn *transaction) { //sDbis can contain dbi's that are not available to this transaction. //We use mdb_dbi_flags to check if the dbi is valid for this transaction. uint f; if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { return false; } return true; } bool openDatabase(bool readOnly, std::function errorHandler) { const auto dbiName = name + db; QReadLocker dbiLocker{&sDbisLock}; if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); Q_ASSERT(dbiValidForTransaction(dbi, transaction)); } else { /* * Dynamic creation of databases. * If all databases were defined via the database layout we wouldn't ever end up in here. * However, we rely on this codepath for indexes, synchronization databases and in race-conditions * where the database is not yet fully created when the client initializes it for reading. * * There are a few things to consider: * * dbi's (DataBase Identifier) should be opened once (ideally), and then be persisted in the environment. * * To open a dbi we need a transaction and must commit the transaction. From then on any open transaction will have access to the dbi. * * Already running transactions will not have access to the dbi. * * There *must* only ever be one active transaction opening dbi's (using mdb_dbi_open), and that transaction *must* * commit or abort before any other transaction opens a dbi. * * We solve this the following way: * * For read-only transactions we abort the transaction, open the dbi and persist it in the environment, and reopen the transaction (so the dbi is available). This may result in the db content changing unexpectedly and referenced memory becoming unavailable, but isn't a problem as long as we don't rely on memory remaining valid for the duration of the transaction (which is anyways not given since any operation would invalidate the memory region).. * * For write transactions we open the dbi for future use, and then open it as well in the current transaction. */ SinkTrace() << "Creating database dynamically: " << dbiName << readOnly; //Only one transaction may ever create dbis at a time. QMutexLocker createDbiLocker(&sCreateDbiLock); //Double checked locking if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); Q_ASSERT(dbiValidForTransaction(dbi, transaction)); return true; } //Create a transaction to open the dbi MDB_txn *dbiTransaction; if (readOnly) { MDB_env *env = mdb_txn_env(transaction); Q_ASSERT(env); mdb_txn_reset(transaction); if (const int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &dbiTransaction)) { SinkError() << "Failed to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; return false; } } else { dbiTransaction = transaction; } if (createDbi(dbiTransaction, db, readOnly, allowDuplicates, dbi)) { if (readOnly) { mdb_txn_commit(dbiTransaction); dbiLocker.unlock(); QWriteLocker dbiWriteLocker(&sDbisLock); sDbis.insert(dbiName, dbi); //We reopen the read-only transaction so the dbi becomes available in it. mdb_txn_renew(transaction); } else { createdNewDbi = true; createdNewDbiName = dbiName; } //Ensure the dbi is valid for the parent transaction Q_ASSERT(dbiValidForTransaction(dbi, transaction)); } else { if (readOnly) { mdb_txn_abort(dbiTransaction); mdb_txn_renew(transaction); } else { SinkWarning() << "Failed to create the dbi: " << dbiName; } dbi = 0; transaction = 0; return false; } } return true; } }; DataStore::NamedDatabase::NamedDatabase() : d(nullptr) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) { *this = std::move(other); } DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) { if (&other != this) { delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::NamedDatabase::~NamedDatabase() { delete d; } bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); if (d) { errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return false; } const void *keyPtr = sKey.data(); const size_t keySize = sKey.size(); const void *valuePtr = sValue.data(); const size_t valueSize = sValue.size(); if (!keyPtr || keySize == 0) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key."); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } int rc; MDB_val key, data; key.mv_size = keySize; key.mv_data = const_cast(keyPtr); data.mv_size = valueSize; data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return !rc; } void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } int rc; MDB_val key; key.mv_size = k.size(); key.mv_data = const_cast(static_cast(k.data())); if (value.isEmpty()) { rc = mdb_del(d->transaction, d->dbi, &key, 0); } else { MDB_val data; data.mv_size = value.size(); data.mv_data = const_cast(static_cast(value.data())); rc = mdb_del(d->transaction, d->dbi, &key, &data); } if (rc) { auto errorCode = ErrorCodes::GenericError; if (rc == MDB_NOTFOUND) { errorCode = ErrorCodes::NotFound; } Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } } int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return 0; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { //Invalid arguments can mean that the transaction doesn't contain the db dbi Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates || findSubstringKeys) { MDB_cursor_op op = d->allowDuplicates ? MDB_SET : MDB_FIRST; if (findSubstringKeys) { op = MDB_SET_RANGE; } if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // The first lookup will find a key that is equal or greather than our key if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; } if (!callResultHandler || resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { if (findSubstringKeys) { // Reset the key to what we search for key.mv_data = (void *)k.constData(); key.mv_size = k.size(); } MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // Every consequitive lookup simply iterates through the list if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { break; } } } } } } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } } else { if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { numberOfRetrievedValues++; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return numberOfRetrievedValues; } void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return; } if (k.isEmpty()) { Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } bool foundValue = false; MDB_cursor_op op = MDB_SET_RANGE; if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { // The first lookup will find a key that is equal or greather than our key if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { //Read next value until we no longer match while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { MDB_cursor_op nextOp = MDB_NEXT; rc = mdb_cursor_get(cursor, &key, &data, nextOp); if (rc) { break; } } //Now read the previous value, and that's the latest one MDB_cursor_op prefOp = MDB_PREV; // We read past the end above, just take the last value if (rc == MDB_NOTFOUND) { prefOp = MDB_LAST; } rc = mdb_cursor_get(cursor, &key, &data, prefOp); foundValue = true; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } else if (!foundValue) { Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } int DataStore::NamedDatabase::findAllInRange(const QByteArray &lowerBound, const QByteArray &upperBound, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return 0; } MDB_cursor *cursor; if (int rc = mdb_cursor_open(d->transaction, d->dbi, &cursor)) { // Invalid arguments can mean that the transaction doesn't contain the db dbi Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Lower bound: " + lowerBound + " Upper bound: " + upperBound); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } - int numberOfRetrievedValues = 0; - - MDB_val firstKey = { .mv_size = (size_t)lowerBound.size(), .mv_data = (void *)lowerBound.constData() }; - MDB_val idealLastKey = { .mv_size = (size_t)upperBound.size(), .mv_data = (void *)upperBound.constData() }; - MDB_val lastKey = idealLastKey; + MDB_val firstKey = {(size_t)lowerBound.size(), (void *)lowerBound.constData()}; + MDB_val idealLastKey = {(size_t)upperBound.size(), (void *)upperBound.constData()}; MDB_val currentKey; MDB_val data; // Find the first key in the range int rc = mdb_cursor_get(cursor, &firstKey, &data, MDB_SET_RANGE); if (rc != MDB_SUCCESS) { // Nothing is greater or equal than the lower bound, meaning no result mdb_cursor_close(cursor); return 0; } currentKey = firstKey; // If already bigger than the upper bound if (mdb_cmp(d->transaction, d->dbi, ¤tKey, &idealLastKey) > 0) { mdb_cursor_close(cursor); return 0; } int count = 0; do { const auto currentBAKey = QByteArray::fromRawData((char *)currentKey.mv_data, currentKey.mv_size); const auto currentBAValue = QByteArray::fromRawData((char *)data.mv_data, data.mv_size); resultHandler(currentBAKey, currentBAValue); count++; } while (mdb_cursor_get(cursor, ¤tKey, &data, MDB_NEXT) == MDB_SUCCESS && mdb_cmp(d->transaction, d->dbi, ¤tKey, &idealLastKey) <= 0); mdb_cursor_close(cursor); return count; } qint64 DataStore::NamedDatabase::getSize() { if (!d || !d->transaction) { return -1; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); } return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages); } DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat() { if (!d || !d->transaction) { return {}; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); return {}; } return {stat.ms_branch_pages, stat.ms_leaf_pages, stat.ms_overflow_pages, stat.ms_entries}; // std::cout << "page size: " << stat.ms_psize << std::endl; // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl; // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl; // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; // std::cout << "depth: " << stat.ms_depth << std::endl; // std::cout << "entries: " << stat.ms_entries << std::endl; } bool DataStore::NamedDatabase::allowsDuplicates() const { unsigned int flags; mdb_dbi_flags(d->transaction, d->dbi, &flags); return flags & MDB_DUPSORT; } class DataStore::Transaction::Private { public: Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false) { } ~Private() { } MDB_env *env; MDB_txn *transaction; bool requestedRead; std::function defaultErrorHandler; QString name; bool implicitCommit; bool error; QMap createdDbs; void startTransaction() { Q_ASSERT(!transaction); Q_ASSERT(sEnvironments.values().contains(env)); Q_ASSERT(env); // auto f = [](const char *msg, void *ctx) -> int { // qDebug() << msg; // return 0; // }; // mdb_reader_list(env, f, nullptr); // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; if (rc) { unsigned int flags; mdb_env_get_flags(env, &flags); if (flags & MDB_RDONLY && !requestedRead) { SinkError() << "Tried to open a write transation in a read-only enironment"; } defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); } } }; DataStore::Transaction::Transaction() : d(nullptr) { } DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) { *this = std::move(other); } DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) { if (&other != this) { abort(); delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { commit(); } else { // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; abort(); } } delete d; } DataStore::Transaction::operator bool() const { return (d && d->transaction); } bool DataStore::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; } // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); const int rc = mdb_txn_commit(d->transaction); if (rc) { abort(); Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application. throw std::runtime_error("Fatal error while committing transaction."); } //Add the created dbis to the shared environment if (!d->createdDbs.isEmpty()) { sDbisLock.lockForWrite(); for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) { //This means we opened the dbi again in a read-only transaction while the write transaction was ongoing. Q_ASSERT(!sDbis.contains(it.key())); if (!sDbis.contains(it.key())) { sDbis.insert(it.key(), it.value()); } } d->createdDbs.clear(); sDbisLock.unlock(); } d->transaction = nullptr; return !rc; } void DataStore::Transaction::abort() { if (!d || !d->transaction) { return; } // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); mdb_txn_abort(d->transaction); d->createdDbs.clear(); d->transaction = nullptr; } //Ensure that we opened the correct database by comparing the expected identifier with the one //we write to the database on first open. static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly) { bool openedTheWrongDatabase = false; auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { if (value != db) { SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db; openedTheWrongDatabase = true; } return false; }, [&](const DataStore::Error &) { }, false); //This is the first time we open this database in a write transaction, write the db name if (!count) { if (!readOnly) { database.write("__internal_dbname", db); } } return !openedTheWrongDatabase; } DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const { if (!d) { SinkError() << "Tried to open database on invalid transaction: " << db; return DataStore::NamedDatabase(); } Q_ASSERT(d->transaction); // We don't now if anything changed d->implicitCommit = true; auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); auto ret = p->openDatabase(d->requestedRead, errorHandler); if (!ret) { delete p; return DataStore::NamedDatabase(); } if (p->createdNewDbi) { d->createdDbs.insert(p->createdNewDbiName, p->dbi); } auto database = DataStore::NamedDatabase(p); if (!ensureCorrectDb(database, db, d->requestedRead)) { SinkWarning() << "Failed to open the database correctly" << db; Q_ASSERT(false); return DataStore::NamedDatabase(); } return database; } QList DataStore::Transaction::getDatabaseNames() const { if (!d) { SinkWarning() << "Invalid transaction"; return QList(); } return Sink::Storage::getDatabaseNames(d->transaction); } DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails) { const int freeDbi = 0; const int mainDbi = 1; MDB_envinfo mei; mdb_env_info(d->env, &mei); MDB_stat mst; mdb_stat(d->transaction, freeDbi, &mst); auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; mdb_stat(d->transaction, mainDbi, &mst); auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; MDB_cursor *cursor; MDB_val key, data; size_t freePages = 0, *iptr; int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor); if (rc) { fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); return {}; } while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { iptr = static_cast(data.mv_data); freePages += *iptr; bool bad = false; size_t pg, prev; ssize_t i, j, span = 0; j = *iptr++; for (i = j, prev = 1; --i >= 0; ) { pg = iptr[i]; if (pg <= prev) { bad = true; } prev = pg; pg += span; for (; i >= span && iptr[i-span] == pg; span++, pg++) ; } if (printDetails) { std::cout << " Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl; for (--j; j >= 0; ) { pg = iptr[j]; for (span=1; --j >= 0 && iptr[j] == pg+span; span++); if (span > 1) { std::cout << " " << pg << "[" << span << "]\n"; } else { std::cout << " " << pg << std::endl; } } } } mdb_cursor_close(cursor); return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat}; } class DataStore::Private { public: Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {}); ~Private(); QString storageRoot; QString name; MDB_env *env = nullptr; AccessMode mode; Sink::Log::Context logCtx; void initEnvironment(const QString &fullPath, const DbLayout &layout) { // Ensure the environment is only created once, and that we only have one environment per process QReadLocker locker(&sEnvironmentsLock); if (!(env = sEnvironments.value(fullPath))) { locker.unlock(); QWriteLocker envLocker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); if (!(env = sEnvironments.value(fullPath))) { int rc = 0; if ((rc = mdb_env_create(&env))) { SinkWarningCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc); qCritical() << "mdb_env_create: " << rc << " " << mdb_strerror(rc); env = nullptr; } else { //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table. mdb_env_set_maxdbs(env, 50); if (RUNNING_ON_VALGRIND) { // In order to run valgrind this size must be smaller than half your available RAM // https://github.com/BVLC/caffe/issues/2404 mdb_env_set_mapsize(env, (size_t)1048576 * (size_t)1000); // 1MB * 1000 } else { //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit. #ifdef Q_OS_WIN //Windows home 10 has a virtual address space limit of 128GB(https://msdn.microsoft.com/en-us/library/windows/desktop/aa366778(v=vs.85).aspx#physical_memory_limits_windows_10) mdb_env_set_mapsize(env, (size_t)1048576 * (size_t)100000); // 1MB * 100'000 #else mdb_env_set_mapsize(env, (size_t)1048576 * (size_t)100000); // 1MB * 100'000 #endif } const bool readOnly = (mode == ReadOnly); unsigned int flags = MDB_NOTLS; if (readOnly) { flags |= MDB_RDONLY; } if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { if (readOnly) { SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath; } else { SinkWarningCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc); } mdb_env_close(env); env = 0; } else { Q_ASSERT(env); sEnvironments.insert(fullPath, env); //Open all available dbi's MDB_txn *transaction; if (const int rc = mdb_txn_begin(env, nullptr, readOnly ? MDB_RDONLY : 0, &transaction)) { SinkWarning() << "Failed to to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; return; } if (!layout.tables.isEmpty()) { //TODO upgrade db if the layout has changed: //* read existing layout //* if layout is not the same create new layout //Create dbis from the given layout. for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { const bool allowDuplicates = it.value(); MDB_dbi dbi = 0; const auto db = it.key(); const auto dbiName = name + db; if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { sDbis.insert(dbiName, dbi); } } } else { //Open all available databases for (const auto &db : getDatabaseNames(transaction)) { MDB_dbi dbi = 0; const auto dbiName = name + db; //We're going to load the flags anyways. bool allowDuplicates = false; if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { sDbis.insert(dbiName, dbi); } } } //To persist the dbis (this is also necessary for read-only transactions) mdb_txn_commit(transaction); } } } } } }; DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1()) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); if (!dirInfo.exists() && mode == ReadWrite) { QDir().mkpath(fullPath); dirInfo.refresh(); } if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { qCritical() << fullPath << "does not have write permissions. Aborting"; } else if (dirInfo.exists()) { initEnvironment(fullPath, layout); } } DataStore::Private::~Private() { //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) //and create storage instance from all over the place. Thus, we're not closing it here on purpose. } DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout)) { } DataStore::~DataStore() { delete d; } bool DataStore::exists(const QString &storageRoot, const QString &name) { return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists(); } bool DataStore::exists() const { return (d->env != 0) && DataStore::exists(d->storageRoot, d->name); } DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function &errorHandlerArg) { auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); if (!d->env) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment")); return Transaction(); } bool requestedRead = type == ReadOnly; if (d->mode == ReadOnly && !requestedRead) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode.")); return Transaction(); } QReadLocker locker(&sEnvironmentsLock); if (!sEnvironments.values().contains(d->env)) { return {}; } return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); } qint64 DataStore::diskUsage() const { QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); if (!info.exists()) { SinkWarning() << "Tried to get filesize for non-existant file: " << info.path(); } return info.size(); } void DataStore::removeFromDisk() const { const QString fullPath(d->storageRoot + '/' + d->name); QWriteLocker dbiLocker(&sDbisLock); QWriteLocker envLocker(&sEnvironmentsLock); SinkTrace() << "Removing database from disk: " << fullPath; auto env = sEnvironments.take(fullPath); for (const auto &key : sDbis.keys()) { if (key.startsWith(d->name)) { sDbis.remove(key); } } mdb_env_close(env); QDir dir(fullPath); if (!dir.removeRecursively()) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); defaultErrorHandler()(error); } } void DataStore::clearEnv() { SinkTrace() << "Clearing environment"; QWriteLocker locker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); for (const auto &envName : sEnvironments.keys()) { auto env = sEnvironments.value(envName); mdb_env_sync(env, true); for (const auto &k : sDbis.keys()) { if (k.startsWith(envName)) { auto dbi = sDbis.value(k); mdb_dbi_close(env, dbi); } } mdb_env_close(env); } sDbis.clear(); sEnvironments.clear(); } } } // namespace Sink diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 037a01f0..41ab1e92 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -1,748 +1,743 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "synchronizer.h" #include "definitions.h" #include "commands.h" #include "bufferutils.h" #include "synchronizerstore.h" #include "datastorequery.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "flush_generated.h" #include "notification_generated.h" #include "utils.h" using namespace Sink; Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context, {"synchronizer"}), mLogCtx{"synchronizer"}, mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), mSyncInProgress(false) { mCurrentState.push(ApplicationDomain::Status::NoStatus); SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() { } void Synchronizer::setSecret(const QString &s) { mSecret = s; if (!mSyncRequestQueue.isEmpty()) { processSyncQueue().exec(); } } QString Synchronizer::secret() const { return mSecret; } void Synchronizer::setup(const std::function &enqueueCommandCallback, MessageQueue &mq) { mEnqueue = enqueueCommandCallback; mMessageQueue = &mq; } void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) { Q_ASSERT(mEnqueue); mEnqueue(commandId, data); } Storage::EntityStore &Synchronizer::store() { Q_ASSERT(mEntityStore->hasTransaction()); return *mEntityStore; } SynchronizerStore &Synchronizer::syncStore() { if (!mSyncStore) { mSyncStore = QSharedPointer::create(syncTransaction()); } return *mSyncStore; } void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); auto deletions = BufferUtils::toVector(fbb, deletedProperties); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { scanForRemovals(bufferType, [this, &bufferType](const std::function &callback) { store().readAllUids(bufferType, [callback](const QByteArray &uid) { callback(uid); }); }, exists ); } void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current.getProperty(property)) { SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; changed = true; } } if (changed) { SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } }); } void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Can't modify entity that is not locally existing " << remoteId; return; } Storage::EntityStore store(mResourceContext, mLogCtx); modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; Storage::EntityStore store(mResourceContext, mLogCtx); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); const auto found = store.contains(bufferType, sinkId); if (!found) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } else { // modification modify(bufferType, remoteId, entity); } } template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); Storage::EntityStore store(mResourceContext, mLogCtx); const auto found = store.contains(bufferType, sinkId); if (!found) { if (!mergeCriteria.isEmpty()) { Sink::Query query; for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) { query.filter(it.key(), it.value()); } bool merge = false; Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { merge = true; SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); }); if (!merge) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) { QByteArrayList result; if (filter.value.canConvert()) { const auto value = filter.value.value(); if (value.isEmpty()) { SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter; } else { result << filter.value.value(); } } else if (filter.value.canConvert()) { auto query = filter.value.value(); Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, query.type(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 0, [&result](const ResultSet::Result &r) { result << r.entity.identifier(); }); } else { SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; Q_ASSERT(false); } return result; } template void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity, newResource, remove); } QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { return QList() << Synchronizer::SyncRequest{query, "sync"}; } void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue) { mSyncRequestQueue << request; } void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTraceCtx(mLogCtx) << "Synchronizing"; auto newRequests = getSyncRequests(query); for (const auto &request: newRequests) { mergeIntoQueue(request, mSyncRequestQueue); } processSyncQueue().exec(); } void Synchronizer::flush(int commandId, const QByteArray &flushId) { Q_ASSERT(!flushId.isEmpty()); SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; processSyncQueue().exec(); } void Synchronizer::flushComplete(const QByteArray &flushId) { SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; if (mPendingSyncRequests.contains(flushId)) { const auto requests = mPendingSyncRequests.values(flushId); for (const auto &r : requests) { //We want to process the pending request before any others in the queue mSyncRequestQueue.prepend(r); } mPendingSyncRequests.remove(flushId); processSyncQueue().exec(); } } void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.message = message; n.code = code; n.entities = entities; emit notify(n); } void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.progress = progress; n.total = total; n.entities = entities; emit notify(n); } void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) { if (progress > 0 && total > 0) { //Limit progress updates for large amounts if (total >= 100 && progress % 10 != 0) { return; } SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; const auto applicableEntities = [&] { if (entities.isEmpty()) { return mCurrentRequest.applicableEntities; } return entities; }(); emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities); } } void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) { if (error) { if (error.errorCode == ApplicationDomain::ConnectionError) { //Couldn't connect, so we assume we don't have a network connection. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::NoServerError) { //Failed to contact the server. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConfigurationError) { //There is an error with the configuration. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::LoginError) { //If we failed to login altough we could connect that indicates a problem with our setup. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConnectionLostError) { //We've lost the connection so we assume the connection to the server broke. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } //We don't know what kind of error this was, so we assume it's transient and don't change our status. } else { //An operation against the server worked, so we're probably online. setStatus(ApplicationDomain::ConnectedStatus, s, requestId); } } KAsync::Job Synchronizer::processRequest(const SyncRequest &request) { if (request.options & SyncRequest::RequestFlush) { return KAsync::start([=] { //Trigger a flush and record original request without flush option auto modifiedRequest = request; modifiedRequest.options = SyncRequest::NoOptions; //Normally we won't have a requestId here if (modifiedRequest.requestId.isEmpty()) { modifiedRequest.requestId = createUuid(); } SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId; //The sync request will be executed once the flush has completed mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); }); } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Synchronizing: " << request.query; setBusy(true, "Synchronization has started.", request.requestId); emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Synchronization has ended.", request.requestId); if (error) { //Emit notification with error SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done Synchronizing"; emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); return KAsync::null(); } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { return KAsync::start([=] { Q_ASSERT(!request.requestId.isEmpty()); //FIXME it looks like this is emitted before the replay actually finishes if (request.flushType == Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } else { flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(request.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); } }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { if (ChangeReplay::allChangesReplayed()) { return KAsync::null(); } else { return KAsync::start([this, request] { setBusy(true, "ChangeReplay has started.", request.requestId); SinkLogCtx(mLogCtx) << "Replaying changes."; }) .then(replayNextRevision()) .then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Changereplay has ended.", request.requestId); if (error) { SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error.errorMessage; return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done replaying changes"; return KAsync::null(); } }); } } else { SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; return KAsync::error(KAsync::Error{"Unknown request type."}); } } void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) { if (state != mCurrentState.top()) { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { mCurrentState.pop(); } mCurrentState.push(state); emitNotification(Notification::Status, state, reason, requestId); } } void Synchronizer::resetStatus(const QByteArray requestId) { mCurrentState.pop(); emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); } void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) { if (busy) { setStatus(ApplicationDomain::BusyStatus, reason, requestId); } else { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { resetStatus(requestId); } } } KAsync::Job Synchronizer::processSyncQueue() { if (secret().isEmpty()) { SinkWarningCtx(mLogCtx) << "Secret not available but required."; emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {}); return KAsync::null(); } if (mSyncRequestQueue.isEmpty()) { SinkLogCtx(mLogCtx) << "All requests processed."; return KAsync::null(); } if (mSyncInProgress) { SinkTraceCtx(mLogCtx) << "Sync still in progress."; return KAsync::null(); } //Don't process any new requests until we're done with the pending ones. //Otherwise we might process a flush before the previous request actually completed. if (!mPendingSyncRequests.isEmpty()) { SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; return KAsync::null(); } const auto request = mSyncRequestQueue.takeFirst(); return KAsync::start([=] { mMessageQueue->startTransaction(); mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); mSyncInProgress = true; mCurrentRequest = request; }) .then(processRequest(request)) .then([this, request](const KAsync::Error &error) { SinkTraceCtx(mLogCtx) << "Sync request processed"; setBusy(false, {}, request.requestId); mCurrentRequest = {}; mEntityStore->abortTransaction(); mSyncTransaction.abort(); mMessageQueue->commit(); mSyncStore.clear(); mSyncInProgress = false; if (allChangesReplayed()) { emit changesReplayed(); } if (error) { SinkWarningCtx(mLogCtx) << "Error during sync: " << error; emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); } //In case we got more requests meanwhile. return processSyncQueue(); }); } void Synchronizer::commit() { mMessageQueue->commit(); mSyncTransaction.commit(); mSyncStore.clear(); if (mSyncInProgress) { mMessageQueue->startTransaction(); } } Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } void Synchronizer::revisionChanged() { //One replay request is enough for (const auto &r : mSyncRequestQueue) { if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) { return; } } mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; processSyncQueue().exec(); } bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); if (!metadataBuffer->replayToSource()) { SinkTraceCtx(mLogCtx) << "Change is coming from the source"; } return metadataBuffer->replayToSource(); } KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { SinkTraceCtx(mLogCtx) << "Replaying" << type << key; Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); if (!metadataBuffer) { SinkErrorCtx(mLogCtx) << "No metadata buffer available."; return KAsync::error("No metadata buffer"); } if (mSyncTransaction) { SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; mSyncTransaction.abort(); } if (mSyncStore) { SinkErrorCtx(mLogCtx) << "Leftover sync store."; mSyncStore.clear(); } Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); Q_ASSERT(!mSyncTransaction); //The entitystore transaction is handled by processSyncQueue Q_ASSERT(mEntityStore->hasTransaction()); const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; const auto uid = Sink::Storage::DataStore::uidFromKey(key); const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); QByteArray oldRemoteId; if (operation != Sink::Operation_Creation) { oldRemoteId = syncStore().resolveLocalId(type, uid); //oldRemoteId can be empty if the resource implementation didn't return a remoteid } SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision(); //If the entity has been removed already and this is not the removal, skip over. //This is important so we can unblock changereplay by removing entities. bool skipOver = false; store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) { if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) { skipOver = true; } }); if (skipOver) { SinkLogCtx(mLogCtx) << "Skipping over already removed entity"; return KAsync::null(); } KAsync::Job job = KAsync::null(); //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else { SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; } return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().recordRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Modification) { SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().updateRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; if (!oldRemoteId.isEmpty()) { syncStore().removeRemoteId(type, uid, oldRemoteId); } } else { SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; } }) .then([this](const KAsync::Error &error) { //We need to commit here otherwise the next change-replay step will abort the transaction mSyncStore.clear(); mSyncTransaction.commit(); if (error) { SinkWarningCtx(mLogCtx) << "Failed to replay change: " << error.errorMessage; return KAsync::error(error); } return KAsync::null(); }); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } bool Synchronizer::allChangesReplayed() { if (!mSyncRequestQueue.isEmpty()) { SinkTraceCtx(mLogCtx) << "Queue is not empty"; return false; } return ChangeReplay::allChangesReplayed(); } #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove); SINK_REGISTER_TYPES() -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" -#include "synchronizer.moc" -#include "moc_synchronizer.cpp" -#pragma clang diagnostic pop