diff --git a/common/changereplay.cpp b/common/changereplay.cpp index 45b8897a..7d281550 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -1,200 +1,200 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "changereplay.h" -#include "entitybuffer.h" #include "log.h" #include "definitions.h" #include "bufferutils.h" #include "storage/key.h" #include using namespace Sink; using namespace Sink::Storage; ChangeReplay::ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx) : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false), mLogCtx{ctx.subContext("changereplay")} { } qint64 ChangeReplay::getLastReplayedRevision() { qint64 lastReplayedRevision = 0; auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly); replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { lastReplayedRevision = value.toLongLong(); return false; }, [](const DataStore::Error &) {}); return lastReplayedRevision; } bool ChangeReplay::allChangesReplayed() { const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [this](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(mLogCtx) << error.message; })); const qint64 lastReplayedRevision = getLastReplayedRevision(); return (lastReplayedRevision >= topRevision); } void ChangeReplay::recordReplayedRevision(qint64 revision) { auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [this](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(mLogCtx) << error.message; }); replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); replayStoreTransaction.commit(); }; KAsync::Job ChangeReplay::replayNextRevision() { Q_ASSERT(!mReplayInProgress); return KAsync::start([this]() { if (mReplayInProgress) { SinkErrorCtx(mLogCtx) << "Replay still in progress!!!!!"; return KAsync::null(); } auto lastReplayedRevision = QSharedPointer::create(0); auto topRevision = QSharedPointer::create(0); emit replayingChanges(); mReplayInProgress = true; mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { SinkWarningCtx(mLogCtx) << error.message; }); auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [this](const DataStore::Error &error) { SinkWarningCtx(mLogCtx) << error.message; }); Q_ASSERT(mMainStoreTransaction); Q_ASSERT(replayStoreTransaction); replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { *lastReplayedRevision = value.toLongLong(); return false; }, [](const DataStore::Error &) {}); *topRevision = DataStore::maxRevision(mMainStoreTransaction); if (*lastReplayedRevision >= *topRevision) { SinkTraceCtx(mLogCtx) << "Nothing to replay"; return KAsync::null(); } SinkTraceCtx(mLogCtx) << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; return KAsync::doWhile( [this, lastReplayedRevision, topRevision]() -> KAsync::Job { if (*lastReplayedRevision >= *topRevision) { SinkTraceCtx(mLogCtx) << "Done replaying" << *lastReplayedRevision << *topRevision; return KAsync::value(KAsync::Break); } Q_ASSERT(mMainStoreTransaction); auto replayJob = KAsync::null(); qint64 revision = *lastReplayedRevision + 1; while (revision <= *topRevision) { const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); if (uid.isEmpty() || type.isEmpty()) { SinkErrorCtx(mLogCtx) << "Failed to read uid or type for revison: " << revision << uid << type; } else { // TODO: should not use internal representations const auto key = Storage::Key(Storage::Identifier::fromDisplayByteArray(uid), revision); const auto displayKey = key.toDisplayByteArray(); QByteArray entityBuffer; DataStore::mainDatabase(mMainStoreTransaction, type) .scan(revision, [&entityBuffer](const size_t, const QByteArray &value) -> bool { entityBuffer = value; return false; }, [this, key](const DataStore::Error &e) { SinkErrorCtx(mLogCtx) << "Failed to read the entity buffer " << key << "error:" << e; }); if (entityBuffer.isEmpty()) { SinkErrorCtx(mLogCtx) << "Failed to replay change " << key; } else { if (canReplay(type, displayKey, entityBuffer)) { SinkTraceCtx(mLogCtx) << "Replaying " << displayKey; replayJob = replay(type, displayKey, entityBuffer); //Set the last revision we tried to replay *lastReplayedRevision = revision; //Execute replay job and commit break; } else { SinkTraceCtx(mLogCtx) << "Not replaying " << key; + notReplaying(type, displayKey, entityBuffer); //We silently skip over revisions that cannot be replayed, as this is not an error. } } } //Bump the revision if we failed to even attempt to replay. This will simply skip over those revisions, as we can't recover from those situations. *lastReplayedRevision = revision; revision++; } return replayJob.then([=](const KAsync::Error &error) { if (error) { SinkWarningCtx(mLogCtx) << "Change replay failed: " << error << "Last replayed revision: " << *lastReplayedRevision; //We're probably not online or so, so postpone retrying return KAsync::value(KAsync::Break).then(KAsync::error(error)); } SinkTraceCtx(mLogCtx) << "Replayed until: " << *lastReplayedRevision; recordReplayedRevision(*lastReplayedRevision); reportProgress(*lastReplayedRevision, *topRevision); const bool gotMoreToReplay = (*lastReplayedRevision < *topRevision); if (gotMoreToReplay) { SinkTraceCtx(mLogCtx) << "Replaying some more..."; //Replay more if we have more return KAsync::wait(0).then(KAsync::value(KAsync::Continue)); } else { return KAsync::value(KAsync::Break); } }).guard(&mGuard); }); }) .then([this](const KAsync::Error &error) { SinkTraceCtx(mLogCtx) << "Change replay complete."; mMainStoreTransaction.abort(); mReplayInProgress = false; if (ChangeReplay::allChangesReplayed()) { //In case we have a derived implementation if (allChangesReplayed()) { SinkTraceCtx(mLogCtx) << "All changes replayed"; emit changesReplayed(); } } if (error) { return KAsync::error(error); } else { return KAsync::null(); } }).guard(&mGuard); } void ChangeReplay::revisionChanged() { if (!mReplayInProgress) { replayNextRevision().exec(); } } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_changereplay.cpp" #pragma clang diagnostic pop diff --git a/common/changereplay.h b/common/changereplay.h index 22e26a5b..dc6db334 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -1,79 +1,81 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include "storage.h" #include "resourcecontext.h" namespace Sink { /** * Replays changes from the storage one by one. * * Uses a local database to: * * Remember what changes have been replayed already. * * store a mapping of remote to local buffers */ class SINK_EXPORT ChangeReplay : public QObject { Q_OBJECT public: ChangeReplay(const ResourceContext &resourceContext, const Sink::Log::Context &ctx= {}); qint64 getLastReplayedRevision(); virtual bool allChangesReplayed(); + KAsync::Job replayNextRevision(); + signals: void changesReplayed(); void replayingChanges(); public slots: virtual void revisionChanged(); protected: virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; + virtual void notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; virtual void reportProgress(int progress, int total, const QByteArrayList &applicableEntities = {}){}; Sink::Storage::DataStore mStorage; - KAsync::Job replayNextRevision(); private: void recordReplayedRevision(qint64 revision); Sink::Storage::DataStore mChangeReplayStore; bool mReplayInProgress; Sink::Storage::DataStore::Transaction mMainStoreTransaction; Sink::Log::Context mLogCtx; QObject mGuard; }; class NullChangeReplay : public ChangeReplay { public: NullChangeReplay(const ResourceContext &resourceContext) : ChangeReplay(resourceContext) {} KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null(); } bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; } }; } diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index e971b475..8d606af9 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -1,842 +1,865 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "synchronizer.h" #include "definitions.h" #include "commands.h" #include "bufferutils.h" #include "synchronizerstore.h" #include "datastorequery.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" #include "flush_generated.h" #include "notification_generated.h" #include "utils.h" using namespace Sink; bool operator==(const Synchronizer::SyncRequest &left, const Synchronizer::SyncRequest &right) { return left.flushType == right.flushType && left.requestId == right.requestId && left.requestType == right.requestType && left.options == right.options && left.query == right.query && left.applicableEntities == right.applicableEntities; } Synchronizer::Synchronizer(const Sink::ResourceContext &context) : ChangeReplay(context, {"synchronizer"}), mLogCtx{"synchronizer"}, mResourceContext(context), mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext, mLogCtx)), mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite), mSyncInProgress(false), mAbort(false) { mCurrentState.push(ApplicationDomain::Status::NoStatus); SinkTraceCtx(mLogCtx) << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() { } void Synchronizer::setSecret(const QString &s) { mSecret = s; if (!mSyncRequestQueue.isEmpty()) { processSyncQueue().exec(); } } QString Synchronizer::secret() const { return mSecret; } void Synchronizer::setup(const std::function &enqueueCommandCallback, MessageQueue &mq) { mEnqueue = enqueueCommandCallback; mMessageQueue = &mq; } void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) { Q_ASSERT(mEnqueue); mEnqueue(commandId, data); } Storage::EntityStore &Synchronizer::store() { Q_ASSERT(mEntityStore->hasTransaction()); return *mEntityStore; } SynchronizerStore &Synchronizer::syncStore() { if (!mSyncStore) { mSyncStore = QSharedPointer::create(syncTransaction()); } return *mSyncStore; } void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource, bool remove) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); auto deletions = BufferUtils::toVector(fbb, deletedProperties); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto resource = newResource.isEmpty() ? 0 : fbb.CreateString(newResource.constData()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties, resource, remove); Sink::Commands::FinishModifyEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); SinkTraceCtx(mLogCtx) << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTraceCtx(mLogCtx) << "Found a removed entity: " << sinkId; deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { scanForRemovals(bufferType, [this, &bufferType](const std::function &callback) { store().readAllUids(bufferType, [callback](const QByteArray &uid) { callback(uid); }); }, exists ); } void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { const bool changed = [&] { for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current.getProperty(property)) { SinkTraceCtx(mLogCtx) << "Property changed " << sinkId << property; return true; } } return false; }(); if (changed) { SinkTraceCtx(mLogCtx) << "Found a modified entity: " << sinkId; modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } else { SinkTraceCtx(mLogCtx) << "Entity was not modified: " << sinkId; } }); } void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId, false); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to find the local id for " << remoteId; return; } Storage::EntityStore store(mResourceContext, mLogCtx); modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; Q_ASSERT(false); return; } Storage::EntityStore store(mResourceContext, mLogCtx); if (!store.contains(bufferType, sinkId)) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria) { SinkTraceCtx(mLogCtx) << "Create or modify" << bufferType << remoteId; const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); if (sinkId.isEmpty()) { SinkWarningCtx(mLogCtx) << "Failed to create a local id for " << remoteId; Q_ASSERT(false); return; } Storage::EntityStore store(mResourceContext, mLogCtx); if (!store.contains(bufferType, sinkId)) { if (!mergeCriteria.isEmpty()) { Sink::Query query; for (auto it = mergeCriteria.constBegin(); it != mergeCriteria.constEnd(); it++) { query.filter(it.key(), it.value()); } bool merge = false; DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { merge = true; SinkTraceCtx(mLogCtx) << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); }); if (!merge) { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { SinkTraceCtx(mLogCtx) << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); } } else { // modification modifyIfChanged(store, bufferType, sinkId, entity); } } QByteArrayList Synchronizer::resolveQuery(const QueryBase &query) { if (query.type().isEmpty()) { SinkWarningCtx(mLogCtx) << "Can't resolve a query without a type" << query; return {}; } QByteArrayList result; Storage::EntityStore store{mResourceContext, mLogCtx}; DataStoreQuery dataStoreQuery{query, query.type(), store}; auto resultSet = dataStoreQuery.execute(); resultSet.replaySet(0, 0, [&](const ResultSet::Result &r) { result << r.entity.identifier(); }); return result; } QByteArrayList Synchronizer::resolveFilter(const QueryBase::Comparator &filter) { if (filter.value.canConvert()) { const auto value = filter.value.value(); if (value.isEmpty()) { SinkErrorCtx(mLogCtx) << "Tried to filter for an empty value: " << filter; } else { return {filter.value.value()}; } } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else if (filter.value.canConvert()) { return resolveQuery(filter.value.value()); } else { SinkWarningCtx(mLogCtx) << "unknown filter type: " << filter; Q_ASSERT(false); } return {}; } template void Synchronizer::modify(const DomainType &entity, const QByteArray &newResource, bool remove) { modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity, newResource, remove); } QList Synchronizer::getSyncRequests(const Sink::QueryBase &query) { return {Synchronizer::SyncRequest{query, "sync"}}; } void Synchronizer::mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue) { queue << request; } void Synchronizer::addToQueue(const Synchronizer::SyncRequest &request) { mergeIntoQueue(request, mSyncRequestQueue); } void Synchronizer::synchronize(const Sink::QueryBase &query) { SinkTraceCtx(mLogCtx) << "Synchronizing" << query; auto newRequests = getSyncRequests(query); for (const auto &request: newRequests) { auto shouldSkip = [&] { for (auto &r : mSyncRequestQueue) { if (r == request) { //Merge SinkTraceCtx(mLogCtx) << "Merging equal request " << request.query << "\n to" << r.query; return true; } } return false; }; if (shouldSkip()) { continue; } mergeIntoQueue(request, mSyncRequestQueue); } processSyncQueue().exec(); } void Synchronizer::clearQueue() { //Complete all pending flushes. Without this pending flushes would get stuck indefinitely when we clear the queue on failure. //TODO we should probably fail them instead for (const auto &request : mSyncRequestQueue) { if (request.requestType == Synchronizer::SyncRequest::Flush) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } } mSyncRequestQueue.clear(); } void Synchronizer::abort() { SinkLogCtx(mLogCtx) << "Aborting all running synchronization requests"; clearQueue(); mAbort = true; } void Synchronizer::flush(int commandId, const QByteArray &flushId) { Q_ASSERT(!flushId.isEmpty()); SinkTraceCtx(mLogCtx) << "Flushing the synchronization queue " << flushId; mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; processSyncQueue().exec(); } void Synchronizer::flushComplete(const QByteArray &flushId) { SinkTraceCtx(mLogCtx) << "Flush complete: " << flushId; if (mPendingSyncRequests.contains(flushId)) { const auto requests = mPendingSyncRequests.values(flushId); for (const auto &r : requests) { //We want to process the pending request before any others in the queue mSyncRequestQueue.prepend(r); } mPendingSyncRequests.remove(flushId); processSyncQueue().exec(); } } void Synchronizer::emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.message = message; n.code = code; n.entities = entities; emit notify(n); } void Synchronizer::emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities) { Sink::Notification n; n.id = id; n.type = type; n.progress = progress; n.total = total; n.entities = entities; emit notify(n); } void Synchronizer::reportProgress(int progress, int total, const QByteArrayList &entities) { if (progress > 0 && total > 0) { //Limit progress updates for large amounts if (total >= 100 && progress % 10 != 0) { return; } SinkLogCtx(mLogCtx) << "Progress: " << progress << " out of " << total << mCurrentRequest.requestId << mCurrentRequest.applicableEntities; const auto applicableEntities = [&] { if (entities.isEmpty()) { return mCurrentRequest.applicableEntities; } return entities; }(); emitProgressNotification(Notification::Progress, progress, total, mCurrentRequest.requestId, applicableEntities); } } void Synchronizer::setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId) { if (error) { if (error.errorCode == ApplicationDomain::ConnectionError) { //Couldn't connect, so we assume we don't have a network connection. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::NoServerError) { //Failed to contact the server. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConfigurationError) { //There is an error with the configuration. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::LoginError) { //If we failed to login altough we could connect that indicates a problem with our setup. setStatus(ApplicationDomain::ErrorStatus, s, requestId); } else if (error.errorCode == ApplicationDomain::ConnectionLostError) { //We've lost the connection so we assume the connection to the server broke. setStatus(ApplicationDomain::OfflineStatus, s, requestId); } //We don't know what kind of error this was, so we assume it's transient and don't change our status. } else { //An operation against the server worked, so we're probably online. setStatus(ApplicationDomain::ConnectedStatus, s, requestId); } } KAsync::Job Synchronizer::processRequest(const SyncRequest &request) { if (request.options & SyncRequest::RequestFlush) { return KAsync::start([=] { //Trigger a flush and record original request without flush option auto modifiedRequest = request; modifiedRequest.options = SyncRequest::NoOptions; //Normally we won't have a requestId here if (modifiedRequest.requestId.isEmpty()) { modifiedRequest.requestId = createUuid(); } SinkTraceCtx(mLogCtx) << "Enqueuing flush request " << modifiedRequest.requestId; //The sync request will be executed once the flush has completed mPendingSyncRequests.insert(modifiedRequest.requestId, modifiedRequest); flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(modifiedRequest.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); }); } else if (request.requestType == Synchronizer::SyncRequest::Synchronization) { return KAsync::start([this, request] { SinkLogCtx(mLogCtx) << "Synchronizing:" << request.query; setBusy(true, "Synchronization has started.", request.requestId); emitNotification(Notification::Info, ApplicationDomain::SyncInProgress, {}, {}, request.applicableEntities); }).then(synchronizeWithSource(request.query)).then([this] { //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }).then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Synchronization has ended.", request.requestId); if (error) { //Emit notification with error SinkWarningCtx(mLogCtx) << "Synchronization failed: " << error; emitNotification(Notification::Warning, ApplicationDomain::SyncError, {}, {}, request.applicableEntities); return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done Synchronizing"; emitNotification(Notification::Info, ApplicationDomain::SyncSuccess, {}, {}, request.applicableEntities); return KAsync::null(); } }); } else if (request.requestType == Synchronizer::SyncRequest::Flush) { return KAsync::start([=] { Q_ASSERT(!request.requestId.isEmpty()); //FIXME it looks like this is emitted before the replay actually finishes if (request.flushType == Flush::FlushReplayQueue) { SinkTraceCtx(mLogCtx) << "Emitting flush completion: " << request.requestId; emitNotification(Notification::FlushCompletion, 0, "", request.requestId); } else { flatbuffers::FlatBufferBuilder fbb; auto flushId = fbb.CreateString(request.requestId.toStdString()); auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); Sink::Commands::FinishFlushBuffer(fbb, location); enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); } }); } else if (request.requestType == Synchronizer::SyncRequest::ChangeReplay) { if (ChangeReplay::allChangesReplayed()) { return KAsync::null(); } else { return KAsync::start([this, request] { setBusy(true, "ChangeReplay has started.", request.requestId); SinkLogCtx(mLogCtx) << "Replaying changes."; }) .then(replayNextRevision()) .then([this, request](const KAsync::Error &error) { setStatusFromResult(error, "Changereplay has ended.", request.requestId); if (error) { SinkWarningCtx(mLogCtx) << "Changereplay failed: " << error; return KAsync::error(error); } else { SinkLogCtx(mLogCtx) << "Done replaying changes"; return KAsync::null(); } }); } } else { SinkWarningCtx(mLogCtx) << "Unknown request type: " << request.requestType; return KAsync::error(KAsync::Error{"Unknown request type."}); } } /* * We're using a stack so we can go back to whatever we had after the temporary busy status. * Whenever we do change the status we emit a status notification. */ void Synchronizer::setStatus(ApplicationDomain::Status state, const QString &reason, const QByteArray requestId) { //We won't be able to execute any of the coming requests, so clear them if (state == ApplicationDomain::OfflineStatus || state == ApplicationDomain::ErrorStatus) { clearQueue(); } if (state != mCurrentState.top()) { //The busy state is transient and we want to override it. if (mCurrentState.top() == ApplicationDomain::BusyStatus) { mCurrentState.pop(); } if (state != mCurrentState.top()) { //Always leave the first state intact if (mCurrentState.count() > 1 && state != ApplicationDomain::BusyStatus) { mCurrentState.pop(); } mCurrentState.push(state); } //We should never have more than: (NoStatus, $SOMESTATUS, BusyStatus) if (mCurrentState.count() > 3) { qWarning() << mCurrentState; Q_ASSERT(false); } emitNotification(Notification::Status, state, reason, requestId); } } void Synchronizer::resetStatus(const QByteArray requestId) { mCurrentState.pop(); emitNotification(Notification::Status, mCurrentState.top(), {}, requestId); } void Synchronizer::setBusy(bool busy, const QString &reason, const QByteArray requestId) { if (busy) { setStatus(ApplicationDomain::BusyStatus, reason, requestId); } else { if (mCurrentState.top() == ApplicationDomain::BusyStatus) { resetStatus(requestId); } } } KAsync::Job Synchronizer::processSyncQueue() { if (secret().isEmpty()) { SinkLogCtx(mLogCtx) << "Secret not available but required."; emitNotification(Notification::Warning, ApplicationDomain::SyncError, "Secret is not available.", {}, {}); return KAsync::null(); } if (mSyncRequestQueue.isEmpty()) { SinkLogCtx(mLogCtx) << "All requests processed."; return KAsync::null(); } if (mSyncInProgress) { SinkTraceCtx(mLogCtx) << "Sync still in progress."; return KAsync::null(); } //Don't process any new requests until we're done with the pending ones. //Otherwise we might process a flush before the previous request actually completed. if (!mPendingSyncRequests.isEmpty()) { SinkTraceCtx(mLogCtx) << "We still have pending sync requests. Not executing next request."; return KAsync::null(); } const auto request = mSyncRequestQueue.takeFirst(); return KAsync::start([=] { mMessageQueue->startTransaction(); mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); mSyncInProgress = true; mCurrentRequest = request; }) .then(processRequest(request)) .then([this, request](const KAsync::Error &error) { SinkTraceCtx(mLogCtx) << "Sync request processed"; setBusy(false, {}, request.requestId); mCurrentRequest = {}; mEntityStore->abortTransaction(); mSyncTransaction.abort(); mMessageQueue->commit(); mSyncStore.clear(); mSyncInProgress = false; mAbort = false; if (allChangesReplayed()) { emit changesReplayed(); } if (error) { SinkWarningCtx(mLogCtx) << "Error during sync: " << error; emitNotification(Notification::Error, error.errorCode, error.errorMessage, request.requestId); } //In case we got more requests meanwhile. return processSyncQueue(); }); } bool Synchronizer::aborting() const { return mAbort; } void Synchronizer::commit() { mMessageQueue->commit(); mSyncTransaction.commit(); mSyncStore.clear(); if (mSyncInProgress) { mMessageQueue->startTransaction(); } } Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTraceCtx(mLogCtx) << "Starting transaction on sync store."; mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } void Synchronizer::revisionChanged() { //One replay request is enough for (const auto &r : mSyncRequestQueue) { if (r.requestType == Synchronizer::SyncRequest::ChangeReplay) { return; } } mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::ChangeReplay, "changereplay"}; processSyncQueue().exec(); } bool Synchronizer::canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); if (!metadataBuffer->replayToSource()) { SinkTraceCtx(mLogCtx) << "Change is coming from the source"; } return metadataBuffer->replayToSource(); } KAsync::Job Synchronizer::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { SinkTraceCtx(mLogCtx) << "Replaying" << type << key; Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); if (!metadataBuffer) { SinkErrorCtx(mLogCtx) << "No metadata buffer available."; return KAsync::error("No metadata buffer"); } if (mSyncTransaction) { SinkErrorCtx(mLogCtx) << "Leftover sync transaction."; mSyncTransaction.abort(); } if (mSyncStore) { SinkErrorCtx(mLogCtx) << "Leftover sync store."; mSyncStore.clear(); } Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); Q_ASSERT(!mSyncTransaction); //The entitystore transaction is handled by processSyncQueue Q_ASSERT(mEntityStore->hasTransaction()); - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + const auto operation = metadataBuffer->operation(); // TODO: should not use internal representations const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); QByteArray oldRemoteId; if (operation != Sink::Operation_Creation) { oldRemoteId = syncStore().resolveLocalId(type, uid); //oldRemoteId can be empty if the resource implementation didn't return a remoteid } SinkLogCtx(mLogCtx) << "Replaying: " << key << "Type: " << type << "Uid: " << uid << "Rid: " << oldRemoteId << "Revision: " << metadataBuffer->revision(); //If the entity has been removed already and this is not the removal, skip over. //This is important so we can unblock changereplay by removing entities. bool skipOver = false; store().readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &, Sink::Operation latestOperation) { if (latestOperation == Sink::Operation_Removal && operation != Sink::Operation_Removal) { skipOver = true; } }); if (skipOver) { SinkLogCtx(mLogCtx) << "Skipping over already removed entity"; return KAsync::null(); } KAsync::Job job = KAsync::null(); //TODO This requires supporting every domain type here as well. Can we solve this better so we can do the dispatch somewhere centrally? if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else if (type == ApplicationDomain::getTypeName()) { job = replay(store().readEntity(key), operation, oldRemoteId, modifiedProperties); } else { SinkErrorCtx(mLogCtx) << "Replayed unknown type: " << type; } return job.then([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { if (operation == Sink::Operation_Creation) { SinkTraceCtx(mLogCtx) << "Replayed creation with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().recordRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Modification) { SinkTraceCtx(mLogCtx) << "Replayed modification with remote id: " << remoteId; if (!remoteId.isEmpty()) { syncStore().updateRemoteId(type, uid, remoteId); } } else if (operation == Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Replayed removal with remote id: " << oldRemoteId; if (!oldRemoteId.isEmpty()) { syncStore().removeRemoteId(type, uid, oldRemoteId); } } else { SinkErrorCtx(mLogCtx) << "Unkown operation" << operation; } }) .then([this](const KAsync::Error &error) { //We need to commit here otherwise the next change-replay step will abort the transaction mSyncStore.clear(); mSyncTransaction.commit(); if (error) { return KAsync::error(error); } return KAsync::null(); }); } +void Synchronizer::notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) +{ + + Sink::EntityBuffer buffer(value); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + if (!metadataBuffer) { + SinkErrorCtx(mLogCtx) << "No metadata buffer available."; + Q_ASSERT(false); + return; + } + if (metadataBuffer->operation() == Sink::Operation_Removal) { + const auto uid = Sink::Storage::Key::fromDisplayByteArray(key).identifier().toDisplayByteArray(); + const auto oldRemoteId = syncStore().resolveLocalId(type, uid); + SinkLogCtx(mLogCtx) << "Cleaning up removal with remote id: " << oldRemoteId; + if (!oldRemoteId.isEmpty()) { + syncStore().removeRemoteId(type, uid, oldRemoteId); + } + } + mSyncStore.clear(); + mSyncTransaction.commit(); +} + KAsync::Job Synchronizer::replay(const ApplicationDomain::Contact &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Event &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Todo &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } KAsync::Job Synchronizer::replay(const ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &, const QList &) { return KAsync::null(); } bool Synchronizer::allChangesReplayed() { if (!mSyncRequestQueue.isEmpty()) { SinkTraceCtx(mLogCtx) << "Queue is not empty"; return false; } return ChangeReplay::allChangesReplayed(); } #define REGISTER_TYPE(T) \ template void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const T &entity, const QHash &mergeCriteria); \ template void Synchronizer::modify(const T &entity, const QByteArray &newResource, bool remove); SINK_REGISTER_TYPES() diff --git a/common/synchronizer.h b/common/synchronizer.h index 546f477a..efcc76d6 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -1,255 +1,256 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include #include #include #include #include #include "changereplay.h" #include "synchronizerstore.h" namespace Sink { class SynchronizerStore; /** * Synchronize and add what we don't already have to local queue */ class SINK_EXPORT Synchronizer : public ChangeReplay { Q_OBJECT public: Synchronizer(const Sink::ResourceContext &resourceContext); virtual ~Synchronizer() Q_DECL_OVERRIDE; void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); void synchronize(const Sink::QueryBase &query); void flush(int commandId, const QByteArray &flushId); //Read only access to main storage Storage::EntityStore &store(); //Read/Write access to sync storage SynchronizerStore &syncStore(); void commit(); Sink::Storage::DataStore::Transaction &syncTransaction(); bool allChangesReplayed() Q_DECL_OVERRIDE; void flushComplete(const QByteArray &flushId); void setSecret(const QString &s); //Abort all running synchronization requests void abort(); + KAsync::Job processSyncQueue(); + signals: void notify(Notification); public slots: virtual void revisionChanged() Q_DECL_OVERRIDE; protected: ///Base implementation calls the replay$Type calls - KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; - virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; + virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; + virtual void notReplaying(const QByteArray &type, const QByteArray &key, const QByteArray &value) override; protected: ///Implement to write back changes to the server virtual KAsync::Job replay(const Sink::ApplicationDomain::Contact &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Addressbook &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Event &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Todo &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); virtual KAsync::Job replay(const Sink::ApplicationDomain::Calendar &, Sink::Operation, const QByteArray &oldRemoteId, const QList &); protected: QString secret() const; ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, const QByteArray &newResource = QByteArray(), bool remove = false); void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType); /** * A synchronous algorithm to remove entities that are no longer existing. * * A list of entities is generated by @param entryGenerator. * The entiry Generator typically iterates over an index to produce all existing entries. * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, * an entity delete command is enqueued. * * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ void scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); void scanForRemovals(const QByteArray &bufferType, std::function exists); /** * An algorithm to create or modify the entity. * * Depending on whether the entity is locally available, or has changed. */ void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); template void SINK_EXPORT createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash &mergeCriteria); void modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); // template // void create(const DomainType &entity); template void SINK_EXPORT modify(const DomainType &entity, const QByteArray &newResource = QByteArray(), bool remove = false); // template // void remove(const DomainType &entity); QByteArrayList resolveQuery(const QueryBase &query); QByteArrayList resolveFilter(const QueryBase::Comparator &filter); virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) = 0; public: struct SyncRequest { enum RequestType { Synchronization, ChangeReplay, Flush }; enum RequestOptions { NoOptions, RequestFlush }; SyncRequest() = default; SyncRequest(const Sink::QueryBase &q, const QByteArray &requestId_ = QByteArray(), RequestOptions o = NoOptions) : requestId(requestId_), requestType(Synchronization), options(o), query(q), applicableEntities(q.ids()) { } SyncRequest(RequestType type) : requestType(type) { } SyncRequest(RequestType type, const QByteArray &requestId_) : requestId(requestId_), requestType(type) { } SyncRequest(RequestType type, int flushType_, const QByteArray &requestId_) : flushType(flushType_), requestId(requestId_), requestType(type) { } int flushType = 0; QByteArray requestId; RequestType requestType; RequestOptions options = NoOptions; Sink::QueryBase query; QByteArrayList applicableEntities; }; protected: /** * This allows the synchronizer to turn a single query into multiple synchronization requests. * * The idea is the following; * The input query is a specification by the application of what data needs to be made available. * Requests could be: * * Give me everything (signified by the default constructed/empty query) * * Give me all mails of folder X * * Give me all mails of folders matching some constraints * * getSyncRequests allows the resource implementation to apply it's own defaults to that request; * * While a maildir resource might give you always all emails of a folder, an IMAP resource might have a date limit, to i.e. only retrieve the last 14 days worth of data. * * A resource get's to define what "give me everything" means. For email that may be turned into first a requests for folders, and then a request for all emails in those folders. * * This will allow synchronizeWithSource to focus on just getting to the content. */ virtual QList getSyncRequests(const Sink::QueryBase &query); /** * This allows the synchronizer to merge new requests with existing requests in the queue. */ virtual void mergeIntoQueue(const Synchronizer::SyncRequest &request, QList &queue); void addToQueue(const Synchronizer::SyncRequest &request); void emitNotification(Notification::NoticationType type, int code, const QString &message, const QByteArray &id = QByteArray{}, const QByteArrayList &entiteis = QByteArrayList{}); void emitProgressNotification(Notification::NoticationType type, int progress, int total, const QByteArray &id, const QByteArrayList &entities); /** * Report progress for current task */ virtual void reportProgress(int progress, int total, const QByteArrayList &entities = {}) Q_DECL_OVERRIDE; Sink::Log::Context mLogCtx; /** * True while aborting. * * Stop the synchronization as soon as possible. */ bool aborting() const; - KAsync::Job processSyncQueue(); - private: QStack mCurrentState; void setStatusFromResult(const KAsync::Error &error, const QString &s, const QByteArray &requestId); void setStatus(ApplicationDomain::Status busy, const QString &reason, const QByteArray requestId); void resetStatus(const QByteArray requestId); void setBusy(bool busy, const QString &reason, const QByteArray requestId); void clearQueue(); void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); KAsync::Job processRequest(const SyncRequest &request); Sink::ResourceContext mResourceContext; Sink::Storage::EntityStore::Ptr mEntityStore; QSharedPointer mSyncStore; Sink::Storage::DataStore mSyncStorage; Sink::Storage::DataStore::Transaction mSyncTransaction; std::function mEnqueue; QList mSyncRequestQueue; SyncRequest mCurrentRequest; MessageQueue *mMessageQueue; bool mSyncInProgress; bool mAbort; QMultiHash mPendingSyncRequests; QString mSecret; }; } diff --git a/tests/synchronizertest.cpp b/tests/synchronizertest.cpp index 121868a5..8fae42c0 100644 --- a/tests/synchronizertest.cpp +++ b/tests/synchronizertest.cpp @@ -1,192 +1,212 @@ #include #include #include -#include "testimplementations.h" - -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "createentity_generated.h" -#include "modifyentity_generated.h" -#include "deleteentity_generated.h" -#include "dummyresource/resourcefactory.h" #include "store.h" #include "commands.h" #include "entitybuffer.h" -#include "resourceconfig.h" #include "pipeline.h" #include "synchronizer.h" #include "commandprocessor.h" -#include "log.h" -#include "domainadaptor.h" #include "definitions.h" #include "adaptorfactoryregistry.h" -#include "storage/key.h" +#include "datastorequery.h" #include "genericresource.h" #include "testutils.h" #include "test.h" class TestSynchronizer: public Sink::Synchronizer { public: TestSynchronizer(const Sink::ResourceContext &context): Sink::Synchronizer(context) { } QMap> mSyncCallbacks; KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) override { return KAsync::start([this, query] { Q_ASSERT(mSyncCallbacks.contains(query.id())); mSyncCallbacks.value(query.id())(); }); } void createOrModify(const QByteArray &rid, Sink::ApplicationDomain::ApplicationDomainType &entity) { Sink::Synchronizer::createOrModify("calendar", rid, entity); } void scanForRemovals(const QSet &set) { Sink::Synchronizer::scanForRemovals("calendar", [&](const QByteArray &remoteId) { return set.contains(remoteId); }); } QByteArray resolveRemoteId(const QByteArray &remoteId) { return syncStore().resolveRemoteId("calendar", remoteId); } void synchronize(std::function callback, const QByteArray &id = {}, Synchronizer::SyncRequest::RequestOptions options = Synchronizer::SyncRequest::NoOptions) { mSyncCallbacks.insert(id, callback); Sink::Query query; query.setId(id); addToQueue(Synchronizer::SyncRequest{query, id, options}); VERIFYEXEC(processSyncQueue()); } }; class SynchronizerTest : public QObject { Q_OBJECT QByteArray instanceIdentifier() { return "synchronizertest.instance1"; } Sink::ResourceContext getContext() { return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")}; } + bool queryFor(const QByteArray &sinkId, const QByteArray &type, Sink::Storage::EntityStore &store) { + bool foundInQuery = false; + DataStoreQuery dataStoreQuery{{sinkId}, type, store}; + auto resultSet = dataStoreQuery.execute(); + resultSet.replaySet(0, 1, [&](const ResultSet::Result &r) { + if (r.entity.identifier() == sinkId) { + foundInQuery = true; + } + }); + return foundInQuery; + } + private slots: void initTestCase() { Sink::Test::initTest(); Sink::Storage::DataStore{Sink::Store::storageLocation(), instanceIdentifier(), Sink::Storage::DataStore::ReadWrite}.removeFromDisk(); Sink::AdaptorFactoryRegistry::instance().registerFactory>("test"); } void init() { Sink::GenericResource::removeFromDisk(instanceIdentifier()); } - void testSynchronizer() + /* + * Ensure we can remove an recreate an entity. + */ + void testTemporaryRemoval() { const auto context = getContext(); Sink::Pipeline pipeline(context, instanceIdentifier()); Sink::CommandProcessor processor(&pipeline, instanceIdentifier(), Sink::Log::Context{"processor"}); auto synchronizer = QSharedPointer::create(context); processor.setSynchronizer(synchronizer); synchronizer->setSecret("secret"); synchronizer->synchronize([&] { Sink::ApplicationDomain::Calendar calendar; calendar.setName("Name"); synchronizer->createOrModify("1", calendar); }); VERIFYEXEC(processor.processAllMessages()); const auto sinkId = synchronizer->resolveRemoteId("1"); QVERIFY(!sinkId.isEmpty()); { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(store.contains("calendar", sinkId)); QVERIFY(store.exists("calendar", sinkId)); + QVERIFY(queryFor(sinkId, "calendar", store)); } //Remove the calendar synchronizer->synchronize([&] { synchronizer->scanForRemovals({}); }); - synchronizer->revisionChanged(); + //Process the removal VERIFYEXEC(processor.processAllMessages()); + //Ensure we replay the revision generated by the removal. + //This is necessary to remove the rid mapping + synchronizer->replayNextRevision().exec(); { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(!store.exists("calendar", sinkId)); QVERIFY(store.contains("calendar", sinkId)); + QVERIFY(!queryFor(sinkId, "calendar", store)); } //Recreate the same calendar synchronizer->synchronize([&] { Sink::ApplicationDomain::Calendar calendar; calendar.setName("Name"); synchronizer->createOrModify("1", calendar); }); - synchronizer->revisionChanged(); VERIFYEXEC(processor.processAllMessages()); + + //Ensure we got a new sink id (if not we failed to remove the rid mapping from the previous instance). + const auto newSinkId = synchronizer->resolveRemoteId("1"); + QVERIFY(!newSinkId.isEmpty()); + QVERIFY(newSinkId != sinkId); + { Sink::Storage::EntityStore store(context, {"entitystore"}); - QVERIFY(store.contains("calendar", sinkId)); - QVERIFY(store.exists("calendar", sinkId)); + QVERIFY(store.contains("calendar", newSinkId)); + QVERIFY(store.exists("calendar", newSinkId)); + + // store.readRevisions("calendar", newSinkId, 0, [] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &buffer) { + // qWarning() << uid << revision << buffer.operation(); + // }); + + QVERIFY(!queryFor(sinkId, "calendar", store)); + QVERIFY(queryFor(newSinkId, "calendar", store)); } } /* * Ensure the flushed content is available during the next sync request */ void testFlush() { const auto context = getContext(); Sink::Pipeline pipeline(context, instanceIdentifier()); Sink::CommandProcessor processor(&pipeline, instanceIdentifier(), Sink::Log::Context{"processor"}); auto synchronizer = QSharedPointer::create(context); processor.setSynchronizer(synchronizer); synchronizer->setSecret("secret"); QByteArray sinkId; synchronizer->synchronize([&] { Sink::ApplicationDomain::Calendar calendar; calendar.setName("Name"); synchronizer->createOrModify("1", calendar); sinkId = synchronizer->resolveRemoteId("1"); }, "1"); QVERIFY(!sinkId.isEmpty()); //With a flush the calendar should be available during the next sync synchronizer->synchronize([&] { Sink::Storage::EntityStore store(context, {"entitystore"}); QVERIFY(store.contains("calendar", sinkId)); }, "2", Sink::Synchronizer::SyncRequest::RequestFlush); VERIFYEXEC(processor.processAllMessages()); } }; QTEST_MAIN(SynchronizerTest) #include "synchronizertest.moc"