diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index e8ec5ab3..388e7c23 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -1,704 +1,708 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "entitystore.h" #include #include #include "entitybuffer.h" #include "log.h" #include "typeindex.h" #include "definitions.h" #include "resourcecontext.h" #include "index.h" #include "bufferutils.h" #include "entity_generated.h" #include "applicationdomaintype_p.h" #include "typeimplementations.h" using namespace Sink; using namespace Sink::Storage; static QMap baseDbs() { return {{"revisionType", Storage::IntegerKeys}, {"revisions", Storage::IntegerKeys}, {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues}, {"uids", 0}, {"default", 0}, {"__flagtable", 0}}; } template void mergeImpl(T &map, First f) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } } template void mergeImpl(T &map, First f, Tail ...maps) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } mergeImpl(map, maps...); } template First merge(First f, Tail ...maps) { First map; mergeImpl(map, f, maps...); return map; } template struct DbLayoutHelper { void operator()(QMap map) const { mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); } }; static Sink::Storage::DbLayout dbLayout(const QByteArray &instanceId) { static auto databases = [] { QMap map; mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); return merge(baseDbs(), map); }(); return {instanceId, databases}; } class EntityStore::Private { public: Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) { } ResourceContext resourceContext; DataStore::Transaction transaction; QHash > indexByType; Sink::Log::Context logCtx; bool exists() { return Storage::DataStore::exists(Sink::storageLocation(), resourceContext.instanceId()); } DataStore::Transaction &getTransaction() { if (transaction) { return transaction; } DataStore store(Sink::storageLocation(), dbLayout(resourceContext.instanceId()), DataStore::ReadOnly); transaction = store.createTransaction(DataStore::ReadOnly); return transaction; } template struct ConfigureHelper { void operator()(TypeIndex &arg) const { ApplicationDomain::TypeImplementation::configure(arg); } }; TypeIndex &cachedIndex(const QByteArray &type) { if (indexByType.contains(type)) { return *indexByType.value(type); } auto index = QSharedPointer::create(type, logCtx); TypeHelper{type}.template operator()(*index); indexByType.insert(type, index); return *index; } TypeIndex &typeIndex(const QByteArray &type) { auto &index = cachedIndex(type); index.mTransaction = &transaction; return index; } ApplicationDomainType createApplicationDomainType(const QByteArray &type, const QByteArray &uid, qint64 revision, const EntityBuffer &buffer) { auto adaptor = resourceContext.adaptorFactory(type).createAdaptor(buffer.entity(), &typeIndex(type)); return ApplicationDomainType{resourceContext.instanceId(), uid, revision, adaptor}; } }; EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx) : d(new EntityStore::Private{context, ctx}) { } void EntityStore::initialize() { //This function is only called in the resource code where we want to be able to write to the databse. //Check for the existience of the db without creating it or the envrionment. //This is required to be able to set the database version only in the case where we create a new database. if (!Storage::DataStore::exists(Sink::storageLocation(), d->resourceContext.instanceId())) { //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. startTransaction(DataStore::ReadWrite); //Create the database with the correct version if it wasn't existing before SinkLogCtx(d->logCtx) << "Creating resource database."; Storage::DataStore::setDatabaseVersion(d->transaction, Sink::latestDatabaseVersion()); } else { //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. startTransaction(DataStore::ReadWrite); } commitTransaction(); } void EntityStore::startTransaction(DataStore::AccessMode accessMode) { SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode; Q_ASSERT(!d->transaction); d->transaction = DataStore(Sink::storageLocation(), dbLayout(d->resourceContext.instanceId()), accessMode).createTransaction(accessMode); } void EntityStore::commitTransaction() { SinkTraceCtx(d->logCtx) << "Committing transaction"; for (const auto &type : d->indexByType.keys()) { d->typeIndex(type).commitTransaction(); } Q_ASSERT(d->transaction); d->transaction.commit(); d->transaction = {}; } void EntityStore::abortTransaction() { SinkTraceCtx(d->logCtx) << "Aborting transaction"; d->transaction.abort(); d->transaction = {}; } bool EntityStore::hasTransaction() const { return d->transaction; } bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool replayToSource) { if (entity.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; return false; } SinkTraceCtx(d->logCtx) << "New entity " << entity; const auto identifier = Identifier::fromDisplayByteArray(entity.identifier()); d->typeIndex(type).add(identifier, entity, d->transaction, d->resourceContext.instanceId()); //The maxRevision may have changed meanwhile if the entity created sub-entities const qint64 newRevision = maxRevision() + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Creation); metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); DataStore::recordUid(d->transaction, entity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type; return true; } ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet &excludeProperties) const { SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties; auto newEntity = *ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); // Apply diff for (const auto &property : diff.changedProperties()) { if (!excludeProperties.contains(property)) { const auto value = diff.getProperty(property); if (value.isValid()) { newEntity.setProperty(property, value); } } } // Remove deletions for (const auto &property : deletions) { if (!excludeProperties.contains(property)) { newEntity.setProperty(property, QVariant()); } } return newEntity; } bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) { const auto current = readLatest(type, diff.identifier()); if (current.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); return false; } auto newEntity = applyDiff(type, current, diff, deletions); return modify(type, current, newEntity, replayToSource); } bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource) { SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; const auto identifier = Identifier::fromDisplayByteArray(newEntity.identifier()); d->typeIndex(type).modify(identifier, current, newEntity, d->transaction, d->resourceContext.instanceId()); const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; { //We add availableProperties to account for the properties that have been changed by the preprocessors auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Modification); metadataBuilder.add_replayToSource(replayToSource); metadataBuilder.add_modifiedProperties(modifiedProperties); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); } SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); newEntity.setChangedProperties(newEntity.availableProperties().toSet()); flatbuffers::FlatBufferBuilder fbb; d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); DataStore::mainDatabase(d->transaction, type) .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; return true; } bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource) { const auto uid = current.identifier(); if (!exists(type, uid)) { SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; return false; } const auto identifier = Identifier::fromDisplayByteArray(uid); d->typeIndex(type).remove(identifier, current, d->transaction, d->resourceContext.instanceId()); SinkTraceCtx(d->logCtx) << "Removed entity " << current; const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Removal); metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); DataStore::mainDatabase(d->transaction, type) .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, uid, type); DataStore::removeUid(d->transaction, uid, type); return true; } void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) { const auto uid = DataStore::getUidFromRevision(d->transaction, revision); const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); if (bufferType.isEmpty() || uid.isEmpty()) { SinkErrorCtx(d->logCtx) << "Failed to find revision during cleanup: " << revision; Q_ASSERT(false); return; } SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); // Remove old revisions const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision); for (const auto &revisionToRemove : revisionsToRemove) { DataStore::removeRevision(d->transaction, revisionToRemove); DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove); } // And remove the specified revision only if marked for removal DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) { EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; return false; } const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); if (metadata->operation() == Operation_Removal) { DataStore::removeRevision(d->transaction, revision); DataStore::mainDatabase(d->transaction, bufferType).remove(revision); } return false; }); DataStore::setCleanedUpRevision(d->transaction, revision); } bool EntityStore::cleanupRevisions(qint64 revision) { Q_ASSERT(d->exists()); bool implicitTransaction = false; if (!d->transaction) { startTransaction(DataStore::ReadWrite); Q_ASSERT(d->transaction); implicitTransaction = true; } const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction); const auto firstRevisionToCleanup = lastCleanRevision + 1; bool cleanupIsNecessary = firstRevisionToCleanup <= revision; if (cleanupIsNecessary) { SinkTraceCtx(d->logCtx) << "Cleaning up from " << firstRevisionToCleanup << " to " << revision; for (qint64 rev = firstRevisionToCleanup; rev <= revision; rev++) { cleanupEntityRevisionsUntil(rev); } } if (implicitTransaction) { commitTransaction(); } return cleanupIsNecessary; } QVector EntityStore::fullScan(const QByteArray &type) { SinkTraceCtx(d->logCtx) << "Looking for : " << type; if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return {}; } QSet keys; DataStore::getUids(type, d->getTransaction(), [&keys] (const QByteArray &uid) { keys << Identifier::fromDisplayByteArray(uid); }); SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; return keys.toList().toVector(); } QVector EntityStore::indexLookup(const QByteArray &type, const QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return {}; } return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction(), d->resourceContext.instanceId()); } QVector EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return {}; } return d->typeIndex(type).lookup(property, value, d->getTransaction()); } void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return; } auto list = indexLookup(type, property, value); for (const auto &id : list) { callback(id.toDisplayByteArray()); } /* Index index(type + ".index." + property, d->transaction); */ /* index.lookup(value, [&](const QByteArray &sinkId) { */ /* callback(sinkId); */ /* }, */ /* [&](const Index::Error &error) { */ /* SinkWarningCtx(d->logCtx) << "Error in index: " << error.message << property; */ /* }); */ } void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function callback) { Q_ASSERT(d); const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray()); + if (!revision) { + SinkWarningCtx(d->logCtx) << "Failed to readLatest: " << type << id; + return; + } auto db = DataStore::mainDatabase(d->getTransaction(), type); db.scan(revision, [=](size_t, const QByteArray &value) { callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { readLatest(type, Identifier::fromDisplayByteArray(uid), callback); } void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function callback) { readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { //TODO cache max revision for the duration of the transaction. callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { readLatest(type, Identifier::fromDisplayByteArray(uid), callback); } void EntityStore::readLatest(const QByteArray &type, const Identifier &uid, const std::function callback) { readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { //TODO cache max revision for the duration of the transaction. callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer), buffer.operation()); }); } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { readLatest(type, Identifier::fromDisplayByteArray(uid), callback); } ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid) { ApplicationDomainType dt; readLatest(type, uid, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKey, const std::function callback) { const auto key = Key::fromDisplayByteArray(displayKey); auto db = DataStore::mainDatabase(d->getTransaction(), type); db.scan(key.revision().toSizeT(), [=](size_t rev, const QByteArray &value) -> bool { const auto uid = DataStore::getUidFromRevision(d->transaction, rev); callback(uid, Sink::EntityBuffer(value.data(), value.size())); return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; }); } void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback) { readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid) { ApplicationDomainType dt; readEntity(type, uid, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readAll(const QByteArray &type, const std::function &callback) { readAllUids(type, [&] (const QByteArray &uid) { readLatest(type, uid, callback); }); } void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function &callback) { qint64 revisionCounter = baseRevision; const qint64 topRevision = DataStore::maxRevision(d->getTransaction()); // Spit out the revision keys one by one. while (revisionCounter <= topRevision) { const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter); const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter); // SinkTrace() << "Revision" << *revisionCounter << type << uid; Q_ASSERT(!uid.isEmpty()); Q_ASSERT(!type.isEmpty()); if (type != expectedType) { // Skip revision revisionCounter++; continue; } const auto key = Key(Identifier::fromDisplayByteArray(uid), revisionCounter); revisionCounter++; callback(key); } } void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function callback) { const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id.toDisplayByteArray(), revision); const size_t latestRevision = previousRevisions[previousRevisions.size() - 1]; const auto key = Key(id, latestRevision); readEntity(type, key.toDisplayByteArray(), callback); } void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function callback) { readPrevious(type, id, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) { callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision) { ApplicationDomainType dt; readPrevious(type, id, revision, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readAllUids(const QByteArray &type, const std::function callback) { DataStore::getUids(type, d->getTransaction(), callback); } bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid) { Q_ASSERT(!uid.isEmpty()); return !DataStore::getRevisionsFromUid(d->getTransaction(), uid).isEmpty(); } bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) { bool found = false; bool alreadyRemoved = false; const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid); DataStore::mainDatabase(d->transaction, type) .scan(revision, [&found, &alreadyRemoved](size_t, const QByteArray &data) { auto entity = GetEntity(data.data()); if (entity && entity->metadata()) { auto metadata = GetMetadata(entity->metadata()->Data()); found = true; if (metadata->operation() == Operation_Removal) { alreadyRemoved = true; } } return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); if (!found) { SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; return false; } if (alreadyRemoved) { SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; return false; } return true; } void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, size_t startingRevision, const std::function callback) { Q_ASSERT(d); Q_ASSERT(!uid.isEmpty()); const auto revisions = DataStore::getRevisionsFromUid(d->transaction, uid); const auto db = DataStore::mainDatabase(d->transaction, type); for (const auto revision : revisions) { if (revision < static_cast(startingRevision)) { continue; } db.scan(revision, [&](size_t rev, const QByteArray &value) { Q_ASSERT(rev == revision); callback(uid, revision, Sink::EntityBuffer(value.data(), value.size())); return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); } } qint64 EntityStore::maxRevision() { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing."; return 0; } return DataStore::maxRevision(d->getTransaction()); } Sink::Log::Context EntityStore::logContext() const { return d->logCtx; } diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 7c794c35..11e5b466 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -1,317 +1,317 @@ /* * Copyright (C) 2014 Aaron Seigo * Copyright (C) 2014 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "storage.h" #include "log.h" #include "utils.h" QDebug& operator<<(QDebug &dbg, const Sink::Storage::DataStore::Error &error) { dbg << error.message << "Code: " << error.code << "Db: " << error.store; return dbg; } namespace Sink { namespace Storage { static const char *s_internalPrefix = "__internal"; static const int s_internalPrefixSize = strlen(s_internalPrefix); DbLayout::DbLayout() { } DbLayout::DbLayout(const QByteArray &n, const Databases &t) : name(n), tables(t) { } void errorHandler(const DataStore::Error &error) { if (error.code == DataStore::TransactionError) { - SinkError() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message; + SinkError() << "Transaction error:" << error; } else { - SinkWarning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message; + SinkWarning() << "Database error:" << error; } } std::function DataStore::basicErrorHandler() { return errorHandler; } void DataStore::setDefaultErrorHandler(const std::function &errorHandler) { mErrorHandler = errorHandler; } std::function DataStore::defaultErrorHandler() const { if (mErrorHandler) { return mErrorHandler; } return basicErrorHandler(); } void DataStore::setMaxRevision(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase().write("__internal_maxRevision", QByteArray::number(revision)); } qint64 DataStore::maxRevision(const DataStore::Transaction &transaction) { qint64 r = 0; transaction.openDatabase().scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { r = revision.toLongLong(); return false; }, [](const Error &error) { if (error.code != DataStore::NotFound) { SinkWarning() << "Couldn't find the maximum revision: " << error; } }); return r; } void DataStore::setCleanedUpRevision(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision)); } qint64 DataStore::cleanedUpRevision(const DataStore::Transaction &transaction) { qint64 r = 0; transaction.openDatabase().scan("__internal_cleanedUpRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { r = revision.toLongLong(); return false; }, [](const Error &error) { if (error.code != DataStore::NotFound) { SinkWarning() << "Couldn't find the cleanedUpRevision: " << error; } }); return r; } QByteArray DataStore::getUidFromRevision(const DataStore::Transaction &transaction, size_t revision) { QByteArray uid; transaction .openDatabase("revisions", /* errorHandler = */ {}, IntegerKeys) .scan(revision, [&](const size_t, const QByteArray &value) -> bool { uid = QByteArray{ value.constData(), value.size() }; return false; }, [revision](const Error &error) { SinkWarning() << "Couldn't find uid for revision: " << revision << error.message; }); Q_ASSERT(!uid.isEmpty()); return uid; } size_t DataStore::getLatestRevisionFromUid(DataStore::Transaction &t, const QByteArray &uid) { - size_t revision; + size_t revision = 0; t.openDatabase("uidsToRevisions", {}, AllowDuplicates | IntegerValues) .findLatest(uid, [&revision](const QByteArray &key, const QByteArray &value) { revision = byteArrayToSizeT(value); }); return revision; } QList DataStore::getRevisionsUntilFromUid(DataStore::Transaction &t, const QByteArray &uid, size_t lastRevision) { QList queriedRevisions; t.openDatabase("uidsToRevisions", {}, AllowDuplicates | IntegerValues) .scan(uid, [&queriedRevisions, lastRevision](const QByteArray &, const QByteArray &value) { size_t currentRevision = byteArrayToSizeT(value); if (currentRevision < lastRevision) { queriedRevisions << currentRevision; return true; } return false; }); return queriedRevisions; } QList DataStore::getRevisionsFromUid(DataStore::Transaction &t, const QByteArray &uid) { QList queriedRevisions; t.openDatabase("uidsToRevisions", {}, AllowDuplicates | IntegerValues) .scan(uid, [&queriedRevisions](const QByteArray &, const QByteArray &value) { queriedRevisions << byteArrayToSizeT(value); return true; }); return queriedRevisions; } QByteArray DataStore::getTypeFromRevision(const DataStore::Transaction &transaction, size_t revision) { QByteArray type; transaction.openDatabase("revisionType", /* errorHandler = */ {}, IntegerKeys) .scan(revision, [&](const size_t, const QByteArray &value) -> bool { type = QByteArray{value.constData(), value.size()}; return false; }, [revision](const Error &error) { SinkWarning() << "Couldn't find type for revision " << revision; }); Q_ASSERT(!type.isEmpty()); return type; } void DataStore::recordRevision(DataStore::Transaction &transaction, size_t revision, const QByteArray &uid, const QByteArray &type) { transaction .openDatabase("revisions", /* errorHandler = */ {}, IntegerKeys) .write(revision, uid); transaction.openDatabase("uidsToRevisions", /* errorHandler = */ {}, AllowDuplicates | IntegerValues) .write(uid, sizeTToByteArray(revision)); transaction .openDatabase("revisionType", /* errorHandler = */ {}, IntegerKeys) .write(revision, type); } void DataStore::removeRevision(DataStore::Transaction &transaction, size_t revision) { const QByteArray uid = getUidFromRevision(transaction, revision); transaction .openDatabase("revisions", /* errorHandler = */ {}, IntegerKeys) .remove(revision); transaction.openDatabase("uidsToRevisions", /* errorHandler = */ {}, AllowDuplicates | IntegerValues) .remove(uid, sizeTToByteArray(revision)); transaction .openDatabase("revisionType", /* errorHandler = */ {}, IntegerKeys) .remove(revision); } void DataStore::recordUid(DataStore::Transaction &transaction, const QByteArray &uid, const QByteArray &type) { transaction.openDatabase(type + "uids").write(uid, ""); } void DataStore::removeUid(DataStore::Transaction &transaction, const QByteArray &uid, const QByteArray &type) { transaction.openDatabase(type + "uids").remove(uid); } void DataStore::getUids(const QByteArray &type, const Transaction &transaction, const std::function &callback) { transaction.openDatabase(type + "uids").scan("", [&] (const QByteArray &key, const QByteArray &) { callback(key); return true; }); } bool DataStore::hasUid(const QByteArray &type, const Transaction &transaction, const QByteArray &uid) { bool hasTheUid = false; transaction.openDatabase(type + "uids").scan(uid, [&](const QByteArray &key, const QByteArray &) { Q_ASSERT(uid == key); hasTheUid = true; return false; }); return hasTheUid; } bool DataStore::isInternalKey(const char *key) { return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; } bool DataStore::isInternalKey(void *key, int size) { if (size < 1) { return false; } return key && strncmp(static_cast(key), s_internalPrefix, (size > s_internalPrefixSize ? s_internalPrefixSize : size)) == 0; } bool DataStore::isInternalKey(const QByteArray &key) { return key.startsWith(s_internalPrefix); } QByteArray DataStore::generateUid() { return createUuid(); } DataStore::NamedDatabase DataStore::mainDatabase(const DataStore::Transaction &t, const QByteArray &type) { if (type.isEmpty()) { SinkError() << "Tried to open main database for empty type."; Q_ASSERT(false); return {}; } return t.openDatabase(type + ".main", /* errorHandler= */ {}, IntegerKeys); } bool DataStore::NamedDatabase::contains(const QByteArray &uid) { bool found = false; scan(uid, [&found](const QByteArray &, const QByteArray &) -> bool { found = true; return false; }, [](const DataStore::Error &error) {}, true); return found; } void DataStore::setDatabaseVersion(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase().write("__internal_databaseVersion", QByteArray::number(revision)); } qint64 DataStore::databaseVersion(const DataStore::Transaction &transaction) { qint64 r = 0; transaction.openDatabase().scan("__internal_databaseVersion", [&](const QByteArray &, const QByteArray &revision) -> bool { r = revision.toLongLong(); return false; }, [](const Error &error) { if (error.code != DataStore::NotFound) { SinkWarning() << "Couldn't find the database version: " << error; } }); return r; } } } // namespace Sink