diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index da8236f6..e9c930f5 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -1,367 +1,377 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "commandprocessor.h" #include #include "commands.h" #include "messagequeue.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" #include "definitions.h" #include "storage.h" #include "queuedcommand_generated.h" #include "revisionreplayed_generated.h" #include "synchronize_generated.h" static int sBatchSize = 100; // This interval directly affects the roundtrip time of single commands static int sCommitInterval = 10; using namespace Sink; using namespace Sink::Storage; CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx) : QObject(), mLogCtx(ctx.subContext("commandprocessor")), mPipeline(pipeline), mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), mCommandQueues({&mUserQueue, &mSynchronizerQueue}), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { /* * This is a queued connection because otherwise we would execute CommandProcessor::process in the middle of * Synchronizer::commit, which is not what we want. */ const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process, Qt::QueuedConnection); Q_UNUSED(ret); } mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) { flatbuffers::FlatBufferBuilder fbb; auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); Sink::FinishQueuedCommandBuffer(fbb, buffer); mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); } void CommandProcessor::processCommand(int commandId, const QByteArray &data) { switch (commandId) { case Commands::FlushCommand: processFlushCommand(data); break; case Commands::SynchronizeCommand: processSynchronizeCommand(data); break; case Commands::AbortSynchronizationCommand: mSynchronizer->abort(); break; // case Commands::RevisionReplayedCommand: // processRevisionReplayedCommand(data); // break; default: { static int modifications = 0; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); modifications++; if (modifications >= sBatchSize) { mUserQueue.commit(); modifications = 0; mCommitQueueTimer.stop(); } else { mCommitQueueTimer.start(); } } }; } void CommandProcessor::processFlushCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(data.constData()); const auto flushType = buffer->type(); const auto flushId = BufferUtils::extractBufferCopy(buffer->id()); if (flushType == Sink::Flush::FlushSynchronization) { mSynchronizer->flush(flushType, flushId); } else { mUserQueue.startTransaction(); enqueueCommand(mUserQueue, Commands::FlushCommand, data); mUserQueue.commit(); } } } void CommandProcessor::processSynchronizeCommand(const QByteArray &data) { flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { auto buffer = Sink::Commands::GetSynchronize(data.constData()); Sink::QueryBase query; if (buffer->query()) { auto data = QByteArray::fromStdString(buffer->query()->str()); QDataStream stream(&data, QIODevice::ReadOnly); stream >> query; } mSynchronizer->synchronize(query); } else { SinkWarningCtx(mLogCtx) << "received invalid command"; } } // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) // { // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); // client.currentRevision = buffer->revision(); // } else { // SinkWarningCtx(mLogCtx) << "received invalid command"; // } // loadResource().setLowerBoundRevision(lowerBoundRevision()); // } void CommandProcessor::setOldestUsedRevision(qint64 revision) { mLowerBoundRevision = revision; } bool CommandProcessor::messagesToProcessAvailable() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { return true; } } return false; } void CommandProcessor::process() { if (mProcessingLock) { return; } mProcessingLock = true; auto job = processPipeline() .then([this]() { mProcessingLock = false; if (messagesToProcessAvailable()) { process(); } }) .exec(); } KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { SinkTraceCtx(mLogCtx) << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); const auto data = queuedCommand->command()->Data(); const auto size = queuedCommand->command()->size(); switch (queuedCommand->commandId()) { case Sink::Commands::DeleteEntityCommand: return mPipeline->deletedEntity(data, size); case Sink::Commands::ModifyEntityCommand: return mPipeline->modifiedEntity(data, size); case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: Q_ASSERT(mInspector); return mInspector->processCommand(data, size) .then(KAsync::value(-1)); case Sink::Commands::FlushCommand: return flush(data, size) .then(KAsync::value(-1)); default: return KAsync::error(-1, "Unhandled command"); } } KAsync::Job CommandProcessor::processQueuedCommand(const QByteArray &data) { flatbuffers::Verifier verifyer(reinterpret_cast(data.constData()), data.size()); if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { SinkWarningCtx(mLogCtx) << "invalid buffer"; // return KAsync::error(1, "Invalid Buffer"); } auto queuedCommand = Sink::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); return processQueuedCommand(queuedCommand) .then( [this, commandId](const KAsync::Error &error, qint64 createdRevision) -> KAsync::Job { if (error) { SinkWarningCtx(mLogCtx) << "Error while processing queue command: " << error.errorMessage; return KAsync::error(error); } SinkTraceCtx(mLogCtx) << "Command pipeline processed: " << Sink::Commands::name(commandId); return KAsync::value(createdRevision); }); } // Process one batch of messages from this queue KAsync::Job CommandProcessor::processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); return KAsync::start([=] { mPipeline->startTransaction(); }) .then([=] { return queue->dequeueBatch(sBatchSize, [=](const QByteArray &data) { time->start(); return processQueuedCommand(data) .then([=](qint64 createdRevision) { SinkTraceCtx(mLogCtx) << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); }); }) .then([=](const KAsync::Error &error) { if (error) { if (error.errorCode != MessageQueue::ErrorCodes::NoMessageFound) { SinkWarningCtx(mLogCtx) << "Error while getting message from messagequeue: " << error.errorMessage; } } }); }) - .then([=](const KAsync::Error &) { mPipeline->commit(); }); + .then([=](const KAsync::Error &) { + mPipeline->commit(); + //The flushed content has been persistet, we can notify the world + for (const auto &flushId : mCompleteFlushes) { + SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; + mSynchronizer->flushComplete(flushId); + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = flushId; + emit notify(n); + } + mCompleteFlushes.clear(); + }); + + } KAsync::Job CommandProcessor::processPipeline() { auto time = QSharedPointer::create(); time->start(); mPipeline->cleanupRevisions(mLowerBoundRevision); SinkTraceCtx(mLogCtx) << "Cleanup done." << Log::TraceTime(time->elapsed()); // Go through all message queues if (mCommandQueues.isEmpty()) { return KAsync::null(); } return KAsync::doWhile([this]() { for (auto queue : mCommandQueues) { if (!queue->isEmpty()) { auto time = QSharedPointer::create(); time->start(); return processQueue(queue) .then([=] { SinkTraceCtx(mLogCtx) << "Queue processed." << Log::TraceTime(time->elapsed()); return KAsync::Continue; }); } } return KAsync::value(KAsync::Break); }); } void CommandProcessor::setInspector(const QSharedPointer &inspector) { mInspector = inspector; QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); } void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; mSynchronizer->setup([this](int commandId, const QByteArray &data) { enqueueCommand(mSynchronizerQueue, commandId, data); }, mSynchronizerQueue); QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } KAsync::Job CommandProcessor::flush(void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { auto buffer = Sink::Commands::GetFlush(command); const auto flushType = buffer->type(); const QByteArray flushId = BufferUtils::extractBufferCopy(buffer->id()); Q_ASSERT(!flushId.isEmpty()); if (flushType == Sink::Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Flushing synchronizer "; Q_ASSERT(mSynchronizer); mSynchronizer->flush(flushType, flushId); } else { - SinkTraceCtx(mLogCtx) << "Emitting flush completion" << flushId; - mSynchronizer->flushComplete(flushId); - Sink::Notification n; - n.type = Sink::Notification::FlushCompletion; - n.id = flushId; - emit notify(n); + //Defer notification until the results have been comitted + mCompleteFlushes << flushId; } return KAsync::null(); } return KAsync::error(-1, "Invalid flush command."); } static void waitForDrained(KAsync::Future &f, MessageQueue &queue) { if (queue.isEmpty()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(&queue, &MessageQueue::drained, context, [&f, context]() { delete context; f.setFinished(); }); } }; KAsync::Job CommandProcessor::processAllMessages() { // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. // TODO: report errors while processing sync? // TODO JOBAPI: A helper that waits for n events and then continues? return KAsync::start([this](KAsync::Future &f) { if (mCommitQueueTimer.isActive()) { auto context = new QObject; QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { delete context; f.setFinished(); }); } else { f.setFinished(); } }) .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) .then([this](KAsync::Future &f) { if (mSynchronizer->allChangesReplayed()) { f.setFinished(); } else { auto context = new QObject; QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { delete context; f.setFinished(); }); } }); } diff --git a/common/commandprocessor.h b/common/commandprocessor.h index f3a0742d..e18ee8e1 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -1,95 +1,96 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include #include "log.h" #include "notification.h" #include "messagequeue.h" namespace Sink { class Pipeline; class Inspector; class Synchronizer; struct QueuedCommand; class QueryBase; /** * Drives the pipeline using the output from all command queues */ class CommandProcessor : public QObject { Q_OBJECT public: CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId, const Sink::Log::Context &ctx); void setOldestUsedRevision(qint64 revision); void setInspector(const QSharedPointer &inspector); void setSynchronizer(const QSharedPointer &synchronizer); void processCommand(int commandId, const QByteArray &data); KAsync::Job processAllMessages(); signals: void notify(Notification); void error(int errorCode, const QString &errorMessage); private: bool messagesToProcessAvailable(); private slots: void process(); KAsync::Job processQueuedCommand(const Sink::QueuedCommand *queuedCommand); KAsync::Job processQueuedCommand(const QByteArray &data); // Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue); KAsync::Job processPipeline(); private: void processFlushCommand(const QByteArray &data); void processSynchronizeCommand(const QByteArray &data); // void processRevisionReplayedCommand(const QByteArray &data); KAsync::Job flush(void const *command, size_t size); Sink::Log::Context mLogCtx; Sink::Pipeline *mPipeline; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; // Ordered by priority QList mCommandQueues; bool mProcessingLock; // The lowest revision we no longer need qint64 mLowerBoundRevision; QSharedPointer mSynchronizer; QSharedPointer mInspector; QTimer mCommitQueueTimer; + QVector mCompleteFlushes; }; }; diff --git a/tests/synchronizertest.cpp b/tests/synchronizertest.cpp index a3c85177..121868a5 100644 --- a/tests/synchronizertest.cpp +++ b/tests/synchronizertest.cpp @@ -1,155 +1,192 @@ #include #include #include #include "testimplementations.h" #include "event_generated.h" #include "entity_generated.h" #include "metadata_generated.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "dummyresource/resourcefactory.h" #include "store.h" #include "commands.h" #include "entitybuffer.h" #include "resourceconfig.h" #include "pipeline.h" #include "synchronizer.h" #include "commandprocessor.h" #include "log.h" #include "domainadaptor.h" #include "definitions.h" #include "adaptorfactoryregistry.h" #include "storage/key.h" +#include "genericresource.h" #include "testutils.h" #include "test.h" class TestSynchronizer: public Sink::Synchronizer { public: TestSynchronizer(const Sink::ResourceContext &context): Sink::Synchronizer(context) { } - std::function mSyncCallback; + QMap> mSyncCallbacks; KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) override { - return KAsync::start([this] { - qWarning() << "Synchronizing with the source"; - mSyncCallback(); + return KAsync::start([this, query] { + Q_ASSERT(mSyncCallbacks.contains(query.id())); + mSyncCallbacks.value(query.id())(); }); } void createOrModify(const QByteArray &rid, Sink::ApplicationDomain::ApplicationDomainType &entity) { Sink::Synchronizer::createOrModify("calendar", rid, entity); } void scanForRemovals(const QSet &set) { Sink::Synchronizer::scanForRemovals("calendar", [&](const QByteArray &remoteId) { return set.contains(remoteId); }); } QByteArray resolveRemoteId(const QByteArray &remoteId) { return syncStore().resolveRemoteId("calendar", remoteId); } - void synchronize(std::function callback) { - mSyncCallback = callback; - addToQueue(Synchronizer::SyncRequest{{}, "sync"}); + void synchronize(std::function callback, const QByteArray &id = {}, Synchronizer::SyncRequest::RequestOptions options = Synchronizer::SyncRequest::NoOptions) { + mSyncCallbacks.insert(id, callback); + Sink::Query query; + query.setId(id); + addToQueue(Synchronizer::SyncRequest{query, id, options}); VERIFYEXEC(processSyncQueue()); } }; class SynchronizerTest : public QObject { Q_OBJECT QByteArray instanceIdentifier() { return "synchronizertest.instance1"; } Sink::ResourceContext getContext() { return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")}; } private slots: void initTestCase() { Sink::Test::initTest(); Sink::Storage::DataStore{Sink::Store::storageLocation(), instanceIdentifier(), Sink::Storage::DataStore::ReadWrite}.removeFromDisk(); Sink::AdaptorFactoryRegistry::instance().registerFactory>("test"); } void init() { + Sink::GenericResource::removeFromDisk(instanceIdentifier()); } void testSynchronizer() { const auto context = getContext(); Sink::Pipeline pipeline(context, instanceIdentifier()); Sink::CommandProcessor processor(&pipeline, instanceIdentifier(), Sink::Log::Context{"processor"}); auto synchronizer = QSharedPointer::create(context); processor.setSynchronizer(synchronizer); synchronizer->setSecret("secret"); synchronizer->synchronize([&] { Sink::ApplicationDomain::Calendar calendar; calendar.setName("Name"); synchronizer->createOrModify("1", calendar); }); VERIFYEXEC(processor.processAllMessages()); const auto sinkId = synchronizer->resolveRemoteId("1"); QVERIFY(!sinkId.isEmpty()); { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(store.contains("calendar", sinkId)); QVERIFY(store.exists("calendar", sinkId)); } //Remove the calendar synchronizer->synchronize([&] { synchronizer->scanForRemovals({}); }); synchronizer->revisionChanged(); VERIFYEXEC(processor.processAllMessages()); { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(!store.exists("calendar", sinkId)); QVERIFY(store.contains("calendar", sinkId)); } //Recreate the same calendar synchronizer->synchronize([&] { Sink::ApplicationDomain::Calendar calendar; calendar.setName("Name"); synchronizer->createOrModify("1", calendar); }); synchronizer->revisionChanged(); VERIFYEXEC(processor.processAllMessages()); { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(store.contains("calendar", sinkId)); QVERIFY(store.exists("calendar", sinkId)); } } + /* + * Ensure the flushed content is available during the next sync request + */ + void testFlush() + { + const auto context = getContext(); + Sink::Pipeline pipeline(context, instanceIdentifier()); + Sink::CommandProcessor processor(&pipeline, instanceIdentifier(), Sink::Log::Context{"processor"}); + + auto synchronizer = QSharedPointer::create(context); + processor.setSynchronizer(synchronizer); + + synchronizer->setSecret("secret"); + + QByteArray sinkId; + synchronizer->synchronize([&] { + Sink::ApplicationDomain::Calendar calendar; + calendar.setName("Name"); + synchronizer->createOrModify("1", calendar); + sinkId = synchronizer->resolveRemoteId("1"); + }, "1"); + QVERIFY(!sinkId.isEmpty()); + + //With a flush the calendar should be available during the next sync + synchronizer->synchronize([&] { + Sink::Storage::EntityStore store(context, {"entitystore"}); + QVERIFY(store.contains("calendar", sinkId)); + + }, "2", Sink::Synchronizer::SyncRequest::RequestFlush); + + VERIFYEXEC(processor.processAllMessages()); + } + }; QTEST_MAIN(SynchronizerTest) #include "synchronizertest.moc"