diff --git a/src/server/handler/fetchhelper.cpp b/src/server/handler/fetchhelper.cpp index 26a3aaea1..b9c652755 100644 --- a/src/server/handler/fetchhelper.cpp +++ b/src/server/handler/fetchhelper.cpp @@ -1,767 +1,748 @@ /*************************************************************************** * Copyright (C) 2006-2009 by Tobias Koenig * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "fetchhelper.h" #include "akonadi.h" #include "connection.h" #include "handler.h" #include "handlerhelper.h" #include "storage/selectquerybuilder.h" #include "storage/itemqueryhelper.h" #include "storage/itemretrievalmanager.h" #include "storage/itemretrievalrequest.h" #include "storage/parthelper.h" #include "storage/parttypehelper.h" #include "storage/transaction.h" #include "utils.h" #include "intervalcheck.h" #include "agentmanagerinterface.h" #include "dbusconnectionpool.h" #include "tagfetchhelper.h" #include "relationfetch.h" #include "akonadiserver_debug.h" #include #include #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; #define ENABLE_FETCH_PROFILING 0 #if ENABLE_FETCH_PROFILING #define BEGIN_TIMER(name) \ QElapsedTimer name##Timer; \ name##Timer.start(); #define END_TIMER(name) \ const double name##Elapsed = name##Timer.nsecsElapsed() / 1000000.0; #define PROF_INC(name) \ ++name; #else #define BEGIN_TIMER(name) #define END_TIMER(name) #define PROF_INC(name) #endif -namespace { - -class ConnectionResponseCollector : public FetchHelper::ResponseCollectorInterface -{ -public: - ConnectionResponseCollector(Connection *connection) - : mConnection(connection) - {} - - ~ConnectionResponseCollector() override {} - - void addResponse(const Protocol::CommandPtr &response) override - { - mConnection->sendResponse(response); - } - -private: - Connection *mConnection = nullptr; -}; - -} - FetchHelper::FetchHelper(Connection *connection, const Scope &scope, const Protocol::ItemFetchScope &fetchScope) - : FetchHelper(new ConnectionResponseCollector(connection), connection, connection->context(), scope, fetchScope) + : FetchHelper(connection, connection->context(), scope, fetchScope) { } -FetchHelper::FetchHelper(ResponseCollectorInterface *collector, Connection *connection, - CommandContext *context, const Scope &scope, const Protocol::ItemFetchScope &fetchScope) - : mCollector(collector) - , mConnection(connection) +FetchHelper::FetchHelper(Connection *connection, CommandContext *context, + const Scope &scope, const Protocol::ItemFetchScope &fetchScope) + : mConnection(connection) , mContext(context) , mScope(scope) , mFetchScope(fetchScope) { std::fill(mItemQueryColumnMap, mItemQueryColumnMap + ItemQueryColumnCount, -1); } enum PartQueryColumns { PartQueryPimIdColumn, PartQueryTypeIdColumn, PartQueryDataColumn, PartQueryStorageColumn, PartQueryVersionColumn, PartQueryDataSizeColumn }; QSqlQuery FetchHelper::buildPartQuery(const QVector &partList, bool allPayload, bool allAttrs) { ///TODO: merge with ItemQuery QueryBuilder partQuery(PimItem::tableName()); if (!partList.isEmpty() || allPayload || allAttrs) { partQuery.addJoin(QueryBuilder::InnerJoin, Part::tableName(), PimItem::idFullColumnName(), Part::pimItemIdFullColumnName()); partQuery.addColumn(PimItem::idFullColumnName()); partQuery.addColumn(Part::partTypeIdFullColumnName()); partQuery.addColumn(Part::dataFullColumnName()); partQuery.addColumn(Part::storageFullColumnName()); partQuery.addColumn(Part::versionFullColumnName()); partQuery.addColumn(Part::datasizeFullColumnName()); partQuery.addSortColumn(PimItem::idFullColumnName(), Query::Descending); if (!partList.isEmpty() || allPayload || allAttrs) { Query::Condition cond(Query::Or); for (const QByteArray &b : qAsConst(partList)) { if (b.startsWith("PLD") || b.startsWith("ATR")) { cond.addValueCondition(Part::partTypeIdFullColumnName(), Query::Equals, PartTypeHelper::fromFqName(b).id()); } } if (allPayload || allAttrs) { partQuery.addJoin(QueryBuilder::InnerJoin, PartType::tableName(), Part::partTypeIdFullColumnName(), PartType::idFullColumnName()); if (allPayload) { cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral("PLD")); } if (allAttrs) { cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral("ATR")); } } partQuery.addCondition(cond); } ItemQueryHelper::scopeToQuery(mScope, mContext, partQuery); if (!partQuery.exec()) { throw HandlerException("Unable to list item parts"); } partQuery.query().next(); } return partQuery.query(); } QSqlQuery FetchHelper::buildItemQuery() { QueryBuilder itemQuery(PimItem::tableName()); int column = 0; #define ADD_COLUMN(colName, colId) { itemQuery.addColumn( colName ); mItemQueryColumnMap[colId] = column++; } ADD_COLUMN(PimItem::idFullColumnName(), ItemQueryPimItemIdColumn); if (mFetchScope.fetchRemoteId()) { ADD_COLUMN(PimItem::remoteIdFullColumnName(), ItemQueryPimItemRidColumn) } ADD_COLUMN(PimItem::mimeTypeIdFullColumnName(), ItemQueryMimeTypeIdColumn) ADD_COLUMN(PimItem::revFullColumnName(), ItemQueryRevColumn) if (mFetchScope.fetchRemoteRevision()) { ADD_COLUMN(PimItem::remoteRevisionFullColumnName(), ItemQueryRemoteRevisionColumn) } if (mFetchScope.fetchSize()) { ADD_COLUMN(PimItem::sizeFullColumnName(), ItemQuerySizeColumn) } if (mFetchScope.fetchMTime()) { ADD_COLUMN(PimItem::datetimeFullColumnName(), ItemQueryDatetimeColumn) } ADD_COLUMN(PimItem::collectionIdFullColumnName(), ItemQueryCollectionIdColumn) if (mFetchScope.fetchGID()) { ADD_COLUMN(PimItem::gidFullColumnName(), ItemQueryPimItemGidColumn) } #undef ADD_COLUMN itemQuery.addSortColumn(PimItem::idFullColumnName(), Query::Descending); ItemQueryHelper::scopeToQuery(mScope, mContext, itemQuery); if (mFetchScope.changedSince().isValid()) { itemQuery.addValueCondition(PimItem::datetimeFullColumnName(), Query::GreaterOrEqual, mFetchScope.changedSince().toUTC()); } if (!itemQuery.exec()) { throw HandlerException("Unable to list items"); } itemQuery.query().next(); return itemQuery.query(); } enum FlagQueryColumns { FlagQueryPimItemIdColumn, FlagQueryFlagIdColumn }; QSqlQuery FetchHelper::buildFlagQuery() { QueryBuilder flagQuery(PimItem::tableName()); flagQuery.addJoin(QueryBuilder::InnerJoin, PimItemFlagRelation::tableName(), PimItem::idFullColumnName(), PimItemFlagRelation::leftFullColumnName()); flagQuery.addColumn(PimItem::idFullColumnName()); flagQuery.addColumn(PimItemFlagRelation::rightFullColumnName()); ItemQueryHelper::scopeToQuery(mScope, mContext, flagQuery); flagQuery.addSortColumn(PimItem::idFullColumnName(), Query::Descending); if (!flagQuery.exec()) { throw HandlerException("Unable to retrieve item flags"); } flagQuery.query().next(); return flagQuery.query(); } enum TagQueryColumns { TagQueryItemIdColumn, TagQueryTagIdColumn, }; QSqlQuery FetchHelper::buildTagQuery() { QueryBuilder tagQuery(PimItem::tableName()); tagQuery.addJoin(QueryBuilder::InnerJoin, PimItemTagRelation::tableName(), PimItem::idFullColumnName(), PimItemTagRelation::leftFullColumnName()); tagQuery.addJoin(QueryBuilder::InnerJoin, Tag::tableName(), Tag::idFullColumnName(), PimItemTagRelation::rightFullColumnName()); tagQuery.addColumn(PimItem::idFullColumnName()); tagQuery.addColumn(Tag::idFullColumnName()); ItemQueryHelper::scopeToQuery(mScope, mContext, tagQuery); tagQuery.addSortColumn(PimItem::idFullColumnName(), Query::Descending); if (!tagQuery.exec()) { throw HandlerException("Unable to retrieve item tags"); } tagQuery.query().next(); return tagQuery.query(); } enum VRefQueryColumns { VRefQueryCollectionIdColumn, VRefQueryItemIdColumn }; QSqlQuery FetchHelper::buildVRefQuery() { QueryBuilder vRefQuery(PimItem::tableName()); vRefQuery.addJoin(QueryBuilder::LeftJoin, CollectionPimItemRelation::tableName(), CollectionPimItemRelation::rightFullColumnName(), PimItem::idFullColumnName()); vRefQuery.addColumn(CollectionPimItemRelation::leftFullColumnName()); vRefQuery.addColumn(CollectionPimItemRelation::rightFullColumnName()); ItemQueryHelper::scopeToQuery(mScope, mContext, vRefQuery); vRefQuery.addSortColumn(PimItem::idFullColumnName(), Query::Descending); if (!vRefQuery.exec()) { throw HandlerException("Unable to retrieve virtual references"); } vRefQuery.query().next(); return vRefQuery.query(); } bool FetchHelper::isScopeLocal(const Scope &scope) { // The only agent allowed to override local scope is the Baloo Indexer if (!mConnection->sessionId().startsWith("akonadi_indexing_agent")) { return false; } // Get list of all resources that own all items in the scope QueryBuilder qb(PimItem::tableName(), QueryBuilder::Select); qb.setDistinct(true); qb.addColumn(Resource::nameFullColumnName()); qb.addJoin(QueryBuilder::LeftJoin, Collection::tableName(), PimItem::collectionIdFullColumnName(), Collection::idFullColumnName()); qb.addJoin(QueryBuilder::LeftJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName()); ItemQueryHelper::scopeToQuery(scope, mContext, qb); if (mContext->resource().isValid()) { qb.addValueCondition(Resource::nameFullColumnName(), Query::NotEquals, mContext->resource().name()); } if (!qb.exec()) { throw HandlerException("Failed to query database"); return false; } // If there is more than one resource, i.e. this is a fetch from multiple // collections, then don't bother and just return FALSE. This case is aimed // specifically on Baloo, which fetches items from each collection independently, // so it will pass this check. QSqlQuery query = qb.query(); if (query.size() != 1) { return false; } query.next(); const QString resourceName = query.value(0).toString(); org::freedesktop::Akonadi::AgentManager manager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"), DBusConnectionPool::threadConnection()); const QString typeIdentifier = manager.agentInstanceType(resourceName); const QVariantMap properties = manager.agentCustomProperties(typeIdentifier); return properties.value(QStringLiteral("HasLocalStorage"), false).toBool(); } DataStore *FetchHelper::storageBackend() const { if (mConnection) { if (auto store = mConnection->storageBackend()) { return store; } } return DataStore::self(); } -bool FetchHelper::fetchItems() +bool FetchHelper::fetchItems(std::function &&itemCallback) { BEGIN_TIMER(fetch) // retrieve missing parts // HACK: isScopeLocal() is a workaround for resources that have cache expiration // because when the cache expires, Baloo is not able to content of the items. So // we allow fetch of items that belong to local resources (like maildir) to ignore // cacheOnly and retrieve missing parts from the resource. However ItemRetriever // is painfully slow with many items and is generally designed to fetch a few // messages, not all of them. In the long term, we need a better way to do this. BEGIN_TIMER(itemRetriever) BEGIN_TIMER(scopeLocal) #if ENABLE_FETCH_PROFILING double scopeLocalElapsed = 0; #endif if (!mFetchScope.cacheOnly() || isScopeLocal(mScope)) { #if ENABLE_FETCH_PROFILING scopeLocalElapsed = scopeLocalTimer.elapsed(); #endif // trigger a collection sync if configured to do so triggerOnDemandFetch(); // Prepare for a call to ItemRetriever::exec(); // From a resource perspective the only parts that can be fetched are payloads. ItemRetriever retriever(mConnection); retriever.setScope(mScope); retriever.setRetrieveParts(mFetchScope.requestedPayloads()); retriever.setRetrieveFullPayload(mFetchScope.fullPayload()); retriever.setChangedSince(mFetchScope.changedSince()); if (!retriever.exec() && !mFetchScope.ignoreErrors()) { // There we go, retrieve the missing parts from the resource. if (mContext->resource().isValid()) { throw HandlerException(QStringLiteral("Unable to fetch item from backend (collection %1, resource %2) : %3") .arg(mContext->collectionId()) .arg(mContext->resource().id()) .arg(QString::fromLatin1(retriever.lastError()))); } else { throw HandlerException(QStringLiteral("Unable to fetch item from backend (collection %1) : %2") .arg(mContext->collectionId()) .arg(QString::fromLatin1(retriever.lastError()))); } } } END_TIMER(itemRetriever) BEGIN_TIMER(items) QSqlQuery itemQuery = buildItemQuery(); END_TIMER(items) // error if query did not find any item and scope is not listing items but // a request for a specific item if (!itemQuery.isValid()) { if (mFetchScope.ignoreErrors()) { return true; } switch (mScope.scope()) { case Scope::Uid: // fall through case Scope::Rid: // fall through case Scope::HierarchicalRid: // fall through case Scope::Gid: throw HandlerException("Item query returned empty result set"); break; default: break; } } // build part query if needed BEGIN_TIMER(parts) QSqlQuery partQuery(DataStore::self()->database()); if (!mFetchScope.requestedParts().isEmpty() || mFetchScope.fullPayload() || mFetchScope.allAttributes()) { partQuery = buildPartQuery(mFetchScope.requestedParts(), mFetchScope.fullPayload(), mFetchScope.allAttributes()); } END_TIMER(parts) // build flag query if needed BEGIN_TIMER(flags) QSqlQuery flagQuery(DataStore::self()->database()); if (mFetchScope.fetchFlags()) { flagQuery = buildFlagQuery(); } END_TIMER(flags) // build tag query if needed BEGIN_TIMER(tags) QSqlQuery tagQuery(DataStore::self()->database()); if (mFetchScope.fetchTags()) { tagQuery = buildTagQuery(); } END_TIMER(tags) BEGIN_TIMER(vRefs) QSqlQuery vRefQuery(DataStore::self()->database()); if (mFetchScope.fetchVirtualReferences()) { vRefQuery = buildVRefQuery(); } END_TIMER(vRefs) #if ENABLE_FETCH_PROFILING int itemsCount = 0; int flagsCount = 0; int partsCount = 0; int tagsCount = 0; int vRefsCount = 0; #endif BEGIN_TIMER(processing) QHash flagIdNameCache; QHash mimeTypeIdNameCache; QHash partTypeIdNameCache; while (itemQuery.isValid()) { PROF_INC(itemsCount) const qint64 pimItemId = extractQueryResult(itemQuery, ItemQueryPimItemIdColumn).toLongLong(); const int pimItemRev = extractQueryResult(itemQuery, ItemQueryRevColumn).toInt(); auto response = Protocol::FetchItemsResponsePtr::create(); response->setId(pimItemId); response->setRevision(pimItemRev); const qint64 mimeTypeId = extractQueryResult(itemQuery, ItemQueryMimeTypeIdColumn).toLongLong(); auto mtIter = mimeTypeIdNameCache.find(mimeTypeId); if (mtIter == mimeTypeIdNameCache.end()) { mtIter = mimeTypeIdNameCache.insert(mimeTypeId, MimeType::retrieveById(mimeTypeId).name()); } response->setMimeType(mtIter.value()); if (mFetchScope.fetchRemoteId()) { response->setRemoteId(extractQueryResult(itemQuery, ItemQueryPimItemRidColumn).toString()); } response->setParentId(extractQueryResult(itemQuery, ItemQueryCollectionIdColumn).toLongLong()); if (mFetchScope.fetchSize()) { response->setSize(extractQueryResult(itemQuery, ItemQuerySizeColumn).toLongLong()); } if (mFetchScope.fetchMTime()) { response->setMTime(Utils::variantToDateTime(extractQueryResult(itemQuery, ItemQueryDatetimeColumn))); } if (mFetchScope.fetchRemoteRevision()) { response->setRemoteRevision(extractQueryResult(itemQuery, ItemQueryRemoteRevisionColumn).toString()); } if (mFetchScope.fetchGID()) { response->setGid(extractQueryResult(itemQuery, ItemQueryPimItemGidColumn).toString()); } if (mFetchScope.fetchFlags()) { QVector flags; while (flagQuery.isValid()) { const qint64 id = flagQuery.value(FlagQueryPimItemIdColumn).toLongLong(); if (id > pimItemId) { flagQuery.next(); continue; } else if (id < pimItemId) { break; } const qint64 flagId = flagQuery.value(FlagQueryFlagIdColumn).toLongLong(); auto flagNameIter = flagIdNameCache.find(flagId); if (flagNameIter == flagIdNameCache.end()) { flagNameIter = flagIdNameCache.insert(flagId, Flag::retrieveById(flagId).name().toUtf8()); } flags << flagNameIter.value(); flagQuery.next(); } response->setFlags(flags); } if (mFetchScope.fetchTags()) { QVector tagIds; QVector tags; //We don't take the fetch scope into account yet. It's either id only or the full tag. const bool fullTagsRequested = !mFetchScope.tagFetchScope().isEmpty(); while (tagQuery.isValid()) { PROF_INC(tagsCount) const qint64 id = tagQuery.value(TagQueryItemIdColumn).toLongLong(); if (id > pimItemId) { tagQuery.next(); continue; } else if (id < pimItemId) { break; } tagIds << tagQuery.value(TagQueryTagIdColumn).toLongLong(); tagQuery.next(); } tags.reserve(tagIds.count()); if (!fullTagsRequested) { for (qint64 tagId : qAsConst(tagIds)) { Protocol::FetchTagsResponse resp; resp.setId(tagId); tags << resp; } } else { for (qint64 tagId : qAsConst(tagIds)) { tags << *HandlerHelper::fetchTagsResponse(Tag::retrieveById(tagId)); } } response->setTags(tags); } if (mFetchScope.fetchVirtualReferences()) { QVector vRefs; while (vRefQuery.isValid()) { PROF_INC(vRefsCount) const qint64 id = vRefQuery.value(VRefQueryItemIdColumn).toLongLong(); if (id > pimItemId) { vRefQuery.next(); continue; } else if (id < pimItemId) { break; } vRefs << vRefQuery.value(VRefQueryCollectionIdColumn).toLongLong(); vRefQuery.next(); } response->setVirtualReferences(vRefs); } if (mFetchScope.fetchRelations()) { SelectQueryBuilder qb; Query::Condition condition; condition.setSubQueryMode(Query::Or); condition.addValueCondition(Relation::leftIdFullColumnName(), Query::Equals, pimItemId); condition.addValueCondition(Relation::rightIdFullColumnName(), Query::Equals, pimItemId); qb.addCondition(condition); qb.addGroupColumns(QStringList() << Relation::leftIdColumn() << Relation::rightIdColumn() << Relation::typeIdColumn() << Relation::remoteIdColumn()); if (!qb.exec()) { throw HandlerException("Unable to list item relations"); } QVector relations; const auto result = qb.result(); relations.reserve(result.size()); for (const Relation &rel : result) { relations << *HandlerHelper::fetchRelationsResponse(rel); } response->setRelations(relations); } if (mFetchScope.ancestorDepth() != Protocol::ItemFetchScope::NoAncestor) { response->setAncestors(ancestorsForItem(response->parentId())); } bool skipItem = false; QVector cachedParts; QVector parts; while (partQuery.isValid()) { PROF_INC(partsCount) const qint64 id = partQuery.value(PartQueryPimIdColumn).toLongLong(); if (id > pimItemId) { partQuery.next(); continue; } else if (id < pimItemId) { break; } const qint64 partTypeId = partQuery.value(PartQueryTypeIdColumn).toLongLong(); auto ptIter = partTypeIdNameCache.find(partTypeId); if (ptIter == partTypeIdNameCache.end()) { ptIter = partTypeIdNameCache.insert(partTypeId, PartTypeHelper::fullName(PartType::retrieveById(partTypeId)).toUtf8()); } Protocol::PartMetaData metaPart; Protocol::StreamPayloadResponse partData; partData.setPayloadName(ptIter.value()); metaPart.setName(ptIter.value()); metaPart.setVersion(partQuery.value(PartQueryVersionColumn).toInt()); metaPart.setSize(partQuery.value(PartQueryDataSizeColumn).toLongLong()); const QByteArray data = Utils::variantToByteArray(partQuery.value(PartQueryDataColumn)); if (mFetchScope.checkCachedPayloadPartsOnly()) { if (!data.isEmpty()) { cachedParts << ptIter.value(); } partQuery.next(); } else { if (mFetchScope.ignoreErrors() && data.isEmpty()) { //We wanted the payload, couldn't get it, and are ignoring errors. Skip the item. //This is not an error though, it's fine to have empty payload parts (to denote existing but not cached parts) qCDebug(AKONADISERVER_LOG) << "item" << id << "has an empty payload part in parttable for part" << metaPart.name(); skipItem = true; break; } metaPart.setStorageType(static_cast( partQuery.value(PartQueryStorageColumn).toInt())); if (data.isEmpty()) { partData.setData(QByteArray("")); } else { partData.setData(data); } partData.setMetaData(metaPart); if (mFetchScope.requestedParts().contains(ptIter.value()) || mFetchScope.fullPayload() || mFetchScope.allAttributes()) { parts.append(partData); } partQuery.next(); } } response->setParts(parts); if (skipItem) { itemQuery.next(); continue; } if (mFetchScope.checkCachedPayloadPartsOnly()) { response->setCachedParts(cachedParts); } - mCollector->addResponse(response); + if (itemCallback) { + itemCallback(std::move(response)); + } else { + mConnection->sendResponse(std::move(response)); + } itemQuery.next(); } END_TIMER(processing) // update atime (only if the payload was actually requested, otherwise a simple resource sync prevents cache clearing) BEGIN_TIMER(aTime) if (needsAccessTimeUpdate(mFetchScope.requestedParts()) || mFetchScope.fullPayload()) { updateItemAccessTime(); } END_TIMER(aTime) END_TIMER(fetch) #if ENABLE_FETCH_PROFILING qCDebug(AKONADISERVER_LOG) << "FetchHelper execution stats:"; qCDebug(AKONADISERVER_LOG) << "\tItems query:" << itemsElapsed << "ms," << itemsCount << " items in total"; qCDebug(AKONADISERVER_LOG) << "\tFlags query:" << flagsElapsed << "ms, " << flagsCount << " flags in total"; qCDebug(AKONADISERVER_LOG) << "\tParts query:" << partsElapsed << "ms, " << partsCount << " parts in total"; qCDebug(AKONADISERVER_LOG) << "\tTags query: " << tagsElapsed << "ms, " << tagsCount << " tags in total"; qCDebug(AKONADISERVER_LOG) << "\tVRefs query:" << vRefsElapsed << "ms, " << vRefsCount << " vRefs in total"; qCDebug(AKONADISERVER_LOG) << "\t------------"; qCDebug(AKONADISERVER_LOG) << "\tItem retriever:" << itemRetrieverElapsed << "ms (scope local:" << scopeLocalElapsed << "ms)"; qCDebug(AKONADISERVER_LOG) << "\tTotal query:" << (itemsElapsed + flagsElapsed + partsElapsed + tagsElapsed + vRefsElapsed) << "ms"; qCDebug(AKONADISERVER_LOG) << "\tTotal processing: " << processingElapsed << "ms"; qCDebug(AKONADISERVER_LOG) << "\tATime update:" << aTimeElapsed << "ms"; qCDebug(AKONADISERVER_LOG) << "\t============"; qCDebug(AKONADISERVER_LOG) << "\tTotal FETCH:" << fetchElapsed << "ms"; qCDebug(AKONADISERVER_LOG); qCDebug(AKONADISERVER_LOG); #endif return true; } bool FetchHelper::needsAccessTimeUpdate(const QVector &parts) { // TODO technically we should compare the part list with the cache policy of // the parent collection of the retrieved items, but that's kinda expensive // Only updating the atime if the full payload was requested is a good // approximation though. return parts.contains(AKONADI_PARAM_PLD_RFC822); } void FetchHelper::updateItemAccessTime() { Transaction transaction(storageBackend(), QStringLiteral("update atime")); QueryBuilder qb(PimItem::tableName(), QueryBuilder::Update); qb.setColumnValue(PimItem::atimeColumn(), QDateTime::currentDateTimeUtc()); ItemQueryHelper::scopeToQuery(mScope, mContext, qb); if (!qb.exec()) { qCWarning(AKONADISERVER_LOG) << "Unable to update item access time"; } else { transaction.commit(); } } void FetchHelper::triggerOnDemandFetch() { if (mContext->collectionId() <= 0 || mFetchScope.cacheOnly()) { return; } Collection collection = mContext->collection(); // HACK: don't trigger on-demand syncing if the resource is the one triggering it if (mConnection->sessionId() == collection.resource().name().toLatin1()) { return; } storageBackend()->activeCachePolicy(collection); if (!collection.cachePolicySyncOnDemand()) { return; } if (AkonadiServer::instance()->intervalChecker()) { AkonadiServer::instance()->intervalChecker()->requestCollectionSync(collection); } } QVector FetchHelper::ancestorsForItem(Collection::Id parentColId) { if (mFetchScope.ancestorDepth() == Protocol::ItemFetchScope::NoAncestor || parentColId == 0) { return QVector(); } const auto it = mAncestorCache.constFind(parentColId); if (it != mAncestorCache.cend()) { return *it; } QVector ancestors; Collection col = Collection::retrieveById(parentColId); const int depthNum = mFetchScope.ancestorDepth() == Protocol::ItemFetchScope::ParentAncestor ? 1 : INT_MAX; for (int i = 0; i < depthNum; ++i) { if (!col.isValid()) { Protocol::Ancestor ancestor; ancestor.setId(0); ancestors << ancestor; break; } Protocol::Ancestor ancestor; ancestor.setId(col.id()); ancestor.setRemoteId(col.remoteId()); ancestors << ancestor; col = col.parent(); } mAncestorCache.insert(parentColId, ancestors); return ancestors; } QVariant FetchHelper::extractQueryResult(const QSqlQuery &query, FetchHelper::ItemQueryColumns column) const { const int colId = mItemQueryColumnMap[column]; Q_ASSERT(colId >= 0); return query.value(colId); } diff --git a/src/server/handler/fetchhelper.h b/src/server/handler/fetchhelper.h index f3cce0eaa..29d8a302f 100644 --- a/src/server/handler/fetchhelper.h +++ b/src/server/handler/fetchhelper.h @@ -1,107 +1,98 @@ /*************************************************************************** * Copyright (C) 2006-2009 by Tobias Koenig * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #ifndef AKONADI_FETCHHELPER_H #define AKONADI_FETCHHELPER_H #include "storage/countquerybuilder.h" #include "storage/datastore.h" #include "storage/itemretriever.h" #include "commandcontext.h" #include #include #include +#include + class FetchHelperTest; namespace Akonadi { namespace Server { class AggregatedItemFetchScope; class Connection; -class FetchHelper : public QObject +class FetchHelper { - Q_OBJECT public: - class ResponseCollectorInterface - { - public: - virtual ~ResponseCollectorInterface() {}; - virtual void addResponse(const Protocol::CommandPtr &response) = 0; - }; - FetchHelper(Connection *connection, const Scope &scope, const Protocol::ItemFetchScope &fetchScope); - FetchHelper(ResponseCollectorInterface *collector, Connection *connection, - CommandContext *context, const Scope &scope, const Protocol::ItemFetchScope &fetchScope); - ~FetchHelper() override = default; + FetchHelper(Connection *connection, CommandContext *context, const Scope &scope, const Protocol::ItemFetchScope &fetchScope); - bool fetchItems(); + bool fetchItems(std::function &&callback = {}); private: enum ItemQueryColumns { ItemQueryPimItemIdColumn, ItemQueryPimItemRidColumn, ItemQueryMimeTypeIdColumn, ItemQueryRevColumn, ItemQueryRemoteRevisionColumn, ItemQuerySizeColumn, ItemQueryDatetimeColumn, ItemQueryCollectionIdColumn, ItemQueryPimItemGidColumn, ItemQueryColumnCount }; void updateItemAccessTime(); void triggerOnDemandFetch(); QSqlQuery buildItemQuery(); QSqlQuery buildPartQuery(const QVector &partList, bool allPayload, bool allAttrs); QSqlQuery buildFlagQuery(); QSqlQuery buildTagQuery(); QSqlQuery buildVRefQuery(); QVector ancestorsForItem(Collection::Id parentColId); static bool needsAccessTimeUpdate(const QVector &parts); QVariant extractQueryResult(const QSqlQuery &query, ItemQueryColumns column) const; bool isScopeLocal(const Scope &scope); DataStore *storageBackend() const; static QByteArray tagsToByteArray(const Tag::List &tags); static QByteArray relationsToByteArray(const Relation::List &relations); private: - ResponseCollectorInterface *mCollector = nullptr; Connection *mConnection = nullptr; CommandContext *mContext = nullptr; QHash> mAncestorCache; Scope mScope; Protocol::ItemFetchScope mFetchScope; int mItemQueryColumnMap[ItemQueryColumnCount]; friend class ::FetchHelperTest; }; } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/storage/notificationcollector.cpp b/src/server/storage/notificationcollector.cpp index 5bc774f12..1e9264fad 100644 --- a/src/server/storage/notificationcollector.cpp +++ b/src/server/storage/notificationcollector.cpp @@ -1,603 +1,583 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationcollector.h" #include "storage/datastore.h" #include "storage/entity.h" #include "storage/collectionstatistics.h" #include "handlerhelper.h" #include "cachecleaner.h" #include "intervalcheck.h" #include "search/searchmanager.h" #include "akonadi.h" #include "handler/search.h" #include "notificationmanager.h" #include "aggregatedfetchscope.h" #include "selectquerybuilder.h" #include "handler/fetchhelper.h" #include "akonadiserver_debug.h" using namespace Akonadi; using namespace Akonadi::Server; - -namespace { - -class ItemCollector : public FetchHelper::ResponseCollectorInterface -{ -public: - void addResponse(const Protocol::CommandPtr &response) override - { - mItems.push_back(response.staticCast()); - } - - QVector items() const - { - return mItems; - } - -private: - QVector mItems; -}; - -} - - NotificationCollector::NotificationCollector(QObject *parent) : QObject(parent) , mDb(nullptr) { } NotificationCollector::NotificationCollector(DataStore *db) : QObject(db) , mDb(db) { connect(db, &DataStore::transactionCommitted, this, &NotificationCollector::transactionCommitted); connect(db, &DataStore::transactionRolledBack, this, &NotificationCollector::transactionRolledBack); } NotificationCollector::~NotificationCollector() { } void NotificationCollector::itemAdded(const PimItem &item, bool seen, const Collection &collection, const QByteArray &resource) { SearchManager::instance()->scheduleSearchUpdate(); CollectionStatistics::self()->itemAdded(collection, item.size(), seen); itemNotification(Protocol::ItemChangeNotification::Add, item, collection, Collection(), resource); } void NotificationCollector::itemChanged(const PimItem &item, const QSet &changedParts, const Collection &collection, const QByteArray &resource) { SearchManager::instance()->scheduleSearchUpdate(); itemNotification(Protocol::ItemChangeNotification::Modify, item, collection, Collection(), resource, changedParts); } void NotificationCollector::itemsFlagsChanged(const PimItem::List &items, const QSet &addedFlags, const QSet &removedFlags, const Collection &collection, const QByteArray &resource) { int seenCount = (addedFlags.contains(AKONADI_FLAG_SEEN) || addedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); seenCount -= (removedFlags.contains(AKONADI_FLAG_SEEN) || removedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); CollectionStatistics::self()->itemsSeenChanged(collection, seenCount); itemNotification(Protocol::ItemChangeNotification::ModifyFlags, items, collection, Collection(), resource, QSet(), addedFlags, removedFlags); } void NotificationCollector::itemsTagsChanged(const PimItem::List &items, const QSet &addedTags, const QSet &removedTags, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::ModifyTags, items, collection, Collection(), resource, QSet(), QSet(), QSet(), addedTags, removedTags); } void NotificationCollector::itemsRelationsChanged(const PimItem::List &items, const Relation::List &addedRelations, const Relation::List &removedRelations, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::ModifyRelations, items, collection, Collection(), resource, QSet(), QSet(), QSet(), QSet(), QSet(), addedRelations, removedRelations); } void NotificationCollector::itemsMoved(const PimItem::List &items, const Collection &collectionSrc, const Collection &collectionDest, const QByteArray &sourceResource) { SearchManager::instance()->scheduleSearchUpdate(); itemNotification(Protocol::ItemChangeNotification::Move, items, collectionSrc, collectionDest, sourceResource); } void NotificationCollector::itemsRemoved(const PimItem::List &items, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::Remove, items, collection, Collection(), resource); } void NotificationCollector::itemsLinked(const PimItem::List &items, const Collection &collection) { itemNotification(Protocol::ItemChangeNotification::Link, items, collection, Collection(), QByteArray()); } void NotificationCollector::itemsUnlinked(const PimItem::List &items, const Collection &collection) { itemNotification(Protocol::ItemChangeNotification::Unlink, items, collection, Collection(), QByteArray()); } void NotificationCollector::collectionAdded(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionAdded(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionAdded(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Add, collection, collection.parentId(), -1, resource); } void NotificationCollector::collectionChanged(const Collection &collection, const QList &changes, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionChanged(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionChanged(collection.id()); } if (changes.contains(AKONADI_PARAM_ENABLED) || changes.contains(AKONADI_PARAM_REFERENCED)) { CollectionStatistics::self()->invalidateCollection(collection); } collectionNotification(Protocol::CollectionChangeNotification::Modify, collection, collection.parentId(), -1, resource, changes.toSet()); } void NotificationCollector::collectionMoved(const Collection &collection, const Collection &source, const QByteArray &resource, const QByteArray &destResource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionChanged(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionChanged(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Move, collection, source.id(), collection.parentId(), resource, QSet(), destResource); } void NotificationCollector::collectionRemoved(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionRemoved(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionRemoved(collection.id()); } CollectionStatistics::self()->invalidateCollection(collection); collectionNotification(Protocol::CollectionChangeNotification::Remove, collection, collection.parentId(), -1, resource); } void NotificationCollector::collectionSubscribed(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionAdded(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionAdded(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Subscribe, collection, collection.parentId(), -1, resource, QSet()); } void NotificationCollector::collectionUnsubscribed(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionRemoved(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionRemoved(collection.id()); } CollectionStatistics::self()->invalidateCollection(collection); collectionNotification(Protocol::CollectionChangeNotification::Unsubscribe, collection, collection.parentId(), -1, resource, QSet()); } void NotificationCollector::tagAdded(const Tag &tag) { tagNotification(Protocol::TagChangeNotification::Add, tag); } void NotificationCollector::tagChanged(const Tag &tag) { tagNotification(Protocol::TagChangeNotification::Modify, tag); } void NotificationCollector::tagRemoved(const Tag &tag, const QByteArray &resource, const QString &remoteId) { tagNotification(Protocol::TagChangeNotification::Remove, tag, resource, remoteId); } void NotificationCollector::relationAdded(const Relation &relation) { relationNotification(Protocol::RelationChangeNotification::Add, relation); } void NotificationCollector::relationRemoved(const Relation &relation) { relationNotification(Protocol::RelationChangeNotification::Remove, relation); } void NotificationCollector::transactionCommitted() { dispatchNotifications(); } void NotificationCollector::transactionRolledBack() { clear(); } void NotificationCollector::clear() { mNotifications.clear(); } void NotificationCollector::setSessionId(const QByteArray &sessionId) { mSessionId = sessionId; } void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, const PimItem &item, const Collection &collection, const Collection &collectionDest, const QByteArray &resource, const QSet &parts) { PimItem::List items; items << item; itemNotification(op, items, collection, collectionDest, resource, parts); } void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, const PimItem::List &items, const Collection &collection, const Collection &collectionDest, const QByteArray &resource, const QSet &parts, const QSet &addedFlags, const QSet &removedFlags, const QSet &addedTags, const QSet &removedTags, const Relation::List &addedRelations, const Relation::List &removedRelations) { QMap > vCollections; if ((op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::ModifyRelations)) { vCollections = DataStore::self()->virtualCollections(items); } auto msg = Protocol::ItemChangeNotificationPtr::create(); msg->setSessionId(mSessionId); msg->setOperation(op); msg->setItemParts(parts); msg->setAddedFlags(addedFlags); msg->setRemovedFlags(removedFlags); msg->setAddedTags(addedTags); msg->setRemovedTags(removedTags); if (!addedRelations.isEmpty()) { QSet rels; Q_FOREACH (const Relation &rel, addedRelations) { rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); } msg->setAddedRelations(rels); } if (!removedRelations.isEmpty()) { QSet rels; Q_FOREACH (const Relation &rel, removedRelations) { rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); } msg->setRemovedRelations(rels); } if (collectionDest.isValid()) { QByteArray destResourceName; destResourceName = collectionDest.resource().name().toLatin1(); msg->setDestinationResource(destResourceName); } msg->setParentDestCollection(collectionDest.id()); QVector ntfItems; Q_FOREACH (const PimItem &item, items) { auto i = Protocol::FetchItemsResponsePtr::create(); i->setId(item.id()); i->setRemoteId(item.remoteId()); i->setRemoteRevision(item.remoteRevision()); i->setMimeType(item.mimeType().name()); ntfItems.push_back(i); } /* Notify all virtual collections the items are linked to. */ QHash virtItems; for (const auto &ntfItem : ntfItems) { virtItems.insert(ntfItem->id(), ntfItem); } auto iter = vCollections.constBegin(), endIter = vCollections.constEnd(); for (; iter != endIter; ++iter) { auto copy = Protocol::ItemChangeNotificationPtr::create(*msg); QVector items; items.reserve(iter->size()); for (const auto &item : qAsConst(*iter)) { items.append(virtItems.value(item.id())); } copy->setItems(items); copy->setParentCollection(iter.key()); copy->setResource(resource); CollectionStatistics::self()->invalidateCollection(Collection::retrieveById(iter.key())); dispatchNotification(copy); } msg->setItems(ntfItems); Collection col; if (!collection.isValid()) { msg->setParentCollection(items.first().collection().id()); col = items.first().collection(); } else { msg->setParentCollection(collection.id()); col = collection; } QByteArray res = resource; if (res.isEmpty()) { if (col.resourceId() <= 0) { col = Collection::retrieveById(col.id()); } res = col.resource().name().toLatin1(); } msg->setResource(res); // Add and ModifyFlags are handled incrementally // (see itemAdded() and itemsFlagsChanged()) if (msg->operation() != Protocol::ItemChangeNotification::Add && msg->operation() != Protocol::ItemChangeNotification::ModifyFlags) { CollectionStatistics::self()->invalidateCollection(col); } dispatchNotification(msg); } void NotificationCollector::collectionNotification(Protocol::CollectionChangeNotification::Operation op, const Collection &collection, Collection::Id source, Collection::Id destination, const QByteArray &resource, const QSet &changes, const QByteArray &destResource) { auto msg = Protocol::CollectionChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setParentCollection(source); msg->setParentDestCollection(destination); msg->setDestinationResource(destResource); msg->setChangedParts(changes); auto msgCollection = HandlerHelper::fetchCollectionsResponse(collection); if (auto mgr = AkonadiServer::instance()->notificationManager()) { auto fetchScope = mgr->collectionFetchScope(); // Make sure we have all the data if (!fetchScope->fetchIdOnly() && msgCollection->name().isEmpty()) { const auto col = Collection::retrieveById(msgCollection->id()); const auto mts = col.mimeTypes(); QStringList mimeTypes; mimeTypes.reserve(mts.size()); for (const auto &mt : mts) { mimeTypes.push_back(mt.name()); } msgCollection = HandlerHelper::fetchCollectionsResponse(col, {}, false, 0, {}, {}, false, mimeTypes); } // Get up-to-date statistics if (fetchScope->fetchStatistics()) { Collection col; col.setId(msgCollection->id()); const auto stats = CollectionStatistics::self()->statistics(col); msgCollection->setStatistics(Protocol::FetchCollectionStatsResponse(stats.count, stats.count - stats.read, stats.size)); } // Get attributes const auto requestedAttrs = fetchScope->attributes(); auto msgColAttrs = msgCollection->attributes(); // TODO: This assumes that we have either none or all attributes in msgCollection if (msgColAttrs.isEmpty() && !requestedAttrs.isEmpty()) { SelectQueryBuilder qb; qb.addColumn(CollectionAttribute::typeFullColumnName()); qb.addColumn(CollectionAttribute::valueFullColumnName()); qb.addValueCondition(CollectionAttribute::collectionIdFullColumnName(), Query::Equals, msgCollection->id()); Query::Condition cond(Query::Or); for (const auto &attr : requestedAttrs) { cond.addValueCondition(CollectionAttribute::typeFullColumnName(), Query::Equals, attr); } qb.addCondition(cond); if (!qb.exec()) { qCWarning(AKONADISERVER_LOG) << "Failed to obtain collection attributes!"; } const auto attrs = qb.result(); for (const auto &attr : attrs) { msgColAttrs.insert(attr.type(), attr.value()); } msgCollection->setAttributes(msgColAttrs); } } msg->setCollection(msgCollection); if (!collection.enabled()) { msg->addMetadata("DISABLED"); } QByteArray res = resource; if (res.isEmpty()) { res = collection.resource().name().toLatin1(); } msg->setResource(res); dispatchNotification(msg); } void NotificationCollector::tagNotification(Protocol::TagChangeNotification::Operation op, const Tag &tag, const QByteArray &resource, const QString &remoteId) { auto msg = Protocol::TagChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setResource(resource); auto msgTag = HandlerHelper::fetchTagsResponse(tag, false); msgTag->setRemoteId(remoteId.toUtf8()); if (auto mgr = AkonadiServer::instance()->notificationManager()) { auto fetchScope = mgr->tagFetchScope(); if (!fetchScope->fetchIdOnly() && msgTag->gid().isEmpty()) { msgTag = HandlerHelper::fetchTagsResponse(Tag::retrieveById(msgTag->id()), false); } const auto requestedAttrs = fetchScope->attributes(); auto msgTagAttrs = msgTag->attributes(); if (msgTagAttrs.isEmpty() && !requestedAttrs.isEmpty()) { SelectQueryBuilder qb; qb.addColumn(TagAttribute::typeFullColumnName()); qb.addColumn(TagAttribute::valueFullColumnName()); qb.addValueCondition(TagAttribute::tagIdFullColumnName(), Query::Equals, msgTag->id()); Query::Condition cond(Query::Or); for (const auto &attr : requestedAttrs) { cond.addValueCondition(TagAttribute::typeFullColumnName(), Query::Equals, attr); } qb.addCondition(cond); if (!qb.exec()) { qCWarning(AKONADISERVER_LOG) << "Failed to obtain tag attributes!"; } const auto attrs = qb.result(); for (const auto &attr : attrs) { msgTagAttrs.insert(attr.type(), attr.value()); } msgTag->setAttributes(msgTagAttrs); } } msg->setTag(msgTag); dispatchNotification(msg); } void NotificationCollector::relationNotification(Protocol::RelationChangeNotification::Operation op, const Relation &relation) { auto msg = Protocol::RelationChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setRelation(HandlerHelper::fetchRelationsResponse(relation)); dispatchNotification(msg); } void NotificationCollector::completeNotification(const Protocol::ChangeNotificationPtr &changeMsg) { if (changeMsg->type() == Protocol::Command::ItemChangeNotification) { const auto msg = changeMsg.staticCast(); const auto mgr = AkonadiServer::instance()->notificationManager(); if (mgr && msg->operation() != Protocol::ItemChangeNotification::Remove) { if (mDb->inTransaction()) { qCWarning(AKONADISERVER_LOG) << "FetchHelper requested from within a transaction, aborting, since this would deadlock!"; return; } auto fetchScope = mgr->itemFetchScope(); // NOTE: Checking and retrieving missing elements for each Item manually // here would require a complex code (and I'm too lazy), so instead we simply // feed the Items to FetchHelper and retrieve them all with the setup from // the aggregated fetch scope. The worst case is that we re-fetch everything // we already have, but that's stil better than the pre-ntf-payload situation QVector ids; const auto items = msg->items(); ids.reserve(items.size()); for (const auto &item : items) { ids.push_back(item->id()); } // Prevent transactions inside FetchHelper to recursively call our slot disconnect(mDb, &DataStore::transactionCommitted, this, &NotificationCollector::transactionCommitted); disconnect(mDb, &DataStore::transactionRolledBack, this, &NotificationCollector::transactionRolledBack); - ItemCollector collector; CommandContext context; auto scope = fetchScope->toFetchScope(); - FetchHelper helper(&collector, Connection::self(), &context, Scope(ids), scope); - if (helper.fetchItems()) { - msg->setItems(collector.items()); + FetchHelper helper(Connection::self(), &context, Scope(ids), scope); + QVector fetchedItems; + std::function callback = [&fetchedItems](Protocol::CommandPtr cmd) { + fetchedItems.push_back(cmd.staticCast()); + }; + if (helper.fetchItems(std::move(callback))) { + msg->setItems(fetchedItems); } else { qCWarning(AKONADISERVER_LOG) << "Failed to retrieve Items for notification!"; } connect(mDb, &DataStore::transactionCommitted, this, &NotificationCollector::transactionCommitted); connect(mDb, &DataStore::transactionRolledBack, this, &NotificationCollector::transactionRolledBack); } } } void NotificationCollector::dispatchNotification(const Protocol::ChangeNotificationPtr &msg) { if (!mDb || mDb->inTransaction()) { if (msg->type() == Protocol::Command::CollectionChangeNotification) { Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg); } else { mNotifications.append(msg); } } else { completeNotification(msg); Q_EMIT notify({ msg }); } } void NotificationCollector::dispatchNotifications() { if (!mNotifications.isEmpty()) { for (auto &ntf : mNotifications) { completeNotification(ntf); } Q_EMIT notify(mNotifications); clear(); } }