diff --git a/src/core/monitor.cpp b/src/core/monitor.cpp index 24fc2e854..11eefe0f4 100644 --- a/src/core/monitor.cpp +++ b/src/core/monitor.cpp @@ -1,375 +1,384 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "monitor.h" #include "monitor_p.h" #include "changemediator_p.h" #include "collectionfetchscope.h" #include "itemfetchjob.h" #include "session.h" -#include - - -#include +#include using namespace Akonadi; Monitor::Monitor(QObject *parent) : QObject(parent) , d_ptr(new MonitorPrivate(nullptr, this)) { d_ptr->init(); d_ptr->connectToNotificationManager(); ChangeMediator::registerMonitor(this); } //@cond PRIVATE Monitor::Monitor(MonitorPrivate *d, QObject *parent) : QObject(parent) , d_ptr(d) { d_ptr->init(); d_ptr->connectToNotificationManager(); ChangeMediator::registerMonitor(this); } //@endcond Monitor::~Monitor() { ChangeMediator::unregisterMonitor(this); delete d_ptr; } void Monitor::setCollectionMonitored(const Collection &collection, bool monitored) { Q_D(Monitor); if (!d->collections.contains(collection) && monitored) { d->collections << collection; d->pendingModification.startMonitoringCollection(collection.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->collections.removeAll(collection)) { d->pendingModification.stopMonitoringCollection(collection.id()); d->scheduleSubscriptionUpdate(); } } emit collectionMonitored(collection, monitored); } void Monitor::setItemMonitored(const Item &item, bool monitored) { Q_D(Monitor); if (!d->items.contains(item.id()) && monitored) { d->items.insert(item.id()); d->pendingModification.startMonitoringItem(item.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->items.remove(item.id())) { d->pendingModification.stopMonitoringItem(item.id()); d->scheduleSubscriptionUpdate(); } } emit itemMonitored(item, monitored); } void Monitor::setResourceMonitored(const QByteArray &resource, bool monitored) { Q_D(Monitor); if (!d->resources.contains(resource) && monitored) { d->resources.insert(resource); d->pendingModification.startMonitoringResource(resource); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->resources.remove(resource)) { d->pendingModification.stopMonitoringResource(resource); d->scheduleSubscriptionUpdate(); } } emit resourceMonitored(resource, monitored); } void Monitor::setMimeTypeMonitored(const QString &mimetype, bool monitored) { Q_D(Monitor); if (!d->mimetypes.contains(mimetype) && monitored) { d->mimetypes.insert(mimetype); d->pendingModification.startMonitoringMimeType(mimetype); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->mimetypes.remove(mimetype)) { d->pendingModification.stopMonitoringMimeType(mimetype); d->scheduleSubscriptionUpdate(); } } emit mimeTypeMonitored(mimetype, monitored); } void Monitor::setTagMonitored(const Akonadi::Tag &tag, bool monitored) { Q_D(Monitor); if (!d->tags.contains(tag.id()) && monitored) { d->tags.insert(tag.id()); d->pendingModification.startMonitoringTag(tag.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->tags.remove(tag.id())) { d->pendingModification.stopMonitoringTag(tag.id()); d->scheduleSubscriptionUpdate(); } } emit tagMonitored(tag, monitored); } void Monitor::setTypeMonitored(Monitor::Type type, bool monitored) { Q_D(Monitor); if (!d->types.contains(type) && monitored) { d->types.insert(type); d->pendingModification.startMonitoringType(MonitorPrivate::monitorTypeToProtocol(type)); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->types.remove(type)) { d->pendingModification.stopMonitoringType(MonitorPrivate::monitorTypeToProtocol(type)); d->scheduleSubscriptionUpdate(); } } emit typeMonitored(type, monitored); } void Akonadi::Monitor::setAllMonitored(bool monitored) { Q_D(Monitor); if (d->monitorAll == monitored) { return; } d->monitorAll = monitored; d->pendingModification.setAllMonitored(monitored); d->scheduleSubscriptionUpdate(); emit allMonitored(monitored); } void Monitor::setExclusive(bool exclusive) { Q_D(Monitor); d->exclusive = exclusive; d->pendingModification.setIsExclusive(exclusive); d->scheduleSubscriptionUpdate(); } bool Monitor::exclusive() const { Q_D(const Monitor); return d->exclusive; } void Monitor::ignoreSession(Session *session) { Q_D(Monitor); if (!d->sessions.contains(session->sessionId())) { d->sessions << session->sessionId(); connect(session, SIGNAL(destroyed(QObject*)), this, SLOT(slotSessionDestroyed(QObject*))); d->pendingModification.startIgnoringSession(session->sessionId()); d->scheduleSubscriptionUpdate(); } } void Monitor::fetchCollection(bool enable) { Q_D(Monitor); d->fetchCollection = enable; } void Monitor::fetchCollectionStatistics(bool enable) { Q_D(Monitor); d->fetchCollectionStatistics = enable; } void Monitor::setItemFetchScope(const ItemFetchScope &fetchScope) { Q_D(Monitor); d->mItemFetchScope = fetchScope; d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::ItemFetchScope; d->scheduleSubscriptionUpdate(); } ItemFetchScope &Monitor::itemFetchScope() { Q_D(Monitor); d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::ItemFetchScope; d->scheduleSubscriptionUpdate(); return d->mItemFetchScope; } void Monitor::fetchChangedOnly(bool enable) { Q_D(Monitor); d->mFetchChangedOnly = enable; } void Monitor::setCollectionFetchScope(const CollectionFetchScope &fetchScope) { Q_D(Monitor); d->mCollectionFetchScope = fetchScope; d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::CollectionFetchScope; d->scheduleSubscriptionUpdate(); } CollectionFetchScope &Monitor::collectionFetchScope() { Q_D(Monitor); d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::CollectionFetchScope; d->scheduleSubscriptionUpdate(); return d->mCollectionFetchScope; } void Monitor::setTagFetchScope(const TagFetchScope &fetchScope) { Q_D(Monitor); d->mTagFetchScope = fetchScope; d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::TagFetchScope; d->scheduleSubscriptionUpdate(); } TagFetchScope &Monitor::tagFetchScope() { Q_D(Monitor); d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::TagFetchScope; d->scheduleSubscriptionUpdate(); return d->mTagFetchScope; } Akonadi::Collection::List Monitor::collectionsMonitored() const { Q_D(const Monitor); return d->collections; } QVector Monitor::itemsMonitoredEx() const { Q_D(const Monitor); QVector result; result.reserve(d->items.size()); std::copy(d->items.begin(), d->items.end(), std::back_inserter(result)); return result; } int Monitor::numItemsMonitored() const { Q_D(const Monitor); return d->items.size(); } QVector Monitor::tagsMonitored() const { Q_D(const Monitor); QVector result; result.reserve(d->tags.size()); std::copy(d->tags.begin(), d->tags.end(), std::back_inserter(result)); return result; } QVector Monitor::typesMonitored() const { Q_D(const Monitor); QVector result; result.reserve(d->types.size()); std::copy(d->types.begin(), d->types.end(), std::back_inserter(result)); return result; } QStringList Monitor::mimeTypesMonitored() const { Q_D(const Monitor); return d->mimetypes.toList(); } int Monitor::numMimeTypesMonitored() const { Q_D(const Monitor); return d->mimetypes.count(); } QList Monitor::resourcesMonitored() const { Q_D(const Monitor); return d->resources.toList(); } int Monitor::numResourcesMonitored() const { Q_D(const Monitor); return d->resources.count(); } bool Monitor::isAllMonitored() const { Q_D(const Monitor); return d->monitorAll; } void Monitor::setSession(Akonadi::Session *session) { Q_D(Monitor); if (session == d->session) { return; } if (!session) { d->session = Session::defaultSession(); } else { d->session = session; } d->itemCache->setSession(d->session); d->collectionCache->setSession(d->session); d->tagCache->setSession(d->session); // Reconnect with a new session d->connectToNotificationManager(); } Session *Monitor::session() const { Q_D(const Monitor); return d->session; } void Monitor::setCollectionMoveTranslationEnabled(bool enabled) { Q_D(Monitor); d->collectionMoveTranslationEnabled = enabled; } +void Monitor::connectNotify(const QMetaMethod &signal) +{ + Q_D(Monitor); + d->updateListeners(signal, MonitorPrivate::AddListener); +} + +void Monitor::disconnectNotify(const QMetaMethod &signal) +{ + Q_D(Monitor); + d->updateListeners(signal, MonitorPrivate::RemoveListener); +} + #include "moc_monitor.cpp" diff --git a/src/core/monitor.h b/src/core/monitor.h index 3e2e3956a..6b1370ca2 100644 --- a/src/core/monitor.h +++ b/src/core/monitor.h @@ -1,837 +1,840 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_MONITOR_H #define AKONADI_MONITOR_H #include "akonadicore_export.h" #include "tag.h" #include "collection.h" #include "item.h" #include "relation.h" #include namespace Akonadi { class CollectionFetchScope; class CollectionStatistics; class Item; class ItemFetchScope; class MonitorPrivate; class Session; class TagFetchScope; class NotificationSubscriber; class ChangeNotification; namespace Protocol { class Command; } /** * @short Monitors an item or collection for changes. * * The Monitor emits signals if some of these objects are changed or * removed or new ones are added to the Akonadi storage. * * There are various ways to filter these notifications. There are three types of filter * evaluation: * - (-) removal-only filter, ie. if the filter matches the notification is dropped, * if not filter evaluation continues with the next one * - (+) pass-exit filter, ie. if the filter matches the notification is delivered, * if not evaluation is continued * - (f) final filter, ie. evaluation ends here if the corresponding filter criteria is set, * the notification is delievered depending on the result, evaluation is only continued * if no filter criteria is defined * * The following filter are available, listed in evaluation order: * (1) ignored sessions (-) * (2) monitor everything (+) * (3a) resource and mimetype filters (f) (items only) * (3b) resource filters (f) (collections only) * (4) item is monitored (+) * (5) collection is monitored (+) * * Optionally, the changed objects can be fetched automatically from the server. * To enable this, see itemFetchScope() and collectionFetchScope(). * * Note that as a consequence of rule 3a, it is not possible to monitor (more than zero resources * OR more than zero mimetypes) AND more than zero collections. * * @todo Distinguish between monitoring collection properties and collection content. * @todo Special case for collection content counts changed * * @author Volker Krause */ class AKONADICORE_EXPORT Monitor : public QObject { Q_OBJECT public: enum Type { /** * @internal This must be kept in sync with Akonadi::NotificationMessageV2::Type */ Collections = 1, Items, Tags, Relations, /** * Listen to subscription changes of other Monitors connected to Akonadi. * This is only for debugging purposes and should not be used in real * applications. * @since 5.4 */ Subscribers, /** * Listens to all notifications being emitted by the server and provides * additional information about them. This is only for debugging purposes * and should not be used in real applications. * * @note Enabling monitoring this type has performance impact on the * Akonadi Server. * * @since 5.4 */ Notifications }; /** * Creates a new monitor. * * @param parent The parent object. */ explicit Monitor(QObject *parent = nullptr); /** * Destroys the monitor. */ ~Monitor() override; /** * Sets whether the specified collection shall be monitored for changes. If * monitoring is turned on for the collection, all notifications for items * in that collection will be emitted, and its child collections will also * be monitored. Note that move notifications will be emitted if either one * of the collections involved is being monitored. * * Note that if a session is being ignored, this takes precedence over * setCollectionMonitored() on that session. * * @param collection The collection to monitor. * If this collection is Collection::root(), all collections * in the Akonadi storage will be monitored. * @param monitored Whether to monitor the collection. */ void setCollectionMonitored(const Collection &collection, bool monitored = true); /** * Sets whether the specified item shall be monitored for changes. * * Note that if a session is being ignored, this takes precedence over * setItemMonitored() on that session. * * @param item The item to monitor. * @param monitored Whether to monitor the item. */ void setItemMonitored(const Item &item, bool monitored = true); /** * Sets whether the specified resource shall be monitored for changes. If * monitoring is turned on for the resource, all notifications for * collections and items in that resource will be emitted. * * Note that if a session is being ignored, this takes precedence over * setResourceMonitored() on that session. * * @param resource The resource identifier. * @param monitored Whether to monitor the resource. */ void setResourceMonitored(const QByteArray &resource, bool monitored = true); /** * Sets whether items of the specified mime type shall be monitored for changes. * If monitoring is turned on for the mime type, all notifications for items * matching that mime type will be emitted, but notifications for collections * matching that mime type will only be emitted if this is otherwise specified, * for example by setCollectionMonitored(). * * Note that if a session is being ignored, this takes precedence over * setMimeTypeMonitored() on that session. * * @param mimetype The mime type to monitor. * @param monitored Whether to monitor the mime type. */ void setMimeTypeMonitored(const QString &mimetype, bool monitored = true); /** * Sets whether the specified tag shall be monitored for changes. * * Same rules as for item monitoring apply. * * @param tag Tag to monitor. * @param monitored Whether to monitor the tag. * @since 4.13 */ void setTagMonitored(const Tag &tag, bool monitored = true); /** * Sets whether given type (Collection, Item, Tag should be monitored). * * By default all types are monitored, but once you change one, you have * to explicitly enable all other types you want to monitor. * * @param type Type to monitor. * @param monitored Whether to monitor the type * @since 4.13 */ void setTypeMonitored(Type type, bool monitored = true); /** * Sets whether all items shall be monitored. * @param monitored sets all items as monitored if set as @c true * Note that if a session is being ignored, this takes precedence over * setAllMonitored() on that session. */ void setAllMonitored(bool monitored = true); void setExclusive(bool exclusive = true); Q_REQUIRED_RESULT bool exclusive() const; /** * Ignores all change notifications caused by the given session. This * overrides all other settings on this session. * * @param session The session you want to ignore. */ void ignoreSession(Session *session); /** * Enables automatic fetching of changed collections from the Akonadi storage. * * @param enable @c true enables automatic fetching, @c false disables automatic fetching. */ void fetchCollection(bool enable); /** * Enables automatic fetching of changed collection statistics information from * the Akonadi storage. * * @param enable @c true to enables automatic fetching, @c false disables automatic fetching. */ void fetchCollectionStatistics(bool enable); /** * Sets the item fetch scope. * * Controls how much of an item's data is fetched from the server, e.g. * whether to fetch the full item payload or only meta data. * * @param fetchScope The new scope for item fetch operations. * * @see itemFetchScope() */ void setItemFetchScope(const ItemFetchScope &fetchScope); /** * Instructs the monitor to fetch only those parts that were changed and * were requested in the fetch scope. * * This is taken in account only for item modifications. * Example usage: * @code * monitor->itemFetchScope().fetchFullPayload( true ); * monitor->fetchChangedOnly(true); * @endcode * * In the example if an item was changed, but its payload was not, the full * payload will not be retrieved. * If the item's payload was changed, the monitor retrieves the changed * payload as well. * * The default is to fetch everything requested. * * @since 4.8 * * @param enable @c true to enable the feature, @c false means everything * that was requested will be fetched. */ void fetchChangedOnly(bool enable); /** * Returns the item fetch scope. * * Since this returns a reference it can be used to conveniently modify the * current scope in-place, i.e. by calling a method on the returned reference * without storing it in a local variable. See the ItemFetchScope documentation * for an example. * * @return a reference to the current item fetch scope * * @see setItemFetchScope() for replacing the current item fetch scope */ ItemFetchScope &itemFetchScope(); /** * Sets the collection fetch scope. * * Controls which collections are monitored and how much of a collection's data * is fetched from the server. * * @param fetchScope The new scope for collection fetch operations. * * @see collectionFetchScope() * @since 4.4 */ void setCollectionFetchScope(const CollectionFetchScope &fetchScope); /** * Returns the collection fetch scope. * * Since this returns a reference it can be used to conveniently modify the * current scope in-place, i.e. by calling a method on the returned reference * without storing it in a local variable. See the CollectionFetchScope documentation * for an example. * * @return a reference to the current collection fetch scope * * @see setCollectionFetchScope() for replacing the current collection fetch scope * @since 4.4 */ CollectionFetchScope &collectionFetchScope(); /** * Sets the tag fetch scope. * * Controls how much of an tag's data is fetched from the server. * * @param fetchScope The new scope for tag fetch operations. * * @see tagFetchScope() */ void setTagFetchScope(const TagFetchScope &fetchScope); /** * Returns the tag fetch scope. * * Since this returns a reference it can be used to conveniently modify the * current scope in-place, i.e. by calling a method on the returned reference * without storing it in a local variable. * * @return a reference to the current tag fetch scope * * @see setTagFetchScope() for replacing the current tag fetch scope */ TagFetchScope &tagFetchScope(); /** * Returns the list of collections being monitored. * * @since 4.3 */ Q_REQUIRED_RESULT Collection::List collectionsMonitored() const; /** * Returns the set of items being monitored. * * Faster version (at least on 32-bit systems) of itemsMonitored(). * * @since 4.6 */ Q_REQUIRED_RESULT QVector itemsMonitoredEx() const; /** * Returns the number of items being monitored. * Optimization. * @since 4.14.3 */ Q_REQUIRED_RESULT int numItemsMonitored() const; /** * Returns the set of mimetypes being monitored. * * @since 4.3 */ Q_REQUIRED_RESULT QStringList mimeTypesMonitored() const; /** * Returns the number of mimetypes being monitored. * Optimization. * @since 4.14.3 */ Q_REQUIRED_RESULT int numMimeTypesMonitored() const; /** * Returns the set of tags being monitored. * * @since 4.13 */ Q_REQUIRED_RESULT QVector tagsMonitored() const; /** * Returns the set of types being monitored. * * @since 4.13 */ Q_REQUIRED_RESULT QVector typesMonitored() const; /** * Returns the set of identifiers for resources being monitored. * * @since 4.3 */ Q_REQUIRED_RESULT QList resourcesMonitored() const; /** * Returns the number of resources being monitored. * Optimization. * @since 4.14.3 */ Q_REQUIRED_RESULT int numResourcesMonitored() const; /** * Returns true if everything is being monitored. * * @since 4.3 */ Q_REQUIRED_RESULT bool isAllMonitored() const; /** * Sets the session used by the Monitor to communicate with the %Akonadi server. * If not set, the Akonadi::Session::defaultSession is used. * @param session the session to be set * @since 4.4 */ void setSession(Akonadi::Session *session); /** * Returns the Session used by the monitor to communicate with Akonadi. * * @since 4.4 */ Q_REQUIRED_RESULT Session *session() const; /** * Allows to enable/disable collection move translation. If enabled (the default), move * notifications are automatically translated into add/remove notifications if the source/destination * is outside of the monitored collection hierarchy. * @param enabled enables collection move translation if set as @c true * @since 4.9 */ void setCollectionMoveTranslationEnabled(bool enabled); Q_SIGNALS: /** * This signal is emitted if a monitored item has changed, e.g. item parts have been modified. * * @param item The changed item. * @param partIdentifiers The identifiers of the item parts that has been changed. */ void itemChanged(const Akonadi::Item &item, const QSet &partIdentifiers); /** * This signal is emitted if flags of monitored items have changed. * * @param items Items that were changed * @param addedFlags Flags that have been added to each item in @p items * @param removedFlags Flags that have been removed from each item in @p items * @since 4.11 */ void itemsFlagsChanged(const Akonadi::Item::List &items, const QSet &addedFlags, const QSet &removedFlags); /** * This signal is emitted if tags of monitored items have changed. * * @param items Items that were changed * @param addedTags Tags that have been added to each item in @p items. * @param removedTags Tags that have been removed from each item in @p items * @since 4.13 */ void itemsTagsChanged(const Akonadi::Item::List &items, const QSet &addedTags, const QSet &removedTags); /** * This signal is emitted if relations of monitored items have changed. * * @param items Items that were changed * @param addedRelations Relations that have been added to each item in @p items. * @param removedRelations Relations that have been removed from each item in @p items * @since 4.15 */ void itemsRelationsChanged(const Akonadi::Item::List &items, const Akonadi::Relation::List &addedRelations, const Akonadi::Relation::List &removedRelations); /** * This signal is emitted if a monitored item has been moved between two collections * * @param item The moved item. * @param collectionSource The collection the item has been moved from. * @param collectionDestination The collection the item has been moved to. */ void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &collectionSource, const Akonadi::Collection &collectionDestination); /** * This is signal is emitted when multiple monitored items have been moved between two collections * * @param items Moved items * @param collectionSource The collection the items have been moved from. * @param collectionDestination The collection the items have been moved to. * * @since 4.11 */ void itemsMoved(const Akonadi::Item::List &items, const Akonadi::Collection &collectionSource, const Akonadi::Collection &collectionDestination); /** * This signal is emitted if an item has been added to a monitored collection in the Akonadi storage. * * @param item The new item. * @param collection The collection the item has been added to. */ void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection); /** * This signal is emitted if * - a monitored item has been removed from the Akonadi storage * or * - a item has been removed from a monitored collection. * * @param item The removed item. */ void itemRemoved(const Akonadi::Item &item); /** * This signal is emitted if monitored items have been removed from Akonadi * storage of items have been removed from a monitored collection. * * @param items Removed items * * @since 4.11 */ void itemsRemoved(const Akonadi::Item::List &items); /** * This signal is emitted if a reference to an item is added to a virtual collection. * @param item The linked item. * @param collection The collection the item is linked to. * * @since 4.2 */ void itemLinked(const Akonadi::Item &item, const Akonadi::Collection &collection); /** * This signal is emitted if a reference to multiple items is added to a virtual collection * * @param items The linked items * @param collection The collections the items are linked to * * @since 4.11 */ void itemsLinked(const Akonadi::Item::List &items, const Akonadi::Collection &collection); /** * This signal is emitted if a reference to an item is removed from a virtual collection. * @param item The unlinked item. * @param collection The collection the item is unlinked from. * * @since 4.2 */ void itemUnlinked(const Akonadi::Item &item, const Akonadi::Collection &collection); /** * This signal is emitted if a refernece to items is removed from a virtual collection * * @param items The unlinked items * @param collection The collections the items are unlinked from * * @since 4.11 */ void itemsUnlinked(const Akonadi::Item::List &items, const Akonadi::Collection &collection); /** * This signal is emitted if a new collection has been added to a monitored collection in the Akonadi storage. * * @param collection The new collection. * @param parent The parent collection. */ void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent); /** * This signal is emitted if a monitored collection has been changed (properties or content). * * @param collection The changed collection. */ void collectionChanged(const Akonadi::Collection &collection); /** * This signal is emitted if a monitored collection has been changed (properties or attributes). * * @param collection The changed collection. * @param attributeNames The names of the collection attributes that have been changed. * * @since 4.4 */ void collectionChanged(const Akonadi::Collection &collection, const QSet &attributeNames); /** * This signals is emitted if a monitored collection has been moved. * * @param collection The moved collection. * @param source The previous parent collection. * @param destination The new parent collection. * * @since 4.4 */ void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination); /** * This signal is emitted if a monitored collection has been removed from the Akonadi storage. * * @param collection The removed collection. */ void collectionRemoved(const Akonadi::Collection &collection); /** * This signal is emitted if a collection has been subscribed to by the user. * It will be emitted even for unmonitored collections as the check for whether to * monitor it has not been applied yet. * * @param collection The subscribed collection * @param parent The parent collection of the subscribed collection. * * @since 4.6 */ void collectionSubscribed(const Akonadi::Collection &collection, const Akonadi::Collection &parent); /** * This signal is emitted if a user unsubscribes from a collection. * * @param collection The unsubscribed collection * * @since 4.6 */ void collectionUnsubscribed(const Akonadi::Collection &collection); /** * This signal is emitted if the statistics information of a monitored collection * has changed. * * @param id The collection identifier of the changed collection. * @param statistics The updated collection statistics, invalid if automatic * fetching of statistics changes is disabled. */ void collectionStatisticsChanged(Akonadi::Collection::Id id, const Akonadi::CollectionStatistics &statistics); /** * This signal is emitted if a tag has been added to Akonadi storage. * * @param tag The added tag * @since 4.13 */ void tagAdded(const Akonadi::Tag &tag); /** * This signal is emitted if a monitored tag is changed on the server. * * @param tag The changed tag. * @since 4.13 */ void tagChanged(const Akonadi::Tag &tag); /** * This signal is emitted if a monitored tag is removed from the server storage. * * The monitor will also emit itemTagsChanged() signal for all monitored items * (if any) that were tagged by @p tag. * * @param tag The removed tag. * @since 4.13 */ void tagRemoved(const Akonadi::Tag &tag); /** * This signal is emitted if a relation has been added to Akonadi storage. * * The monitor will also emit itemRelationsChanged() signal for all monitored items * hat are affected by @p relation. * * @param relation The added relation * @since 4.13 */ void relationAdded(const Akonadi::Relation &relation); /** * This signal is emitted if a monitored relation is removed from the server storage. * * The monitor will also emit itemRelationsChanged() signal for all monitored items * that were affected by @p relation. * * @param relation The removed relation. * @since 4.13 */ void relationRemoved(const Akonadi::Relation &relation); /** * This signal is emitted when Subscribers are monitored and a new subscriber * subscribers to the server. * * @param subscriber The new subscriber * @since 5.4 * * @note Monitoring for subscribers and listening to this signal only makes * sense if you want to globally debug Monitors. There is no reason to use * this in regular applications. */ void notificationSubscriberAdded(const Akonadi::NotificationSubscriber &subscriber); /** * This signal is emitted when Subscribers are monitored and an existing * subscriber changes its subscription. * * @param subscriber The changed subscriber * @since 5.4 * * @note Monitoring for subscribers and listening to this signal only makes * sense if you want to globally debug Monitors. There is no reason to use * this in regular applications. */ void notificationSubscriberChanged(const Akonadi::NotificationSubscriber &subscriber); /** * This signal is emitted when Subscribers are monitored and an existing * subscriber unsubscribes from the server. * * @param subscriber The removed subscriber * @since 5.4 * * @note Monitoring for subscribers and listening to this signal only makes * sense if you want to globally debug Monitors. There is no reason to use * this in regular applications. */ void notificationSubscriberRemoved(const Akonadi::NotificationSubscriber &subscriber); /** * This signal is emitted when Notifications are monitored and the server emits * anny change notification. * * @since 5.4 * * @note Getting introspection into all change notifications only makes sense * if you want to globally debug Notifications. There is no reason to use * this in regular applications. */ void debugNotification(const Akonadi::ChangeNotification ¬ification); /** * This signal is emitted if the Monitor starts or stops monitoring @p collection explicitly. * @param collection The collection * @param monitored Whether the collection is now being monitored or not. * * @since 4.3 */ void collectionMonitored(const Akonadi::Collection &collection, bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring @p item explicitly. * @param item The item * @param monitored Whether the item is now being monitored or not. * * @since 4.3 */ void itemMonitored(const Akonadi::Item &item, bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring the resource with the identifier @p identifier explicitly. * @param identifier The identifier of the resource. * @param monitored Whether the resource is now being monitored or not. * * @since 4.3 */ void resourceMonitored(const QByteArray &identifier, bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring @p mimeType explicitly. * @param mimeType The mimeType. * @param monitored Whether the mimeType is now being monitored or not. * * @since 4.3 */ void mimeTypeMonitored(const QString &mimeType, bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring everything. * @param monitored Whether everything is now being monitored or not. * * @since 4.3 */ void allMonitored(bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring @p tag explicitly. * @param tag The tag. * @param monitored Whether the tag is now being monitored or not. * @since 4.13 */ void tagMonitored(const Akonadi::Tag &tag, bool monitored); /** * This signal is emitted if the Monitor starts or stops monitoring @p type explicitly * @param type The type. * @param monitored Whether the type is now being monitored or not. * @since 4.13 */ void typeMonitored(const Akonadi::Monitor::Type type, bool monitored); void monitorReady(); protected: //@cond PRIVATE + void connectNotify(const QMetaMethod &signal) override; + void disconnectNotify(const QMetaMethod &signal) override; + friend class EntityTreeModel; friend class EntityTreeModelPrivate; MonitorPrivate *d_ptr; explicit Monitor(MonitorPrivate *d, QObject *parent = nullptr); //@endcond private: Q_DECLARE_PRIVATE(Monitor) //@cond PRIVATE Q_PRIVATE_SLOT(d_ptr, void slotSessionDestroyed(QObject *)) Q_PRIVATE_SLOT(d_ptr, void slotStatisticsChangedFinished(KJob *)) Q_PRIVATE_SLOT(d_ptr, void slotFlushRecentlyChangedCollections()) Q_PRIVATE_SLOT(d_ptr, void slotUpdateSubscription()) Q_PRIVATE_SLOT(d_ptr, void handleCommands()) Q_PRIVATE_SLOT(d_ptr, void dataAvailable()) Q_PRIVATE_SLOT(d_ptr, void serverStateChanged(Akonadi::ServerManager::State)) Q_PRIVATE_SLOT(d_ptr, void invalidateCollectionCache(qint64)) Q_PRIVATE_SLOT(d_ptr, void invalidateItemCache(qint64)) Q_PRIVATE_SLOT(d_ptr, void invalidateTagCache(qint64)) friend class ResourceBasePrivate; //@endcond }; } #endif diff --git a/src/core/monitor_p.cpp b/src/core/monitor_p.cpp index 1173c8a99..9aae174de 100644 --- a/src/core/monitor_p.cpp +++ b/src/core/monitor_p.cpp @@ -1,1475 +1,1400 @@ /* Copyright (c) 2007 Tobias Koenig This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // @cond PRIVATE #include "monitor_p.h" #include "collectionfetchjob.h" #include "collectionstatistics.h" #include "itemfetchjob.h" #include "notificationmanagerinterface.h" #include "session.h" #include "changemediator_p.h" #include "vectorhelper.h" #include "akonadicore_debug.h" #include "notificationsubscriber.h" #include "changenotification.h" #include "protocolhelper_p.h" using namespace Akonadi; class operation; static const int PipelineSize = 5; MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent) : q_ptr(parent) , dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory) , ntfConnection(nullptr) , monitorAll(false) , exclusive(false) , mFetchChangedOnly(false) , session(Session::defaultSession()) , collectionCache(nullptr) , itemCache(nullptr) , tagCache(nullptr) , mCommandBuffer(parent, "handleCommands") , pendingModificationChanges(Protocol::ModifySubscriptionCommand::None) , pendingModificationTimer(nullptr) , monitorReady(false) , fetchCollection(false) , fetchCollectionStatistics(false) , collectionMoveTranslationEnabled(true) , useRefCounting(false) { } MonitorPrivate::~MonitorPrivate() { disconnectFromNotificationManager(); delete dependenciesFactory; delete collectionCache; delete itemCache; delete tagCache; } void MonitorPrivate::init() { // needs to be at least 3x pipeline size for the collection move case collectionCache = dependenciesFactory->createCollectionCache(3 * PipelineSize, session); // needs to be at least 1x pipeline size itemCache = dependenciesFactory->createItemListCache(PipelineSize, session); // 20 tags looks like a reasonable amount to keep around tagCache = dependenciesFactory->createTagListCache(20, session); QObject::connect(collectionCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(itemCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(tagCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)), q_ptr, SLOT(serverStateChanged(Akonadi::ServerManager::State))); statisticsCompressionTimer.setSingleShot(true); statisticsCompressionTimer.setInterval(500); QObject::connect(&statisticsCompressionTimer, SIGNAL(timeout()), q_ptr, SLOT(slotFlushRecentlyChangedCollections())); } bool MonitorPrivate::connectToNotificationManager() { if (ntfConnection) { ntfConnection->deleteLater(); ntfConnection = nullptr; } if (!session) { return false; } ntfConnection = dependenciesFactory->createNotificationConnection(session, &mCommandBuffer); if (!ntfConnection) { return false; } slotUpdateSubscription(); ntfConnection->reconnect(); return true; } void MonitorPrivate::disconnectFromNotificationManager() { if (ntfConnection) { ntfConnection->disconnect(q_ptr); dependenciesFactory->destroyNotificationConnection(session, ntfConnection.data()); } } void MonitorPrivate::serverStateChanged(ServerManager::State state) { if (state == ServerManager::Running) { connectToNotificationManager(); } } void MonitorPrivate::invalidateCollectionCache(qint64 id) { collectionCache->update(id, mCollectionFetchScope); } void MonitorPrivate::invalidateItemCache(qint64 id) { itemCache->update(QList() << id, mItemFetchScope); // Also invalidate content of all any pending notification for given item for (auto it = pendingNotifications.begin(), end = pendingNotifications.end(); it != end; ++it) { if ((*it)->type() == Protocol::Command::ItemChangeNotification) { auto &ntf = Protocol::cmdCast(*it); const auto items = ntf.items(); if (std::any_of(items.cbegin(), items.cend(), [id](const Protocol::FetchItemsResponse &r) { return r.id() == id; })) { ntf.setMustRetrieve(true); } } } } void MonitorPrivate::invalidateTagCache(qint64 id) { tagCache->update({ id }, mTagFetchScope); } int MonitorPrivate::pipelineSize() const { return PipelineSize; } void MonitorPrivate::scheduleSubscriptionUpdate() { if (pendingModificationTimer || !monitorReady) { return; } pendingModificationTimer = new QTimer(); pendingModificationTimer->setSingleShot(true); pendingModificationTimer->setInterval(0); pendingModificationTimer->start(); q_ptr->connect(pendingModificationTimer, SIGNAL(timeout()), q_ptr, SLOT(slotUpdateSubscription())); } void MonitorPrivate::slotUpdateSubscription() { Q_Q(Monitor); pendingModificationTimer->deleteLater(); pendingModificationTimer = nullptr; if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::ItemFetchScope) { pendingModification.setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); } if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { pendingModification.setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); } if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::TagFetchScope) { pendingModification.setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); } pendingModificationChanges = Protocol::ModifySubscriptionCommand::None; if (ntfConnection) { ntfConnection->sendCommand(3, Protocol::ModifySubscriptionCommandPtr::create(pendingModification)); pendingModification = Protocol::ModifySubscriptionCommand(); } } bool MonitorPrivate::isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion) const { if (msg->type() == Protocol::Command::CollectionChangeNotification) { // Lazy fetching can only affects items. return false; } if (msg->type() == Protocol::Command::TagChangeNotification) { const auto op = Protocol::cmdCast(msg).operation(); - return ((op == Protocol::TagChangeNotification::Add && q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0) - || (op == Protocol::TagChangeNotification::Modify && q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0) - || (op == Protocol::TagChangeNotification::Remove && q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0)); + return ((op == Protocol::TagChangeNotification::Add && !hasListeners(&Monitor::tagAdded)) + || (op == Protocol::TagChangeNotification::Modify && !hasListeners(&Monitor::tagChanged)) + || (op == Protocol::TagChangeNotification::Remove && !hasListeners(&Monitor::tagRemoved))); } if (!fetchCollectionStatistics && msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); const auto op = itemNtf.operation(); - if ((op == Protocol::ItemChangeNotification::Add && q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) == 0) - || (op == Protocol::ItemChangeNotification::Remove && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) == 0 - && q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) == 0) - || (op == Protocol::ItemChangeNotification::Modify && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) == 0) - || (op == Protocol::ItemChangeNotification::ModifyFlags - && (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) == 0 + if ((op == Protocol::ItemChangeNotification::Add && !hasListeners(&Monitor::itemAdded)) + || (op == Protocol::ItemChangeNotification::Remove && !hasListeners(&Monitor::itemRemoved) && !hasListeners(&Monitor::itemsRemoved)) + || (op == Protocol::ItemChangeNotification::Modify && !hasListeners(&Monitor::itemChanged)) + || (op == Protocol::ItemChangeNotification::ModifyFlags && !hasListeners(&Monitor::itemsFlagsChanged) // Newly delivered ModifyFlags notifications will be converted to // itemChanged(item, "FLAGS") for legacy clients. - && (!allowModifyFlagsConversion || q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) == 0))) - || (op == Protocol::ItemChangeNotification::ModifyTags && q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))) == 0) - || (op == Protocol::ItemChangeNotification::Move && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) == 0 - && q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) == 0) - || (op == Protocol::ItemChangeNotification::Link && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) == 0 - && q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) == 0) - || (op == Protocol::ItemChangeNotification::Unlink && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) == 0 - && q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) == 0)) { + && (!allowModifyFlagsConversion || !hasListeners(&Monitor::itemChanged))) + || (op == Protocol::ItemChangeNotification::ModifyTags && !hasListeners(&Monitor::itemsTagsChanged)) + || (op == Protocol::ItemChangeNotification::Move && !hasListeners(&Monitor::itemMoved) && !hasListeners(&Monitor::itemsMoved)) + || (op == Protocol::ItemChangeNotification::Link && !hasListeners(&Monitor::itemLinked) && !hasListeners(&Monitor::itemsLinked)) + || (op == Protocol::ItemChangeNotification::Unlink && !hasListeners(&Monitor::itemUnlinked) && !hasListeners(&Monitor::itemsUnlinked))) { return true; } if (!useRefCounting) { return false; } const Collection::Id parentCollectionId = itemNtf.parentCollection(); if ((op == Protocol::ItemChangeNotification::Add) || (op == Protocol::ItemChangeNotification::Remove) || (op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::Link) || (op == Protocol::ItemChangeNotification::Unlink)) { if (isMonitored(parentCollectionId)) { return false; } } if (op == Protocol::ItemChangeNotification::Move) { if (!isMonitored(parentCollectionId) && !isMonitored(itemNtf.parentDestCollection())) { return true; } // We can't ignore the move. It must be transformed later into a removal or insertion. return false; } return true; } return false; } void MonitorPrivate::checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const { if (msg->type() != Protocol::Command::ItemChangeNotification) { needsSplit = false; batchSupported = false; return; } const auto &itemNtf = Protocol::cmdCast(msg); const bool isBatch = (itemNtf.items().count() > 1); switch (itemNtf.operation()) { case Protocol::ItemChangeNotification::Add: needsSplit = isBatch; batchSupported = false; return; case Protocol::ItemChangeNotification::Modify: needsSplit = isBatch; batchSupported = false; return; case Protocol::ItemChangeNotification::ModifyFlags: - batchSupported = q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) > 0; - needsSplit = isBatch && !batchSupported && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) > 0; + batchSupported = hasListeners(&Monitor::itemsFlagsChanged); + needsSplit = isBatch && !batchSupported && hasListeners(&Monitor::itemChanged); return; case Protocol::ItemChangeNotification::ModifyTags: // Tags were added after batch notifications, so they are always supported batchSupported = true; needsSplit = false; return; case Protocol::ItemChangeNotification::ModifyRelations: // Relations were added after batch notifications, so they are always supported batchSupported = true; needsSplit = false; return; case Protocol::ItemChangeNotification::Move: - needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0; - batchSupported = q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0; + needsSplit = isBatch && hasListeners(&Monitor::itemMoved); + batchSupported = hasListeners(&Monitor::itemsMoved); return; case Protocol::ItemChangeNotification::Remove: - needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0; - batchSupported = q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0; + needsSplit = isBatch && hasListeners(&Monitor::itemRemoved); + batchSupported = hasListeners(&Monitor::itemsRemoved); return; case Protocol::ItemChangeNotification::Link: - needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0; - batchSupported = q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0; + needsSplit = isBatch && hasListeners(&Monitor::itemLinked); + batchSupported = hasListeners(&Monitor::itemsLinked); return; case Protocol::ItemChangeNotification::Unlink: - needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0; - batchSupported = q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0; + needsSplit = isBatch && hasListeners(&Monitor::itemUnlinked); + batchSupported = hasListeners(&Monitor::itemsUnlinked); return; default: needsSplit = isBatch; batchSupported = false; qCDebug(AKONADICORE_LOG) << "Unknown operation type" << itemNtf.operation() << "in item change notification"; return; } } Protocol::ChangeNotificationList MonitorPrivate::splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const { Protocol::ChangeNotificationList list; Protocol::ItemChangeNotification baseMsg; baseMsg.setSessionId(msg.sessionId()); if (legacy && msg.operation() == Protocol::ItemChangeNotification::ModifyFlags) { baseMsg.setOperation(Protocol::ItemChangeNotification::Modify); baseMsg.setItemParts(QSet() << "FLAGS"); } else { baseMsg.setOperation(msg.operation()); baseMsg.setItemParts(msg.itemParts()); } baseMsg.setParentCollection(msg.parentCollection()); baseMsg.setParentDestCollection(msg.parentDestCollection()); baseMsg.setResource(msg.resource()); baseMsg.setDestinationResource(msg.destinationResource()); baseMsg.setAddedFlags(msg.addedFlags()); baseMsg.setRemovedFlags(msg.removedFlags()); baseMsg.setAddedTags(msg.addedTags()); baseMsg.setRemovedTags(msg.removedTags()); const auto items = msg.items(); list.reserve(items.count()); for (const auto &item : items) { auto copy = Protocol::ItemChangeNotificationPtr::create(baseMsg); - copy->setItems({std::move(Protocol::FetchItemsResponse(item))}); + copy->setItems({Protocol::FetchItemsResponse(item)}); list.push_back(std::move(copy)); } return list; } bool MonitorPrivate::fetchCollections() const { return fetchCollection; } bool MonitorPrivate::fetchItems() const { return !mItemFetchScope.isEmpty(); } bool MonitorPrivate::ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg) { if (msg->type() == Protocol::Command::TagChangeNotification) { const auto tagMsg = Protocol::cmdCast(msg); if (tagMsg.metadata().contains("FETCH_TAG")) { if (!tagCache->ensureCached({ tagMsg.tag().id() }, mTagFetchScope)) { return false; } } return true; } if (msg->type() == Protocol::Command::RelationChangeNotification) { return true; } if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { return true; } if (msg->type() == Protocol::Command::DebugChangeNotification) { return true; } if (msg->type() == Protocol::Command::CollectionChangeNotification && Protocol::cmdCast(msg).operation() == Protocol::CollectionChangeNotification::Remove) { // For collection removals the collection is gone already, so we can't fetch it, // but we have to at least obtain the ancestor chain. const qint64 parentCollection = Protocol::cmdCast(msg).parentCollection(); return parentCollection <= -1 || collectionCache->ensureCached(parentCollection, mCollectionFetchScope); } bool allCached = true; if (fetchCollections()) { const qint64 parentCollection = (msg->type() == Protocol::Command::ItemChangeNotification) ? Protocol::cmdCast(msg).parentCollection() : (msg->type() == Protocol::Command::CollectionChangeNotification) ? Protocol::cmdCast(msg).parentCollection() : -1; if (parentCollection > -1 && !collectionCache->ensureCached(parentCollection, mCollectionFetchScope)) { allCached = false; } qint64 parentDestCollection = -1; if ((msg->type() == Protocol::Command::ItemChangeNotification) && (Protocol::cmdCast(msg).operation() == Protocol::ItemChangeNotification::Move)) { parentDestCollection = Protocol::cmdCast(msg).parentDestCollection(); } else if ((msg->type() == Protocol::Command::CollectionChangeNotification) && (Protocol::cmdCast(msg).operation() == Protocol::CollectionChangeNotification::Move)) { parentDestCollection = Protocol::cmdCast(msg).parentDestCollection(); } if (parentDestCollection > -1 && !collectionCache->ensureCached(parentDestCollection, mCollectionFetchScope)) { allCached = false; } } if (msg->isRemove()) { return allCached; } if (msg->type() == Protocol::Command::ItemChangeNotification && fetchItems()) { const auto &itemNtf = Protocol::cmdCast(msg); if (mFetchChangedOnly && (itemNtf.operation() == Protocol::ItemChangeNotification::Modify || itemNtf.operation() == Protocol::ItemChangeNotification::ModifyFlags)) { const auto changedParts = itemNtf.itemParts(); const auto requestedParts = mItemFetchScope.payloadParts(); const auto requestedAttrs = mItemFetchScope.attributes(); QSet missingParts, missingAttributes; for (const QByteArray &part : changedParts) { const auto partName = part.mid(4); if (part.startsWith("PLD:") && //krazy:exclude=strings since QByteArray (!mItemFetchScope.fullPayload() || !requestedParts.contains(partName))) { missingParts.insert(partName); } else if (part.startsWith("ATR:") && //krazy:exclude=strings since QByteArray (!mItemFetchScope.allAttributes() || !requestedAttrs.contains(partName))) { missingAttributes.insert(partName); } } if (!missingParts.isEmpty() || !missingAttributes.isEmpty()) { ItemFetchScope scope(mItemFetchScope); scope.fetchFullPayload(false); for (const auto &part : requestedParts) { scope.fetchPayloadPart(part, false); } for (const auto &attr : requestedAttrs) { scope.fetchAttribute(attr, false); } for (const auto &part : missingParts) { scope.fetchPayloadPart(part, true); } for (const auto &attr : missingAttributes) { scope.fetchAttribute(attr, true); } if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), scope)) { return false; } } return allCached; } // Make sure all tags for ModifyTags operation are in cache too if (itemNtf.operation() == Protocol::ItemChangeNotification::ModifyTags) { if (!tagCache->ensureCached((itemNtf.addedTags() + itemNtf.removedTags()).toList(), mTagFetchScope)) { return false; } } if (itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve()) { if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope)) { return false; } } return allCached; } else if (msg->type() == Protocol::Command::CollectionChangeNotification && fetchCollections()) { const auto &colMsg = Protocol::cmdCast(msg); if (colMsg.metadata().contains("FETCH_COLLECTION")) { if (!collectionCache->ensureCached(colMsg.collection().id(), mCollectionFetchScope)) { return false; } } return allCached; } return allCached; } bool MonitorPrivate::emitNotification(const Protocol::ChangeNotificationPtr &msg) { bool someoneWasListening = false; if (msg->type() == Protocol::Command::TagChangeNotification) { const auto &tagNtf = Protocol::cmdCast(msg); const bool fetched = tagNtf.metadata().contains("FETCH_TAG"); Tag tag; if (fetched) { const auto tags = tagCache->retrieve({ tagNtf.tag().id() }); tag = tags.isEmpty() ? Tag() : tags.at(0); } else { tag = ProtocolHelper::parseTag(tagNtf.tag()); } someoneWasListening = emitTagNotification(tagNtf, tag); } else if (msg->type() == Protocol::Command::RelationChangeNotification) { const auto &relNtf = Protocol::cmdCast(msg); const Relation rel = ProtocolHelper::parseRelationFetchResult(relNtf.relation()); someoneWasListening = emitRelationNotification(relNtf, rel); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); const Collection parent = collectionCache->retrieve(colNtf.parentCollection()); Collection destParent; if (colNtf.operation() == Protocol::CollectionChangeNotification::Move) { destParent = collectionCache->retrieve(colNtf.parentDestCollection()); } const bool fetched = colNtf.metadata().contains("FETCH_COLLECTION"); //For removals this will retrieve an invalid collection. We'll deal with that in emitCollectionNotification const Collection col = fetched ? collectionCache->retrieve(colNtf.collection().id()) : ProtocolHelper::parseCollection(colNtf.collection(), true); //It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while //the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. if (col.isValid() || colNtf.operation() == Protocol::CollectionChangeNotification::Remove || !fetchCollections()) { someoneWasListening = emitCollectionNotification(colNtf, col, parent, destParent); } } else if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); const Collection parent = collectionCache->retrieve(itemNtf.parentCollection()); Collection destParent; if (itemNtf.operation() == Protocol::ItemChangeNotification::Move) { destParent = collectionCache->retrieve(itemNtf.parentDestCollection()); } const bool fetched = itemNtf.metadata().contains("FETCH_ITEM") || itemNtf.mustRetrieve(); //For removals this will retrieve an empty set. We'll deal with that in emitItemNotification Item::List items; if (fetched) { items = itemCache->retrieve(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); } else { const auto ntfItems = itemNtf.items(); items.reserve(ntfItems.size()); for (const auto &ntfItem : ntfItems) { items.push_back(ProtocolHelper::parseItemFetchResult(ntfItem)); } } //It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while //the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. if (!items.isEmpty() || itemNtf.operation() == Protocol::ItemChangeNotification::Remove || !fetchItems()) { someoneWasListening = emitItemsNotification(itemNtf, items, parent, destParent); } } else if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { const auto &subNtf = Protocol::cmdCast(msg); NotificationSubscriber subscriber; subscriber.setSubscriber(subNtf.subscriber()); subscriber.setSessionId(subNtf.sessionId()); subscriber.setMonitoredCollections(subNtf.collections()); subscriber.setMonitoredItems(subNtf.items()); subscriber.setMonitoredTags(subNtf.tags()); QSet monitorTypes; Q_FOREACH (auto type, subNtf.types()) { if (type == Protocol::ModifySubscriptionCommand::NoType) { continue; } monitorTypes.insert([](Protocol::ModifySubscriptionCommand::ChangeType type) { switch (type) { case Protocol::ModifySubscriptionCommand::ItemChanges: return Monitor::Items; case Protocol::ModifySubscriptionCommand::CollectionChanges: return Monitor::Collections; case Protocol::ModifySubscriptionCommand::TagChanges: return Monitor::Tags; case Protocol::ModifySubscriptionCommand::RelationChanges: return Monitor::Relations; case Protocol::ModifySubscriptionCommand::SubscriptionChanges: return Monitor::Subscribers; case Protocol::ModifySubscriptionCommand::ChangeNotifications: return Monitor::Notifications; default: Q_ASSERT(false); return Monitor::Items; //unreachable } }(type)); } subscriber.setMonitoredTypes(monitorTypes); subscriber.setMonitoredMimeTypes(subNtf.mimeTypes()); subscriber.setMonitoredResources(subNtf.resources()); subscriber.setIgnoredSessions(subNtf.ignoredSessions()); subscriber.setIsAllMonitored(subNtf.allMonitored()); subscriber.setIsExclusive(subNtf.exclusive()); subscriber.setItemFetchScope(ProtocolHelper::parseItemFetchScope(subNtf.itemFetchScope())); subscriber.setCollectionFetchScope(ProtocolHelper::parseCollectionFetchScope(subNtf.collectionFetchScope())); someoneWasListening = emitSubscriptionChangeNotification(subNtf, subscriber); } else if (msg->type() == Protocol::Command::DebugChangeNotification) { const auto &changeNtf = Protocol::cmdCast(msg); ChangeNotification notification; notification.setListeners(changeNtf.listeners()); notification.setTimestamp(QDateTime::fromMSecsSinceEpoch(changeNtf.timestamp())); notification.setNotification(changeNtf.notification()); switch (changeNtf.notification()->type()) { case Protocol::Command::ItemChangeNotification: notification.setType(ChangeNotification::Items); break; case Protocol::Command::CollectionChangeNotification: notification.setType(ChangeNotification::Collection); break; case Protocol::Command::TagChangeNotification: notification.setType(ChangeNotification::Tag); break; case Protocol::Command::RelationChangeNotification: notification.setType(ChangeNotification::Relation); break; case Protocol::Command::SubscriptionChangeNotification: notification.setType(ChangeNotification::Subscription); break; default: Q_ASSERT(false); // huh? return false; } someoneWasListening = emitDebugChangeNotification(changeNtf, notification); } return someoneWasListening; } void MonitorPrivate::updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg) { if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); notifyCollectionStatisticsWatchers(itemNtf.parentCollection(), itemNtf.resource()); // FIXME use the proper resource of the target collection, for cross resource moves notifyCollectionStatisticsWatchers(itemNtf.parentDestCollection(), itemNtf.destinationResource()); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); if (colNtf.operation() == Protocol::CollectionChangeNotification::Remove) { // no need for statistics updates anymore recentlyChangedCollections.remove(colNtf.collection().id()); } } } void MonitorPrivate::slotSessionDestroyed(QObject *object) { Session *objectSession = qobject_cast(object); if (objectSession) { sessions.removeAll(objectSession->sessionId()); pendingModification.stopIgnoringSession(objectSession->sessionId()); scheduleSubscriptionUpdate(); } } void MonitorPrivate::slotStatisticsChangedFinished(KJob *job) { if (job->error()) { qCWarning(AKONADICORE_LOG) << "Error on fetching collection statistics: " << job->errorText(); } else { CollectionStatisticsJob *statisticsJob = static_cast(job); Q_ASSERT(statisticsJob->collection().isValid()); emit q_ptr->collectionStatisticsChanged(statisticsJob->collection().id(), statisticsJob->statistics()); } } void MonitorPrivate::slotFlushRecentlyChangedCollections() { for (Collection::Id collection : qAsConst(recentlyChangedCollections)) { Q_ASSERT(collection >= 0); if (fetchCollectionStatistics) { fetchStatistics(collection); } else { static const CollectionStatistics dummyStatistics; emit q_ptr->collectionStatisticsChanged(collection, dummyStatistics); } } recentlyChangedCollections.clear(); } int MonitorPrivate::translateAndCompress(QQueue ¬ificationQueue, const Protocol::ChangeNotificationPtr &msg) { // Always handle tags and relations if (msg->type() == Protocol::Command::TagChangeNotification || msg->type() == Protocol::Command::RelationChangeNotification) { notificationQueue.enqueue(msg); return 1; } // We have to split moves into insert or remove if the source or destination // is not monitored. if (!msg->isMove()) { notificationQueue.enqueue(msg); return 1; } bool sourceWatched = false; bool destWatched = false; if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); if (useRefCounting) { sourceWatched = isMonitored(itemNtf.parentCollection()); destWatched = isMonitored(itemNtf.parentDestCollection()); } else { if (!resources.isEmpty()) { sourceWatched = resources.contains(itemNtf.resource()); destWatched = isMoveDestinationResourceMonitored(itemNtf); } if (!sourceWatched) { sourceWatched = isCollectionMonitored(itemNtf.parentCollection()); } if (!destWatched) { destWatched = isCollectionMonitored(itemNtf.parentDestCollection()); } } } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); if (!resources.isEmpty()) { sourceWatched = resources.contains(colNtf.resource()); destWatched = isMoveDestinationResourceMonitored(colNtf); } if (!sourceWatched) { sourceWatched = isCollectionMonitored(colNtf.parentCollection()); } if (!destWatched) { destWatched = isCollectionMonitored(colNtf.parentDestCollection()); } } else { Q_ASSERT(false); return 0; } if (!sourceWatched && !destWatched) { return 0; } if ((sourceWatched && destWatched) || (!collectionMoveTranslationEnabled && msg->type() == Protocol::Command::CollectionChangeNotification)) { notificationQueue.enqueue(msg); return 1; } if (sourceWatched) { if (msg->type() == Protocol::Command::ItemChangeNotification) { auto removalMessage = Protocol::ItemChangeNotificationPtr::create( Protocol::cmdCast(msg)); removalMessage->setOperation(Protocol::ItemChangeNotification::Remove); removalMessage->setParentDestCollection(-1); notificationQueue.enqueue(removalMessage); return 1; } else { auto removalMessage = Protocol::CollectionChangeNotificationPtr::create( Protocol::cmdCast(msg)); removalMessage->setOperation(Protocol::CollectionChangeNotification::Remove); removalMessage->setParentDestCollection(-1); notificationQueue.enqueue(removalMessage); return 1; } } // Transform into an insertion if (msg->type() == Protocol::Command::ItemChangeNotification) { auto insertionMessage = Protocol::ItemChangeNotificationPtr::create( Protocol::cmdCast(msg)); insertionMessage->setOperation(Protocol::ItemChangeNotification::Add); insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); insertionMessage->setParentDestCollection(-1); // We don't support batch insertion, so we have to do it one by one const auto split = splitMessage(*insertionMessage, false); for (const Protocol::ChangeNotificationPtr &insertion : split) { notificationQueue.enqueue(insertion); } return split.count(); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { auto insertionMessage = Protocol::CollectionChangeNotificationPtr::create( Protocol::cmdCast(msg)); insertionMessage->setOperation(Protocol::CollectionChangeNotification::Add); insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); insertionMessage->setParentDestCollection(-1); notificationQueue.enqueue(insertionMessage); return 1; } Q_ASSERT(false); return 0; } void MonitorPrivate::handleCommands() { Q_Q(Monitor); CommandBufferLocker lock(&mCommandBuffer); CommandBufferNotifyBlocker notify(&mCommandBuffer); while (!mCommandBuffer.isEmpty()) { const auto cmd = mCommandBuffer.dequeue(); lock.unlock(); const auto command = cmd.command; if (command->isResponse()) { switch (command->type()) { case Protocol::Command::Hello: { qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus"; QByteArray subname; if (!q->objectName().isEmpty()) { subname = q->objectName().toLatin1(); } else { subname = session->sessionId(); } subname += " - " + QByteArray::number(quintptr(q)); qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\""; auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId()); ntfConnection->sendCommand(2, subCmd); break; } case Protocol::Command::CreateSubscription: { auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(); for (const auto &col : qAsConst(collections)) { msubCmd->startMonitoringCollection(col.id()); } for (const auto &res : qAsConst(resources)) { msubCmd->startMonitoringResource(res); } for (auto itemId : qAsConst(items)) { msubCmd->startMonitoringItem(itemId); } for (auto tagId : qAsConst(tags)) { msubCmd->startMonitoringTag(tagId); } for (auto type : qAsConst(types)) { msubCmd->startMonitoringType(monitorTypeToProtocol(type)); } for (const auto &mimetype : qAsConst(mimetypes)) { msubCmd->startMonitoringMimeType(mimetype); } for (const auto &session : qAsConst(sessions)) { msubCmd->startIgnoringSession(session); } msubCmd->setAllMonitored(monitorAll); msubCmd->setIsExclusive(exclusive); msubCmd->setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); msubCmd->setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); msubCmd->setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); pendingModification = Protocol::ModifySubscriptionCommand(); ntfConnection->sendCommand(3, msubCmd); break; } case Protocol::Command::ModifySubscription: // TODO: Handle errors if (!monitorReady) { monitorReady = true; Q_EMIT q_ptr->monitorReady(); } break; default: qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command); break; } } else { switch (command->type()) { case Protocol::Command::ItemChangeNotification: case Protocol::Command::CollectionChangeNotification: case Protocol::Command::TagChangeNotification: case Protocol::Command::RelationChangeNotification: case Protocol::Command::SubscriptionChangeNotification: case Protocol::Command::DebugChangeNotification: slotNotify(command.staticCast()); break; default: qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command); break; } } lock.relock(); } notify.unblock(); lock.unlock(); } /* server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit | | x --> discard x --> pipeline fetchJobDone --> pipeline ?dataAvailable --> emit */ void MonitorPrivate::slotNotify(const Protocol::ChangeNotificationPtr &msg) { int appendedMessages = 0; int modifiedMessages = 0; int erasedMessages = 0; invalidateCaches(msg); updatePendingStatistics(msg); bool needsSplit = true; bool supportsBatch = false; if (isLazilyIgnored(msg, true)) { return; } checkBatchSupport(msg, needsSplit, supportsBatch); const bool isModifyFlags = (msg->type() == Protocol::Command::ItemChangeNotification && Protocol::cmdCast(msg).operation() == Protocol::ItemChangeNotification::ModifyFlags); if (supportsBatch || (!needsSplit && !supportsBatch && !isModifyFlags) || msg->type() == Protocol::Command::CollectionChangeNotification) { // Make sure the batch msg is always queued before the split notifications const int oldSize = pendingNotifications.size(); const int appended = translateAndCompress(pendingNotifications, msg); if (appended > 0) { appendedMessages += appended; } else { ++modifiedMessages; } // translateAndCompress can remove an existing "modify" when msg is a "delete". // Or it can merge two ModifyFlags and return false. // We need to detect such removals, for ChangeRecorder. if (pendingNotifications.count() != oldSize + appended) { ++erasedMessages; // this count isn't exact, but it doesn't matter } } else if (needsSplit) { // If it's not queued at least make sure we fetch all the items from split // notifications in one go. if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto items = Protocol::cmdCast(msg).items(); itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(items), mItemFetchScope); } } // if the message contains more items, but we need to emit single-item notification, // split the message into one message per item and queue them // if the message contains only one item, but batches are not supported // (and thus neither is flagsModified), splitMessage() will convert the // notification to regular Modify with "FLAGS" part changed if (needsSplit || (!needsSplit && !supportsBatch && isModifyFlags)) { // Make sure inter-resource move notifications are translated into // Add/Remove notifications if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); if (itemNtf.operation() == Protocol::ItemChangeNotification::Move && itemNtf.resource() != itemNtf.destinationResource()) { if (needsSplit) { const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); for (const auto &splitMsg : split) { appendedMessages += translateAndCompress(pendingNotifications, splitMsg); } } else { appendedMessages += translateAndCompress(pendingNotifications, msg); } } else { const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); pendingNotifications << split.toList(); appendedMessages += split.count(); } } } // tell ChangeRecorder (even if 0 appended, the compression could have made changes to existing messages) if (appendedMessages > 0 || modifiedMessages > 0 || erasedMessages > 0) { if (erasedMessages > 0) { notificationsErased(); } else { notificationsEnqueued(appendedMessages); } } dispatchNotifications(); } void MonitorPrivate::flushPipeline() { while (!pipeline.isEmpty()) { const auto msg = pipeline.head(); if (ensureDataAvailable(msg)) { // dequeue should be before emit, otherwise stuff might happen (like dataAvailable // being called again) and we end up dequeuing an empty pipeline pipeline.dequeue(); emitNotification(msg); } else { break; } } } void MonitorPrivate::dataAvailable() { flushPipeline(); dispatchNotifications(); } void MonitorPrivate::dispatchNotifications() { // Note that this code is not used in a ChangeRecorder (pipelineSize==0) while (pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty()) { const auto msg = pendingNotifications.dequeue(); const bool avail = ensureDataAvailable(msg); if (avail && pipeline.isEmpty()) { emitNotification(msg); } else { pipeline.enqueue(msg); } } } static Relation::List extractRelations(const QSet &rels) { Relation::List relations; if (rels.isEmpty()) { return relations; } relations.reserve(rels.size()); for (const auto &rel : rels) { relations.push_back(Relation(rel.type.toLatin1(), Akonadi::Item(rel.leftId), Akonadi::Item(rel.rightId))); } return relations; } bool MonitorPrivate::emitItemsNotification(const Protocol::ItemChangeNotification &msg_, const Item::List &items, const Collection &collection, const Collection &collectionDest) { Protocol::ItemChangeNotification msg = msg_; Collection col = collection; Collection colDest = collectionDest; if (!col.isValid()) { col = Collection(msg.parentCollection()); col.setResource(QString::fromUtf8(msg.resource())); } if (!colDest.isValid()) { colDest = Collection(msg.parentDestCollection()); // HACK: destination resource is delivered in the parts field... if (!msg.itemParts().isEmpty()) { colDest.setResource(QString::fromLatin1(*(msg.itemParts().cbegin()))); } } const QSet addedFlags = msg.addedFlags(); const QSet removedFlags = msg.removedFlags(); Relation::List addedRelations, removedRelations; if (msg.operation() == Protocol::ItemChangeNotification::ModifyRelations) { addedRelations = extractRelations(msg.addedRelations()); removedRelations = extractRelations(msg.removedRelations()); } Tag::List addedTags, removedTags; if (msg.operation() == Protocol::ItemChangeNotification::ModifyTags) { addedTags = tagCache->retrieve(msg.addedTags().toList()); removedTags = tagCache->retrieve(msg.removedTags().toList()); } Item::List its = items; for (auto it = its.begin(), end = its.end(); it != end; ++it) { if (msg.operation() == Protocol::ItemChangeNotification::Move) { it->setParentCollection(colDest); } else { it->setParentCollection(col); } } bool handled = false; switch (msg.operation()) { case Protocol::ItemChangeNotification::Add: - if (q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemAdded(its.first(), col); - return true; - } - return false; + return emitToListeners(&Monitor::itemAdded, its.first(), col); case Protocol::ItemChangeNotification::Modify: - if (q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemChanged(its.first(), msg.itemParts()); - return true; - } - return false; + return emitToListeners(&Monitor::itemChanged, its.first(), msg.itemParts()); case Protocol::ItemChangeNotification::ModifyFlags: - if (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) > 0) { - emit q_ptr->itemsFlagsChanged(its, msg.addedFlags(), msg.removedFlags()); - handled = true; - } - return handled; + return emitToListeners(&Monitor::itemsFlagsChanged, its, msg.addedFlags(), msg.removedFlags()); case Protocol::ItemChangeNotification::Move: - if (q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemMoved(its.first(), col, colDest); - handled = true; - } - if (q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0) { - emit q_ptr->itemsMoved(its, col, colDest); - handled = true; - } + handled |= emitToListeners(&Monitor::itemMoved, its.first(), col, colDest); + handled |= emitToListeners(&Monitor::itemsMoved, its, col, colDest); return handled; case Protocol::ItemChangeNotification::Remove: - if (q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemRemoved(its.first()); - handled = true; - } - if (q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0) { - emit q_ptr->itemsRemoved(its); - handled = true; - } + handled |= emitToListeners(&Monitor::itemRemoved, its.first()); + handled |= emitToListeners(&Monitor::itemsRemoved, its); return handled; case Protocol::ItemChangeNotification::Link: - if (q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemLinked(its.first(), col); - handled = true; - } - if (q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0) { - emit q_ptr->itemsLinked(its, col); - handled = true; - } + handled |= emitToListeners(&Monitor::itemLinked, its.first(), col); + handled |= emitToListeners(&Monitor::itemsLinked, its, col); return handled; case Protocol::ItemChangeNotification::Unlink: - if (q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0) { - Q_ASSERT(its.count() == 1); - emit q_ptr->itemUnlinked(its.first(), col); - handled = true; - } - if (q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0) { - emit q_ptr->itemsUnlinked(its, col); - handled = true; - } + handled |= emitToListeners(&Monitor::itemUnlinked, its.first(), col); + handled |= emitToListeners(&Monitor::itemsUnlinked, its, col); return handled; case Protocol::ItemChangeNotification::ModifyTags: - if (q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))) > 0) { - emit q_ptr->itemsTagsChanged(its, Akonadi::vectorToSet(addedTags), Akonadi::vectorToSet(removedTags)); - return true; - } - return false; + return emitToListeners(&Monitor::itemsTagsChanged, its, Akonadi::vectorToSet(addedTags), Akonadi::vectorToSet(removedTags)); case Protocol::ItemChangeNotification::ModifyRelations: - if (q_ptr->receivers(SIGNAL(itemsRelationsChanged(Akonadi::Item::List,Akonadi::Relation::List,Akonadi::Relation::List))) > 0) { - emit q_ptr->itemsRelationsChanged(its, addedRelations, removedRelations); - return true; - } - return false; + return emitToListeners(&Monitor::itemsRelationsChanged, its, addedRelations, removedRelations); default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in item change notification"; + return false; } - - return false; } bool MonitorPrivate::emitCollectionNotification(const Protocol::CollectionChangeNotification &msg, const Collection &col, const Collection &par, const Collection &dest) { Collection parent = par; if (!parent.isValid()) { parent = Collection(msg.parentCollection()); } Collection destination = dest; if (!destination.isValid()) { destination = Collection(msg.parentDestCollection()); } Collection collection = col; Q_ASSERT(collection.isValid()); if (!collection.isValid()) { qCWarning(AKONADICORE_LOG) << "Failed to get valid Collection for a Collection change!"; return true; // prevent Monitor disconnecting from a signal } if (msg.operation() == Protocol::CollectionChangeNotification::Move) { collection.setParentCollection(destination); } else { collection.setParentCollection(parent); } + bool handled = false; switch (msg.operation()) { case Protocol::CollectionChangeNotification::Add: - if (q_ptr->receivers(SIGNAL(collectionAdded(Akonadi::Collection,Akonadi::Collection))) == 0) { - return false; - } - emit q_ptr->collectionAdded(collection, parent); - return true; + return emitToListeners(&Monitor::collectionAdded, collection, parent); case Protocol::CollectionChangeNotification::Modify: - if (q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection))) == 0 - && q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection,QSet))) == 0) { - return false; - } - emit q_ptr->collectionChanged(collection); - emit q_ptr->collectionChanged(collection, msg.changedParts()); - return true; + handled |= emitToListeners(qOverload(&Monitor::collectionChanged), collection); + handled |= emitToListeners(qOverload &>(&Monitor::collectionChanged), collection, msg.changedParts()); + return handled; case Protocol::CollectionChangeNotification::Move: - if (q_ptr->receivers(SIGNAL(collectionMoved(Akonadi::Collection,Akonadi::Collection,Akonadi::Collection))) == 0) { - return false; - } - emit q_ptr->collectionMoved(collection, parent, destination); - return true; + return emitToListeners(&Monitor::collectionMoved, collection, parent, destination); case Protocol::CollectionChangeNotification::Remove: - if (q_ptr->receivers(SIGNAL(collectionRemoved(Akonadi::Collection))) == 0) { - return false; - } - emit q_ptr->collectionRemoved(collection); - return true; + return emitToListeners(&Monitor::collectionRemoved, collection); case Protocol::CollectionChangeNotification::Subscribe: - if (q_ptr->receivers(SIGNAL(collectionSubscribed(Akonadi::Collection,Akonadi::Collection))) == 0) { - return false; - } - if (!monitorAll) { // ### why?? - emit q_ptr->collectionSubscribed(collection, parent); - } - return true; + // ### why?? + return !monitorAll && emitToListeners(&Monitor::collectionSubscribed, collection, parent); case Protocol::CollectionChangeNotification::Unsubscribe: - if (q_ptr->receivers(SIGNAL(collectionUnsubscribed(Akonadi::Collection))) == 0) { - return false; - } - if (!monitorAll) { // ### why?? - emit q_ptr->collectionUnsubscribed(collection); - } - return true; + // ### why?? + return !monitorAll && emitToListeners(&Monitor::collectionUnsubscribed, collection); default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in collection change notification"; + return false; } - - return false; } bool MonitorPrivate::emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tag) { Q_UNUSED(msg); switch (msg.operation()) { case Protocol::TagChangeNotification::Add: - if (q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0) { - return false; - } - Q_EMIT q_ptr->tagAdded(tag); - return true; + return emitToListeners(&Monitor::tagAdded, tag); case Protocol::TagChangeNotification::Modify: - if (q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0) { - return false; - } - Q_EMIT q_ptr->tagChanged(tag); - return true; + return emitToListeners(&Monitor::tagChanged, tag); case Protocol::TagChangeNotification::Remove: - if (q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0) { - return false; - } - Q_EMIT q_ptr->tagRemoved(tag); - return true; + return emitToListeners(&Monitor::tagRemoved, tag); default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; + return false; } - - return false; } bool MonitorPrivate::emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation) { if (!relation.isValid()) { return false; } switch (msg.operation()) { case Protocol::RelationChangeNotification::Add: - if (q_ptr->receivers(SIGNAL(relationAdded(Akonadi::Relation))) == 0) { - return false; - } - Q_EMIT q_ptr->relationAdded(relation); - return true; + return emitToListeners(&Monitor::relationAdded, relation); case Protocol::RelationChangeNotification::Remove: - if (q_ptr->receivers(SIGNAL(relationRemoved(Akonadi::Relation))) == 0) { - return false; - } - Q_EMIT q_ptr->relationRemoved(relation); - return true; + return emitToListeners(&Monitor::relationRemoved, relation); default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; + return false; } - - return false; } bool MonitorPrivate::emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const Akonadi::NotificationSubscriber &subscriber) { if (!subscriber.isValid()) { return false; } switch (msg.operation()) { case Protocol::SubscriptionChangeNotification::Add: - if (q_ptr->receivers(SIGNAL(notificationSubscriberAdded(Akonadi::NotificationSubscriber))) == 0) { - return false; - } - Q_EMIT q_ptr->notificationSubscriberAdded(subscriber); - return true; + return emitToListeners(&Monitor::notificationSubscriberAdded, subscriber); case Protocol::SubscriptionChangeNotification::Modify: - if (q_ptr->receivers(SIGNAL(notificationSubscriberChanged(Akonadi::NotificationSubscriber))) == 0) { - return false; - } - Q_EMIT q_ptr->notificationSubscriberChanged(subscriber); - return true; + return emitToListeners(&Monitor::notificationSubscriberChanged, subscriber); case Protocol::SubscriptionChangeNotification::Remove: - if (q_ptr->receivers(SIGNAL(notificationSubscriberRemoved(Akonadi::NotificationSubscriber))) == 0) { - return false; - } - Q_EMIT q_ptr->notificationSubscriberRemoved(subscriber); - return true; + return emitToListeners(&Monitor::notificationSubscriberRemoved, subscriber); default: - break; + qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in subscription change notification"; + return false; } - - return false; } bool MonitorPrivate::emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf) { Q_UNUSED(msg); if (!ntf.isValid()) { return false; } - if (q_ptr->receivers(SIGNAL(debugNotification(Akonadi::ChangeNotification))) == 0) { - return false; - } - Q_EMIT q_ptr->debugNotification(ntf); - return true; + return emitToListeners(&Monitor::debugNotification, ntf); } void MonitorPrivate::invalidateCaches(const Protocol::ChangeNotificationPtr &msg) { // remove invalidates // modify removes the cache entry, as we need to re-fetch // And subscription modify the visibility of the collection by the collectionFetchScope. switch (msg->type()) { case Protocol::Command::CollectionChangeNotification: { const auto &colNtf = Protocol::cmdCast(msg); switch (colNtf.operation()) { case Protocol::CollectionChangeNotification::Modify: case Protocol::CollectionChangeNotification::Move: case Protocol::CollectionChangeNotification::Subscribe: collectionCache->update(colNtf.collection().id(), mCollectionFetchScope); break; case Protocol::CollectionChangeNotification::Remove: collectionCache->invalidate(colNtf.collection().id()); break; default: break; } } break; case Protocol::Command::ItemChangeNotification: { const auto &itemNtf = Protocol::cmdCast(msg); switch (itemNtf.operation()) { case Protocol::ItemChangeNotification::Modify: case Protocol::ItemChangeNotification::ModifyFlags: case Protocol::ItemChangeNotification::ModifyTags: case Protocol::ItemChangeNotification::ModifyRelations: case Protocol::ItemChangeNotification::Move: itemCache->update(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope); break; case Protocol::ItemChangeNotification::Remove: itemCache->invalidate(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); break; default: break; } } break; case Protocol::Command::TagChangeNotification: { const auto &tagNtf = Protocol::cmdCast(msg); switch (tagNtf.operation()) { case Protocol::TagChangeNotification::Modify: tagCache->update({ tagNtf.tag().id() }, mTagFetchScope); break; case Protocol::TagChangeNotification::Remove: tagCache->invalidate({ tagNtf.tag().id() }); break; default: break; } } break; default: break; } } void MonitorPrivate::invalidateCache(const Collection &col) { collectionCache->update(col.id(), mCollectionFetchScope); } void MonitorPrivate::ref(Collection::Id id) { if (!refCountMap.contains(id)) { refCountMap.insert(id, 0); } ++refCountMap[id]; if (m_buffer.isBuffered(id)) { m_buffer.purge(id); } } Akonadi::Collection::Id MonitorPrivate::deref(Collection::Id id) { Q_ASSERT(refCountMap.contains(id)); if (--refCountMap[id] == 0) { refCountMap.remove(id); return m_buffer.buffer(id); } return -1; } void MonitorPrivate::PurgeBuffer::purge(Collection::Id id) { m_buffer.removeOne(id); } Akonadi::Collection::Id MonitorPrivate::PurgeBuffer::buffer(Collection::Id id) { // Ensure that we don't put a duplicate @p id into the buffer. purge(id); Collection::Id bumpedId = -1; if (m_buffer.size() == MAXBUFFERSIZE) { bumpedId = m_buffer.dequeue(); purge(bumpedId); } m_buffer.enqueue(id); return bumpedId; } int MonitorPrivate::PurgeBuffer::buffersize() { return MAXBUFFERSIZE; } bool MonitorPrivate::isMonitored(Collection::Id colId) const { if (!useRefCounting) { return true; } return refCountMap.contains(colId) || m_buffer.isBuffered(colId); } void MonitorPrivate::notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource) { if (collection > 0 && (monitorAll || isCollectionMonitored(collection) || resources.contains(resource))) { recentlyChangedCollections.insert(collection); if (!statisticsCompressionTimer.isActive()) { statisticsCompressionTimer.start(); } } } Protocol::ModifySubscriptionCommand::ChangeType MonitorPrivate::monitorTypeToProtocol(Monitor::Type type) { switch (type) { case Monitor::Collections: return Protocol::ModifySubscriptionCommand::CollectionChanges; case Monitor::Items: return Protocol::ModifySubscriptionCommand::ItemChanges; case Monitor::Tags: return Protocol::ModifySubscriptionCommand::TagChanges; case Monitor::Relations: return Protocol::ModifySubscriptionCommand::RelationChanges; case Monitor::Subscribers: return Protocol::ModifySubscriptionCommand::SubscriptionChanges; case Monitor::Notifications: return Protocol::ModifySubscriptionCommand::ChangeNotifications; default: Q_ASSERT(false); return Protocol::ModifySubscriptionCommand::NoType; } } +void MonitorPrivate::updateListeners(const QMetaMethod &signal, ListenerAction action) +{ + #define UPDATE_LISTENERS(sig) \ + if (signal == QMetaMethod::fromSignal(sig)) { \ + updateListener(sig, action); \ + return; \ + } + + UPDATE_LISTENERS(&Monitor::itemChanged) + UPDATE_LISTENERS(&Monitor::itemChanged) + UPDATE_LISTENERS(&Monitor::itemsFlagsChanged) + UPDATE_LISTENERS(&Monitor::itemsTagsChanged) + UPDATE_LISTENERS(&Monitor::itemsRelationsChanged) + UPDATE_LISTENERS(&Monitor::itemMoved) + UPDATE_LISTENERS(&Monitor::itemsMoved) + UPDATE_LISTENERS(&Monitor::itemAdded) + UPDATE_LISTENERS(&Monitor::itemRemoved) + UPDATE_LISTENERS(&Monitor::itemsRemoved) + UPDATE_LISTENERS(&Monitor::itemLinked) + UPDATE_LISTENERS(&Monitor::itemsLinked) + UPDATE_LISTENERS(&Monitor::itemUnlinked) + UPDATE_LISTENERS(&Monitor::itemsUnlinked) + UPDATE_LISTENERS(&Monitor::collectionAdded) + + UPDATE_LISTENERS(qOverload(&Monitor::collectionChanged)) + UPDATE_LISTENERS((qOverload &>(&Monitor::collectionChanged))) + UPDATE_LISTENERS(&Monitor::collectionMoved) + UPDATE_LISTENERS(&Monitor::collectionRemoved) + UPDATE_LISTENERS(&Monitor::collectionSubscribed) + UPDATE_LISTENERS(&Monitor::collectionUnsubscribed) + UPDATE_LISTENERS(&Monitor::collectionStatisticsChanged) + + UPDATE_LISTENERS(&Monitor::tagAdded) + UPDATE_LISTENERS(&Monitor::tagChanged) + UPDATE_LISTENERS(&Monitor::tagRemoved) + + UPDATE_LISTENERS(&Monitor::relationAdded) + UPDATE_LISTENERS(&Monitor::relationRemoved) + + UPDATE_LISTENERS(&Monitor::notificationSubscriberAdded) + UPDATE_LISTENERS(&Monitor::notificationSubscriberChanged) + UPDATE_LISTENERS(&Monitor::notificationSubscriberRemoved) + UPDATE_LISTENERS(&Monitor::debugNotification) + +#undef UPDATE_LISTENERS +} // @endcond diff --git a/src/core/monitor_p.h b/src/core/monitor_p.h index e85e882f9..683583c50 100644 --- a/src/core/monitor_p.h +++ b/src/core/monitor_p.h @@ -1,335 +1,414 @@ /* Copyright (c) 2007 Tobias Koenig This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_MONITOR_P_H #define AKONADI_MONITOR_P_H #include "akonadicore_export.h" #include "monitor.h" #include "collection.h" #include "collectionstatisticsjob.h" #include "collectionfetchscope.h" #include "item.h" #include "itemfetchscope.h" #include "tagfetchscope.h" #include "job.h" #include "entitycache_p.h" #include "servermanager.h" #include "changenotificationdependenciesfactory_p.h" #include "connection_p.h" #include "commandbuffer_p.h" #include "private/protocol_p.h" #include #include #include #include #include namespace Akonadi { class Monitor; class ChangeNotification; +// A helper struct to wrap pointer to member function (which cannot be contained +// in a regular ponter) +struct SignalId { + constexpr SignalId() = default; + + using Unit = uint; + static constexpr int Size = sizeof(&Monitor::itemAdded) / sizeof(Unit); + Unit data[sizeof(&Monitor::itemAdded) / sizeof(Unit)] = { 0 }; + + inline bool operator==(const SignalId &other) const { + for (int i = Size - 1; i >= 0; --i) { + if (data[i] != other.data[i]) { + return false; + } + } + return true; + } +}; + +inline uint qHash(const SignalId &sig) +{ + // The 4 LSBs of the address should be enough to give us a good hash + return sig.data[SignalId::Size - 1]; +} + /** * @internal */ class AKONADICORE_EXPORT MonitorPrivate { public: + enum ListenerAction { + AddListener, + RemoveListener + }; + MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent); virtual ~MonitorPrivate(); void init(); Monitor *q_ptr; Q_DECLARE_PUBLIC(Monitor) ChangeNotificationDependenciesFactory *dependenciesFactory = nullptr; QPointer ntfConnection; Collection::List collections; QSet resources; QSet items; QSet tags; QSet types; QSet mimetypes; bool monitorAll; bool exclusive; QList sessions; ItemFetchScope mItemFetchScope; TagFetchScope mTagFetchScope; CollectionFetchScope mCollectionFetchScope; bool mFetchChangedOnly; Session *session = nullptr; CollectionCache *collectionCache = nullptr; ItemListCache *itemCache = nullptr; TagListCache *tagCache = nullptr; QMimeDatabase mimeDatabase; + QHash listeners; CommandBuffer mCommandBuffer; Protocol::ModifySubscriptionCommand::ModifiedParts pendingModificationChanges; Protocol::ModifySubscriptionCommand pendingModification; QTimer *pendingModificationTimer; bool monitorReady; // The waiting list QQueue pendingNotifications; // The messages for which data is currently being fetched QQueue pipeline; // In a pure Monitor, the pipeline contains items that were dequeued from pendingNotifications. // The ordering [pipeline] [pendingNotifications] is kept at all times. // [] [A B C] -> [A B] [C] -> [B] [C] -> [B C] [] -> [C] [] -> [] // In a ChangeRecorder, the pipeline contains one item only, and not dequeued yet. // [] [A B C] -> [A] [A B C] -> [] [A B C] -> (changeProcessed) [] [B C] -> [B] [B C] etc... bool fetchCollection; bool fetchCollectionStatistics; bool collectionMoveTranslationEnabled; // Virtual methods for ChangeRecorder virtual void notificationsEnqueued(int) { } virtual void notificationsErased() { } // Virtual so it can be overridden in FakeMonitor. virtual bool connectToNotificationManager(); void disconnectFromNotificationManager(); void dispatchNotifications(); void flushPipeline(); bool ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg); /** * Sends out the change notification @p msg. * @param msg the change notification to send * @return @c true if the notification was actually send to someone, @c false if no one was listening. */ virtual bool emitNotification(const Protocol::ChangeNotificationPtr &msg); void updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg); void invalidateCaches(const Protocol::ChangeNotificationPtr &msg); /** Used by ResourceBase to inform us about collection changes before the notifications are emitted, needed to avoid the missing RID race on change replay. */ void invalidateCache(const Collection &col); /// Virtual so that ChangeRecorder can set it to 0 and handle the pipeline itself virtual int pipelineSize() const; // private Q_SLOTS void dataAvailable(); void slotSessionDestroyed(QObject *object); void slotStatisticsChangedFinished(KJob *job); void slotFlushRecentlyChangedCollections(); /** Returns whether a message was appended to @p notificationQueue */ int translateAndCompress(QQueue ¬ificationQueue, const Protocol::ChangeNotificationPtr &msg); void handleCommands(); virtual void slotNotify(const Protocol::ChangeNotificationPtr &msg); /** * Sends out a change notification for an item. * @return @c true if the notification was actually send to someone, @c false if no one was listening. */ bool emitItemsNotification(const Protocol::ItemChangeNotification &msg, const Item::List &items = Item::List(), const Collection &collection = Collection(), const Collection &collectionDest = Collection()); /** * Sends out a change notification for a collection. * @return @c true if the notification was actually send to someone, @c false if no one was listening. */ bool emitCollectionNotification(const Protocol::CollectionChangeNotification &msg, const Collection &col = Collection(), const Collection &par = Collection(), const Collection &dest = Collection()); bool emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tags); bool emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation); bool emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const NotificationSubscriber &subscriber); bool emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf); void serverStateChanged(Akonadi::ServerManager::State state); /** * This method is called by the ChangeMediator to enforce an invalidation of the passed collection. */ void invalidateCollectionCache(qint64 collectionId); /** * This method is called by the ChangeMediator to enforce an invalidation of the passed item. */ void invalidateItemCache(qint64 itemId); /** * This method is called by the ChangeMediator to enforce an invalidation of the passed tag. */ void invalidateTagCache(qint64 tagId); void scheduleSubscriptionUpdate(); void slotUpdateSubscription(); + void updateListeners(const QMetaMethod &signal, ListenerAction action); + + template + void updateListener(Signal signal, ListenerAction action) + { + auto it = listeners.find(signalId(signal)); + if (action == AddListener) { + if (it == listeners.end()) { + it = listeners.insert(signalId(signal), 0); + } + ++(*it); + } else { + if (--(*it) == 0) { + listeners.erase(it); + } + } + } + static Protocol::ModifySubscriptionCommand::ChangeType monitorTypeToProtocol(Monitor::Type type); /** @brief Class used to determine when to purge items in a Collection The buffer method can be used to buffer a Collection. This may cause another Collection to be purged if it is removed from the buffer. The purge method is used to purge a Collection from the buffer, but not the model. This is used for example, to not buffer Collections anymore if they get referenced, and to ensure that one Collection does not appear twice in the buffer. Check whether a Collection is buffered using the isBuffered method. */ class AKONADI_TESTS_EXPORT PurgeBuffer { // Buffer the most recent 10 unreferenced Collections static const int MAXBUFFERSIZE = 10; public: explicit PurgeBuffer() { } /** Adds @p id to the Collections to be buffered @returns The collection id which was removed form the buffer or -1 if none. */ Collection::Id buffer(Collection::Id id); /** Removes @p id from the Collections being buffered */ void purge(Collection::Id id); bool isBuffered(Collection::Id id) const { return m_buffer.contains(id); } static int buffersize(); private: QQueue m_buffer; } m_buffer; QHash refCountMap; bool useRefCounting; void ref(Collection::Id id); Collection::Id deref(Collection::Id id); /** * Returns true if the collection is monitored by monitor. * * A collection is always monitored if useRefCounting is false. * If ref counting is used, the collection is only monitored, * if the collection is either in refCountMap or m_buffer. * If ref counting is used and the collection is not in refCountMap or m_buffer, * no updates for the contained items are emitted, because they are lazily ignored. */ bool isMonitored(Collection::Id colId) const; private: // collections that need a statistics update QSet recentlyChangedCollections; QTimer statisticsCompressionTimer; /** @returns True if @p msg should be ignored. Otherwise appropriate signals are emitted for it. */ bool isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion = false) const; /** Sets @p needsSplit to True when @p msg contains more than one item and there's at least one listener that does not support batch operations. Sets @p batchSupported to True when there's at least one listener that supports batch operations. */ void checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const; Protocol::ChangeNotificationList splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const; bool isCollectionMonitored(Collection::Id collection) const { if (collection < 0) { return false; } if (collections.contains(Collection(collection))) { return true; } if (collections.contains(Collection::root())) { return true; } return false; } bool isMimeTypeMonitored(const QString &mimetype) const { if (mimetypes.contains(mimetype)) { return true; } const QMimeType mimeType = mimeDatabase.mimeTypeForName(mimetype); if (!mimeType.isValid()) { return false; } for (const QString &mt : mimetypes) { if (mimeType.inherits(mt)) { return true; } } return false; } template bool isMoveDestinationResourceMonitored(const T &msg) const { if (msg.operation() != T::Move) { return false; } return resources.contains(msg.destinationResource()); } void fetchStatistics(Collection::Id colId) { CollectionStatisticsJob *job = new CollectionStatisticsJob(Collection(colId), session); QObject::connect(job, SIGNAL(result(KJob *)), q_ptr, SLOT(slotStatisticsChangedFinished(KJob *))); } void notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource); bool fetchCollections() const; bool fetchItems() const; + + // A hack to "cast" pointer to member function to something we can easilly + // use as a key in the hashtable + template + constexpr SignalId signalId(Signal signal) const + { + union { + Signal in; + SignalId out; + } h = {signal}; + return h.out; + } + + template + bool hasListeners(Signal signal) const + { + auto it = listeners.find(signalId(signal)); + return it != listeners.end(); + } + + template + bool emitToListeners(Signal signal, Args ... args) + { + Q_Q(Monitor); + if (hasListeners(signal)) { + Q_EMIT (q_ptr->*signal)(std::forward(args) ...); + return true; + } + return false; + } }; } #endif