diff --git a/autotests/server/CMakeLists.txt b/autotests/server/CMakeLists.txt index fe10d1814..cbf0f81ac 100644 --- a/autotests/server/CMakeLists.txt +++ b/autotests/server/CMakeLists.txt @@ -1,105 +1,106 @@ ########### next target ############### # QTEST_MAIN is using QApplication when QT_GUI_LIB is defined remove_definitions(-DQT_GUI_LIB) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_BINARY_DIR}/src/server ${Akonadi_SOURCE_DIR}/src/server) akonadi_run_xsltproc( XSL ${Akonadi_SOURCE_DIR}/src/server/storage/schema.xsl XML ${CMAKE_CURRENT_SOURCE_DIR}/dbtest_data/unittest_schema.xml BASENAME unittestschema CLASSNAME UnitTestSchema ) akonadi_run_xsltproc( XSL ${CMAKE_CURRENT_SOURCE_DIR}/dbpopulator.xsl XML ${CMAKE_CURRENT_SOURCE_DIR}/dbtest_data/dbdata.xml BASENAME dbpopulator ) set_property(SOURCE ${CMAKE_CURRENT_BINARY_DIR}/dbpopulator.cpp PROPERTY SKIP_AUTOMOC TRUE) set(common_SRCS unittestschema.cpp fakeconnection.cpp fakedatastore.cpp fakeclient.cpp fakeakonadiserver.cpp fakesearchmanager.cpp fakeitemretrievalmanager.cpp dbinitializer.cpp inspectablenotificationcollector.cpp ${CMAKE_CURRENT_BINARY_DIR}/dbpopulator.cpp ) add_library(akonadi_unittest_common STATIC ${common_SRCS}) target_link_libraries(akonadi_unittest_common KF5AkonadiPrivate libakonadiserver Qt5::Core Qt5::DBus Qt5::Test Qt5::Sql Qt5::Network ) macro(add_server_test _source) set(_test ${_source} ../../src/server/akonadiserver_debug.cpp ../../src/server/akonadiserver_search_debug.cpp) get_filename_component(_name ${_source} NAME_WE) qt5_add_resources(_test dbtest_data/dbtest_data.qrc) add_executable(${_name} ${_test}) add_test(NAME AkonadiServer-${_name} COMMAND ${_name}) if (ENABLE_ASAN) set_tests_properties(AkonadiServer-${_name} PROPERTIES ENVIRONMENT ASAN_OPTIONS=symbolize=1 ) endif() set_tests_properties(AkonadiServer-${_name} PROPERTIES ENVIRONMENT "QT_HASH_SEED=0;QT_NO_CPU_FEATURE=sse4.2" ) target_link_libraries(${_name} akonadi_shared akonadi_unittest_common libakonadiserver KF5AkonadiPrivate Qt5::Core Qt5::DBus Qt5::Test Qt5::Sql Qt5::Network ${CMAKE_SHARED_LINKER_FLAGS_ASAN} ) endmacro() add_server_test(dbtypetest.cpp) add_server_test(dbintrospectortest.cpp) add_server_test(querybuildertest.cpp) add_server_test(dbinitializertest.cpp) add_server_test(dbupdatertest.cpp) add_server_test(handlertest.cpp) add_server_test(dbconfigtest.cpp) add_server_test(parthelpertest.cpp) add_server_test(itemretrievertest.cpp) add_server_test(notificationsubscribertest.cpp) +add_server_test(notificationmanagertest.cpp) add_server_test(parttypehelpertest.cpp) add_server_test(collectionstatisticstest.cpp) add_server_test(aggregatedfetchscopetest.cpp) if (SQLITE_FOUND) # tests using the fake server need the QSQLITE3 plugin add_server_test(partstreamertest.cpp) add_server_test(akappendhandlertest.cpp) add_server_test(linkhandlertest.cpp) add_server_test(listhandlertest.cpp) add_server_test(modifyhandlertest.cpp) add_server_test(movehandlertest.cpp) add_server_test(createhandlertest.cpp) add_server_test(collectionreferencetest.cpp) add_server_test(searchtest.cpp akonadiprivate) add_server_test(relationhandlertest.cpp akonadiprivate) add_server_test(taghandlertest.cpp akonadiprivate) add_server_test(fetchhandlertest.cpp akonadiprivate) endif() diff --git a/autotests/server/notificationmanagertest.cpp b/autotests/server/notificationmanagertest.cpp new file mode 100644 index 000000000..ea340f921 --- /dev/null +++ b/autotests/server/notificationmanagertest.cpp @@ -0,0 +1,157 @@ +/* + Copyright (c) 2019 David Faure + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ + +#include + +#include "aggregatedfetchscope.h" +#include "entities.h" +#include "notificationmanager.h" +#include "notificationsubscriber.h" + +#include +#include + +using namespace Akonadi; +using namespace Akonadi::Server; + +class TestableNotificationSubscriber : public NotificationSubscriber +{ +public: + TestableNotificationSubscriber(NotificationManager *manager) + : NotificationSubscriber(manager) + { + mSubscriber = "TestSubscriber"; + } + + using NotificationSubscriber::registerSubscriber; + using NotificationSubscriber::modifySubscription; + using NotificationSubscriber::disconnectSubscriber; + +}; + +class NotificationManagerTest : public QObject +{ + Q_OBJECT + +private Q_SLOTS: + void testAggregatedFetchScope() + { + NotificationManager manager(AkThread::NoThread); + QMetaObject::invokeMethod(&manager, "init", Qt::DirectConnection); + + // first subscriber, A + TestableNotificationSubscriber subscriberA(&manager); + Protocol::CreateSubscriptionCommand createCmd; + createCmd.setSession("session1"); + subscriberA.registerSubscriber(createCmd); + QVERIFY(!manager.tagFetchScope()->fetchIdOnly()); + QVERIFY(!manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(!manager.collectionFetchScope()->fetchStatistics()); + + // set A's subscription settings + Protocol::ModifySubscriptionCommand modifyCmd; + { + Protocol::TagFetchScope tagFetchScope; + tagFetchScope.setFetchIdOnly(true); + modifyCmd.setTagFetchScope(tagFetchScope); + + Protocol::CollectionFetchScope collectionFetchScope; + collectionFetchScope.setFetchIdOnly(true); + collectionFetchScope.setIncludeStatistics(true); + modifyCmd.setCollectionFetchScope(collectionFetchScope); + + Protocol::ItemFetchScope itemFetchScope; + itemFetchScope.setFetch(Protocol::ItemFetchScope::FullPayload); + itemFetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes); + itemFetchScope.setFetch(Protocol::ItemFetchScope::Size); + itemFetchScope.setFetch(Protocol::ItemFetchScope::MTime); + itemFetchScope.setFetch(Protocol::ItemFetchScope::RemoteRevision); + itemFetchScope.setFetch(Protocol::ItemFetchScope::Flags); + itemFetchScope.setFetch(Protocol::ItemFetchScope::RemoteID); + itemFetchScope.setFetch(Protocol::ItemFetchScope::GID); + itemFetchScope.setFetch(Protocol::ItemFetchScope::Tags); + itemFetchScope.setFetch(Protocol::ItemFetchScope::Relations); + itemFetchScope.setFetch(Protocol::ItemFetchScope::VirtReferences); + modifyCmd.setItemFetchScope(itemFetchScope); + } + subscriberA.modifySubscription(modifyCmd); + QVERIFY(manager.tagFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchStatistics()); + QVERIFY(manager.itemFetchScope()->fullPayload()); + QVERIFY(manager.itemFetchScope()->allAttributes()); + + // second subscriber, B + TestableNotificationSubscriber subscriberB(&manager); + subscriberB.registerSubscriber(createCmd); + QVERIFY(!manager.tagFetchScope()->fetchIdOnly()); // A and B don't agree, so: false + QVERIFY(!manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchStatistics()); // at least one - so still true + QVERIFY(manager.itemFetchScope()->fullPayload()); + QVERIFY(manager.itemFetchScope()->allAttributes()); + QVERIFY(manager.itemFetchScope()->fetchSize()); + QVERIFY(manager.itemFetchScope()->fetchMTime()); + QVERIFY(manager.itemFetchScope()->fetchRemoteRevision()); + QVERIFY(manager.itemFetchScope()->fetchFlags()); + QVERIFY(manager.itemFetchScope()->fetchRemoteId()); + QVERIFY(manager.itemFetchScope()->fetchGID()); + QVERIFY(manager.itemFetchScope()->fetchTags()); + QVERIFY(manager.itemFetchScope()->fetchRelations()); + QVERIFY(manager.itemFetchScope()->fetchVirtualReferences()); + + // give it the same settings + subscriberB.modifySubscription(modifyCmd); + QVERIFY(manager.tagFetchScope()->fetchIdOnly()); // now they agree + QVERIFY(manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchStatistics()); // no change for the "at least one" settings + + // revert B's settings, so we can check what happens when disconnecting + modifyCmd.setTagFetchScope(Protocol::TagFetchScope()); + modifyCmd.setCollectionFetchScope(Protocol::CollectionFetchScope()); + subscriberB.modifySubscription(modifyCmd); + QVERIFY(!manager.tagFetchScope()->fetchIdOnly()); + QVERIFY(!manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchStatistics()); + + // B goes away + subscriberB.disconnectSubscriber(); + QVERIFY(manager.tagFetchScope()->fetchIdOnly()); // B cleaned up after itself, so A can have id-only again + QVERIFY(manager.collectionFetchScope()->fetchIdOnly()); + QVERIFY(manager.collectionFetchScope()->fetchStatistics()); + + // A goes away + subscriberA.disconnectSubscriber(); + QVERIFY(!manager.collectionFetchScope()->fetchStatistics()); + QVERIFY(!manager.itemFetchScope()->fullPayload()); + QVERIFY(!manager.itemFetchScope()->allAttributes()); + QVERIFY(!manager.itemFetchScope()->fetchSize()); + QVERIFY(!manager.itemFetchScope()->fetchMTime()); + QVERIFY(!manager.itemFetchScope()->fetchRemoteRevision()); + QVERIFY(!manager.itemFetchScope()->fetchFlags()); + QVERIFY(!manager.itemFetchScope()->fetchRemoteId()); + QVERIFY(!manager.itemFetchScope()->fetchGID()); + QVERIFY(!manager.itemFetchScope()->fetchTags()); + QVERIFY(!manager.itemFetchScope()->fetchRelations()); + QVERIFY(!manager.itemFetchScope()->fetchVirtualReferences()); + } +}; + +AKTEST_MAIN(NotificationManagerTest) + +#include "notificationmanagertest.moc" diff --git a/src/server/akthread.cpp b/src/server/akthread.cpp index 20eb5ffc9..dbd0e6c4f 100644 --- a/src/server/akthread.cpp +++ b/src/server/akthread.cpp @@ -1,98 +1,101 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "akthread.h" #include "storage/datastore.h" #include "akonadiserver_debug.h" #include #include using namespace Akonadi::Server; AkThread::AkThread(const QString &objectName, StartMode startMode, QThread::Priority priority, QObject *parent) - : QObject(parent) + : QObject(parent), m_startMode(startMode) { setObjectName(objectName); - QThread *thread = new QThread(); - thread->setObjectName(objectName + QStringLiteral("-Thread")); - moveToThread(thread); - thread->start(priority); + if (startMode != NoThread) { + QThread *thread = new QThread(); + thread->setObjectName(objectName + QStringLiteral("-Thread")); + moveToThread(thread); + thread->start(priority); + } if (startMode == AutoStart) { startThread(); } } AkThread::AkThread(const QString &objectName, QThread::Priority priority, QObject *parent) : AkThread(objectName, AutoStart, priority, parent) { } AkThread::~AkThread() { } void AkThread::startThread() { + Q_ASSERT(m_startMode != NoThread); #if QT_VERSION >= QT_VERSION_CHECK(5, 10, 0) const bool init = QMetaObject::invokeMethod(this, &AkThread::init, Qt::QueuedConnection); #else const bool init = QMetaObject::invokeMethod(this, "init", Qt::QueuedConnection); #endif Q_ASSERT(init); Q_UNUSED(init); } void AkThread::quitThread() { + if (m_startMode == NoThread) + return; qCDebug(AKONADISERVER_LOG) << "Shutting down" << objectName() << "..."; #if QT_VERSION >= QT_VERSION_CHECK(5, 10, 0) const bool invoke = QMetaObject::invokeMethod(this, &AkThread::quit, Qt::QueuedConnection); #else const bool invoke = QMetaObject::invokeMethod(this, "quit", Qt::QueuedConnection); #endif Q_ASSERT(invoke); Q_UNUSED(invoke); if (!thread()->wait(10 * 1000)) { thread()->terminate(); thread()->wait(); } delete thread(); } void AkThread::init() { Q_ASSERT(thread() == QThread::currentThread()); - Q_ASSERT(thread() != qApp->thread()); } void AkThread::quit() { + Q_ASSERT(m_startMode != NoThread); Q_ASSERT(thread() == QThread::currentThread()); - Q_ASSERT(thread() != qApp->thread()); if (DataStore::hasDataStore()) { DataStore::self()->close(); } thread()->quit(); } - diff --git a/src/server/akthread.h b/src/server/akthread.h index 20131de7a..f37e49d54 100644 --- a/src/server/akthread.h +++ b/src/server/akthread.h @@ -1,60 +1,63 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_SERVER_AKTHREAD_H #define AKONADI_SERVER_AKTHREAD_H #include #include namespace Akonadi { namespace Server { class AkThread : public QObject { Q_OBJECT public: enum StartMode { AutoStart, - ManualStart + ManualStart, + NoThread // for unit-tests }; explicit AkThread(const QString &objectName, QThread::Priority priority = QThread::InheritPriority, QObject *parent = nullptr); explicit AkThread(const QString &objectName, StartMode startMode, QThread::Priority priority = QThread::InheritPriority, QObject *parent = nullptr); ~AkThread() override; protected: void quitThread(); void startThread(); protected Q_SLOTS: virtual void init(); virtual void quit(); +private: + StartMode m_startMode; }; } } #endif // AKONADI_SERVER_AKTHREAD_H diff --git a/src/server/notificationmanager.cpp b/src/server/notificationmanager.cpp index 348ba6743..46d5aabf3 100644 --- a/src/server/notificationmanager.cpp +++ b/src/server/notificationmanager.cpp @@ -1,229 +1,230 @@ /* Copyright (c) 2006 - 2007 Volker Krause Copyright (c) 2010 Michael Jansen This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationmanager.h" #include "notificationsubscriber.h" #include "storage/notificationcollector.h" #include "tracer.h" #include "akonadiserver_debug.h" #include "aggregatedfetchscope.h" #include "storage/collectionstatistics.h" #include "storage/selectquerybuilder.h" #include "handler/fetchhelper.h" #include "handlerhelper.h" #include #include +#include #include #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; -NotificationManager::NotificationManager() - : AkThread(QStringLiteral("NotificationManager")) +NotificationManager::NotificationManager(StartMode startMode) + : AkThread(QStringLiteral("NotificationManager"), startMode) , mTimer(nullptr) , mNotifyThreadPool(nullptr) , mDebugNotifications(0) { } NotificationManager::~NotificationManager() { quitThread(); } void NotificationManager::init() { AkThread::init(); const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite); QSettings settings(serverConfigFile, QSettings::IniFormat); mTimer = new QTimer(this); mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt()); mTimer->setSingleShot(true); connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications); mNotifyThreadPool = new QThreadPool(this); mNotifyThreadPool->setMaxThreadCount(5); mCollectionFetchScope = new AggregatedCollectionFetchScope(); mItemFetchScope = new AggregatedItemFetchScope(); mTagFetchScope = new AggregatedTagFetchScope(); } void NotificationManager::quit() { mQuitting = true; if (mEventLoop) { mEventLoop->quit(); return; } mTimer->stop(); delete mTimer; mNotifyThreadPool->clear(); mNotifyThreadPool->waitForDone(); delete mNotifyThreadPool; qDeleteAll(mSubscribers); delete mCollectionFetchScope; delete mItemFetchScope; delete mTagFetchScope; AkThread::quit(); } void NotificationManager::registerConnection(quintptr socketDescriptor) { Q_ASSERT(thread() == QThread::currentThread()); NotificationSubscriber *subscriber = new NotificationSubscriber(this, socketDescriptor); qCDebug(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")"; connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) { if (enabled) { ++mDebugNotifications; } else { --mDebugNotifications; } Q_ASSERT(mDebugNotifications >= 0); Q_ASSERT(mDebugNotifications <= mSubscribers.count()); }); mSubscribers.push_back(subscriber); } void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber) { Q_ASSERT(QThread::currentThread() == thread()); mSubscribers.removeAll(subscriber); } void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs) { Q_ASSERT(QThread::currentThread() == thread()); for (const auto &msg : msgs) { switch (msg->type()) { case Protocol::Command::CollectionChangeNotification: Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg); continue; case Protocol::Command::ItemChangeNotification: case Protocol::Command::TagChangeNotification: case Protocol::Command::RelationChangeNotification: case Protocol::Command::SubscriptionChangeNotification: case Protocol::Command::DebugChangeNotification: mNotifications.push_back(msg); continue; default: Q_ASSERT_X(false, "slotNotify", "Invalid notification type!"); continue; } } if (!mTimer->isActive()) { mTimer->start(); } } class NotifyRunnable : public QRunnable { public: explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList ¬ifications) : mSubscriber(subscriber) , mNotifications(notifications) { } ~NotifyRunnable() { } void run() override { for (const auto &ntf : qAsConst(mNotifications)) { if (mSubscriber) { mSubscriber->notify(ntf); } else { break; } } } private: QPointer mSubscriber; Protocol::ChangeNotificationList mNotifications; }; void NotificationManager::emitPendingNotifications() { Q_ASSERT(QThread::currentThread() == thread()); if (mNotifications.isEmpty()) { return; } if (mDebugNotifications == 0) { for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { if (subscriber) { mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications)); } } } else { // When debugging notification we have to use a non-threaded approach // so that we can work with return value of notify() for (const auto ¬ification : qAsConst(mNotifications)) { QVector listeners; for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { if (subscriber && subscriber->notify(notification)) { listeners.push_back(subscriber->subscriber()); } } emitDebugNotification(notification, listeners); } } mNotifications.clear(); } void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QVector &listeners) { auto debugNtf = Protocol::DebugChangeNotificationPtr::create(); debugNtf->setNotification(ntf); debugNtf->setListeners(listeners); debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch()); for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { if (subscriber) { mNotifyThreadPool->start(new NotifyRunnable(subscriber, { debugNtf })); } } } diff --git a/src/server/notificationmanager.h b/src/server/notificationmanager.h index 5d3ec250e..08e0a0607 100644 --- a/src/server/notificationmanager.h +++ b/src/server/notificationmanager.h @@ -1,95 +1,95 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_NOTIFICATIONMANAGER_H #define AKONADI_NOTIFICATIONMANAGER_H #include "akthread.h" #include #include #include class NotificationManagerTest; class QThreadPool; class QEventLoop; namespace Akonadi { namespace Server { class NotificationCollector; class NotificationSubscriber; class AggregatedCollectionFetchScope; class AggregatedItemFetchScope; class AggregatedTagFetchScope; class NotificationManager : public AkThread { Q_OBJECT public: - explicit NotificationManager(); + explicit NotificationManager(StartMode startMode = AutoStart); ~NotificationManager() override; void forgetSubscriber(NotificationSubscriber *subscriber); AggregatedCollectionFetchScope *collectionFetchScope() const { return mCollectionFetchScope; } AggregatedItemFetchScope *itemFetchScope() const { return mItemFetchScope; } AggregatedTagFetchScope *tagFetchScope() const { return mTagFetchScope; } public Q_SLOTS: void registerConnection(quintptr socketDescriptor); void emitPendingNotifications(); void slotNotify(const Akonadi::Protocol::ChangeNotificationList &msgs); protected: void init() override; void quit() override; void emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QVector &listeners); private: Protocol::ChangeNotificationList mNotifications; QTimer *mTimer = nullptr; QThreadPool *mNotifyThreadPool = nullptr; QVector> mSubscribers; int mDebugNotifications; AggregatedCollectionFetchScope *mCollectionFetchScope = nullptr; AggregatedItemFetchScope *mItemFetchScope = nullptr; AggregatedTagFetchScope *mTagFetchScope = nullptr; QEventLoop *mEventLoop = nullptr; bool mWaiting = false; bool mQuitting = false; friend class NotificationSubscriber; friend class ::NotificationManagerTest; }; } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/notificationsubscriber.cpp b/src/server/notificationsubscriber.cpp index 5fbe65cf2..8c1f0bcda 100644 --- a/src/server/notificationsubscriber.cpp +++ b/src/server/notificationsubscriber.cpp @@ -1,749 +1,744 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationsubscriber.h" #include "akonadiserver_debug.h" #include "notificationmanager.h" #include "collectionreferencemanager.h" #include "aggregatedfetchscope.h" #include "storage/querybuilder.h" #include "utils.h" #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; QMimeDatabase NotificationSubscriber::sMimeDatabase; #define TRACE_NTF(x) //#define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x NotificationSubscriber::NotificationSubscriber(NotificationManager *manager) : QObject() , mManager(manager) , mSocket(nullptr) , mAllMonitored(false) , mExclusive(false) , mNotificationDebugging(false) { if (mManager) { mManager->itemFetchScope()->addSubscriber(); mManager->collectionFetchScope()->addSubscriber(); mManager->tagFetchScope()->addSubscriber(); } } NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor) : NotificationSubscriber(manager) { mSocket = new QLocalSocket(this); connect(mSocket, &QLocalSocket::readyRead, this, &NotificationSubscriber::handleIncomingData); connect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected); mSocket->setSocketDescriptor(socketDescriptor); const SchemaVersion schema = SchemaVersion::retrieveAll().first(); auto hello = Protocol::HelloResponsePtr::create(); hello->setServerName(QStringLiteral("Akonadi")); hello->setMessage(QStringLiteral("Not really IMAP server")); hello->setProtocolVersion(Protocol::version()); hello->setGeneration(schema.generation()); writeCommand(0, hello); } NotificationSubscriber::~NotificationSubscriber() { QMutexLocker locker(&mLock); if (mNotificationDebugging) { Q_EMIT notificationDebuggingChanged(false); } } void NotificationSubscriber::handleIncomingData() { while (mSocket->bytesAvailable() > (int) sizeof(qint64)) { Protocol::DataStream stream(mSocket); // Ignored atm qint64 tag = -1; stream >> tag; Protocol::CommandPtr cmd; try { cmd = Protocol::deserialize(mSocket); } catch (const Akonadi::ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "ProtocolException:" << e.what(); disconnectSubscriber(); return; } catch (const std::exception &e) { qCWarning(AKONADISERVER_LOG) << "Unknown exception:" << e.what(); disconnectSubscriber(); return; } if (cmd->type() == Protocol::Command::Invalid) { qCWarning(AKONADISERVER_LOG) << "Received an invalid command: resetting connection"; disconnectSubscriber(); return; } switch (cmd->type()) { case Protocol::Command::CreateSubscription: registerSubscriber(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create()); break; case Protocol::Command::ModifySubscription: if (mSubscriber.isEmpty()) { qCWarning(AKONADISERVER_LOG) << "Received ModifySubscription command before RegisterSubscriber"; disconnectSubscriber(); return; } modifySubscription(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create()); break; case Protocol::Command::Logout: disconnectSubscriber(); break; default: qCWarning(AKONADISERVER_LOG) << "Invalid command" << cmd->type() << "received by NotificationSubscriber" << mSubscriber; disconnectSubscriber(); break; } } } void NotificationSubscriber::socketDisconnected() { qCDebug(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected"; disconnectSubscriber(); } void NotificationSubscriber::disconnectSubscriber() { QMutexLocker locker(&mLock); auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove); mManager->slotNotify({ changeNtf }); - disconnect(mSocket, &QLocalSocket::disconnected, - this, &NotificationSubscriber::socketDisconnected); - mSocket->close(); + if (mSocket) { + disconnect(mSocket, &QLocalSocket::disconnected, + this, &NotificationSubscriber::socketDisconnected); + mSocket->close(); + } - // Unregister ourselves from the aggergated collection fetch scope + // Unregister ourselves from the aggregated collection fetch scope auto cfs = mManager->collectionFetchScope(); - if (mCollectionFetchScope.fetchIdOnly()) { - cfs->setFetchIdOnly(false); - } - if (mCollectionFetchScope.includeStatistics()) { - cfs->setFetchStatistics(false); - } - const auto attrs = mCollectionFetchScope.attributes(); - for (const auto &attr : attrs) { - cfs->removeAttribute(attr); - } + cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope()); cfs->removeSubscriber(); auto tfs = mManager->tagFetchScope(); + tfs->apply(mTagFetchScope, Protocol::TagFetchScope()); tfs->removeSubscriber(); auto ifs = mManager->itemFetchScope(); + ifs->apply(mItemFetchScope, Protocol::ItemFetchScope()); ifs->removeSubscriber(); mManager->forgetSubscriber(this); deleteLater(); } void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command) { QMutexLocker locker(&mLock); qCDebug(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName(); mSubscriber = command.subscriberName(); mSession = command.session(); auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add); mManager->slotNotify({ changeNtf }); } void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command) { QMutexLocker locker(&mLock); const auto modifiedParts = command.modifiedParts(); #define START_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add)) #define STOP_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove)) #define APPEND(set, newItems) \ Q_FOREACH (const auto &entity, newItems) { \ set.insert(entity); \ } #define REMOVE(set, items) \ Q_FOREACH (const auto &entity, items) { \ set.remove(entity); \ } if (START_MONITORING(Types)) { APPEND(mMonitoredTypes, command.startMonitoringTypes()) } if (STOP_MONITORING(Types)) { REMOVE(mMonitoredTypes, command.stopMonitoringTypes()) } if (START_MONITORING(Collections)) { APPEND(mMonitoredCollections, command.startMonitoringCollections()) } if (STOP_MONITORING(Collections)) { REMOVE(mMonitoredCollections, command.stopMonitoringCollections()) } if (START_MONITORING(Items)) { APPEND(mMonitoredItems, command.startMonitoringItems()) } if (STOP_MONITORING(Items)) { REMOVE(mMonitoredItems, command.stopMonitoringItems()) } if (START_MONITORING(Tags)) { APPEND(mMonitoredTags, command.startMonitoringTags()) } if (STOP_MONITORING(Tags)) { REMOVE(mMonitoredTags, command.stopMonitoringTags()) } if (START_MONITORING(Resources)) { APPEND(mMonitoredResources, command.startMonitoringResources()) } if (STOP_MONITORING(Resources)) { REMOVE(mMonitoredResources, command.stopMonitoringResources()) } if (START_MONITORING(MimeTypes)) { APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes()) } if (STOP_MONITORING(MimeTypes)) { REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes()) } if (START_MONITORING(Sessions)) { APPEND(mIgnoredSessions, command.startIgnoringSessions()) } if (STOP_MONITORING(Sessions)) { REMOVE(mIgnoredSessions, command.stopIgnoringSessions()) } if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) { mAllMonitored = command.allMonitored(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) { mExclusive = command.isExclusive(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) { const auto newScope = command.itemFetchScope(); mManager->itemFetchScope()->apply(mItemFetchScope, newScope); mItemFetchScope = newScope; } if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { const auto newScope = command.collectionFetchScope(); mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope); mCollectionFetchScope = newScope; } if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) { const auto newScope = command.tagFetchScope(); mManager->tagFetchScope()->apply(mTagFetchScope, newScope); mTagFetchScope = newScope; if (!newScope.fetchIdOnly()) Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly()); } if (mManager) { if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) { // Did the caller just subscribed to subscription changes? if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) { // If yes, then send them list of all existing subscribers Q_FOREACH (const NotificationSubscriber *subscriber, mManager->mSubscribers) { // Send them back to caller if (subscriber) { QMetaObject::invokeMethod(this, "notify", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification())); } } } if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (!mNotificationDebugging) { mNotificationDebugging = true; Q_EMIT notificationDebuggingChanged(true); } } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (mNotificationDebugging) { mNotificationDebugging = false; Q_EMIT notificationDebuggingChanged(false); } } } // Emit subscription change notification auto changeNtf = toChangeNotification(); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify); mManager->slotNotify({ changeNtf }); } #undef START_MONITORING #undef STOP_MONITORING #undef APPEND #undef REMOVE } Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const { // Assumes mLock being locked by caller auto ntf = Protocol::SubscriptionChangeNotificationPtr::create(); ntf->setSessionId(mSession); ntf->setSubscriber(mSubscriber); ntf->setOperation(Protocol::SubscriptionChangeNotification::Add); ntf->setCollections(mMonitoredCollections); ntf->setItems(mMonitoredItems); ntf->setTags(mMonitoredTags); ntf->setTypes(mMonitoredTypes); ntf->setMimeTypes(mMonitoredMimeTypes); ntf->setResources(mMonitoredResources); ntf->setIgnoredSessions(mIgnoredSessions); ntf->setAllMonitored(mAllMonitored); ntf->setExclusive(mExclusive); ntf->setItemFetchScope(mItemFetchScope); ntf->setTagFetchScope(mTagFetchScope); ntf->setCollectionFetchScope(mCollectionFetchScope); return ntf; } bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const { // Assumes mLock being locked by caller if (id < 0) { return false; } else if (mMonitoredCollections.contains(id)) { return true; } else if (mMonitoredCollections.contains(0)) { return true; } return false; } bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const { // Assumes mLock being locked by caller const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType); if (mMonitoredMimeTypes.contains(mimeType)) { return true; } const QStringList lst = mt.aliases(); for (const QString &alias : lst) { if (mMonitoredMimeTypes.contains(alias)) { return true; } } return false; } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::ItemChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::CollectionChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.items().isEmpty()) { return false; } if (CollectionReferenceManager::instance()->isReferenced(msg.parentCollection())) { //We always want notifications that affect the parent resource (like an item added to a referenced collection) const bool notificationForParentResource = (mSession == msg.resource()); const bool accepts = mExclusive || isCollectionMonitored(msg.parentCollection()) || isMoveDestinationResourceMonitored(msg) || notificationForParentResource; TRACE_NTF("ACCEPTS ITEM: parent col referenced" << "exclusive:" << mExclusive << "," << "parent monitored:" << isCollectionMonitored(msg.parentCollection()) << "," << "destination monitored:" << isMoveDestinationResourceMonitored(msg) << "," << "ntf for parent resource:" << notificationForParentResource << ":" << "ACCEPTED:" << accepts); return accepts; } if (mAllMonitored) { TRACE_NTF("ACCEPTS ITEM: all monitored"); return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) { TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored"); return false; } // we have a resource or mimetype filter if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) { if (mMonitoredResources.contains(msg.resource())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored"); return true; } if (isMoveDestinationResourceMonitored(msg)) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored"); return true; } Q_FOREACH (const auto &item, msg.items()) { if (isMimeTypeMonitored(item.mimeType())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored"); return true; } } TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored"); return false; } // we explicitly monitor that item or the collections it's in Q_FOREACH (const auto &item, msg.items()) { if (mMonitoredItems.contains(item.id())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored"); return true; } } if (isCollectionMonitored(msg.parentCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored"); return true; } if (isCollectionMonitored(msg.parentDestCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored"); return true; } TRACE_NTF("ACCEPTS ITEM: REJECTED"); return false; } bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller const auto &collection = msg.collection(); if (collection.id() < 0) { return false; } // HACK: We need to dispatch notifications about disabled collections to SOME // agents (that's what we have the exclusive subscription for) - but because // querying each Collection from database would be expensive, we use the // metadata hack to transfer this information from NotificationCollector if (msg.metadata().contains("DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe) && !msg.changedParts().contains("ENABLED")) { // Exclusive subscriber always gets it if (mExclusive) { return true; } //Deliver the notification if referenced from this session if (CollectionReferenceManager::instance()->isReferenced(collection.id(), mSession)) { return true; } //Exclusive subscribers still want the notification if (mExclusive && CollectionReferenceManager::instance()->isReferenced(collection.id())) { return true; } //The session belonging to this monitor referenced or dereferenced the collection. We always want this notification. //The referencemanager no longer holds a reference, so we have to check this way. if (msg.changedParts().contains(AKONADI_PARAM_REFERENCED) && mSession == msg.sessionId()) { return true; } // If the collection is not referenced, monitored or the subscriber is not // exclusive (i.e. if we got here), then the subscriber does not care about // this one, so drop it return false; } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) { return false; } // we have a resource filter if (!mMonitoredResources.isEmpty()) { const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg); // a bit hacky, but match the behaviour from the item case, // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course if (mMonitoredMimeTypes.isEmpty() || resourceMatches) { return resourceMatches; } // else continue } // we explicitly monitor that collection, or all of them if (isCollectionMonitored(collection.id())) { return true; } return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection()); } bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.tag().id() < 0) { return false; } // Special handling for Tag removal notifications: When a Tag is removed, // a notification is emitted for each Resource that owns the tag (i.e. // each resource that owns a Tag RID - Tag RIDs are resource-specific). // Additionally then we send one more notification without any RID that is // destined for regular applications (which don't know anything about Tag RIDs) if (msg.operation() == Protocol::TagChangeNotification::Remove) { // HACK: Since have no way to determine which resource this NotificationSource // belongs to, we are abusing the fact that each resource ignores it's own // main session, which is called the same name as the resource. // If there are any ignored sessions, but this notification does not have // a specific resource set, then we ignore it, as this notification is // for clients, not resources (does not have tag RID) if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) { return false; } // If this source ignores a session (i.e. we assume it is a resource), // but this notification is for another resource, then we ignore it if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) { return false; } // Now we got here, which means that this notification either has empty // resource, i.e. it is destined for a client applications, or it's // destined for resource that we *think* (see the hack above) this // NotificationSource belongs too. Which means we approve this notification, // but it can still be discarded in the generic Tag notification filter // below } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) { return false; } if (mMonitoredTags.isEmpty()) { return true; } if (mMonitoredTags.contains(msg.tag().id())) { return true; } return true; } bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges)) { return false; } return true; } bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); // Unlike other types, subscription notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges); } bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const { // Assumes mLock being locked by caller // We should never end up sending debug notification about a debug notification. // This could get very messy very quickly... Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification); if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) { return false; } // Unlike other types, debug change notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications); } bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const { // Assumes mLock being locked // Uninitialized subscriber gets nothing if (mSubscriber.isEmpty()) { return false; } // session is ignored // TODO: Should this affect SubscriptionChangeNotification and DebugChangeNotification? if (mIgnoredSessions.contains(msg.sessionId())) { return false; } switch (msg.type()) { case Protocol::Command::ItemChangeNotification: return acceptsItemNotification(static_cast(msg)); case Protocol::Command::CollectionChangeNotification: return acceptsCollectionNotification(static_cast(msg)); case Protocol::Command::TagChangeNotification: return acceptsTagNotification(static_cast(msg)); case Protocol::Command::RelationChangeNotification: return acceptsRelationNotification(static_cast(msg)); case Protocol::Command::SubscriptionChangeNotification: return acceptsSubscriptionNotification(static_cast(msg)); case Protocol::Command::DebugChangeNotification: return acceptsDebugChangeNotification(static_cast(msg)); default: qCDebug(AKONADISERVER_LOG) << "Received invalid change notification!"; return false; } } Protocol::CollectionChangeNotificationPtr NotificationSubscriber::customizeCollection(const Protocol::CollectionChangeNotificationPtr &ntf) { const bool isReferencedFromSession = CollectionReferenceManager::instance()->isReferenced(ntf->collection().id(), mSession); if (isReferencedFromSession != ntf->collection().referenced()) { auto copy = Protocol::CollectionChangeNotificationPtr::create(*ntf); auto copyCol = ntf->collection(); copyCol.setReferenced(isReferencedFromSession); copy->setCollection(std::move(copyCol)); return copy; } return ntf; } bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr ¬ification) { // Guard against this object being deleted while we are waiting for the lock QPointer ptr(this); QMutexLocker locker(&mLock); if (!ptr) { return false; } if (acceptsNotification(*notification)) { auto ntf = notification; if (ntf->type() == Protocol::Command::CollectionChangeNotification) { ntf = customizeCollection(notification.staticCast()); } QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, ntf)); return true; } return false; } void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr ¬ification) { // tag chosen by fair dice roll writeCommand(4, notification); } void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd) { Q_ASSERT(QThread::currentThread() == thread()); Protocol::DataStream stream(mSocket); stream << tag; try { Protocol::serialize(mSocket, cmd); if (!mSocket->waitForBytesWritten()) { if (mSocket->state() == QLocalSocket::ConnectedState) { qCWarning(AKONADISERVER_LOG) << "Notification socket write timeout!"; } else { // client has disconnected, just discard the message } } } catch (const ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "Notification protocol exception:" << e.what(); } }