diff --git a/common/store.cpp b/common/store.cpp index 38d8a7af..84a52fed 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -1,546 +1,553 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" #include "utils.h" #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast(A) == static_cast(B), "The enum values must match"); //Ensure the copied enum matches typedef ModelResult MailModelResult; ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole) ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole) ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole) ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole) ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole) ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole) Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); static bool sanityCheckQuery(const Sink::Query &query) { for (const auto &id : query.ids()) { if (id.isEmpty()) { SinkError() << "Empty id in query."; return false; } } return true; } +static KAsync::Job forEachResource(const Sink::SyncScope &scope, std::function(const Sink::ApplicationDomain::SinkResource::Ptr &resource)> callback) +{ + using namespace Sink; + auto resourceFilter = scope.getResourceFilter(); + //Filter resources by type by default + if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { + resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); + } + Sink::Query query; + query.setFilter(resourceFilter); + return Store::fetchAll(query) + .template each(callback); +} namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) { auto ctx = ctx_.subContext(resourceInstanceIdentifier); auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QPair::Ptr, typename ResultEmitter::Ptr> getEmitter(Query query, const Log::Context &ctx) { query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Query: " << query; // Query all resources and aggregate results auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { //For global types we don't need to query for the resources first. queryResource("", "", query, aggregatingEmitter, ctx).exec(); } else { auto resourceCtx = ctx.subContext("resourceQuery"); auto facade = FacadeFactory::instance().getFacade(); Q_ASSERT(facade); Sink::Query resourceQuery; resourceQuery.request(); if (query.liveQuery()) { SinkTraceCtx(ctx) << "Listening for new resources."; resourceQuery.setFlags(Query::LiveQuery); } //Filter resources by available content types (unless the query already specifies a capability filter) auto resourceFilter = query.getResourceFilter(); if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) { resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName(), Query::Comparator::Contains}); } resourceQuery.setFilter(resourceFilter); for (auto const &properties : resourceFilter.propertyFilter.keys()) { resourceQuery.requestedProperties << properties; } auto result = facade->load(resourceQuery, resourceCtx); auto emitter = result.second; emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec(); }); emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { SinkTraceCtx(resourceCtx) << "Resource query complete"; }); return qMakePair(aggregatingEmitter, emitter); } return qMakePair(aggregatingEmitter, ResultEmitter::Ptr{}); } static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type) { if (!query.id().isEmpty()) { return Log::Context{"query." + type + "." + query.id()}; } return Log::Context{"query." + type}; } template QSharedPointer Store::loadModel(const Query &query) { Q_ASSERT(sanityCheckQuery(query)); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); auto model = QSharedPointer>::create(query, query.requestedProperties, ctx); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). auto result = getEmitter(query, ctx); model->setEmitter(result.first); //Keep the emitter alive if (auto resourceEmitter = result.second) { model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries resourceEmitter->fetch(); } //Automatically populate the top-level model->fetchMore(QModelIndex()); return model; } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade()) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkLog() << "Create: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; }); }); } return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::modify(const Query &query, const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << query << domainObject; return fetchAll(query) .each([=] (const typename DomainType::Ptr &entity) { auto copy = *entity; for (const auto &p : domainObject.changedProperties()) { copy.setProperty(p, domainObject.getProperty(p)); } return modify(copy); }); } template KAsync::Job Store::move(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Move: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); }); } return facade->move(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); } template KAsync::Job Store::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Copy: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); }); } return facade->copy(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkLog() << "Remove: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); }); } return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); } template KAsync::Job Store::remove(const Sink::Query &query) { SinkLog() << "Remove: " << query; return fetchAll(query) .each([] (const typename DomainType::Ptr &entity) { return remove(*entity); }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification from the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown auto guard = new QObject; QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) { if (!ready) { //We don't disconnect if ResourceAccess get's recycled, so ready can fire multiple times, which can result in a crash if the future is no longer valid. delete guard; future.setFinished(); } }); } else { future.setFinished(); } }) .then([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } static KAsync::Job upgrade(const QByteArray &resource) { auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) { return KAsync::value(Store::UpgradeResult{false}); } SinkLog() << "Upgrading " << resource; //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater}; return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during upgrade."; return KAsync::error(error); } SinkTrace() << "Upgrade of resource " << resource << " complete."; return KAsync::null(); }) .then(KAsync::value(Store::UpgradeResult{true})); } KAsync::Job Store::upgrade() { SinkLog() << "Upgrading..."; //Migrate from sink.dav to sink.carddav const auto resources = ResourceConfig::getResources(); for (auto it = resources.constBegin(); it != resources.constEnd(); it++) { if (it.value() == "sink.dav") { ResourceConfig::setResourceType(it.key(), "sink.carddav"); } } auto ret = QSharedPointer::create(false); return fetchAll({}) .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return Sink::upgrade(resource->identifier()) .then([ret](UpgradeResult returnValue) { if (returnValue.upgradeExecuted) { SinkLog() << "Upgrade executed."; *ret = true; } }); }) .then([ret] { if (*ret) { SinkLog() << "Upgrade complete."; } return Store::UpgradeResult{*ret}; }); } static KAsync::Job synchronize(const QByteArray &resource, const Sink::SyncScope &scope) { SinkLog() << "Synchronizing " << resource << scope; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "Synchronization of resource " << resource << " complete."; return KAsync::null(); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { return synchronize(Sink::SyncScope{query}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { - auto resourceFilter = scope.getResourceFilter(); - //Filter resources by type by default - if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { - resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); - } - Sink::Query query; - query.setFilter(resourceFilter); - SinkLog() << "Synchronizing all resource matching: " << query; - return fetchAll(query) - .template each([scope](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { + SinkLog() << "Synchronizing all resource matching: " << scope; + return forEachResource(scope, [=] (const auto &resource) { return synchronize(resource->identifier(), scope); }); } -KAsync::Job Store::abortSynchronization(const QByteArray &identifier) +KAsync::Job Store::abortSynchronization(const Sink::SyncScope &scope) { - auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); - return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand) - .addToContext(resourceAccess) - .then([=](const KAsync::Error &error) { - if (error) { - SinkWarning() << "Error aborting synchronization."; - return KAsync::error(error); - } - return KAsync::null(); + return forEachResource(scope, [] (const auto &resource) { + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource->identifier(), ResourceConfig::getResourceType(resource->identifier())); + return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand) + .addToContext(resourceAccess) + .then([=](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Error aborting synchronization."; + return KAsync::error(error); + } + return KAsync::null(); + }); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { Q_ASSERT(sanityCheckQuery(query)); auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } SinkWarning() << "Tried to read value but no values are available."; return DomainType(); } template QList Store::read(const Sink::Query &query_) { Q_ASSERT(sanityCheckQuery(query_)); auto query = query_; query.setFlags(Query::SynchronousQuery); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); QList list; auto result = getEmitter(query, ctx); auto aggregatingEmitter = result.first; aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); list << *value; }); if (auto resourceEmitter = result.second) { resourceEmitter->fetch(); } aggregatingEmitter->fetch(); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::remove(const Query &); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template KAsync::Job Store::modify(const Query &, const T &); \ template KAsync::Job Store::move(const T &domainObject, const QByteArray &newResource); \ template KAsync::Job Store::copy(const T &domainObject, const QByteArray &newResource); \ template QSharedPointer Store::loadModel(const Query &query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); SINK_REGISTER_TYPES() } // namespace Sink diff --git a/common/store.h b/common/store.h index f2001323..2104f51a 100644 --- a/common/store.h +++ b/common/store.h @@ -1,159 +1,159 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include "query.h" #include "applicationdomaintype.h" class QAbstractItemModel; namespace Sink { /** * The unified Sink Store. * * This is the primary interface for clients to interact with Sink. * It provides a unified store where all data provided by various resources can be accessed and modified. */ namespace Store { QString SINK_EXPORT storageLocation(); // Must be the same as in ModelResult enum Roles { DomainObjectRole = Qt::UserRole + 1, ChildrenFetchedRole, DomainObjectBaseRole, StatusRole, //ApplicationDomain::SyncStatus WarningRole, //ApplicationDomain::Warning, only if status == warning || status == error ProgressRole //ApplicationDomain::Progress }; /** * Asynchronusly load a dataset with tree structure information */ template QSharedPointer SINK_EXPORT loadModel(const Query &query); /** * Create a new entity. */ template KAsync::Job SINK_EXPORT create(const DomainType &domainObject); /** * Modify an entity. * * This includes moving etc. since these are also simple settings on a property. * Note that the modification will be dropped if there is no changedProperty on the domain object. */ template KAsync::Job SINK_EXPORT modify(const DomainType &domainObject); /** * Modify a set of entities identified by @param query. * * Note that the modification will be dropped if there is no changedProperty on the domain object. */ template KAsync::Job SINK_EXPORT modify(const Query &query, const DomainType &domainObject); /** * Remove an entity. */ template KAsync::Job SINK_EXPORT remove(const DomainType &domainObject); /** * Remove a set of entities identified by @param query. */ template KAsync::Job SINK_EXPORT remove(const Query &query); /** * Move an entity to a new resource. */ template KAsync::Job SINK_EXPORT move(const DomainType &domainObject, const QByteArray &newResource); /** * Copy an entity to a new resource. */ template KAsync::Job SINK_EXPORT copy(const DomainType &domainObject, const QByteArray &newResource); /** * Synchronize data to local cache. */ KAsync::Job SINK_EXPORT synchronize(const Sink::Query &query); KAsync::Job SINK_EXPORT synchronize(const Sink::SyncScope &query); /** * Abort all running synchronization commands. */ -KAsync::Job SINK_EXPORT abortSynchronization(const QByteArray &resourceIdentifier); +KAsync::Job SINK_EXPORT abortSynchronization(const Sink::SyncScope &scope); /** * Removes all resource data from disk. * * This will not touch the configuration. All commands that that arrived at the resource before this command will be dropped. All commands that arrived later will be executed. */ KAsync::Job SINK_EXPORT removeDataFromDisk(const QByteArray &resourceIdentifier); struct UpgradeResult { bool upgradeExecuted; }; /** * Run upgrade jobs. * * Run this to upgrade your local database to a new version. * Note that this may: * * take a while * * remove some/all of your local caches * * Note: The initial implementation simply calls removeDataFromDisk for all resources. */ KAsync::Job SINK_EXPORT upgrade(); template KAsync::Job SINK_EXPORT fetchOne(const Sink::Query &query); template KAsync::Job> SINK_EXPORT fetchAll(const Sink::Query &query); template KAsync::Job> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); template DomainType SINK_EXPORT readOne(const Sink::Query &query); template QList SINK_EXPORT read(const Sink::Query &query); } }