diff --git a/autotests/server/CMakeLists.txt b/autotests/server/CMakeLists.txt index 97e328483..fe10d1814 100644 --- a/autotests/server/CMakeLists.txt +++ b/autotests/server/CMakeLists.txt @@ -1,104 +1,105 @@ ########### next target ############### # QTEST_MAIN is using QApplication when QT_GUI_LIB is defined remove_definitions(-DQT_GUI_LIB) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_BINARY_DIR}/src/server ${Akonadi_SOURCE_DIR}/src/server) akonadi_run_xsltproc( XSL ${Akonadi_SOURCE_DIR}/src/server/storage/schema.xsl XML ${CMAKE_CURRENT_SOURCE_DIR}/dbtest_data/unittest_schema.xml BASENAME unittestschema CLASSNAME UnitTestSchema ) akonadi_run_xsltproc( XSL ${CMAKE_CURRENT_SOURCE_DIR}/dbpopulator.xsl XML ${CMAKE_CURRENT_SOURCE_DIR}/dbtest_data/dbdata.xml BASENAME dbpopulator ) set_property(SOURCE ${CMAKE_CURRENT_BINARY_DIR}/dbpopulator.cpp PROPERTY SKIP_AUTOMOC TRUE) set(common_SRCS unittestschema.cpp fakeconnection.cpp fakedatastore.cpp fakeclient.cpp fakeakonadiserver.cpp fakesearchmanager.cpp fakeitemretrievalmanager.cpp dbinitializer.cpp inspectablenotificationcollector.cpp ${CMAKE_CURRENT_BINARY_DIR}/dbpopulator.cpp ) add_library(akonadi_unittest_common STATIC ${common_SRCS}) target_link_libraries(akonadi_unittest_common KF5AkonadiPrivate libakonadiserver Qt5::Core Qt5::DBus Qt5::Test Qt5::Sql Qt5::Network ) macro(add_server_test _source) set(_test ${_source} ../../src/server/akonadiserver_debug.cpp ../../src/server/akonadiserver_search_debug.cpp) get_filename_component(_name ${_source} NAME_WE) qt5_add_resources(_test dbtest_data/dbtest_data.qrc) add_executable(${_name} ${_test}) add_test(NAME AkonadiServer-${_name} COMMAND ${_name}) if (ENABLE_ASAN) set_tests_properties(AkonadiServer-${_name} PROPERTIES ENVIRONMENT ASAN_OPTIONS=symbolize=1 ) endif() set_tests_properties(AkonadiServer-${_name} PROPERTIES ENVIRONMENT "QT_HASH_SEED=0;QT_NO_CPU_FEATURE=sse4.2" ) target_link_libraries(${_name} akonadi_shared akonadi_unittest_common libakonadiserver KF5AkonadiPrivate Qt5::Core Qt5::DBus Qt5::Test Qt5::Sql Qt5::Network ${CMAKE_SHARED_LINKER_FLAGS_ASAN} ) endmacro() add_server_test(dbtypetest.cpp) add_server_test(dbintrospectortest.cpp) add_server_test(querybuildertest.cpp) add_server_test(dbinitializertest.cpp) add_server_test(dbupdatertest.cpp) add_server_test(handlertest.cpp) add_server_test(dbconfigtest.cpp) add_server_test(parthelpertest.cpp) add_server_test(itemretrievertest.cpp) add_server_test(notificationsubscribertest.cpp) add_server_test(parttypehelpertest.cpp) add_server_test(collectionstatisticstest.cpp) +add_server_test(aggregatedfetchscopetest.cpp) if (SQLITE_FOUND) # tests using the fake server need the QSQLITE3 plugin add_server_test(partstreamertest.cpp) add_server_test(akappendhandlertest.cpp) add_server_test(linkhandlertest.cpp) add_server_test(listhandlertest.cpp) add_server_test(modifyhandlertest.cpp) add_server_test(movehandlertest.cpp) add_server_test(createhandlertest.cpp) add_server_test(collectionreferencetest.cpp) add_server_test(searchtest.cpp akonadiprivate) add_server_test(relationhandlertest.cpp akonadiprivate) add_server_test(taghandlertest.cpp akonadiprivate) add_server_test(fetchhandlertest.cpp akonadiprivate) endif() diff --git a/autotests/server/aggregatedfetchscopetest.cpp b/autotests/server/aggregatedfetchscopetest.cpp new file mode 100644 index 000000000..eaeaa4aa4 --- /dev/null +++ b/autotests/server/aggregatedfetchscopetest.cpp @@ -0,0 +1,163 @@ +/* + Copyright (c) 2019 David Faure + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ + +#include + +#include +#include "aggregatedfetchscope.h" + +#include + +using namespace Akonadi; +using namespace Akonadi::Server; + +class AggregatedFetchScopeTest : public QObject +{ + Q_OBJECT + +private Q_SLOTS: + + void testTagApply() + { + AggregatedTagFetchScope scope; + + // first subscriber, A + scope.addSubscriber(); + Protocol::TagFetchScope oldTagScope, tagScopeA; + QSet attrs = {"FOO"}; + tagScopeA.setAttributes(attrs); + tagScopeA.setFetchIdOnly(true); + scope.apply(oldTagScope, tagScopeA); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(scope.fetchIdOnly()); + + // second subscriber, B + Protocol::TagFetchScope tagScopeB = tagScopeA; + tagScopeB.setFetchIdOnly(false); + scope.addSubscriber(); + scope.apply(oldTagScope, tagScopeB); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(!scope.fetchIdOnly()); + + // then B goes away + scope.apply(tagScopeB, oldTagScope); + scope.removeSubscriber(); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(scope.fetchIdOnly()); + + // A goes away + scope.apply(tagScopeA, oldTagScope); + scope.removeSubscriber(); + QCOMPARE(scope.attributes(), QSet()); + } + + void testCollectionApply() + { + AggregatedCollectionFetchScope scope; + + // first subscriber, A + scope.addSubscriber(); + Protocol::CollectionFetchScope oldCollectionScope, collectionScopeA; + QSet attrs = {"FOO"}; + collectionScopeA.setAttributes(attrs); + collectionScopeA.setFetchIdOnly(true); + scope.apply(oldCollectionScope, collectionScopeA); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(scope.fetchIdOnly()); + + // second subscriber, B + Protocol::CollectionFetchScope collectionScopeB = collectionScopeA; + collectionScopeB.setFetchIdOnly(false); + scope.addSubscriber(); + scope.apply(oldCollectionScope, collectionScopeB); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(!scope.fetchIdOnly()); + + // then B goes away + scope.apply(collectionScopeB, oldCollectionScope); + scope.removeSubscriber(); + QCOMPARE(scope.attributes(), attrs); + QVERIFY(scope.fetchIdOnly()); + + // A goes away + scope.apply(collectionScopeA, oldCollectionScope); + scope.removeSubscriber(); + QCOMPARE(scope.attributes(), QSet()); + } + + void testItemApply() + { + AggregatedItemFetchScope scope; + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::NoAncestor); + + // first subscriber, A + scope.addSubscriber(); + Protocol::ItemFetchScope oldItemScope, itemScopeA; + QVector parts = {"FOO"}; + QSet partsSet = {"FOO"}; + itemScopeA.setRequestedParts(parts); + itemScopeA.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor); + itemScopeA.setFetch(Protocol::ItemFetchScope::CacheOnly); + itemScopeA.setFetch(Protocol::ItemFetchScope::IgnoreErrors); + scope.apply(oldItemScope, itemScopeA); + QCOMPARE(scope.requestedParts(), partsSet); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::ParentAncestor); + QVERIFY(scope.cacheOnly()); + QVERIFY(scope.ignoreErrors()); + + // second subscriber, B + Protocol::ItemFetchScope itemScopeB = itemScopeA; + itemScopeB.setAncestorDepth(Protocol::ItemFetchScope::AllAncestors); + scope.addSubscriber(); + QVERIFY(!scope.cacheOnly()); // they don't agree so: false + QVERIFY(!scope.ignoreErrors()); + scope.apply(oldItemScope, itemScopeB); + QCOMPARE(scope.requestedParts(), partsSet); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::AllAncestors); + + // subscriber C with ParentAncestor - but that won't make change it + Protocol::ItemFetchScope itemScopeC = itemScopeA; + scope.addSubscriber(); + scope.apply(oldItemScope, itemScopeC); + QCOMPARE(scope.requestedParts(), partsSet); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::AllAncestors); // no change + + // then C goes away + scope.apply(itemScopeC, oldItemScope); + scope.removeSubscriber(); + QCOMPARE(scope.requestedParts(), partsSet); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::AllAncestors); + + // then B goes away + scope.apply(itemScopeB, oldItemScope); + scope.removeSubscriber(); + QCOMPARE(scope.requestedParts(), partsSet); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::ParentAncestor); + + // A goes away + scope.apply(itemScopeA, oldItemScope); + scope.removeSubscriber(); + QCOMPARE(scope.requestedParts(), QSet()); + QCOMPARE(scope.ancestorDepth(), Protocol::ItemFetchScope::NoAncestor); + } +}; + +QTEST_MAIN(AggregatedFetchScopeTest) + +#include "aggregatedfetchscopetest.moc" diff --git a/src/server/aggregatedfetchscope.cpp b/src/server/aggregatedfetchscope.cpp index f75456e00..e449c8ee5 100644 --- a/src/server/aggregatedfetchscope.cpp +++ b/src/server/aggregatedfetchscope.cpp @@ -1,646 +1,657 @@ /* Copyright (c) 2017 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "aggregatedfetchscope.h" #include #include #include #include #define LOCKED_D(name) \ Q_D(name); \ QMutexLocker lock(&d->lock); namespace Akonadi { namespace Server { class AggregatedFetchScopePrivate { public: AggregatedFetchScopePrivate() : lock(QMutex::Recursive) // recursive so that we can call our own getters/setters {} inline void addToSet(const QByteArray &value, QSet &set, QHash &count) { auto it = count.find(value); if (it == count.end()) { it = count.insert(value, 0); set.insert(value); } ++(*it); } inline void removeFromSet(const QByteArray &value, QSet &set, QHash &count) { auto it = count.find(value); if (it == count.end()) { return; } if (--(*it) == 0) { count.erase(it); set.remove(value); } } inline void updateBool(bool newValue, int &store) { store += newValue ? 1 : -1; } inline void applySet(const QSet &oldSet, const QSet &newSet, QSet &set, QHash &count) { const auto added = newSet - oldSet; for (const auto &value : added) { addToSet(value, set, count); } const auto removed = oldSet - newSet; for (const auto &value : removed) { removeFromSet(value, set, count); } } public: mutable QMutex lock; }; class AggregatedCollectionFetchScopePrivate : public AggregatedFetchScopePrivate { public: QSet attrs; QHash attrsCount; int subscribers = 0; int fetchIdOnly = 0; int fetchStats = 0; }; class AggregatedTagFetchScopePrivate : public AggregatedFetchScopePrivate { public: QSet attrs; QHash attrsCount; int subscribers = 0; int fetchIdOnly = 0; int fetchRemoteId = 0; int fetchAllAttributes = 0; }; class AggregatedItemFetchScopePrivate : public AggregatedFetchScopePrivate { public: mutable Protocol::ItemFetchScope mCachedScope; mutable bool mCachedScopeValid = false; // use std::optional for mCachedScope QSet parts; QHash partsCount; QSet tags; QHash tagsCount; + int subscribers = 0; int ancestors[3] = { 0, 0, 0 }; // 3 = size of AncestorDepth enum int cacheOnly = 0; int fullPayload = 0; int allAttributes = 0; int fetchSize = 0; int fetchMTime = 0; int fetchRRev = 0; int ignoreErrors = 0; int fetchFlags = 0; int fetchRID = 0; int fetchGID = 0; int fetchTags = 0; int fetchRelations = 0; int fetchVRefs = 0; }; } // namespace Server } // namespace Akonadi using namespace Akonadi; using namespace Akonadi::Protocol; using namespace Akonadi::Server; AggregatedCollectionFetchScope::AggregatedCollectionFetchScope() : d_ptr(new AggregatedCollectionFetchScopePrivate) { } AggregatedCollectionFetchScope::~AggregatedCollectionFetchScope() { delete d_ptr; } void AggregatedCollectionFetchScope::apply(const Protocol::CollectionFetchScope &oldScope, const Protocol::CollectionFetchScope &newScope) { LOCKED_D(AggregatedCollectionFetchScope) if (newScope.includeStatistics() != oldScope.includeStatistics()) { setFetchStatistics(newScope.includeStatistics()); } if (newScope.fetchIdOnly() != oldScope.fetchIdOnly()) { setFetchIdOnly(newScope.fetchIdOnly()); } if (newScope.attributes() != oldScope.attributes()) { d->applySet(oldScope.attributes(), newScope.attributes(), d->attrs, d->attrsCount); } } QSet AggregatedCollectionFetchScope::attributes() const { LOCKED_D(const AggregatedCollectionFetchScope) return d->attrs; } void AggregatedCollectionFetchScope::addAttribute(const QByteArray &attribute) { LOCKED_D(AggregatedCollectionFetchScope) d->addToSet(attribute, d->attrs, d->attrsCount); } void AggregatedCollectionFetchScope::removeAttribute(const QByteArray &attribute) { LOCKED_D(AggregatedCollectionFetchScope) d->removeFromSet(attribute, d->attrs, d->attrsCount); } bool AggregatedCollectionFetchScope::fetchIdOnly() const { LOCKED_D(const AggregatedCollectionFetchScope) // Aggregation: we can return true only if everyone wants fetchIdOnly, // otherwise there's at least one subscriber who wants everything return d->fetchIdOnly == d->subscribers; } void AggregatedCollectionFetchScope::setFetchIdOnly(bool fetchIdOnly) { LOCKED_D(AggregatedCollectionFetchScope) d->updateBool(fetchIdOnly, d->fetchIdOnly); } bool AggregatedCollectionFetchScope::fetchStatistics() const { LOCKED_D(const AggregatedCollectionFetchScope); // Aggregation: return true if at least one subscriber wants stats return d->fetchStats > 0; } void AggregatedCollectionFetchScope::setFetchStatistics(bool fetchStats) { LOCKED_D(AggregatedCollectionFetchScope); d->updateBool(fetchStats, d->fetchStats); } void AggregatedCollectionFetchScope::addSubscriber() { LOCKED_D(AggregatedCollectionFetchScope) ++d->subscribers; } void AggregatedCollectionFetchScope::removeSubscriber() { LOCKED_D(AggregatedCollectionFetchScope) --d->subscribers; } AggregatedItemFetchScope::AggregatedItemFetchScope() : d_ptr(new AggregatedItemFetchScopePrivate) { } AggregatedItemFetchScope::~AggregatedItemFetchScope() { delete d_ptr; } void AggregatedItemFetchScope::apply(const Protocol::ItemFetchScope &oldScope, const Protocol::ItemFetchScope &newScope) { LOCKED_D(AggregatedItemFetchScope); const auto newParts = vectorToSet(newScope.requestedParts()); const auto oldParts = vectorToSet(oldScope.requestedParts()); if (newParts != oldParts) { d->applySet(oldParts, newParts, d->parts, d->partsCount); } if (newScope.ancestorDepth() != oldScope.ancestorDepth()) { updateAncestorDepth(oldScope.ancestorDepth(), newScope.ancestorDepth()); } if (newScope.cacheOnly() != oldScope.cacheOnly()) { setCacheOnly(newScope.cacheOnly()); } if (newScope.fullPayload() != oldScope.fullPayload()) { setFullPayload(newScope.fullPayload()); } if (newScope.allAttributes() != oldScope.allAttributes()) { setAllAttributes(newScope.allAttributes()); } if (newScope.fetchSize() != oldScope.fetchSize()) { setFetchSize(newScope.fetchSize()); } if (newScope.fetchMTime() != oldScope.fetchMTime()) { setFetchMTime(newScope.fetchMTime()); } if (newScope.fetchRemoteRevision() != oldScope.fetchRemoteRevision()) { setFetchRemoteRevision(newScope.fetchRemoteRevision()); } if (newScope.ignoreErrors() != oldScope.ignoreErrors()) { setIgnoreErrors(newScope.ignoreErrors()); } if (newScope.fetchFlags() != oldScope.fetchFlags()) { setFetchFlags(newScope.fetchFlags()); } if (newScope.fetchRemoteId() != oldScope.fetchRemoteId()) { setFetchRemoteId(newScope.fetchRemoteId()); } if (newScope.fetchGID() != oldScope.fetchGID()) { setFetchGID(newScope.fetchGID()); } if (newScope.fetchTags() != oldScope.fetchTags()) { setFetchTags(newScope.fetchTags()); } if (newScope.fetchRelations() != oldScope.fetchRelations()) { setFetchRelations(newScope.fetchRelations()); } if (newScope.fetchVirtualReferences() != oldScope.fetchVirtualReferences()) { setFetchVirtualReferences(newScope.fetchVirtualReferences()); } d->mCachedScopeValid = false; } ItemFetchScope AggregatedItemFetchScope::toFetchScope() const { LOCKED_D(const AggregatedItemFetchScope); if (d->mCachedScopeValid) { return d->mCachedScope; } d->mCachedScope = ItemFetchScope(); d->mCachedScope.setRequestedParts(setToVector(d->parts)); d->mCachedScope.setAncestorDepth(ancestorDepth()); d->mCachedScope.setFetch(ItemFetchScope::CacheOnly, cacheOnly()); d->mCachedScope.setFetch(ItemFetchScope::FullPayload, fullPayload()); d->mCachedScope.setFetch(ItemFetchScope::AllAttributes, allAttributes()); d->mCachedScope.setFetch(ItemFetchScope::Size, fetchSize()); d->mCachedScope.setFetch(ItemFetchScope::MTime, fetchMTime()); d->mCachedScope.setFetch(ItemFetchScope::RemoteRevision, fetchRemoteRevision()); d->mCachedScope.setFetch(ItemFetchScope::IgnoreErrors, ignoreErrors()); d->mCachedScope.setFetch(ItemFetchScope::Flags, fetchFlags()); d->mCachedScope.setFetch(ItemFetchScope::RemoteID, fetchRemoteId()); d->mCachedScope.setFetch(ItemFetchScope::GID, fetchGID()); d->mCachedScope.setFetch(ItemFetchScope::Tags, fetchTags()); d->mCachedScope.setFetch(ItemFetchScope::Relations, fetchRelations()); d->mCachedScope.setFetch(ItemFetchScope::VirtReferences, fetchVirtualReferences()); d->mCachedScopeValid = true; return d->mCachedScope; } QSet AggregatedItemFetchScope::requestedParts() const { LOCKED_D(const AggregatedItemFetchScope) return d->parts; } void AggregatedItemFetchScope::addRequestedPart(const QByteArray &part) { LOCKED_D(AggregatedItemFetchScope) d->addToSet(part, d->parts, d->partsCount); } void AggregatedItemFetchScope::removeRequestedPart(const QByteArray &part) { LOCKED_D(AggregatedItemFetchScope) d->removeFromSet(part, d->parts, d->partsCount); } ItemFetchScope::AncestorDepth AggregatedItemFetchScope::ancestorDepth() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return the largest depth with at least one subscriber if (d->ancestors[ItemFetchScope::AllAncestors] > 0) { return ItemFetchScope::AllAncestors; } else if (d->ancestors[ItemFetchScope::ParentAncestor] > 0) { return ItemFetchScope::ParentAncestor; } else { return ItemFetchScope::NoAncestor; } } void AggregatedItemFetchScope::updateAncestorDepth(ItemFetchScope::AncestorDepth oldDepth, ItemFetchScope::AncestorDepth newDepth) { LOCKED_D(AggregatedItemFetchScope) if (d->ancestors[oldDepth] > 0) { --d->ancestors[oldDepth]; } ++d->ancestors[newDepth]; } bool AggregatedItemFetchScope::cacheOnly() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: we can return true only if everyone wants cached data only, // otherwise there's at least one subscriber who wants uncached data - return d->cacheOnly == 0; + return d->cacheOnly == d->subscribers; } void AggregatedItemFetchScope::setCacheOnly(bool cacheOnly) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(cacheOnly, d->cacheOnly); } bool AggregatedItemFetchScope::fullPayload() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants the // full payload return d->fullPayload > 0; } void AggregatedItemFetchScope::setFullPayload(bool fullPayload) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fullPayload, d->fullPayload); } bool AggregatedItemFetchScope::allAttributes() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants // all attributes return d->allAttributes > 0; } void AggregatedItemFetchScope::setAllAttributes(bool allAttributes) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(allAttributes, d->allAttributes); } bool AggregatedItemFetchScope::fetchSize() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants size return d->fetchSize > 0; } void AggregatedItemFetchScope::setFetchSize(bool fetchSize) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchSize, d->fetchSize); } bool AggregatedItemFetchScope::fetchMTime() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants mtime return d->fetchMTime > 0; } void AggregatedItemFetchScope::setFetchMTime(bool fetchMTime) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchMTime, d->fetchMTime); } bool AggregatedItemFetchScope::fetchRemoteRevision() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants rrev return d->fetchRRev > 0; } void AggregatedItemFetchScope::setFetchRemoteRevision(bool remoteRevision) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(remoteRevision, d->fetchRRev); } bool AggregatedItemFetchScope::ignoreErrors() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true only if everyone wants to ignore errors, otherwise // there's at least one subscriber who does not want to ignore them - return d->ignoreErrors == 0; + return d->ignoreErrors == d->subscribers; } void AggregatedItemFetchScope::setIgnoreErrors(bool ignoreErrors) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(ignoreErrors, d->ignoreErrors); } bool AggregatedItemFetchScope::fetchFlags() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants flags return d->fetchFlags > 0; } void AggregatedItemFetchScope::setFetchFlags(bool fetchFlags) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchFlags, d->fetchFlags); } bool AggregatedItemFetchScope::fetchRemoteId() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants RID return d->fetchRID > 0; } void AggregatedItemFetchScope::setFetchRemoteId(bool fetchRemoteId) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchRemoteId, d->fetchRID); } bool AggregatedItemFetchScope::fetchGID() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants GID return d->fetchGID > 0; } void AggregatedItemFetchScope::setFetchGID(bool fetchGid) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchGid, d->fetchGID); } bool AggregatedItemFetchScope::fetchTags() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants tags return d->fetchTags > 0; } void AggregatedItemFetchScope::setFetchTags(bool fetchTags) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchTags, d->fetchTags); } bool AggregatedItemFetchScope::fetchRelations() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants relations return d->fetchRelations > 0; } void AggregatedItemFetchScope::setFetchRelations(bool fetchRelations) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchRelations, d->fetchRelations); } bool AggregatedItemFetchScope::fetchVirtualReferences() const { LOCKED_D(const AggregatedItemFetchScope) // Aggregation: return true if there's at least one subscriber who wants vrefs return d->fetchVRefs > 0; } void AggregatedItemFetchScope::setFetchVirtualReferences(bool fetchVRefs) { LOCKED_D(AggregatedItemFetchScope) d->updateBool(fetchVRefs, d->fetchVRefs); } +void AggregatedItemFetchScope::addSubscriber() +{ + LOCKED_D(AggregatedItemFetchScope) + ++d->subscribers; +} +void AggregatedItemFetchScope::removeSubscriber() +{ + LOCKED_D(AggregatedItemFetchScope) + --d->subscribers; +} AggregatedTagFetchScope::AggregatedTagFetchScope() : d_ptr(new AggregatedTagFetchScopePrivate) { } AggregatedTagFetchScope::~AggregatedTagFetchScope() { delete d_ptr; } void AggregatedTagFetchScope::apply(const Protocol::TagFetchScope &oldScope, const Protocol::TagFetchScope &newScope) { LOCKED_D(AggregatedTagFetchScope) if (newScope.fetchIdOnly() != oldScope.fetchIdOnly()) { setFetchIdOnly(newScope.fetchIdOnly()); } if (newScope.fetchRemoteID() != oldScope.fetchRemoteID()) { setFetchRemoteId(newScope.fetchRemoteID()); } if (newScope.fetchAllAttributes() != oldScope.fetchAllAttributes()) { setFetchAllAttributes(newScope.fetchAllAttributes()); } if (newScope.attributes() != oldScope.attributes()) { d->applySet(oldScope.attributes(), newScope.attributes(), d->attrs, d->attrsCount); } } Protocol::TagFetchScope AggregatedTagFetchScope::toFetchScope() const { Protocol::TagFetchScope tfs; tfs.setFetchIdOnly(fetchIdOnly()); tfs.setFetchRemoteID(fetchRemoteId()); tfs.setFetchAllAttributes(fetchAllAttributes()); tfs.setAttributes(attributes()); return tfs; } bool AggregatedTagFetchScope::fetchIdOnly() const { LOCKED_D(const AggregatedTagFetchScope) // Aggregation: we can return true only if everyone wants fetchIdOnly, // otherwise there's at least one subscriber who wants everything return d->fetchIdOnly == d->subscribers; } void AggregatedTagFetchScope::setFetchIdOnly(bool fetchIdOnly) { LOCKED_D(AggregatedTagFetchScope) d->updateBool(fetchIdOnly, d->fetchIdOnly); } bool AggregatedTagFetchScope::fetchRemoteId() const { LOCKED_D(const AggregatedTagFetchScope) return d->fetchRemoteId > 0; } void AggregatedTagFetchScope::setFetchRemoteId(bool fetchRemoteId) { LOCKED_D(AggregatedTagFetchScope) d->updateBool(fetchRemoteId, d->fetchRemoteId); } bool AggregatedTagFetchScope::fetchAllAttributes() const { LOCKED_D(const AggregatedTagFetchScope) return d->fetchAllAttributes > 0; } void AggregatedTagFetchScope::setFetchAllAttributes(bool fetchAllAttributes) { LOCKED_D(AggregatedTagFetchScope) d->updateBool(fetchAllAttributes, d->fetchAllAttributes); } QSet AggregatedTagFetchScope::attributes() const { LOCKED_D(const AggregatedTagFetchScope) return d->attrs; } void AggregatedTagFetchScope::addAttribute(const QByteArray &attribute) { LOCKED_D(AggregatedTagFetchScope) d->addToSet(attribute, d->attrs, d->attrsCount); } void AggregatedTagFetchScope::removeAttribute(const QByteArray &attribute) { LOCKED_D(AggregatedTagFetchScope) d->removeFromSet(attribute, d->attrs, d->attrsCount); } void AggregatedTagFetchScope::addSubscriber() { LOCKED_D(AggregatedTagFetchScope) ++d->subscribers; } void AggregatedTagFetchScope::removeSubscriber() { LOCKED_D(AggregatedTagFetchScope) --d->subscribers; } #undef LOCKED_D diff --git a/src/server/aggregatedfetchscope.h b/src/server/aggregatedfetchscope.h index b712ad3a2..cdf866d26 100644 --- a/src/server/aggregatedfetchscope.h +++ b/src/server/aggregatedfetchscope.h @@ -1,146 +1,149 @@ /* Copyright (c) 2017 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_AGGREGATED_FETCHSCOPE_H_ #define AKONADI_AGGREGATED_FETCHSCOPE_H_ #include #include class QByteArray; namespace Akonadi { namespace Server { class AggregatedCollectionFetchScopePrivate; class AggregatedCollectionFetchScope { public: explicit AggregatedCollectionFetchScope(); ~AggregatedCollectionFetchScope(); void apply(const Protocol::CollectionFetchScope &oldScope, const Protocol::CollectionFetchScope &newScope); QSet attributes() const; void addAttribute(const QByteArray &attribute); void removeAttribute(const QByteArray &attribute); bool fetchIdOnly() const; void setFetchIdOnly(bool fetchIdOnly); bool fetchStatistics() const; void setFetchStatistics(bool fetchStats); void addSubscriber(); void removeSubscriber(); private: AggregatedCollectionFetchScopePrivate * const d_ptr; Q_DECLARE_PRIVATE(AggregatedCollectionFetchScope) }; class AggregatedItemFetchScopePrivate; class AggregatedItemFetchScope { public: explicit AggregatedItemFetchScope(); ~AggregatedItemFetchScope(); void apply(const Protocol::ItemFetchScope &oldScope, const Protocol::ItemFetchScope &newScope); Protocol::ItemFetchScope toFetchScope() const; QSet requestedParts() const; void addRequestedPart(const QByteArray &part); void removeRequestedPart(const QByteArray &part); Protocol::ItemFetchScope::AncestorDepth ancestorDepth() const; void updateAncestorDepth(Protocol::ItemFetchScope::AncestorDepth oldDepth, Protocol::ItemFetchScope::AncestorDepth newDepth); bool cacheOnly() const; void setCacheOnly(bool cacheOnly); bool fullPayload() const; void setFullPayload(bool fullPayload); bool allAttributes() const; void setAllAttributes(bool allAttributes); bool fetchSize() const; void setFetchSize(bool fetchSize); bool fetchMTime() const; void setFetchMTime(bool fetchMTime); bool fetchRemoteRevision() const; void setFetchRemoteRevision(bool remoteRevision); bool ignoreErrors() const; void setIgnoreErrors(bool ignoreErrors); bool fetchFlags() const; void setFetchFlags(bool fetchFlags); bool fetchRemoteId() const; void setFetchRemoteId(bool fetchRemoteId); bool fetchGID() const; void setFetchGID(bool fetchGid); bool fetchTags() const; void setFetchTags(bool fetchTags); bool fetchRelations() const; void setFetchRelations(bool fetchRelations); bool fetchVirtualReferences() const; void setFetchVirtualReferences(bool fetchVRefs); + void addSubscriber(); + void removeSubscriber(); + private: AggregatedItemFetchScopePrivate * const d_ptr; Q_DECLARE_PRIVATE(AggregatedItemFetchScope) }; class AggregatedTagFetchScopePrivate; class AggregatedTagFetchScope { public: explicit AggregatedTagFetchScope(); ~AggregatedTagFetchScope(); void apply(const Protocol::TagFetchScope &oldScope, const Protocol::TagFetchScope &newScope); Protocol::TagFetchScope toFetchScope() const; QSet attributes() const; void addAttribute(const QByteArray &attribute); void removeAttribute(const QByteArray &attribute); void addSubscriber(); void removeSubscriber(); bool fetchIdOnly() const; void setFetchIdOnly(bool fetchIdOnly); bool fetchRemoteId() const; void setFetchRemoteId(bool fetchRemoteId); bool fetchAllAttributes() const; void setFetchAllAttributes(bool fetchAllAttributes); private: AggregatedTagFetchScopePrivate * const d_ptr; Q_DECLARE_PRIVATE(AggregatedTagFetchScope) }; } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/notificationsubscriber.cpp b/src/server/notificationsubscriber.cpp index 39d6fc7d9..5fbe65cf2 100644 --- a/src/server/notificationsubscriber.cpp +++ b/src/server/notificationsubscriber.cpp @@ -1,745 +1,749 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationsubscriber.h" #include "akonadiserver_debug.h" #include "notificationmanager.h" #include "collectionreferencemanager.h" #include "aggregatedfetchscope.h" #include "storage/querybuilder.h" #include "utils.h" #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; QMimeDatabase NotificationSubscriber::sMimeDatabase; #define TRACE_NTF(x) //#define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x NotificationSubscriber::NotificationSubscriber(NotificationManager *manager) : QObject() , mManager(manager) , mSocket(nullptr) , mAllMonitored(false) , mExclusive(false) , mNotificationDebugging(false) { if (mManager) { + mManager->itemFetchScope()->addSubscriber(); mManager->collectionFetchScope()->addSubscriber(); mManager->tagFetchScope()->addSubscriber(); } } NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor) : NotificationSubscriber(manager) { mSocket = new QLocalSocket(this); connect(mSocket, &QLocalSocket::readyRead, this, &NotificationSubscriber::handleIncomingData); connect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected); mSocket->setSocketDescriptor(socketDescriptor); const SchemaVersion schema = SchemaVersion::retrieveAll().first(); auto hello = Protocol::HelloResponsePtr::create(); hello->setServerName(QStringLiteral("Akonadi")); hello->setMessage(QStringLiteral("Not really IMAP server")); hello->setProtocolVersion(Protocol::version()); hello->setGeneration(schema.generation()); writeCommand(0, hello); } NotificationSubscriber::~NotificationSubscriber() { QMutexLocker locker(&mLock); if (mNotificationDebugging) { Q_EMIT notificationDebuggingChanged(false); } } void NotificationSubscriber::handleIncomingData() { while (mSocket->bytesAvailable() > (int) sizeof(qint64)) { Protocol::DataStream stream(mSocket); // Ignored atm qint64 tag = -1; stream >> tag; Protocol::CommandPtr cmd; try { cmd = Protocol::deserialize(mSocket); } catch (const Akonadi::ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "ProtocolException:" << e.what(); disconnectSubscriber(); return; } catch (const std::exception &e) { qCWarning(AKONADISERVER_LOG) << "Unknown exception:" << e.what(); disconnectSubscriber(); return; } if (cmd->type() == Protocol::Command::Invalid) { qCWarning(AKONADISERVER_LOG) << "Received an invalid command: resetting connection"; disconnectSubscriber(); return; } switch (cmd->type()) { case Protocol::Command::CreateSubscription: registerSubscriber(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create()); break; case Protocol::Command::ModifySubscription: if (mSubscriber.isEmpty()) { qCWarning(AKONADISERVER_LOG) << "Received ModifySubscription command before RegisterSubscriber"; disconnectSubscriber(); return; } modifySubscription(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create()); break; case Protocol::Command::Logout: disconnectSubscriber(); break; default: qCWarning(AKONADISERVER_LOG) << "Invalid command" << cmd->type() << "received by NotificationSubscriber" << mSubscriber; disconnectSubscriber(); break; } } } void NotificationSubscriber::socketDisconnected() { qCDebug(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected"; disconnectSubscriber(); } void NotificationSubscriber::disconnectSubscriber() { QMutexLocker locker(&mLock); auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove); mManager->slotNotify({ changeNtf }); disconnect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected); mSocket->close(); // Unregister ourselves from the aggergated collection fetch scope auto cfs = mManager->collectionFetchScope(); if (mCollectionFetchScope.fetchIdOnly()) { cfs->setFetchIdOnly(false); } if (mCollectionFetchScope.includeStatistics()) { cfs->setFetchStatistics(false); } const auto attrs = mCollectionFetchScope.attributes(); for (const auto &attr : attrs) { cfs->removeAttribute(attr); } cfs->removeSubscriber(); auto tfs = mManager->tagFetchScope(); tfs->removeSubscriber(); + auto ifs = mManager->itemFetchScope(); + ifs->removeSubscriber(); + mManager->forgetSubscriber(this); deleteLater(); } void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command) { QMutexLocker locker(&mLock); qCDebug(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName(); mSubscriber = command.subscriberName(); mSession = command.session(); auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add); mManager->slotNotify({ changeNtf }); } void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command) { QMutexLocker locker(&mLock); const auto modifiedParts = command.modifiedParts(); #define START_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add)) #define STOP_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove)) #define APPEND(set, newItems) \ Q_FOREACH (const auto &entity, newItems) { \ set.insert(entity); \ } #define REMOVE(set, items) \ Q_FOREACH (const auto &entity, items) { \ set.remove(entity); \ } if (START_MONITORING(Types)) { APPEND(mMonitoredTypes, command.startMonitoringTypes()) } if (STOP_MONITORING(Types)) { REMOVE(mMonitoredTypes, command.stopMonitoringTypes()) } if (START_MONITORING(Collections)) { APPEND(mMonitoredCollections, command.startMonitoringCollections()) } if (STOP_MONITORING(Collections)) { REMOVE(mMonitoredCollections, command.stopMonitoringCollections()) } if (START_MONITORING(Items)) { APPEND(mMonitoredItems, command.startMonitoringItems()) } if (STOP_MONITORING(Items)) { REMOVE(mMonitoredItems, command.stopMonitoringItems()) } if (START_MONITORING(Tags)) { APPEND(mMonitoredTags, command.startMonitoringTags()) } if (STOP_MONITORING(Tags)) { REMOVE(mMonitoredTags, command.stopMonitoringTags()) } if (START_MONITORING(Resources)) { APPEND(mMonitoredResources, command.startMonitoringResources()) } if (STOP_MONITORING(Resources)) { REMOVE(mMonitoredResources, command.stopMonitoringResources()) } if (START_MONITORING(MimeTypes)) { APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes()) } if (STOP_MONITORING(MimeTypes)) { REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes()) } if (START_MONITORING(Sessions)) { APPEND(mIgnoredSessions, command.startIgnoringSessions()) } if (STOP_MONITORING(Sessions)) { REMOVE(mIgnoredSessions, command.stopIgnoringSessions()) } if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) { mAllMonitored = command.allMonitored(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) { mExclusive = command.isExclusive(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) { const auto newScope = command.itemFetchScope(); mManager->itemFetchScope()->apply(mItemFetchScope, newScope); mItemFetchScope = newScope; } if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { const auto newScope = command.collectionFetchScope(); mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope); mCollectionFetchScope = newScope; } if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) { const auto newScope = command.tagFetchScope(); mManager->tagFetchScope()->apply(mTagFetchScope, newScope); mTagFetchScope = newScope; if (!newScope.fetchIdOnly()) Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly()); } if (mManager) { if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) { // Did the caller just subscribed to subscription changes? if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) { // If yes, then send them list of all existing subscribers Q_FOREACH (const NotificationSubscriber *subscriber, mManager->mSubscribers) { // Send them back to caller if (subscriber) { QMetaObject::invokeMethod(this, "notify", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification())); } } } if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (!mNotificationDebugging) { mNotificationDebugging = true; Q_EMIT notificationDebuggingChanged(true); } } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (mNotificationDebugging) { mNotificationDebugging = false; Q_EMIT notificationDebuggingChanged(false); } } } // Emit subscription change notification auto changeNtf = toChangeNotification(); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify); mManager->slotNotify({ changeNtf }); } #undef START_MONITORING #undef STOP_MONITORING #undef APPEND #undef REMOVE } Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const { // Assumes mLock being locked by caller auto ntf = Protocol::SubscriptionChangeNotificationPtr::create(); ntf->setSessionId(mSession); ntf->setSubscriber(mSubscriber); ntf->setOperation(Protocol::SubscriptionChangeNotification::Add); ntf->setCollections(mMonitoredCollections); ntf->setItems(mMonitoredItems); ntf->setTags(mMonitoredTags); ntf->setTypes(mMonitoredTypes); ntf->setMimeTypes(mMonitoredMimeTypes); ntf->setResources(mMonitoredResources); ntf->setIgnoredSessions(mIgnoredSessions); ntf->setAllMonitored(mAllMonitored); ntf->setExclusive(mExclusive); ntf->setItemFetchScope(mItemFetchScope); ntf->setTagFetchScope(mTagFetchScope); ntf->setCollectionFetchScope(mCollectionFetchScope); return ntf; } bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const { // Assumes mLock being locked by caller if (id < 0) { return false; } else if (mMonitoredCollections.contains(id)) { return true; } else if (mMonitoredCollections.contains(0)) { return true; } return false; } bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const { // Assumes mLock being locked by caller const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType); if (mMonitoredMimeTypes.contains(mimeType)) { return true; } const QStringList lst = mt.aliases(); for (const QString &alias : lst) { if (mMonitoredMimeTypes.contains(alias)) { return true; } } return false; } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::ItemChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::CollectionChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.items().isEmpty()) { return false; } if (CollectionReferenceManager::instance()->isReferenced(msg.parentCollection())) { //We always want notifications that affect the parent resource (like an item added to a referenced collection) const bool notificationForParentResource = (mSession == msg.resource()); const bool accepts = mExclusive || isCollectionMonitored(msg.parentCollection()) || isMoveDestinationResourceMonitored(msg) || notificationForParentResource; TRACE_NTF("ACCEPTS ITEM: parent col referenced" << "exclusive:" << mExclusive << "," << "parent monitored:" << isCollectionMonitored(msg.parentCollection()) << "," << "destination monitored:" << isMoveDestinationResourceMonitored(msg) << "," << "ntf for parent resource:" << notificationForParentResource << ":" << "ACCEPTED:" << accepts); return accepts; } if (mAllMonitored) { TRACE_NTF("ACCEPTS ITEM: all monitored"); return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) { TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored"); return false; } // we have a resource or mimetype filter if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) { if (mMonitoredResources.contains(msg.resource())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored"); return true; } if (isMoveDestinationResourceMonitored(msg)) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored"); return true; } Q_FOREACH (const auto &item, msg.items()) { if (isMimeTypeMonitored(item.mimeType())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored"); return true; } } TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored"); return false; } // we explicitly monitor that item or the collections it's in Q_FOREACH (const auto &item, msg.items()) { if (mMonitoredItems.contains(item.id())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored"); return true; } } if (isCollectionMonitored(msg.parentCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored"); return true; } if (isCollectionMonitored(msg.parentDestCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored"); return true; } TRACE_NTF("ACCEPTS ITEM: REJECTED"); return false; } bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller const auto &collection = msg.collection(); if (collection.id() < 0) { return false; } // HACK: We need to dispatch notifications about disabled collections to SOME // agents (that's what we have the exclusive subscription for) - but because // querying each Collection from database would be expensive, we use the // metadata hack to transfer this information from NotificationCollector if (msg.metadata().contains("DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe) && !msg.changedParts().contains("ENABLED")) { // Exclusive subscriber always gets it if (mExclusive) { return true; } //Deliver the notification if referenced from this session if (CollectionReferenceManager::instance()->isReferenced(collection.id(), mSession)) { return true; } //Exclusive subscribers still want the notification if (mExclusive && CollectionReferenceManager::instance()->isReferenced(collection.id())) { return true; } //The session belonging to this monitor referenced or dereferenced the collection. We always want this notification. //The referencemanager no longer holds a reference, so we have to check this way. if (msg.changedParts().contains(AKONADI_PARAM_REFERENCED) && mSession == msg.sessionId()) { return true; } // If the collection is not referenced, monitored or the subscriber is not // exclusive (i.e. if we got here), then the subscriber does not care about // this one, so drop it return false; } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) { return false; } // we have a resource filter if (!mMonitoredResources.isEmpty()) { const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg); // a bit hacky, but match the behaviour from the item case, // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course if (mMonitoredMimeTypes.isEmpty() || resourceMatches) { return resourceMatches; } // else continue } // we explicitly monitor that collection, or all of them if (isCollectionMonitored(collection.id())) { return true; } return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection()); } bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.tag().id() < 0) { return false; } // Special handling for Tag removal notifications: When a Tag is removed, // a notification is emitted for each Resource that owns the tag (i.e. // each resource that owns a Tag RID - Tag RIDs are resource-specific). // Additionally then we send one more notification without any RID that is // destined for regular applications (which don't know anything about Tag RIDs) if (msg.operation() == Protocol::TagChangeNotification::Remove) { // HACK: Since have no way to determine which resource this NotificationSource // belongs to, we are abusing the fact that each resource ignores it's own // main session, which is called the same name as the resource. // If there are any ignored sessions, but this notification does not have // a specific resource set, then we ignore it, as this notification is // for clients, not resources (does not have tag RID) if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) { return false; } // If this source ignores a session (i.e. we assume it is a resource), // but this notification is for another resource, then we ignore it if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) { return false; } // Now we got here, which means that this notification either has empty // resource, i.e. it is destined for a client applications, or it's // destined for resource that we *think* (see the hack above) this // NotificationSource belongs too. Which means we approve this notification, // but it can still be discarded in the generic Tag notification filter // below } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) { return false; } if (mMonitoredTags.isEmpty()) { return true; } if (mMonitoredTags.contains(msg.tag().id())) { return true; } return true; } bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges)) { return false; } return true; } bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); // Unlike other types, subscription notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges); } bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const { // Assumes mLock being locked by caller // We should never end up sending debug notification about a debug notification. // This could get very messy very quickly... Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification); if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) { return false; } // Unlike other types, debug change notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications); } bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const { // Assumes mLock being locked // Uninitialized subscriber gets nothing if (mSubscriber.isEmpty()) { return false; } // session is ignored // TODO: Should this affect SubscriptionChangeNotification and DebugChangeNotification? if (mIgnoredSessions.contains(msg.sessionId())) { return false; } switch (msg.type()) { case Protocol::Command::ItemChangeNotification: return acceptsItemNotification(static_cast(msg)); case Protocol::Command::CollectionChangeNotification: return acceptsCollectionNotification(static_cast(msg)); case Protocol::Command::TagChangeNotification: return acceptsTagNotification(static_cast(msg)); case Protocol::Command::RelationChangeNotification: return acceptsRelationNotification(static_cast(msg)); case Protocol::Command::SubscriptionChangeNotification: return acceptsSubscriptionNotification(static_cast(msg)); case Protocol::Command::DebugChangeNotification: return acceptsDebugChangeNotification(static_cast(msg)); default: qCDebug(AKONADISERVER_LOG) << "Received invalid change notification!"; return false; } } Protocol::CollectionChangeNotificationPtr NotificationSubscriber::customizeCollection(const Protocol::CollectionChangeNotificationPtr &ntf) { const bool isReferencedFromSession = CollectionReferenceManager::instance()->isReferenced(ntf->collection().id(), mSession); if (isReferencedFromSession != ntf->collection().referenced()) { auto copy = Protocol::CollectionChangeNotificationPtr::create(*ntf); auto copyCol = ntf->collection(); copyCol.setReferenced(isReferencedFromSession); copy->setCollection(std::move(copyCol)); return copy; } return ntf; } bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr ¬ification) { // Guard against this object being deleted while we are waiting for the lock QPointer ptr(this); QMutexLocker locker(&mLock); if (!ptr) { return false; } if (acceptsNotification(*notification)) { auto ntf = notification; if (ntf->type() == Protocol::Command::CollectionChangeNotification) { ntf = customizeCollection(notification.staticCast()); } QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, ntf)); return true; } return false; } void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr ¬ification) { // tag chosen by fair dice roll writeCommand(4, notification); } void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd) { Q_ASSERT(QThread::currentThread() == thread()); Protocol::DataStream stream(mSocket); stream << tag; try { Protocol::serialize(mSocket, cmd); if (!mSocket->waitForBytesWritten()) { if (mSocket->state() == QLocalSocket::ConnectedState) { qCWarning(AKONADISERVER_LOG) << "Notification socket write timeout!"; } else { // client has disconnected, just discard the message } } } catch (const ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "Notification protocol exception:" << e.what(); } }