diff --git a/autotests/libs/tagtest.cpp b/autotests/libs/tagtest.cpp index ec1476d81..935f91fb4 100644 --- a/autotests/libs/tagtest.cpp +++ b/autotests/libs/tagtest.cpp @@ -1,822 +1,823 @@ /* Copyright (c) 2014 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include "test_utils.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace Akonadi; class TagTest : public QObject { Q_OBJECT private Q_SLOTS: void initTestCase(); void testTag(); void testCreateFetch(); void testRID(); void testDelete(); void testDeleteRIDIsolation(); void testModify(); void testModifyFromResource(); void testCreateMerge(); void testAttributes(); void testTagItem(); void testCreateItem(); void testRIDIsolation(); void testFetchTagIdWithItem(); void testFetchFullTagWithItem(); void testModifyItemWithTagByGID(); void testModifyItemWithTagByRID(); void testMonitor(); void testFetchItemsByTag(); }; void TagTest::initTestCase() { AkonadiTest::checkTestIsIsolated(); AkonadiTest::setAllResourcesOffline(); AttributeFactory::registerAttribute(); qRegisterMetaType(); qRegisterMetaType >(); qRegisterMetaType(); // Delete the default Knut tag - it's interfering with this test TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); TagDeleteJob *deleteJob = new TagDeleteJob(fetchJob->tags().first(), this); AKVERIFYEXEC(deleteJob); } void TagTest::testTag() { Tag tag1; Tag tag2; // Invalid tags are equal QVERIFY(tag1 == tag2); // Invalid tags with different GIDs are not equal tag1.setGid("GID1"); QVERIFY(tag1 != tag2); tag2.setGid("GID2"); QVERIFY(tag1 != tag2); // Invalid tags with equal GIDs are equal tag1.setGid("GID2"); QVERIFY(tag1 == tag2); // Valid tags with different IDs are not equal tag1 = Tag(1); tag2 = Tag(2); QVERIFY(tag1 != tag2); // Valid tags with different IDs and equal GIDs are still not equal tag1.setGid("GID1"); tag2.setGid("GID1"); QVERIFY(tag1 != tag2); // Valid tags with equal ID are equal regardless of GIDs tag2 = Tag(1); tag2.setGid("GID2"); QVERIFY(tag1 == tag2); } void TagTest::testCreateFetch() { Tag tag; tag.setGid("gid"); tag.setType("mytype"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QCOMPARE(fetchJob->tags().first().gid(), QByteArray("gid")); QCOMPARE(fetchJob->tags().first().type(), QByteArray("mytype")); TagDeleteJob *deleteJob = new TagDeleteJob(fetchJob->tags().first(), this); AKVERIFYEXEC(deleteJob); } { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 0); } } void TagTest::testRID() { { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); } Tag tag; tag.setGid("gid"); tag.setType("mytype"); tag.setRemoteId("rid"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QCOMPARE(fetchJob->tags().first().gid(), QByteArray("gid")); QCOMPARE(fetchJob->tags().first().type(), QByteArray("mytype")); QCOMPARE(fetchJob->tags().first().remoteId(), QByteArray("rid")); TagDeleteJob *deleteJob = new TagDeleteJob(fetchJob->tags().first(), this); AKVERIFYEXEC(deleteJob); } { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("")); AKVERIFYEXEC(select); } } void TagTest::testRIDIsolation() { { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); } Tag tag; tag.setGid("gid"); tag.setType("mytype"); tag.setRemoteId("rid_0"); TagCreateJob *createJob = new TagCreateJob(tag, this); AKVERIFYEXEC(createJob); QVERIFY(createJob->tag().isValid()); qint64 tagId; { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); Q_FOREACH (const Tag &tag, fetchJob->tags()) { qDebug() << tag.gid(); } QCOMPARE(fetchJob->tags().count(), 1); QCOMPARE(fetchJob->tags().first().gid(), QByteArray("gid")); QCOMPARE(fetchJob->tags().first().type(), QByteArray("mytype")); QCOMPARE(fetchJob->tags().first().remoteId(), QByteArray("rid_0")); tagId = fetchJob->tags().first().id(); } { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_1")); AKVERIFYEXEC(select); } tag.setRemoteId("rid_1"); createJob = new TagCreateJob(tag, this); createJob->setMergeIfExisting(true); AKVERIFYEXEC(createJob); QVERIFY(createJob->tag().isValid()); { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().count(), 1); QCOMPARE(fetchJob->tags().first().gid(), QByteArray("gid")); QCOMPARE(fetchJob->tags().first().type(), QByteArray("mytype")); QCOMPARE(fetchJob->tags().first().remoteId(), QByteArray("rid_1")); QCOMPARE(fetchJob->tags().first().id(), tagId); } TagDeleteJob *deleteJob = new TagDeleteJob(Tag(tagId), this); AKVERIFYEXEC(deleteJob); { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("")); AKVERIFYEXEC(select); } } void TagTest::testDelete() { Akonadi::Monitor monitor; monitor.setTypeMonitored(Monitor::Tags); QSignalSpy spy(&monitor, SIGNAL(tagRemoved(Akonadi::Tag))); Tag tag1; { tag1.setGid("tag1"); TagCreateJob *createjob = new TagCreateJob(tag1, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag1 = createjob->tag(); } Tag tag2; { tag2.setGid("tag2"); TagCreateJob *createjob = new TagCreateJob(tag2, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag2 = createjob->tag(); } { TagDeleteJob *deleteJob = new TagDeleteJob(tag1, this); AKVERIFYEXEC(deleteJob); } { TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QCOMPARE(fetchJob->tags().first().gid(), tag2.gid()); } { TagDeleteJob *deleteJob = new TagDeleteJob(tag2, this); AKVERIFYEXEC(deleteJob); } // Collect Remove notification, so that they don't interfere with testDeleteRIDIsolation QTRY_VERIFY(!spy.isEmpty()); } void TagTest::testDeleteRIDIsolation() { Tag tag; tag.setGid("gid"); tag.setType("mytype"); tag.setRemoteId("rid_0"); { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); TagCreateJob *createJob = new TagCreateJob(tag, this); AKVERIFYEXEC(createJob); QVERIFY(createJob->tag().isValid()); tag.setId(createJob->tag().id()); } tag.setRemoteId("rid_1"); { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_1")); AKVERIFYEXEC(select); TagCreateJob *createJob = new TagCreateJob(tag, this); createJob->setMergeIfExisting(true); AKVERIFYEXEC(createJob); QVERIFY(createJob->tag().isValid()); } Akonadi::Monitor monitor; monitor.setTypeMonitored(Akonadi::Monitor::Tags); QSignalSpy signalSpy(&monitor, SIGNAL(tagRemoved(Akonadi::Tag))); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); // Other tests notifications might interfere due to notification compression on server QTRY_VERIFY(signalSpy.count() >= 1); Tag removedTag; while (!signalSpy.isEmpty()) { const Tag t = signalSpy.takeFirst().takeFirst().value(); if (t.id() == tag.id()) { removedTag = t; break; } } QVERIFY(removedTag.isValid()); QVERIFY(removedTag.remoteId().isEmpty()); { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral(""), this); AKVERIFYEXEC(select); } } void TagTest::testModify() { Tag tag; { tag.setGid("gid"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag = createjob->tag(); } //We can add an attribute { Akonadi::TagAttribute *attr = tag.attribute(Tag::AddIfMissing); attr->setDisplayName(QStringLiteral("display name")); tag.addAttribute(attr); tag.setParent(Tag(0)); tag.setType("mytype"); TagModifyJob *modJob = new TagModifyJob(tag, this); AKVERIFYEXEC(modJob); TagFetchJob *fetchJob = new TagFetchJob(this); fetchJob->fetchScope().fetchAttribute(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QVERIFY(fetchJob->tags().first().hasAttribute()); } //We can update an attribute { Akonadi::TagAttribute *attr = tag.attribute(Tag::AddIfMissing); attr->setDisplayName(QStringLiteral("display name2")); TagModifyJob *modJob = new TagModifyJob(tag, this); AKVERIFYEXEC(modJob); TagFetchJob *fetchJob = new TagFetchJob(this); fetchJob->fetchScope().fetchAttribute(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QVERIFY(fetchJob->tags().first().hasAttribute()); QCOMPARE(fetchJob->tags().first().attribute()->displayName(), attr->displayName()); } //We can clear an attribute { tag.removeAttribute(); TagModifyJob *modJob = new TagModifyJob(tag, this); AKVERIFYEXEC(modJob); TagFetchJob *fetchJob = new TagFetchJob(this); fetchJob->fetchScope().fetchAttribute(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QVERIFY(!fetchJob->tags().first().hasAttribute()); } TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testModifyFromResource() { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); Tag tag; { tag.setGid("gid"); tag.setRemoteId("rid"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag = createjob->tag(); } { tag.setRemoteId(QByteArray("")); TagModifyJob *modJob = new TagModifyJob(tag, this); AKVERIFYEXEC(modJob); // The tag is removed on the server, because we just removed the last // RemoteID TagFetchJob *fetchJob = new TagFetchJob(this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 0); } } void TagTest::testCreateMerge() { Tag tag; { tag.setGid("gid"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag = createjob->tag(); } { Tag tag2; tag2.setGid("gid"); TagCreateJob *createjob = new TagCreateJob(tag2, this); createjob->setMergeIfExisting(true); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); QCOMPARE(createjob->tag().id(), tag.id()); } TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testAttributes() { Tag tag; { tag.setGid("gid2"); TagAttribute *attr = tag.attribute(Tag::AddIfMissing); attr->setDisplayName(QStringLiteral("name")); attr->setInToolbar(true); tag.addAttribute(attr); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag = createjob->tag(); { TagFetchJob *fetchJob = new TagFetchJob(createjob->tag(), this); fetchJob->fetchScope().fetchAttribute(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 1); QVERIFY(fetchJob->tags().first().hasAttribute()); //we need to clone because the returned attribute is just a reference and destroyed on the next line //FIXME we should find a better solution for this (like returning a smart pointer or value object) QScopedPointer tagAttr(fetchJob->tags().first().attribute()->clone()); QVERIFY(tagAttr); QCOMPARE(tagAttr->displayName(), QStringLiteral("name")); QCOMPARE(tagAttr->inToolbar(), true); } } //Try fetching multiple items Tag tag2; { tag2.setGid("gid22"); TagAttribute *attr = tag.attribute(Tag::AddIfMissing)->clone(); attr->setDisplayName(QStringLiteral("name2")); attr->setInToolbar(true); tag2.addAttribute(attr); TagCreateJob *createjob = new TagCreateJob(tag2, this); AKVERIFYEXEC(createjob); QVERIFY(createjob->tag().isValid()); tag2 = createjob->tag(); { TagFetchJob *fetchJob = new TagFetchJob(Tag::List() << tag << tag2, this); fetchJob->fetchScope().fetchAttribute(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->tags().size(), 2); QVERIFY(fetchJob->tags().at(0).hasAttribute()); QVERIFY(fetchJob->tags().at(1).hasAttribute()); } } TagDeleteJob *deleteJob = new TagDeleteJob(Tag::List() << tag << tag2, this); AKVERIFYEXEC(deleteJob); } void TagTest::testTagItem() { Akonadi::Monitor monitor; monitor.itemFetchScope().setFetchTags(true); monitor.setAllMonitored(true); const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag; { TagCreateJob *createjob = new TagCreateJob(Tag(QStringLiteral("gid1")), this); AKVERIFYEXEC(createjob); tag = createjob->tag(); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); } item1.setTag(tag); QSignalSpy tagsSpy(&monitor, SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))); QVERIFY(tagsSpy.isValid()); ItemModifyJob *modJob = new ItemModifyJob(item1, this); AKVERIFYEXEC(modJob); QTRY_VERIFY(tagsSpy.count() >= 1); QTRY_COMPARE(tagsSpy.last().first().value().first().id(), item1.id()); QTRY_COMPARE(tagsSpy.last().at(1).value< QSet >().size(), 1); //1 added tag ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testCreateItem() { // Akonadi::Monitor monitor; // monitor.itemFetchScope().setFetchTags(true); // monitor.setAllMonitored(true); const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag; { TagCreateJob *createjob = new TagCreateJob(Tag(QStringLiteral("gid1")), this); AKVERIFYEXEC(createjob); tag = createjob->tag(); } // QSignalSpy tagsSpy(&monitor, SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))); // QVERIFY(tagsSpy.isValid()); Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); item1.setTag(tag); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); } // QTRY_VERIFY(tagsSpy.count() >= 1); // QTest::qWait(10); // kDebug() << tagsSpy.count(); // QTRY_COMPARE(tagsSpy.last().first().value().first().id(), item1.id()); // QTRY_COMPARE(tagsSpy.last().at(1).value< QSet >().size(), 1); //1 added tag ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testFetchTagIdWithItem() { const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag; { TagCreateJob *createjob = new TagCreateJob(Tag(QStringLiteral("gid1")), this); AKVERIFYEXEC(createjob); tag = createjob->tag(); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); item1.setTag(tag); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); } ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); fetchJob->fetchScope().tagFetchScope().setFetchIdOnly(true); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); Tag t = fetchJob->items().first().tags().first(); QCOMPARE(t.id(), tag.id()); QVERIFY(t.gid().isEmpty()); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testFetchFullTagWithItem() { const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag; { TagCreateJob *createjob = new TagCreateJob(Tag(QStringLiteral("gid1")), this); AKVERIFYEXEC(createjob); tag = createjob->tag(); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); //FIXME This should also be possible with create, but isn't item1.setTag(tag); } ItemModifyJob *modJob = new ItemModifyJob(item1, this); AKVERIFYEXEC(modJob); ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); fetchJob->fetchScope().tagFetchScope().setFetchIdOnly(false); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); Tag t = fetchJob->items().first().tags().first(); QCOMPARE(t, tag); QVERIFY(!t.gid().isEmpty()); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } void TagTest::testModifyItemWithTagByGID() { const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); { Tag tag; tag.setGid("gid2"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); } Tag tag; tag.setGid("gid2"); item1.setTag(tag); ItemModifyJob *modJob = new ItemModifyJob(item1, this); AKVERIFYEXEC(modJob); ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); TagDeleteJob *deleteJob = new TagDeleteJob(fetchJob->items().first().tags().first(), this); AKVERIFYEXEC(deleteJob); } void TagTest::testModifyItemWithTagByRID() { { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); } const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag3; { tag3.setGid("gid3"); tag3.setRemoteId("rid3"); TagCreateJob *createjob = new TagCreateJob(tag3, this); AKVERIFYEXEC(createjob); tag3 = createjob->tag(); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); } Tag tag; tag.setRemoteId("rid2"); item1.setTag(tag); ItemModifyJob *modJob = new ItemModifyJob(item1, this); AKVERIFYEXEC(modJob); ItemFetchJob *fetchJob = new ItemFetchJob(item1, this); fetchJob->fetchScope().setFetchTags(true); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().first().tags().size(), 1); { TagDeleteJob *deleteJob = new TagDeleteJob(fetchJob->items().first().tags().first(), this); AKVERIFYEXEC(deleteJob); } { TagDeleteJob *deleteJob = new TagDeleteJob(tag3, this); AKVERIFYEXEC(deleteJob); } { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("")); AKVERIFYEXEC(select); } } void TagTest::testMonitor() { Akonadi::Monitor monitor; monitor.setTypeMonitored(Akonadi::Monitor::Tags); monitor.tagFetchScope().fetchAttribute(); + QTest::qWait(10); // give Monitor time to upload settings Akonadi::Tag createdTag; { QSignalSpy addedSpy(&monitor, SIGNAL(tagAdded(Akonadi::Tag))); QVERIFY(addedSpy.isValid()); Tag tag; tag.setGid("gid2"); tag.setName(QStringLiteral("name2")); tag.setType("type2"); TagCreateJob *createjob = new TagCreateJob(tag, this); AKVERIFYEXEC(createjob); createdTag = createjob->tag(); //We usually pick up signals from the previous tests as well (due to server-side notification caching) QTRY_VERIFY(addedSpy.count() >= 1); QTRY_COMPARE(addedSpy.last().first().value().id(), createdTag.id()); QVERIFY(addedSpy.last().first().value().hasAttribute()); } { QSignalSpy modifedSpy(&monitor, SIGNAL(tagChanged(Akonadi::Tag))); QVERIFY(modifedSpy.isValid()); createdTag.setName(QStringLiteral("name3")); TagModifyJob *modJob = new TagModifyJob(createdTag, this); AKVERIFYEXEC(modJob); //We usually pick up signals from the previous tests as well (due to server-side notification caching) QTRY_VERIFY(modifedSpy.count() >= 1); QTRY_COMPARE(modifedSpy.last().first().value().id(), createdTag.id()); QVERIFY(modifedSpy.last().first().value().hasAttribute()); } { QSignalSpy removedSpy(&monitor, SIGNAL(tagRemoved(Akonadi::Tag))); QVERIFY(removedSpy.isValid()); TagDeleteJob *deletejob = new TagDeleteJob(createdTag, this); AKVERIFYEXEC(deletejob); QTRY_VERIFY(removedSpy.count() >= 1); QTRY_COMPARE(removedSpy.last().first().value().id(), createdTag.id()); } } void TagTest::testFetchItemsByTag() { const Collection res3 = Collection(collectionIdFromPath(QStringLiteral("res3"))); Tag tag; { TagCreateJob *createjob = new TagCreateJob(Tag(QStringLiteral("gid1")), this); AKVERIFYEXEC(createjob); tag = createjob->tag(); } Item item1; { item1.setMimeType(QStringLiteral("application/octet-stream")); ItemCreateJob *append = new ItemCreateJob(item1, res3, this); AKVERIFYEXEC(append); item1 = append->item(); //FIXME This should also be possible with create, but isn't item1.setTag(tag); } ItemModifyJob *modJob = new ItemModifyJob(item1, this); AKVERIFYEXEC(modJob); ItemFetchJob *fetchJob = new ItemFetchJob(tag, this); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().size(), 1); Item i = fetchJob->items().first(); QCOMPARE(i, item1); TagDeleteJob *deleteJob = new TagDeleteJob(tag, this); AKVERIFYEXEC(deleteJob); } #include "tagtest.moc" QTEST_AKONADIMAIN(TagTest) diff --git a/autotests/server/notificationmanagertest.cpp b/autotests/server/notificationmanagertest.cpp index 6595444b0..678f52700 100644 --- a/autotests/server/notificationmanagertest.cpp +++ b/autotests/server/notificationmanagertest.cpp @@ -1,385 +1,389 @@ /* Copyright (c) 2013 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include "entities.h" #include "notificationmanager.h" #include "notificationsubscriber.h" #include #include using namespace Akonadi; using namespace Akonadi::Server; Q_DECLARE_METATYPE(QVector) class TestableNotificationSubscriber : public NotificationSubscriber { public: TestableNotificationSubscriber() : NotificationSubscriber() { mSubscriber = "TestSubscriber"; } void setAllMonitored(bool allMonitored) { mAllMonitored = allMonitored; } void setMonitoredCollection(qint64 collection, bool monitored) { if (monitored) { mMonitoredCollections.insert(collection); } else { mMonitoredCollections.remove(collection); } } void setMonitoredItem(qint64 item, bool monitored) { if (monitored) { mMonitoredItems.insert(item); } else { mMonitoredItems.remove(item); } } void setMonitoredResource(const QByteArray &resource, bool monitored) { if (monitored) { mMonitoredResources.insert(resource); } else { mMonitoredResources.remove(resource); } } void setMonitoredMimeType(const QString &mimeType, bool monitored) { if (monitored) { mMonitoredMimeTypes.insert(mimeType); } else { mMonitoredMimeTypes.remove(mimeType); } } void setIgnoredSession(const QByteArray &session, bool ignored) { if (ignored) { mIgnoredSessions.insert(session); } else { mIgnoredSessions.remove(session); } } void writeNotification(const Protocol::ChangeNotificationPtr ¬ification) override { emittedNotifications << notification; } Protocol::ChangeNotificationList emittedNotifications; }; class NotificationManagerTest : public QObject { Q_OBJECT typedef QList NSList; private Q_SLOTS: void testSourceFilter_data() { qRegisterMetaType(); QTest::addColumn("allMonitored"); QTest::addColumn >("monitoredCollections"); QTest::addColumn >("monitoredItems"); QTest::addColumn >("monitoredResources"); QTest::addColumn >("monitoredMimeTypes"); QTest::addColumn >("ignoredSessions"); QTest::addColumn("notification"); QTest::addColumn("accepted"); #define EmptyList(T) (QVector()) #define List(T,x) (QVector() << x) auto itemMsg = Protocol::ItemChangeNotificationPtr::create(); itemMsg->setOperation(Protocol::ItemChangeNotification::Add); itemMsg->setParentCollection(1); QTest::newRow("monitorAll vs notification without items") << true << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << EmptyList(QByteArray) << itemMsg.staticCast() << false; itemMsg = Protocol::ItemChangeNotificationPtr::create(*itemMsg); itemMsg->setItems({ { 1, QString(), QString(), QStringLiteral("message/rfc822") } }); QTest::newRow("monitorAll vs notification with one item") << true << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << EmptyList(QByteArray) << itemMsg.staticCast() << true; QTest::newRow("item monitored but different mimetype") << false << EmptyList(Entity::Id) << List(Entity::Id, 1 << 2) << EmptyList(QByteArray) << List(QString, QStringLiteral("random/mimetype")) << EmptyList(QByteArray) << Protocol::ItemChangeNotificationPtr::create(*itemMsg).staticCast() << false; QTest::newRow("item not monitored, but mimetype matches") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << List(QString, QStringLiteral("message/rfc822")) << EmptyList(QByteArray) << Protocol::ItemChangeNotificationPtr::create(*itemMsg).staticCast() << true; itemMsg = Protocol::ItemChangeNotificationPtr::create(*itemMsg); itemMsg->setSessionId("testSession"); QTest::newRow("item monitored but session ignored") << false << EmptyList(Entity::Id) << List(Entity::Id, 1) << EmptyList(QByteArray) << EmptyList(QString) << List(QByteArray, "testSession") << itemMsg.staticCast() << false; // Simulate adding a new resource auto colMsg = Protocol::CollectionChangeNotificationPtr::create(); colMsg->setOperation(Protocol::CollectionChangeNotification::Add); auto col = Protocol::FetchCollectionsResponsePtr::create(); col->setId(1); col->setRemoteId(QStringLiteral("imap://user@some.domain/")); colMsg->setCollection(col); colMsg->setParentCollection(0); colMsg->setSessionId("akonadi_imap_resource_0"); colMsg->setResource("akonadi_imap_resource_0"); QTest::newRow("new root collection in non-monitored resource") << false << List(Entity::Id, 0) << EmptyList(Entity::Id) << List(QByteArray, "akonadi_search_resource") << List(QString, QStringLiteral("message/rfc822")) << EmptyList(QByteArray) << colMsg.staticCast() << true; itemMsg = Protocol::ItemChangeNotificationPtr::create(); itemMsg->setOperation(Protocol::ItemChangeNotification::Move); itemMsg->setResource("akonadi_resource_1"); itemMsg->setDestinationResource("akonadi_resource_2"); itemMsg->setParentCollection(1); itemMsg->setParentDestCollection(2); itemMsg->setSessionId("kmail"); itemMsg->setItems({ { 10, QStringLiteral("123"), QStringLiteral("1"), QStringLiteral("message/rfc822") } }); QTest::newRow("inter-resource move, source source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << List(QByteArray, "akonadi_resource_1") << List(QString, QStringLiteral("message/rfc822")) << List(QByteArray, "akonadi_resource_1") << itemMsg.staticCast() << true; QTest::newRow("inter-resource move, destination source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << List(QByteArray, "akonadi_resource_2") << List(QString, QStringLiteral("message/rfc822")) << List(QByteArray, "akonadi_resource_2") << itemMsg.staticCast() << true; QTest::newRow("inter-resource move, uninterested party") << false << List(Entity::Id, 12) << EmptyList(Entity::Id) << EmptyList(QByteArray) << List(QString, QStringLiteral("inode/directory")) << EmptyList(QByteArray) << itemMsg.staticCast() << false; itemMsg = Protocol::ItemChangeNotificationPtr::create(); itemMsg->setOperation(Protocol::ItemChangeNotification::Move); itemMsg->setResource("akonadi_resource_0"); itemMsg->setDestinationResource("akonadi_resource_0"); itemMsg->setParentCollection(1); itemMsg->setParentDestCollection(2); itemMsg->setSessionId("kmail"); itemMsg->setItems({ { 10, QStringLiteral("123"), QStringLiteral("1"), QStringLiteral("message/rfc822") }, { 11, QStringLiteral("456"), QStringLiteral("1"), QStringLiteral("message/rfc822") } }); QTest::newRow("intra-resource move, owning resource") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << List(QByteArray, "akonadi_imap_resource_0") << List(QString, QStringLiteral("message/rfc822")) << List(QByteArray, "akonadi_imap_resource_0") << itemMsg.staticCast() << true; colMsg = Protocol::CollectionChangeNotificationPtr::create(); colMsg->setOperation(Protocol::CollectionChangeNotification::Add); colMsg->setSessionId("kmail"); colMsg->setResource("akonadi_resource_1"); colMsg->setParentCollection(1); QTest::newRow("new subfolder") << false << List(Entity::Id, 0) << EmptyList(Entity::Id) << EmptyList(QByteArray) << List(QString, QStringLiteral("message/rfc822")) << EmptyList(QByteArray) << colMsg.staticCast() << false; itemMsg = Protocol::ItemChangeNotificationPtr::create(); itemMsg->setOperation(Protocol::ItemChangeNotification::Add); itemMsg->setSessionId("randomSession"); itemMsg->setResource("randomResource"); itemMsg->setParentCollection(1); itemMsg->setItems({ { 10, QString(), QString(), QStringLiteral("message/rfc822") } }); QTest::newRow("new mail for mailfilter or maildispatcher") << false << List(Entity::Id, 0) << EmptyList(Entity::Id) << EmptyList(QByteArray) << List(QString, QStringLiteral("message/rfc822")) << EmptyList(QByteArray) << itemMsg.staticCast() << true; auto tagMsg = Protocol::TagChangeNotificationPtr::create(); tagMsg->setOperation(Protocol::TagChangeNotification::Remove); tagMsg->setSessionId("randomSession"); tagMsg->setResource("akonadi_random_resource_0"); - tagMsg->setId(1); - tagMsg->setRemoteId(QStringLiteral("TAG")); + auto tagMsgTag = Protocol::FetchTagsResponsePtr::create(); + tagMsgTag->setId(1); + tagMsgTag->setRemoteId("TAG"); + tagMsg->setTag(tagMsgTag); QTest::newRow("Tag removal - resource notification - matching resource source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << List(QByteArray, "akonadi_random_resource_0") << tagMsg.staticCast() << true; QTest::newRow("Tag removal - resource notification - wrong resource source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << List(QByteArray, "akonadi_another_resource_1") << tagMsg.staticCast() << false; tagMsg = Protocol::TagChangeNotificationPtr::create(); tagMsg->setOperation(Protocol::TagChangeNotification::Remove); tagMsg->setSessionId("randomSession"); - tagMsg->setId(1); - tagMsg->setRemoteId(QStringLiteral("TAG")); + tagMsgTag = Protocol::FetchTagsResponsePtr::create(); + tagMsgTag->setId(1); + tagMsgTag->setRemoteId("TAG"); + tagMsg->setTag(tagMsgTag); QTest::newRow("Tag removal - client notification - client source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << EmptyList(QByteArray) << tagMsg.staticCast() << true; QTest::newRow("Tag removal - client notification - resource source") << false << EmptyList(Entity::Id) << EmptyList(Entity::Id) << EmptyList(QByteArray) << EmptyList(QString) << List( QByteArray, "akonadi_some_resource_0" ) << tagMsg.staticCast() << false; } void testSourceFilter() { QFETCH(bool, allMonitored); QFETCH(QVector, monitoredCollections); QFETCH(QVector, monitoredItems); QFETCH(QVector, monitoredResources); QFETCH(QVector, monitoredMimeTypes); QFETCH(QVector, ignoredSessions); QFETCH(Protocol::ChangeNotificationPtr, notification); QFETCH(bool, accepted); TestableNotificationSubscriber subscriber; subscriber.setAllMonitored(allMonitored); Q_FOREACH (Entity::Id id, monitoredCollections) { subscriber.setMonitoredCollection(id, true); } Q_FOREACH (Entity::Id id, monitoredItems) { subscriber.setMonitoredItem(id, true); } Q_FOREACH (const QByteArray &res, monitoredResources) { subscriber.setMonitoredResource(res, true); } Q_FOREACH (const QString &mimeType, monitoredMimeTypes) { subscriber.setMonitoredMimeType(mimeType, true); } Q_FOREACH (const QByteArray &session, ignoredSessions) { subscriber.setIgnoredSession(session, true); } subscriber.notify({ notification }); QTRY_COMPARE(subscriber.emittedNotifications.count(), accepted ? 1 : 0); if (accepted) { const Protocol::ChangeNotificationPtr ntf = subscriber.emittedNotifications.at(0); QVERIFY(ntf->isValid()); } } }; AKTEST_MAIN(NotificationManagerTest) #include "notificationmanagertest.moc" diff --git a/autotests/server/taghandlertest.cpp b/autotests/server/taghandlertest.cpp index 333aad925..f18204ae5 100644 --- a/autotests/server/taghandlertest.cpp +++ b/autotests/server/taghandlertest.cpp @@ -1,465 +1,483 @@ /* Copyright (c) 2014 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include "fakeakonadiserver.h" #include "aktest.h" #include "entities.h" #include "dbinitializer.h" #include #include using namespace Akonadi; using namespace Akonadi::Server; typedef QPair TagTagAttributeListPair; Q_DECLARE_METATYPE(Akonadi::Server::Tag::List) Q_DECLARE_METATYPE(Akonadi::Server::Tag) Q_DECLARE_METATYPE(QVector) static Protocol::ChangeNotificationList extractNotifications(const QSharedPointer ¬ificationSpy) { Protocol::ChangeNotificationList receivedNotifications; for (int q = 0; q < notificationSpy->size(); q++) { //Only one notify call if (notificationSpy->at(q).count() != 1) { qWarning() << "Error: We're assuming only one notify call."; return Protocol::ChangeNotificationList(); } const auto n = notificationSpy->at(q).first().value(); for (int i = 0; i < n.size(); i++) { // qDebug() << n.at(i); receivedNotifications.append(n.at(i)); } } return receivedNotifications; } class TagHandlerTest : public QObject { Q_OBJECT public: TagHandlerTest() : QObject() { qRegisterMetaType(); try { FakeAkonadiServer::instance()->setPopulateDb(false); FakeAkonadiServer::instance()->init(); } catch (const FakeAkonadiServerException &e) { qWarning() << "Server exception: " << e.what(); qFatal("Fake Akonadi Server failed to start up, aborting test"); } } ~TagHandlerTest() { FakeAkonadiServer::instance()->quit(); } Protocol::FetchTagsResponsePtr createResponse(const Tag &tag, const QByteArray &remoteId = QByteArray(), const Protocol::Attributes &attrs = Protocol::Attributes()) { auto resp = Protocol::FetchTagsResponsePtr::create(tag.id()); resp->setGid(tag.gid().toUtf8()); resp->setParentId(tag.parentId()); resp->setType(tag.tagType().name().toUtf8()); resp->setRemoteId(remoteId); resp->setAttributes(attrs); return resp; } QScopedPointer initializer; private Q_SLOTS: void testStoreTag_data() { initializer.reset(new DbInitializer); Resource res = initializer->createResource("testresource"); // Make sure the type exists TagType type = type.retrieveByName(QStringLiteral("PLAIN")); if (!type.isValid()) { type.setName(QStringLiteral("PLAIN")); type.insert(); } QTest::addColumn("scenarios"); QTest::addColumn>>("expectedTags"); QTest::addColumn("expectedNotifications"); { auto cmd = Protocol::CreateTagCommandPtr::create(); cmd->setGid("tag"); cmd->setParentId(0); cmd->setType("PLAIN"); cmd->setAttributes({ { "TAG", "(\\\"tag2\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")" } }); auto resp = Protocol::FetchTagsResponsePtr::create(1); resp->setGid(cmd->gid()); resp->setParentId(cmd->parentId()); resp->setType(cmd->type()); resp->setAttributes(cmd->attributes()); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, resp) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::CreateTagResponsePtr::create()); Tag tag; tag.setId(1); tag.setTagType(type); tag.setParentId(0); TagAttribute attribute; attribute.setTagId(1); attribute.setType("TAG"); attribute.setValue("(\\\"tag2\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")"); auto notification = Protocol::TagChangeNotificationPtr::create(); notification->setOperation(Protocol::TagChangeNotification::Add); notification->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); - notification->setId(1); + auto ntfTag = Protocol::FetchTagsResponsePtr::create(); + ntfTag->setId(1); + notification->setTag(ntfTag); QTest::newRow("uid create relation") << scenarios << QVector{ { tag, { attribute } } } << Protocol::ChangeNotificationList{ notification }; } { auto cmd = Protocol::CreateTagCommandPtr::create(); cmd->setGid("tag2"); cmd->setParentId(1); cmd->setType("PLAIN"); cmd->setAttributes({ { "TAG", "(\\\"tag3\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")" } }); auto resp = Protocol::FetchTagsResponsePtr::create(2); resp->setGid(cmd->gid()); resp->setParentId(cmd->parentId()); resp->setType(cmd->type()); resp->setAttributes(cmd->attributes()); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, resp) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::CreateTagResponsePtr::create()); Tag tag; tag.setId(2); tag.setTagType(type); tag.setParentId(1); TagAttribute attribute; attribute.setTagId(2); attribute.setType("TAG"); attribute.setValue("(\\\"tag3\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")"); auto notification = Protocol::TagChangeNotificationPtr::create(); notification->setOperation(Protocol::TagChangeNotification::Add); notification->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); - notification->setId(2); + auto ntfTag = Protocol::FetchTagsResponsePtr::create(); + ntfTag->setId(2); + notification->setTag(ntfTag); QTest::newRow("create child tag") << scenarios << QVector{ { tag, { attribute } } } << Protocol::ChangeNotificationList{ notification }; } } void testStoreTag() { QFETCH(TestScenario::List, scenarios); QFETCH(QVector, expectedTags); QFETCH(Protocol::ChangeNotificationList, expectedNotifications); FakeAkonadiServer::instance()->setScenarios(scenarios); FakeAkonadiServer::instance()->runTest(); const auto receivedNotifications = extractNotifications(FakeAkonadiServer::instance()->notificationSpy()); QVariantList ids; QCOMPARE(receivedNotifications.size(), expectedNotifications.count()); for (int i = 0; i < expectedNotifications.size(); i++) { QCOMPARE(*receivedNotifications.at(i), *expectedNotifications.at(i)); - ids << Protocol::cmdCast(receivedNotifications.at(i)).id(); + ids << Protocol::cmdCast(receivedNotifications.at(i)).tag()->id(); } SelectQueryBuilder qb; qb.addValueCondition(Tag::idColumn(), Query::In, ids); QVERIFY(qb.exec()); const Tag::List tags = qb.result(); QCOMPARE(tags.size(), expectedTags.size()); for (int i = 0; i < tags.size(); i++) { const Tag actual = tags.at(i); const Tag expected = expectedTags.at(i).first; const TagAttribute::List expectedAttrs = expectedTags.at(i).second; QCOMPARE(actual.id(), expected.id()); QCOMPARE(actual.typeId(), expected.typeId()); QCOMPARE(actual.parentId(), expected.parentId()); TagAttribute::List attributes = TagAttribute::retrieveFiltered( TagAttribute::tagIdColumn(), tags.at(i).id()); QCOMPARE(attributes.size(), expectedAttrs.size()); for (int j = 0; j < attributes.size(); ++j) { const TagAttribute actualAttr = attributes.at(i); const TagAttribute expectedAttr = expectedAttrs.at(i); QCOMPARE(actualAttr.tagId(), expectedAttr.tagId()); QCOMPARE(actualAttr.type(), expectedAttr.type()); QCOMPARE(actualAttr.value(), expectedAttr.value()); } } } void testModifyTag_data() { initializer.reset(new DbInitializer); Resource res = initializer->createResource("testresource"); Resource res2 = initializer->createResource("testresource2"); Collection col = initializer->createCollection("Col 1"); PimItem pimItem = initializer->createItem("Item 1", col); Tag tag; TagType type; type.setName(QStringLiteral("PLAIN")); type.insert(); tag.setTagType(type); tag.setGid(QStringLiteral("gid")); tag.insert(); pimItem.addTag(tag); TagRemoteIdResourceRelation rel; rel.setRemoteId(QStringLiteral("TAG1RES2RID")); rel.setResource(res2); rel.setTag(tag); rel.insert(); QTest::addColumn("scenarios"); QTest::addColumn("expectedTags"); QTest::addColumn("expectedNotifications"); { auto cmd = Protocol::ModifyTagCommandPtr::create(tag.id()); cmd->setAttributes({ { "TAG", "(\\\"tag2\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")" } }); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, createResponse(tag, QByteArray(), cmd->attributes())) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::ModifyTagResponsePtr::create()); auto notification = Protocol::TagChangeNotificationPtr::create(); notification->setOperation(Protocol::TagChangeNotification::Modify); notification->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); - notification->setId(tag.id()); + auto ntfTag = Protocol::FetchTagsResponsePtr::create(); + ntfTag->setId(tag.id()); + notification->setTag(ntfTag); QTest::newRow("uid store name") << scenarios << (Tag::List() << tag) << (Protocol::ChangeNotificationList() << notification); } { auto cmd = Protocol::ModifyTagCommandPtr::create(tag.id()); cmd->setRemoteId("remote1"); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << FakeAkonadiServer::selectResourceScenario(QStringLiteral("testresource")) << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, createResponse(tag, "remote1", { { "TAG", "(\\\"tag2\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")" } })) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::ModifyTagResponsePtr::create()); // RID-only changes don't emit notifications /* Akonadi::Protocol::ChangeNotification notification; notification.setType(Protocol::ChangeNotification::Tags); notification.setOperation(Protocol::ChangeNotification::Modify); notification.setSessionId(FakeAkonadiServer::instanceName().toLatin1()); notification.addEntity(tag.id()); */ QTest::newRow("uid store rid") << scenarios << (Tag::List() << tag) << Protocol::ChangeNotificationList(); } { auto cmd = Protocol::ModifyTagCommandPtr::create(tag.id()); cmd->setRemoteId(QByteArray()); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << FakeAkonadiServer::selectResourceScenario(res.name()) << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, createResponse(tag, QByteArray(), { { "TAG", "(\\\"tag2\\\" \\\"\\\" \\\"\\\" \\\"\\\" \\\"0\\\" () () \\\"-1\\\")" } })) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::ModifyTagResponsePtr::create()); // RID-only changes don't emit notifications /* Akonadi::Protocol::ChangeNotification tagChangeNtf; tagChangeNtf.setType(Protocol::ChangeNotification::Tags); tagChangeNtf.setOperation(Protocol::ChangeNotification::Modify); tagChangeNtf.setSessionId(FakeAkonadiServer::instanceName().toLatin1()); tagChangeNtf.addEntity(tag.id()); */ QTest::newRow("uid store unset one rid") << scenarios << (Tag::List() << tag) << Protocol::ChangeNotificationList(); } { auto cmd = Protocol::ModifyTagCommandPtr::create(tag.id()); cmd->setRemoteId(QByteArray()); TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << FakeAkonadiServer::selectResourceScenario(res2.name()) << TestScenario::create(5, TestScenario::ClientCmd, cmd) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::DeleteTagResponsePtr::create()) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::ModifyTagResponsePtr::create()); auto itemUntaggedNtf = Protocol::ItemChangeNotificationPtr::create(); itemUntaggedNtf->setOperation(Protocol::ItemChangeNotification::ModifyTags); itemUntaggedNtf->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); itemUntaggedNtf->setItems({ { pimItem.id(), pimItem.remoteId(), QString(), pimItem.mimeType().name() } }); itemUntaggedNtf->setResource(res2.name().toLatin1()); itemUntaggedNtf->setParentCollection(col.id()); itemUntaggedNtf->setRemovedTags(QSet() << tag.id()); auto tagRemoveNtf = Protocol::TagChangeNotificationPtr::create(); tagRemoveNtf->setOperation(Protocol::TagChangeNotification::Remove); tagRemoveNtf->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); - tagRemoveNtf->setId(tag.id()); + auto ntfTag = Protocol::FetchTagsResponsePtr::create(); + ntfTag->setId(tag.id()); + ntfTag->setGid("gid"); + ntfTag->setType("PLAIN"); + tagRemoveNtf->setTag(ntfTag); QTest::newRow("uid store unset last rid") << scenarios << Tag::List() << (Protocol::ChangeNotificationList() << itemUntaggedNtf << tagRemoveNtf); } } void testModifyTag() { QFETCH(TestScenario::List, scenarios); QFETCH(Tag::List, expectedTags); QFETCH(Protocol::ChangeNotificationList, expectedNotifications); FakeAkonadiServer::instance()->setScenarios(scenarios); FakeAkonadiServer::instance()->runTest(); const auto receivedNotifications = extractNotifications(FakeAkonadiServer::instance()->notificationSpy()); QCOMPARE(receivedNotifications.size(), expectedNotifications.count()); for (int i = 0; i < receivedNotifications.size(); i++) { + qDebug() << Protocol::debugString(receivedNotifications.at(i)); + qDebug() << Protocol::debugString(expectedNotifications.at(i)); QCOMPARE(*receivedNotifications.at(i), *expectedNotifications.at(i)); } const Tag::List tags = Tag::retrieveAll(); QCOMPARE(tags.size(), expectedTags.size()); for (int i = 0; i < tags.size(); i++) { QCOMPARE(tags.at(i).id(), expectedTags.at(i).id()); QCOMPARE(tags.at(i).tagType().name(), expectedTags.at(i).tagType().name()); } } void testRemoveTag_data() { initializer.reset(new DbInitializer); Resource res1 = initializer->createResource("testresource3"); Resource res2 = initializer->createResource("testresource4"); Tag tag; TagType type; type.setName(QStringLiteral("PLAIN")); type.insert(); tag.setTagType(type); tag.setGid(QStringLiteral("gid2")); tag.insert(); TagRemoteIdResourceRelation rel1; rel1.setRemoteId(QStringLiteral("TAG2RES1RID")); rel1.setResource(res1); rel1.setTag(tag); rel1.insert(); TagRemoteIdResourceRelation rel2; rel2.setRemoteId(QStringLiteral("TAG2RES2RID")); rel2.setResource(res2); rel2.setTag(tag); rel2.insert(); QTest::addColumn("scenarios"); QTest::addColumn("expectedTags"); QTest::addColumn("expectedNotifications"); { TestScenario::List scenarios; scenarios << FakeAkonadiServer::loginScenario() << TestScenario::create(5, TestScenario::ClientCmd, Protocol::DeleteTagCommandPtr::create(tag.id())) << TestScenario::create(5, TestScenario::ServerCmd, Protocol::DeleteTagResponsePtr::create()); auto ntf = Protocol::TagChangeNotificationPtr::create(); ntf->setOperation(Protocol::TagChangeNotification::Remove); ntf->setSessionId(FakeAkonadiServer::instanceName().toLatin1()); auto res1Ntf = Protocol::TagChangeNotificationPtr::create(*ntf); - res1Ntf->setId(tag.id()); - res1Ntf->setRemoteId(rel1.remoteId()); + auto res1NtfTag = Protocol::FetchTagsResponsePtr::create(); + res1NtfTag->setId(tag.id()); + res1NtfTag->setRemoteId(rel1.remoteId().toLatin1()); + res1Ntf->setTag(res1NtfTag); res1Ntf->setResource(res1.name().toLatin1()); auto res2Ntf = Protocol::TagChangeNotificationPtr::create(*ntf); - res2Ntf->setId(tag.id()); - res2Ntf->setRemoteId(rel2.remoteId()); + auto res2NtfTag = Protocol::FetchTagsResponsePtr::create(); + res2NtfTag->setId(tag.id()); + res2NtfTag->setRemoteId(rel2.remoteId().toLatin1()); + res2Ntf->setTag(res2NtfTag); res2Ntf->setResource(res2.name().toLatin1()); auto clientNtf = Protocol::TagChangeNotificationPtr::create(*ntf); - clientNtf->setId(tag.id()); + auto clientNtfTag = Protocol::FetchTagsResponsePtr::create(); + clientNtfTag->setId(tag.id()); + clientNtf->setTag(clientNtfTag); QTest::newRow("uid remove") << scenarios << Tag::List() << (Protocol::ChangeNotificationList() << res1Ntf << res2Ntf << clientNtf); } } void testRemoveTag() { QFETCH(TestScenario::List, scenarios); QFETCH(Tag::List, expectedTags); QFETCH(Protocol::ChangeNotificationList, expectedNotifications); FakeAkonadiServer::instance()->setScenarios(scenarios); FakeAkonadiServer::instance()->runTest(); const auto receivedNotifications = extractNotifications(FakeAkonadiServer::instance()->notificationSpy()); QCOMPARE(receivedNotifications.size(), expectedNotifications.count()); for (int i = 0; i < receivedNotifications.size(); i++) { QCOMPARE(*receivedNotifications.at(i), *expectedNotifications.at(i)); } const Tag::List tags = Tag::retrieveAll(); QCOMPARE(tags.size(), 0); } }; AKTEST_FAKESERVER_MAIN(TagHandlerTest) #include "taghandlertest.moc" diff --git a/src/core/changerecorder_p.cpp b/src/core/changerecorder_p.cpp index 00bc01527..1206c172f 100644 --- a/src/core/changerecorder_p.cpp +++ b/src/core/changerecorder_p.cpp @@ -1,1029 +1,1053 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "changerecorder_p.h" #include "akonadicore_debug.h" #include #include #include #include #include using namespace Akonadi; ChangeRecorderPrivate::ChangeRecorderPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, ChangeRecorder *parent) : MonitorPrivate(dependenciesFactory_, parent) , settings(nullptr) , enableChangeRecording(true) , m_lastKnownNotificationsCount(0) , m_startOffset(0) , m_needFullSave(true) { } int ChangeRecorderPrivate::pipelineSize() const { if (enableChangeRecording) { return 0; // we fill the pipeline ourselves when using change recording } return MonitorPrivate::pipelineSize(); } void ChangeRecorderPrivate::slotNotify(const Protocol::ChangeNotificationPtr &msg) { Q_Q(ChangeRecorder); const int oldChanges = pendingNotifications.size(); // with change recording disabled this will automatically take care of dispatching notification messages and saving MonitorPrivate::slotNotify(msg); if (enableChangeRecording && pendingNotifications.size() != oldChanges) { emit q->changesAdded(); } } // The QSettings object isn't actually used anymore, except for migrating old data // and it gives us the base of the filename to use. This is all historical. QString ChangeRecorderPrivate::notificationsFileName() const { return settings->fileName() + QStringLiteral("_changes.dat"); } void ChangeRecorderPrivate::loadNotifications() { pendingNotifications.clear(); Q_ASSERT(pipeline.isEmpty()); pipeline.clear(); const QString changesFileName = notificationsFileName(); /** * In an older version we recorded changes inside the settings object, however * for performance reasons we changed that to store them in a separated file. * If this file doesn't exists, it means we run the new version the first time, * so we have to read in the legacy list of changes first. */ if (!QFile::exists(changesFileName)) { QStringList list; settings->beginGroup(QStringLiteral("ChangeRecorder")); const int size = settings->beginReadArray(QStringLiteral("change")); for (int i = 0; i < size; ++i) { settings->setArrayIndex(i); Protocol::ChangeNotificationPtr msg; switch (static_cast(settings->value(QStringLiteral("type")).toInt())) { case Item: msg = loadItemNotification(settings); break; case Collection: msg = loadCollectionNotification(settings); break; case Tag: case Relation: case InvalidType: qWarning() << "Unexpected notification type in legacy store"; continue; } if (msg->isValid()) { pendingNotifications << msg; } } settings->endArray(); // save notifications to the new file... saveNotifications(); // ...delete the legacy list... settings->remove(QString()); settings->endGroup(); // ...and continue as usually } QFile file(changesFileName); if (file.open(QIODevice::ReadOnly)) { m_needFullSave = false; pendingNotifications = loadFrom(&file, m_needFullSave); } else { m_needFullSave = true; } notificationsLoaded(); } static const quint64 s_currentVersion = Q_UINT64_C(0x000700000000); static const quint64 s_versionMask = Q_UINT64_C(0xFFFF00000000); static const quint64 s_sizeMask = Q_UINT64_C(0x0000FFFFFFFF); QQueue ChangeRecorderPrivate::loadFrom(QFile *device, bool &needsFullSave) const { QDataStream stream(device); stream.setVersion(QDataStream::Qt_4_6); QByteArray sessionId; int type; QQueue list; quint64 sizeAndVersion; stream >> sizeAndVersion; const quint64 size = sizeAndVersion & s_sizeMask; const quint64 version = (sizeAndVersion & s_versionMask) >> 32; quint64 startOffset = 0; if (version >= 1) { stream >> startOffset; } // If we skip the first N items, then we'll need to rewrite the file on saving. // Also, if the file is old, it needs to be rewritten. needsFullSave = startOffset > 0 || version == 0; for (quint64 i = 0; i < size && !stream.atEnd(); ++i) { Protocol::ChangeNotificationPtr msg; stream >> sessionId; stream >> type; if (stream.status() != QDataStream::Ok) { qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting. Corrupt file:" << device->fileName(); break; } switch (static_cast(type)) { case Item: msg = loadItemNotification(stream, version); break; case Collection: msg = loadCollectionNotification(stream, version); break; case Tag: msg = loadTagNotification(stream, version); break; case Relation: msg = loadRelationNotification(stream, version); break; default: qCWarning(AKONADICORE_LOG) << "Unknown notification type"; break; } if (i < startOffset) { continue; } if (msg && msg->isValid()) { msg->setSessionId(sessionId); list << msg; } } return list; } QString ChangeRecorderPrivate::dumpNotificationListToString() const { if (!settings) { return QStringLiteral("No settings set in ChangeRecorder yet."); } const QString changesFileName = notificationsFileName(); QFile file(changesFileName); if (!file.open(QIODevice::ReadOnly)) { return QLatin1String("Error reading ") + changesFileName; } QString result; bool dummy; const QQueue notifications = loadFrom(&file, dummy); for (const Protocol::ChangeNotificationPtr &n : notifications) { result += Protocol::debugString(n) + QLatin1Char('\n'); } return result; } void ChangeRecorderPrivate::addToStream(QDataStream &stream, const Protocol::ChangeNotificationPtr &msg) { // We deliberately don't use Factory::serialize(), because the internal // serialization format could change at any point stream << msg->sessionId(); stream << int(mapToLegacyType(msg->type())); switch (msg->type()) { case Protocol::Command::ItemChangeNotification: saveItemNotification(stream, Protocol::cmdCast(msg)); break; case Protocol::Command::CollectionChangeNotification: saveCollectionNotification(stream, Protocol::cmdCast(msg)); break; case Protocol::Command::TagChangeNotification: saveTagNotification(stream, Protocol::cmdCast(msg)); break; case Protocol::Command::RelationChangeNotification: saveRelationNotification(stream, Protocol::cmdCast(msg)); break; default: qCWarning(AKONADICORE_LOG) << "Unexpected type?"; return; } } void ChangeRecorderPrivate::writeStartOffset() { if (!settings) { return; } QFile file(notificationsFileName()); if (!file.open(QIODevice::ReadWrite)) { qCWarning(AKONADICORE_LOG) << "Could not update notifications in file" << file.fileName(); return; } // Skip "countAndVersion" file.seek(8); //qCDebug(AKONADICORE_LOG) << "Writing start offset=" << m_startOffset; QDataStream stream(&file); stream.setVersion(QDataStream::Qt_4_6); stream << static_cast(m_startOffset); // Everything else stays unchanged } void ChangeRecorderPrivate::saveNotifications() { if (!settings) { return; } QFile file(notificationsFileName()); QFileInfo info(file); if (!QFile::exists(info.absolutePath())) { QDir dir; dir.mkpath(info.absolutePath()); } if (!file.open(QIODevice::WriteOnly)) { qCWarning(AKONADICORE_LOG) << "Could not save notifications to file" << file.fileName(); return; } saveTo(&file); m_needFullSave = false; m_startOffset = 0; } void ChangeRecorderPrivate::saveTo(QIODevice *device) { // Version 0 of this file format was writing a quint64 count, followed by the notifications. // Version 1 bundles a version number into that quint64, to be able to detect a version number at load time. const quint64 countAndVersion = static_cast(pendingNotifications.count()) | s_currentVersion; QDataStream stream(device); stream.setVersion(QDataStream::Qt_4_6); stream << countAndVersion; stream << quint64(0); // no start offset //qCDebug(AKONADICORE_LOG) << "Saving" << pendingNotifications.count() << "notifications (full save)"; for (int i = 0; i < pendingNotifications.count(); ++i) { const Protocol::ChangeNotificationPtr msg = pendingNotifications.at(i); addToStream(stream, msg); } } void ChangeRecorderPrivate::notificationsEnqueued(int count) { // Just to ensure the contract is kept, and these two methods are always properly called. if (enableChangeRecording) { m_lastKnownNotificationsCount += count; if (m_lastKnownNotificationsCount != pendingNotifications.count()) { qCWarning(AKONADICORE_LOG) << this << "The number of pending notifications changed without telling us! Expected" << m_lastKnownNotificationsCount << "but got" << pendingNotifications.count() << "Caller just added" << count; Q_ASSERT(pendingNotifications.count() == m_lastKnownNotificationsCount); } saveNotifications(); } } void ChangeRecorderPrivate::dequeueNotification() { if (pendingNotifications.isEmpty()) { return; } pendingNotifications.dequeue(); if (enableChangeRecording) { Q_ASSERT(pendingNotifications.count() == m_lastKnownNotificationsCount - 1); --m_lastKnownNotificationsCount; if (m_needFullSave || pendingNotifications.isEmpty()) { saveNotifications(); } else { ++m_startOffset; writeStartOffset(); } } } void ChangeRecorderPrivate::notificationsErased() { if (enableChangeRecording) { m_lastKnownNotificationsCount = pendingNotifications.count(); m_needFullSave = true; saveNotifications(); } } void ChangeRecorderPrivate::notificationsLoaded() { m_lastKnownNotificationsCount = pendingNotifications.count(); m_startOffset = 0; } bool ChangeRecorderPrivate::emitNotification(const Protocol::ChangeNotificationPtr &msg) { const bool someoneWasListening = MonitorPrivate::emitNotification(msg); if (!someoneWasListening && enableChangeRecording) { //If no signal was emitted (e.g. because no one was connected to it), no one is going to call changeProcessed, so we help ourselves. dequeueNotification(); QMetaObject::invokeMethod(q_ptr, "replayNext", Qt::QueuedConnection); } return someoneWasListening; } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadItemNotification(QSettings *settings) const { auto msg = Protocol::ItemChangeNotificationPtr::create(); msg->setSessionId(settings->value(QStringLiteral("sessionId")).toByteArray()); msg->setOperation(mapItemOperation(static_cast(settings->value(QStringLiteral("op")).toInt()))); msg->setItems({ { settings->value(QStringLiteral("uid")).toLongLong(), settings->value(QStringLiteral("rid")).toString(), QString(), settings->value(QStringLiteral("mimeType")).toString() } }); msg->setResource(settings->value(QStringLiteral("resource")).toByteArray()); msg->setParentCollection(settings->value(QStringLiteral("parentCol")).toLongLong()); msg->setParentDestCollection(settings->value(QStringLiteral("parentDestCol")).toLongLong()); const QStringList list = settings->value(QStringLiteral("itemParts")).toStringList(); QSet itemParts; for (const QString &entry : list) { itemParts.insert(entry.toLatin1()); } msg->setItemParts(itemParts); return msg; } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadCollectionNotification(QSettings *settings) const { auto msg = Protocol::CollectionChangeNotificationPtr::create(); msg->setSessionId(settings->value(QStringLiteral("sessionId")).toByteArray()); msg->setOperation(mapCollectionOperation(static_cast(settings->value(QStringLiteral("op")).toInt()))); auto collection = Protocol::FetchCollectionsResponsePtr::create(); collection->setId(settings->value(QStringLiteral("uid")).toLongLong()); collection->setRemoteId(settings->value(QStringLiteral("rid")).toString()); msg->setCollection(collection); msg->addMetadata("FETCH_COLLECTION"); msg->setResource(settings->value(QStringLiteral("resource")).toByteArray()); msg->setParentCollection(settings->value(QStringLiteral("parentCol")).toLongLong()); msg->setParentDestCollection(settings->value(QStringLiteral("parentDestCol")).toLongLong()); const QStringList list = settings->value(QStringLiteral("itemParts")).toStringList(); QSet changedParts; for (const QString &entry : list) { changedParts.insert(entry.toLatin1()); } msg->setChangedParts(changedParts); return msg; } QSet ChangeRecorderPrivate::extractRelations(QSet &flags) const { QSet relations; auto iter = flags.begin(); while (iter != flags.end()) { if (iter->startsWith("RELATION")) { const QByteArrayList parts = iter->split(' '); Q_ASSERT(parts.size() == 4); Protocol::ItemChangeNotification::Relation relation; relation.type = QString::fromLatin1(parts[1]); relation.leftId = parts[2].toLongLong(); relation.rightId = parts[3].toLongLong(); relations.insert(relation); iter = flags.erase(iter); } else { ++iter; } } return relations; } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadItemNotification(QDataStream &stream, quint64 version) const { QByteArray resource, destinationResource; int operation, entityCnt; qint64 uid, parentCollection, parentDestCollection; QString remoteId, mimeType, remoteRevision; QSet itemParts, addedFlags, removedFlags; QSet addedTags, removedTags; QVector items; auto msg = Protocol::ItemChangeNotificationPtr::create(); if (version == 1) { stream >> operation; stream >> uid; stream >> remoteId; stream >> resource; stream >> parentCollection; stream >> parentDestCollection; stream >> mimeType; stream >> itemParts; items << Protocol::ChangeNotification::Item{ uid, remoteId, QString(), mimeType }; } else if (version >= 2) { stream >> operation; stream >> entityCnt; for (int j = 0; j < entityCnt; ++j) { stream >> uid; stream >> remoteId; stream >> remoteRevision; stream >> mimeType; if (stream.status() != QDataStream::Ok) { qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting"; return msg; } items << Protocol::ChangeNotification::Item{ uid, remoteId, remoteRevision, mimeType }; } stream >> resource; stream >> destinationResource; stream >> parentCollection; stream >> parentDestCollection; stream >> itemParts; stream >> addedFlags; stream >> removedFlags; if (version >= 3) { stream >> addedTags; stream >> removedTags; } } else { qCWarning(AKONADICORE_LOG) << "Error version is not correct here" << version; return msg; } if (version >= 5) { msg->setOperation(static_cast(operation)); } else { msg->setOperation(mapItemOperation(static_cast(operation))); } msg->setItems(items); msg->setResource(resource); msg->setDestinationResource(destinationResource); msg->setParentCollection(parentCollection); msg->setParentDestCollection(parentDestCollection); msg->setItemParts(itemParts); msg->setAddedRelations(extractRelations(addedFlags)); msg->setAddedFlags(addedFlags); msg->setRemovedRelations(extractRelations(removedFlags)); msg->setRemovedFlags(removedFlags); msg->setAddedTags(addedTags); msg->setRemovedTags(removedTags); return msg; } QSet ChangeRecorderPrivate::encodeRelations(const QSet &relations) const { QSet rv; for (const auto &rel : relations) { rv.insert("RELATION " + rel.type.toLatin1() + ' ' + QByteArray::number(rel.leftId) + ' ' + QByteArray::number(rel.rightId)); } return rv; } void ChangeRecorderPrivate::saveItemNotification(QDataStream &stream, const Protocol::ItemChangeNotification &msg) { stream << int(msg.operation()); const auto items = msg.items(); stream << items.count(); for (const Protocol::ItemChangeNotification::Item &item : items) { stream << quint64(item.id); stream << item.remoteId; stream << item.remoteRevision; stream << item.mimeType; } stream << msg.resource(); stream << msg.destinationResource(); stream << quint64(msg.parentCollection()); stream << quint64(msg.parentDestCollection()); stream << msg.itemParts(); stream << msg.addedFlags() + encodeRelations(msg.addedRelations()); stream << msg.removedFlags() + encodeRelations(msg.removedRelations()); stream << msg.addedTags(); stream << msg.removedTags(); } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadCollectionNotification(QDataStream &stream, quint64 version) const { QByteArray resource, destinationResource; int operation, entityCnt; quint64 uid, parentCollection, parentDestCollection; QString remoteId, remoteRevision, dummyString; QSet changedParts, dummyBa; QSet dummyIv; auto msg = Protocol::CollectionChangeNotificationPtr::create(); if (version == 1) { stream >> operation; stream >> uid; stream >> remoteId; stream >> resource; stream >> parentCollection; stream >> parentDestCollection; stream >> dummyString; stream >> changedParts; auto collection = Protocol::FetchCollectionsResponsePtr::create(); collection->setId(uid); collection->setRemoteId(remoteId); msg->setCollection(collection); msg->addMetadata("FETCH_COLLECTION"); } else if (version >= 2) { stream >> operation; stream >> entityCnt; if (version >= 7) { QString str; QStringList stringList; qint64 i64; QVector vb; QMap attrs; bool b; int i; Tristate tristate; auto collection = Protocol::FetchCollectionsResponsePtr::create(); stream >> uid; collection->setId(uid); stream >> uid; collection->setParentId(uid); stream >> str; collection->setName(str); stream >> stringList; collection->setMimeTypes(stringList); stream >> str; collection->setRemoteId(str); stream >> str; collection->setRemoteRevision(str); stream >> str; collection->setResource(str); Protocol::FetchCollectionStatsResponse stats; stream >> i64; stats.setCount(i64); stream >> i64; stats.setUnseen(i64); stream >> i64; stats.setSize(i64); collection->setStatistics(stats); stream >> str; collection->setSearchQuery(str); stream >> vb; collection->setSearchCollections(vb); stream >> entityCnt; QVector ancestors; for (int i = 0; i < entityCnt; ++i) { Protocol::Ancestor ancestor; stream >> i64; ancestor.setId(i64); stream >> str; ancestor.setRemoteId(str); stream >> str; ancestor.setName(str); stream >> attrs; ancestor.setAttributes(attrs); ancestors.push_back(ancestor); if (stream.status() != QDataStream::Ok) { qCWarning(AKONADICORE_LOG) << "Erorr reading saved notifications! Aborting"; return msg; } } collection->setAncestors(ancestors); Protocol::CachePolicy cachePolicy; stream >> b; cachePolicy.setInherit(b); stream >> i; cachePolicy.setCheckInterval(i); stream >> i; cachePolicy.setCacheTimeout(i); stream >> b; cachePolicy.setSyncOnDemand(b); stream >> stringList; cachePolicy.setLocalParts(stringList); collection->setCachePolicy(cachePolicy); stream >> attrs; collection->setAttributes(attrs); stream >> b; collection->setEnabled(b); stream >> reinterpret_cast(tristate); collection->setDisplayPref(tristate); stream >> reinterpret_cast(tristate); collection->setSyncPref(tristate); stream >> reinterpret_cast(tristate); collection->setIndexPref(tristate); stream >> b; collection->setReferenced(b); stream >> b; collection->setIsVirtual(b); msg->setCollection(collection); } else { for (int j = 0; j < entityCnt; ++j) { stream >> uid; stream >> remoteId; stream >> remoteRevision; stream >> dummyString; if (stream.status() != QDataStream::Ok) { qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting"; return msg; } auto collection = Protocol::FetchCollectionsResponsePtr::create(); collection->setId(uid); collection->setRemoteId(remoteId); collection->setRemoteRevision(remoteRevision); msg->setCollection(collection); msg->addMetadata("FETCH_COLLECTION"); } } stream >> resource; stream >> destinationResource; stream >> parentCollection; stream >> parentDestCollection; stream >> changedParts; stream >> dummyBa; stream >> dummyBa; if (version >= 3) { stream >> dummyIv; stream >> dummyIv; } } else { qCWarning(AKONADICORE_LOG) << "Error version is not correct here" << version; return msg; } if (version >= 5) { msg->setOperation(static_cast(operation)); } else { msg->setOperation(mapCollectionOperation(static_cast(operation))); } msg->setResource(resource); msg->setDestinationResource(destinationResource); msg->setParentCollection(parentCollection); msg->setParentDestCollection(parentDestCollection); msg->setChangedParts(changedParts); return msg; } void Akonadi::ChangeRecorderPrivate::saveCollectionNotification(QDataStream &stream, const Protocol::CollectionChangeNotification &msg) { // Version 7 const auto col = msg.collection(); stream << int(msg.operation()); stream << int(1); stream << col->id(); stream << col->parentId(); stream << col->name(); stream << col->mimeTypes(); stream << col->remoteId(); stream << col->remoteRevision(); stream << col->resource(); const auto stats = col->statistics(); stream << stats.count(); stream << stats.unseen(); stream << stats.size(); stream << col->searchQuery(); stream << col->searchCollections(); const auto ancestors = col->ancestors(); stream << ancestors.count(); for (const auto &ancestor : ancestors) { stream << ancestor.id() << ancestor.remoteId() << ancestor.name() << ancestor.attributes(); } const auto cachePolicy = col->cachePolicy(); stream << cachePolicy.inherit(); stream << cachePolicy.checkInterval(); stream << cachePolicy.cacheTimeout(); stream << cachePolicy.syncOnDemand(); stream << cachePolicy.localParts(); stream << col->attributes(); stream << col->enabled(); stream << static_cast(col->displayPref()); stream << static_cast(col->syncPref()); stream << static_cast(col->indexPref()); stream << col->referenced(); stream << col->isVirtual(); stream << msg.resource(); stream << msg.destinationResource(); stream << quint64(msg.parentCollection()); stream << quint64(msg.parentDestCollection()); stream << msg.changedParts(); stream << QSet(); stream << QSet(); stream << QSet(); stream << QSet(); } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadTagNotification(QDataStream &stream, quint64 version) const { QByteArray resource, dummyBa; int operation, entityCnt; quint64 uid, dummyI; QString remoteId, dummyString; QSet dummyBaV; QSet dummyIv; auto msg = Protocol::TagChangeNotificationPtr::create(); if (version == 1) { stream >> operation; stream >> uid; stream >> remoteId; stream >> dummyBa; stream >> dummyI; stream >> dummyI; stream >> dummyString; stream >> dummyBaV; - msg->setId(uid); - msg->setRemoteId(remoteId); + auto tag = Protocol::FetchTagsResponsePtr::create(); + tag->setId(uid); + tag->setRemoteId(remoteId.toLatin1()); + msg->setTag(tag); + msg->addMetadata("FETCH_TAG"); } else if (version >= 2) { stream >> operation; stream >> entityCnt; - for (int j = 0; j < entityCnt; ++j) { + if (version >= 7) { + QByteArray ba; + QMap attrs; + + auto tag = Protocol::FetchTagsResponsePtr::create(); + stream >> uid; - stream >> remoteId; - stream >> dummyString; - stream >> dummyString; - if (stream.status() != QDataStream::Ok) { - qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting"; - return msg; + tag->setId(uid); + stream >> ba; + tag->setParentId(uid); + stream >> attrs; + tag->setGid(ba); + stream >> ba; + tag->setType(ba); + stream >> uid; + tag->setRemoteId(ba); + stream >> ba; + tag->setAttributes(attrs); + msg->setTag(tag); + + stream >> resource; + } else { + for (int j = 0; j < entityCnt; ++j) { + stream >> uid; + stream >> remoteId; + stream >> dummyString; + stream >> dummyString; + if (stream.status() != QDataStream::Ok) { + qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting"; + return msg; + } + auto tag = Protocol::FetchTagsResponsePtr::create(); + tag->setId(uid); + tag->setRemoteId(remoteId.toLatin1()); + msg->setTag(tag); + msg->addMetadata("FETCH_TAG"); + } + stream >> resource; + stream >> dummyBa; + stream >> dummyI; + stream >> dummyI; + stream >> dummyBaV; + stream >> dummyBaV; + stream >> dummyBaV; + if (version >= 3) { + stream >> dummyIv; + stream >> dummyIv; } - msg->setId(uid); - msg->setRemoteId(remoteId); - } - stream >> resource; - stream >> dummyBa; - stream >> dummyI; - stream >> dummyI; - stream >> dummyBaV; - stream >> dummyBaV; - stream >> dummyBaV; - if (version >= 3) { - stream >> dummyIv; - stream >> dummyIv; } } if (version >= 5) { msg->setOperation(static_cast(operation)); } else { msg->setOperation(mapTagOperation(static_cast(operation))); } msg->setResource(resource); return msg; } void Akonadi::ChangeRecorderPrivate::saveTagNotification(QDataStream &stream, const Protocol::TagChangeNotification &msg) { + const auto tag = msg.tag(); stream << int(msg.operation()); stream << int(1); - stream << msg.id(); - stream << msg.remoteId(); - stream << QString(); - stream << QString(); + stream << tag->id(); + stream << tag->parentId(); + stream << tag->gid(); + stream << tag->type(); + stream << tag->remoteId(); + stream << tag->attributes(); stream << msg.resource(); - stream << qint64(0); - stream << qint64(0); - stream << qint64(0); - stream << QSet(); - stream << QSet(); - stream << QSet(); - stream << QSet(); - stream << QSet(); } Protocol::ChangeNotificationPtr ChangeRecorderPrivate::loadRelationNotification(QDataStream &stream, quint64 version) const { QByteArray dummyBa; int operation, entityCnt; quint64 dummyI; QString dummyString; QSet itemParts, dummyBaV; QSet dummyIv; auto msg = Protocol::RelationChangeNotificationPtr::create(); if (version == 1) { qCWarning(AKONADICORE_LOG) << "Invalid version of relation notification"; return msg; } else if (version >= 2) { stream >> operation; stream >> entityCnt; for (int j = 0; j < entityCnt; ++j) { stream >> dummyI; stream >> dummyString; stream >> dummyString; stream >> dummyString; if (stream.status() != QDataStream::Ok) { qCWarning(AKONADICORE_LOG) << "Error reading saved notifications! Aborting"; return msg; } } stream >> dummyBa; if (version == 5) { // there was a bug in version 5 serializer that serialized this // field as qint64 (8 bytes) instead of empty QByteArray (which is // 4 bytes) stream >> dummyI; } else { stream >> dummyBa; } stream >> dummyI; stream >> dummyI; stream >> itemParts; stream >> dummyBaV; stream >> dummyBaV; if (version >= 3) { stream >> dummyIv; stream >> dummyIv; } } if (version >= 5) { msg->setOperation(static_cast(operation)); } else { msg->setOperation(mapRelationOperation(static_cast(operation))); } for (const QByteArray &part : qAsConst(itemParts)) { const QByteArrayList p = part.split(' '); if (p.size() < 2) { continue; } if (p[0] == "LEFT") { msg->setLeftItem(p[1].toLongLong()); } else if (p[0] == "RIGHT") { msg->setRightItem(p[1].toLongLong()); } else if (p[0] == "RID") { msg->setRemoteId(QString::fromLatin1(p[1])); } else if (p[0] == "TYPE") { msg->setType(QString::fromLatin1(p[1])); } } return msg; } void Akonadi::ChangeRecorderPrivate::saveRelationNotification(QDataStream &stream, const Protocol::RelationChangeNotification &msg) { QSet rv; rv.insert("LEFT " + QByteArray::number(msg.leftItem())); rv.insert("RIGHT " + QByteArray::number(msg.rightItem())); rv.insert("RID " + msg.remoteId().toLatin1()); rv.insert("TYPE " + msg.type().toLatin1()); stream << int(msg.operation()); stream << int(0); stream << qint64(0); stream << QString(); stream << QString(); stream << QString(); stream << QByteArray(); stream << QByteArray(); stream << qint64(0); stream << qint64(0); stream << rv; stream << QSet(); stream << QSet(); stream << QSet(); stream << QSet(); } Protocol::ItemChangeNotification::Operation ChangeRecorderPrivate::mapItemOperation(LegacyOp op) const { switch (op) { case Add: return Protocol::ItemChangeNotification::Add; case Modify: return Protocol::ItemChangeNotification::Modify; case Move: return Protocol::ItemChangeNotification::Move; case Remove: return Protocol::ItemChangeNotification::Remove; case Link: return Protocol::ItemChangeNotification::Link; case Unlink: return Protocol::ItemChangeNotification::Unlink; case ModifyFlags: return Protocol::ItemChangeNotification::ModifyFlags; case ModifyTags: return Protocol::ItemChangeNotification::ModifyTags; case ModifyRelations: return Protocol::ItemChangeNotification::ModifyRelations; default: qWarning() << "Unexpected operation type in item notification"; return Protocol::ItemChangeNotification::InvalidOp; } } Protocol::CollectionChangeNotification::Operation ChangeRecorderPrivate::mapCollectionOperation(LegacyOp op) const { switch (op) { case Add: return Protocol::CollectionChangeNotification::Add; case Modify: return Protocol::CollectionChangeNotification::Modify; case Move: return Protocol::CollectionChangeNotification::Move; case Remove: return Protocol::CollectionChangeNotification::Remove; case Subscribe: return Protocol::CollectionChangeNotification::Subscribe; case Unsubscribe: return Protocol::CollectionChangeNotification::Unsubscribe; default: qCWarning(AKONADICORE_LOG) << "Unexpected operation type in collection notification"; return Protocol::CollectionChangeNotification::InvalidOp; } } Protocol::TagChangeNotification::Operation ChangeRecorderPrivate::mapTagOperation(LegacyOp op) const { switch (op) { case Add: return Protocol::TagChangeNotification::Add; case Modify: return Protocol::TagChangeNotification::Modify; case Remove: return Protocol::TagChangeNotification::Remove; default: qCWarning(AKONADICORE_LOG) << "Unexpected operation type in tag notification"; return Protocol::TagChangeNotification::InvalidOp; } } Protocol::RelationChangeNotification::Operation ChangeRecorderPrivate::mapRelationOperation(LegacyOp op) const { switch (op) { case Add: return Protocol::RelationChangeNotification::Add; case Remove: return Protocol::RelationChangeNotification::Remove; default: qCWarning(AKONADICORE_LOG) << "Unexpected operation type in relation notification"; return Protocol::RelationChangeNotification::InvalidOp; } } ChangeRecorderPrivate::LegacyType ChangeRecorderPrivate::mapToLegacyType(Protocol::Command::Type type) const { switch (type) { case Protocol::Command::ItemChangeNotification: return Item; case Protocol::Command::CollectionChangeNotification: return Collection; case Protocol::Command::TagChangeNotification: return Tag; case Protocol::Command::RelationChangeNotification: return Relation; default: qCWarning(AKONADICORE_LOG) << "Unexpected notification type"; return InvalidType; } } diff --git a/src/core/monitor.cpp b/src/core/monitor.cpp index 172db6b48..351960a6d 100644 --- a/src/core/monitor.cpp +++ b/src/core/monitor.cpp @@ -1,369 +1,375 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "monitor.h" #include "monitor_p.h" #include "changemediator_p.h" #include "collectionfetchscope.h" #include "itemfetchjob.h" #include "session.h" #include #include using namespace Akonadi; Monitor::Monitor(QObject *parent) : QObject(parent) , d_ptr(new MonitorPrivate(nullptr, this)) { d_ptr->init(); d_ptr->connectToNotificationManager(); ChangeMediator::registerMonitor(this); } //@cond PRIVATE Monitor::Monitor(MonitorPrivate *d, QObject *parent) : QObject(parent) , d_ptr(d) { d_ptr->init(); d_ptr->connectToNotificationManager(); ChangeMediator::registerMonitor(this); } //@endcond Monitor::~Monitor() { ChangeMediator::unregisterMonitor(this); delete d_ptr; } void Monitor::setCollectionMonitored(const Collection &collection, bool monitored) { Q_D(Monitor); if (!d->collections.contains(collection) && monitored) { d->collections << collection; d->pendingModification.startMonitoringCollection(collection.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->collections.removeAll(collection)) { d->pendingModification.stopMonitoringCollection(collection.id()); d->scheduleSubscriptionUpdate(); } } emit collectionMonitored(collection, monitored); } void Monitor::setItemMonitored(const Item &item, bool monitored) { Q_D(Monitor); if (!d->items.contains(item.id()) && monitored) { d->items.insert(item.id()); d->pendingModification.startMonitoringItem(item.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->items.remove(item.id())) { d->pendingModification.stopMonitoringItem(item.id()); d->scheduleSubscriptionUpdate(); } } emit itemMonitored(item, monitored); } void Monitor::setResourceMonitored(const QByteArray &resource, bool monitored) { Q_D(Monitor); if (!d->resources.contains(resource) && monitored) { d->resources.insert(resource); d->pendingModification.startMonitoringResource(resource); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->resources.remove(resource)) { d->pendingModification.stopMonitoringResource(resource); d->scheduleSubscriptionUpdate(); } } emit resourceMonitored(resource, monitored); } void Monitor::setMimeTypeMonitored(const QString &mimetype, bool monitored) { Q_D(Monitor); if (!d->mimetypes.contains(mimetype) && monitored) { d->mimetypes.insert(mimetype); d->pendingModification.startMonitoringMimeType(mimetype); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->mimetypes.remove(mimetype)) { d->pendingModification.stopMonitoringMimeType(mimetype); d->scheduleSubscriptionUpdate(); } } emit mimeTypeMonitored(mimetype, monitored); } void Monitor::setTagMonitored(const Akonadi::Tag &tag, bool monitored) { Q_D(Monitor); if (!d->tags.contains(tag.id()) && monitored) { d->tags.insert(tag.id()); d->pendingModification.startMonitoringTag(tag.id()); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->tags.remove(tag.id())) { d->pendingModification.stopMonitoringTag(tag.id()); d->scheduleSubscriptionUpdate(); } } emit tagMonitored(tag, monitored); } void Monitor::setTypeMonitored(Monitor::Type type, bool monitored) { Q_D(Monitor); if (!d->types.contains(type) && monitored) { d->types.insert(type); d->pendingModification.startMonitoringType(static_cast(type)); d->scheduleSubscriptionUpdate(); } else if (!monitored) { if (d->types.remove(type)) { d->pendingModification.stopMonitoringType(static_cast(type)); d->scheduleSubscriptionUpdate(); } } emit typeMonitored(type, monitored); } void Akonadi::Monitor::setAllMonitored(bool monitored) { Q_D(Monitor); if (d->monitorAll == monitored) { return; } d->monitorAll = monitored; d->pendingModification.setAllMonitored(monitored); d->scheduleSubscriptionUpdate(); emit allMonitored(monitored); } void Monitor::setExclusive(bool exclusive) { Q_D(Monitor); d->exclusive = exclusive; d->pendingModification.setIsExclusive(exclusive); d->scheduleSubscriptionUpdate(); } bool Monitor::exclusive() const { Q_D(const Monitor); return d->exclusive; } void Monitor::ignoreSession(Session *session) { Q_D(Monitor); if (!d->sessions.contains(session->sessionId())) { d->sessions << session->sessionId(); connect(session, SIGNAL(destroyed(QObject*)), this, SLOT(slotSessionDestroyed(QObject*))); d->pendingModification.startIgnoringSession(session->sessionId()); d->scheduleSubscriptionUpdate(); } } void Monitor::fetchCollection(bool enable) { Q_D(Monitor); d->fetchCollection = enable; } void Monitor::fetchCollectionStatistics(bool enable) { Q_D(Monitor); d->fetchCollectionStatistics = enable; } void Monitor::setItemFetchScope(const ItemFetchScope &fetchScope) { Q_D(Monitor); d->mItemFetchScope = fetchScope; + d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::ItemFetchScope; d->scheduleSubscriptionUpdate(); } ItemFetchScope &Monitor::itemFetchScope() { Q_D(Monitor); d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::ItemFetchScope; d->scheduleSubscriptionUpdate(); return d->mItemFetchScope; } void Monitor::fetchChangedOnly(bool enable) { Q_D(Monitor); d->mFetchChangedOnly = enable; } void Monitor::setCollectionFetchScope(const CollectionFetchScope &fetchScope) { Q_D(Monitor); d->mCollectionFetchScope = fetchScope; + d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::CollectionFetchScope; d->scheduleSubscriptionUpdate(); } CollectionFetchScope &Monitor::collectionFetchScope() { Q_D(Monitor); d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::CollectionFetchScope; d->scheduleSubscriptionUpdate(); return d->mCollectionFetchScope; } void Monitor::setTagFetchScope(const TagFetchScope &fetchScope) { Q_D(Monitor); d->mTagFetchScope = fetchScope; + d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::TagFetchScope; + d->scheduleSubscriptionUpdate(); } TagFetchScope &Monitor::tagFetchScope() { Q_D(Monitor); + d->pendingModificationChanges |= Protocol::ModifySubscriptionCommand::TagFetchScope; + d->scheduleSubscriptionUpdate(); return d->mTagFetchScope; } Akonadi::Collection::List Monitor::collectionsMonitored() const { Q_D(const Monitor); return d->collections; } QVector Monitor::itemsMonitoredEx() const { Q_D(const Monitor); QVector result; result.reserve(d->items.size()); std::copy(d->items.begin(), d->items.end(), std::back_inserter(result)); return result; } int Monitor::numItemsMonitored() const { Q_D(const Monitor); return d->items.size(); } QVector Monitor::tagsMonitored() const { Q_D(const Monitor); QVector result; result.reserve(d->tags.size()); std::copy(d->tags.begin(), d->tags.end(), std::back_inserter(result)); return result; } QVector Monitor::typesMonitored() const { Q_D(const Monitor); QVector result; result.reserve(d->types.size()); std::copy(d->types.begin(), d->types.end(), std::back_inserter(result)); return result; } QStringList Monitor::mimeTypesMonitored() const { Q_D(const Monitor); return d->mimetypes.toList(); } int Monitor::numMimeTypesMonitored() const { Q_D(const Monitor); return d->mimetypes.count(); } QList Monitor::resourcesMonitored() const { Q_D(const Monitor); return d->resources.toList(); } int Monitor::numResourcesMonitored() const { Q_D(const Monitor); return d->resources.count(); } bool Monitor::isAllMonitored() const { Q_D(const Monitor); return d->monitorAll; } void Monitor::setSession(Akonadi::Session *session) { Q_D(Monitor); if (session == d->session) { return; } if (!session) { d->session = Session::defaultSession(); } else { d->session = session; } d->itemCache->setSession(d->session); d->collectionCache->setSession(d->session); d->tagCache->setSession(d->session); // Reconnect with a new session d->connectToNotificationManager(); } Session *Monitor::session() const { Q_D(const Monitor); return d->session; } void Monitor::setCollectionMoveTranslationEnabled(bool enabled) { Q_D(Monitor); d->collectionMoveTranslationEnabled = enabled; } #include "moc_monitor.cpp" diff --git a/src/core/monitor_p.cpp b/src/core/monitor_p.cpp index 54a5164c1..bdb868b21 100644 --- a/src/core/monitor_p.cpp +++ b/src/core/monitor_p.cpp @@ -1,1463 +1,1469 @@ /* Copyright (c) 2007 Tobias Koenig This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // @cond PRIVATE #include "monitor_p.h" #include "collectionfetchjob.h" #include "collectionstatistics.h" #include "itemfetchjob.h" #include "notificationmanagerinterface.h" #include "session.h" #include "changemediator_p.h" #include "vectorhelper.h" #include "akonadicore_debug.h" #include "notificationsubscriber.h" #include "changenotification.h" #include "protocolhelper_p.h" using namespace Akonadi; class operation; static const int PipelineSize = 5; MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent) : q_ptr(parent) , dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory) , ntfConnection(nullptr) , monitorAll(false) , exclusive(false) , mFetchChangedOnly(false) , session(Session::defaultSession()) , collectionCache(nullptr) , itemCache(nullptr) , tagCache(nullptr) , mCommandBuffer(parent, "handleCommands") , pendingModificationChanges(Protocol::ModifySubscriptionCommand::None) , pendingModificationTimer(nullptr) , monitorReady(false) , fetchCollection(false) , fetchCollectionStatistics(false) , collectionMoveTranslationEnabled(true) , useRefCounting(false) { } MonitorPrivate::~MonitorPrivate() { disconnectFromNotificationManager(); delete dependenciesFactory; delete collectionCache; delete itemCache; delete tagCache; } void MonitorPrivate::init() { // needs to be at least 3x pipeline size for the collection move case collectionCache = dependenciesFactory->createCollectionCache(3 * PipelineSize, session); // needs to be at least 1x pipeline size itemCache = dependenciesFactory->createItemListCache(PipelineSize, session); // 20 tags looks like a reasonable amount to keep around tagCache = dependenciesFactory->createTagListCache(20, session); QObject::connect(collectionCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(itemCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(tagCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable())); QObject::connect(ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)), q_ptr, SLOT(serverStateChanged(Akonadi::ServerManager::State))); statisticsCompressionTimer.setSingleShot(true); statisticsCompressionTimer.setInterval(500); QObject::connect(&statisticsCompressionTimer, SIGNAL(timeout()), q_ptr, SLOT(slotFlushRecentlyChangedCollections())); } bool MonitorPrivate::connectToNotificationManager() { if (ntfConnection) { ntfConnection->deleteLater(); ntfConnection = nullptr; } if (!session) { return false; } ntfConnection = dependenciesFactory->createNotificationConnection(session, &mCommandBuffer); if (!ntfConnection) { return false; } - pendingModification = Protocol::ModifySubscriptionCommand(); - for (const auto &col : qAsConst(collections)) { - pendingModification.startMonitoringCollection(col.id()); - } - for (const auto &res : qAsConst(resources)) { - pendingModification.startMonitoringResource(res); - } - for (auto itemId : qAsConst(items)) { - pendingModification.startMonitoringItem(itemId); - } - for (auto tagId : qAsConst(tags)) { - pendingModification.startMonitoringTag(tagId); - } - for (auto type : qAsConst(types)) { - pendingModification.startMonitoringType(static_cast(type)); - } - for (const auto &mimetype : qAsConst(mimetypes)) { - pendingModification.startMonitoringMimeType(mimetype); - } - for (const auto &session : qAsConst(sessions)) { - pendingModification.startIgnoringSession(session); - } - pendingModification.setAllMonitored(monitorAll); - pendingModification.setIsExclusive(exclusive); + slotUpdateSubscription(); ntfConnection->reconnect(); return true; } void MonitorPrivate::disconnectFromNotificationManager() { if (ntfConnection) { ntfConnection->disconnect(q_ptr); dependenciesFactory->destroyNotificationConnection(session, ntfConnection.data()); } } void MonitorPrivate::serverStateChanged(ServerManager::State state) { if (state == ServerManager::Running) { connectToNotificationManager(); } } void MonitorPrivate::invalidateCollectionCache(qint64 id) { collectionCache->update(id, mCollectionFetchScope); } void MonitorPrivate::invalidateItemCache(qint64 id) { itemCache->update(QList() << id, mItemFetchScope); } void MonitorPrivate::invalidateTagCache(qint64 id) { tagCache->update({ id }, mTagFetchScope); } int MonitorPrivate::pipelineSize() const { return PipelineSize; } void MonitorPrivate::scheduleSubscriptionUpdate() { if (pendingModificationTimer || !monitorReady) { return; } pendingModificationTimer = new QTimer(); pendingModificationTimer->setSingleShot(true); pendingModificationTimer->setInterval(0); pendingModificationTimer->start(); q_ptr->connect(pendingModificationTimer, SIGNAL(timeout()), q_ptr, SLOT(slotUpdateSubscription())); } void MonitorPrivate::slotUpdateSubscription() { delete pendingModificationTimer; pendingModificationTimer = nullptr; - if (ntfConnection) { - if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::ItemFetchScope) { - pendingModification.setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); - } - if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { - pendingModification.setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); - } - pendingModificationChanges = Protocol::ModifySubscriptionCommand::None; + if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::ItemFetchScope) { + pendingModification.setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); + } + if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { + pendingModification.setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); + } + if (pendingModificationChanges & Protocol::ModifySubscriptionCommand::TagFetchScope) { + pendingModification.setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); + } + pendingModificationChanges = Protocol::ModifySubscriptionCommand::None; + if (ntfConnection) { ntfConnection->sendCommand(3, Protocol::ModifySubscriptionCommandPtr::create(pendingModification)); pendingModification = Protocol::ModifySubscriptionCommand(); } } bool MonitorPrivate::isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion) const { if (msg->type() == Protocol::Command::CollectionChangeNotification) { // Lazy fetching can only affects items. return false; } if (msg->type() == Protocol::Command::TagChangeNotification) { const auto op = Protocol::cmdCast(msg).operation(); return ((op == Protocol::TagChangeNotification::Add && q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0) || (op == Protocol::TagChangeNotification::Modify && q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0) || (op == Protocol::TagChangeNotification::Remove && q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0)); } if (!fetchCollectionStatistics && msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); const auto op = itemNtf.operation(); if ((op == Protocol::ItemChangeNotification::Add && q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) == 0) || (op == Protocol::ItemChangeNotification::Remove && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) == 0 && q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) == 0) || (op == Protocol::ItemChangeNotification::Modify && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) == 0) || (op == Protocol::ItemChangeNotification::ModifyFlags && (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) == 0 // Newly delivered ModifyFlags notifications will be converted to // itemChanged(item, "FLAGS") for legacy clients. && (!allowModifyFlagsConversion || q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) == 0))) || (op == Protocol::ItemChangeNotification::ModifyTags && q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))) == 0) || (op == Protocol::ItemChangeNotification::Move && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) == 0 && q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) == 0) || (op == Protocol::ItemChangeNotification::Link && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) == 0 && q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) == 0) || (op == Protocol::ItemChangeNotification::Unlink && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) == 0 && q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) == 0)) { return true; } if (!useRefCounting) { return false; } const Collection::Id parentCollectionId = itemNtf.parentCollection(); if ((op == Protocol::ItemChangeNotification::Add) || (op == Protocol::ItemChangeNotification::Remove) || (op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::Link) || (op == Protocol::ItemChangeNotification::Unlink)) { if (isMonitored(parentCollectionId)) { return false; } } if (op == Protocol::ItemChangeNotification::Move) { if (!isMonitored(parentCollectionId) && !isMonitored(itemNtf.parentDestCollection())) { return true; } // We can't ignore the move. It must be transformed later into a removal or insertion. return false; } return true; } return false; } void MonitorPrivate::checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const { if (msg->type() != Protocol::Command::ItemChangeNotification) { needsSplit = false; batchSupported = false; return; } const auto &itemNtf = Protocol::cmdCast(msg); const bool isBatch = (itemNtf.items().count() > 1); switch (itemNtf.operation()) { case Protocol::ItemChangeNotification::Add: needsSplit = isBatch; batchSupported = false; return; case Protocol::ItemChangeNotification::Modify: needsSplit = isBatch; batchSupported = false; return; case Protocol::ItemChangeNotification::ModifyFlags: batchSupported = q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) > 0; needsSplit = isBatch && !batchSupported && q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) > 0; return; case Protocol::ItemChangeNotification::ModifyTags: // Tags were added after batch notifications, so they are always supported batchSupported = true; needsSplit = false; return; case Protocol::ItemChangeNotification::ModifyRelations: // Relations were added after batch notifications, so they are always supported batchSupported = true; needsSplit = false; return; case Protocol::ItemChangeNotification::Move: needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0; batchSupported = q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0; return; case Protocol::ItemChangeNotification::Remove: needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0; batchSupported = q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0; return; case Protocol::ItemChangeNotification::Link: needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0; batchSupported = q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0; return; case Protocol::ItemChangeNotification::Unlink: needsSplit = isBatch && q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0; batchSupported = q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0; return; default: needsSplit = isBatch; batchSupported = false; qCDebug(AKONADICORE_LOG) << "Unknown operation type" << itemNtf.operation() << "in item change notification"; return; } } Protocol::ChangeNotificationList MonitorPrivate::splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const { Protocol::ChangeNotificationList list; Protocol::ItemChangeNotification baseMsg; baseMsg.setSessionId(msg.sessionId()); if (legacy && msg.operation() == Protocol::ItemChangeNotification::ModifyFlags) { baseMsg.setOperation(Protocol::ItemChangeNotification::Modify); baseMsg.setItemParts(QSet() << "FLAGS"); } else { baseMsg.setOperation(msg.operation()); baseMsg.setItemParts(msg.itemParts()); } baseMsg.setParentCollection(msg.parentCollection()); baseMsg.setParentDestCollection(msg.parentDestCollection()); baseMsg.setResource(msg.resource()); baseMsg.setDestinationResource(msg.destinationResource()); baseMsg.setAddedFlags(msg.addedFlags()); baseMsg.setRemovedFlags(msg.removedFlags()); baseMsg.setAddedTags(msg.addedTags()); baseMsg.setRemovedTags(msg.removedTags()); const auto items = msg.items(); list.reserve(items.count()); for (const Protocol::ItemChangeNotification::Item &item : items) { auto copy = Protocol::ItemChangeNotificationPtr::create(baseMsg); copy->setItems({ { item.id, item.remoteId, item.remoteRevision, item.mimeType } }); list << copy; } return list; } bool MonitorPrivate::fetchCollections() const { return fetchCollection; } bool MonitorPrivate::fetchItems() const { return !mItemFetchScope.isEmpty(); } bool MonitorPrivate::ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg) { bool allCached = true; if (msg->type() == Protocol::Command::TagChangeNotification) { - return tagCache->ensureCached({ Protocol::cmdCast(msg).id() }, mTagFetchScope); + const auto tagMsg = Protocol::cmdCast(msg); + if (tagMsg.metadata().contains("FETCH_TAG")) { + if (!tagCache->ensureCached({ tagMsg.tag()->id() }, mTagFetchScope)) { + allCached = false; + } + } + return true; } + if (msg->type() == Protocol::Command::RelationChangeNotification) { return true; } if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { return true; } if (msg->type() == Protocol::Command::DebugChangeNotification) { return true; } if (msg->type() == Protocol::Command::CollectionChangeNotification && Protocol::cmdCast(msg).operation() == Protocol::CollectionChangeNotification::Remove) { // For collection removals the collection is gone already, so we can't fetch it, // but we have to at least obtain the ancestor chain. const qint64 parentCollection = Protocol::cmdCast(msg).parentCollection(); return parentCollection <= -1 || collectionCache->ensureCached(parentCollection, mCollectionFetchScope); } if (fetchCollections()) { const qint64 parentCollection = (msg->type() == Protocol::Command::ItemChangeNotification) ? Protocol::cmdCast(msg).parentCollection() : (msg->type() == Protocol::Command::CollectionChangeNotification) ? Protocol::cmdCast(msg).parentCollection() : -1; if (parentCollection > -1 && !collectionCache->ensureCached(parentCollection, mCollectionFetchScope)) { allCached = false; } qint64 parentDestCollection = -1; if ((msg->type() == Protocol::Command::ItemChangeNotification) && (Protocol::cmdCast(msg).operation() == Protocol::ItemChangeNotification::Move)) { parentDestCollection = Protocol::cmdCast(msg).parentDestCollection(); } else if ((msg->type() == Protocol::Command::CollectionChangeNotification) && (Protocol::cmdCast(msg).operation() == Protocol::CollectionChangeNotification::Move)) { parentDestCollection = Protocol::cmdCast(msg).parentDestCollection(); } if (parentDestCollection > -1 && !collectionCache->ensureCached(parentDestCollection, mCollectionFetchScope)) { allCached = false; } } if (msg->isRemove()) { return allCached; } if (msg->type() == Protocol::Command::ItemChangeNotification && fetchItems()) { ItemFetchScope scope(mItemFetchScope); const auto &itemNtf = Protocol::cmdCast(msg); if (mFetchChangedOnly && (itemNtf.operation() == Protocol::ItemChangeNotification::Modify || itemNtf.operation() == Protocol::ItemChangeNotification::ModifyFlags)) { bool fullPayloadWasRequested = scope.fullPayload(); scope.fetchFullPayload(false); const QSet requestedPayloadParts = scope.payloadParts(); for (const QByteArray &part : requestedPayloadParts) { scope.fetchPayloadPart(part, false); } bool allAttributesWereRequested = scope.allAttributes(); const QSet requestedAttrParts = scope.attributes(); for (const QByteArray &part : requestedAttrParts) { scope.fetchAttribute(part, false); } const QSet changedParts = itemNtf.itemParts(); for (const QByteArray &part : changedParts) { if (part.startsWith("PLD:") && //krazy:exclude=strings since QByteArray (fullPayloadWasRequested || requestedPayloadParts.contains(part))) { scope.fetchPayloadPart(part.mid(4), true); } if (part.startsWith("ATR:") && //krazy:exclude=strings since QByteArray (allAttributesWereRequested || requestedAttrParts.contains(part))) { scope.fetchAttribute(part.mid(4), true); } } } if (!itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), scope)) { allCached = false; } // Make sure all tags for ModifyTags operation are in cache too if (itemNtf.operation() == Protocol::ItemChangeNotification::ModifyTags) { if (!tagCache->ensureCached((itemNtf.addedTags() + itemNtf.removedTags()).toList(), mTagFetchScope)) { allCached = false; } } } else if (msg->type() == Protocol::Command::CollectionChangeNotification && fetchCollections()) { const auto &colMsg = Protocol::cmdCast(msg); if (colMsg.metadata().contains("FETCH_COLLECTION")) { if (!collectionCache->ensureCached(colMsg.collection()->id(), mCollectionFetchScope)) { allCached = false; } } } return allCached; } bool MonitorPrivate::emitNotification(const Protocol::ChangeNotificationPtr &msg) { bool someoneWasListening = false; if (msg->type() == Protocol::Command::TagChangeNotification) { const auto &tagNtf = Protocol::cmdCast(msg); - //In case of a Remove notification this will return a list of invalid entities (we'll deal later with them) - const Tag::List tags = tagCache->retrieve({ tagNtf.id() }); - someoneWasListening = emitTagNotification(tagNtf, tags.isEmpty() ? Tag() : tags[0]); + const bool fetched = tagNtf.metadata().contains("FETCH_TAG"); + Tag tag; + if (fetched) { + const auto tags = tagCache->retrieve({ tagNtf.tag()->id() }); + tag = tags.isEmpty() ? Tag() : tags.at(0); + } else { + tag = ProtocolHelper::parseTag(*tagNtf.tag()); + } + someoneWasListening = emitTagNotification(tagNtf, tag); } else if (msg->type() == Protocol::Command::RelationChangeNotification) { const auto &relNtf = Protocol::cmdCast(msg); Relation rel; rel.setLeft(Akonadi::Item(relNtf.leftItem())); rel.setRight(Akonadi::Item(relNtf.rightItem())); rel.setType(relNtf.type().toLatin1()); rel.setRemoteId(relNtf.remoteId().toLatin1()); someoneWasListening = emitRelationNotification(relNtf, rel); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); const Collection parent = collectionCache->retrieve(colNtf.parentCollection()); Collection destParent; if (colNtf.operation() == Protocol::CollectionChangeNotification::Move) { destParent = collectionCache->retrieve(colNtf.parentDestCollection()); } //For removals this will retrieve an invalid collection. We'll deal with that in emitCollectionNotification const bool fetched = colNtf.metadata().contains("FETCH_COLLECTION"); - qCDebug(AKONADICORE_LOG) << "Monitor::emitNotification: Collection notification bypassed CollectionCache?" << (!fetched); const Collection col = fetched ? collectionCache->retrieve(colNtf.collection()->id()) : ProtocolHelper::parseCollection(*colNtf.collection(), true); //It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while //the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. if (col.isValid() || colNtf.operation() == Protocol::CollectionChangeNotification::Remove || !fetchCollections()) { someoneWasListening = emitCollectionNotification(colNtf, col, parent, destParent); } } else if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); const Collection parent = collectionCache->retrieve(itemNtf.parentCollection()); Collection destParent; if (itemNtf.operation() == Protocol::ItemChangeNotification::Move) { destParent = collectionCache->retrieve(itemNtf.parentDestCollection()); } //For removals this will retrieve an empty set. We'll deal with that in emitItemNotification const Item::List items = itemCache->retrieve(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); //It is possible that the retrieval fails also in the non-removal case (e.g. because the item was meanwhile removed while //the changerecorder stored the notification or the notification was in the queue). In order to drop such invalid notifications we have to ignore them. if (!items.isEmpty() || itemNtf.operation() == Protocol::ItemChangeNotification::Remove || !fetchItems()) { someoneWasListening = emitItemsNotification(itemNtf, items, parent, destParent); } } else if (msg->type() == Protocol::Command::SubscriptionChangeNotification) { const auto &subNtf = Protocol::cmdCast(msg); NotificationSubscriber subscriber; subscriber.setSubscriber(subNtf.subscriber()); subscriber.setSessionId(subNtf.sessionId()); subscriber.setMonitoredCollections(subNtf.collections()); subscriber.setMonitoredItems(subNtf.items()); subscriber.setMonitoredTags(subNtf.tags()); QSet monitorTypes; Q_FOREACH (auto type, subNtf.types()) { monitorTypes.insert(static_cast(type)); } subscriber.setMonitoredTypes(monitorTypes); subscriber.setMonitoredMimeTypes(subNtf.mimeTypes()); subscriber.setMonitoredResources(subNtf.resources()); subscriber.setIgnoredSessions(subNtf.ignoredSessions()); subscriber.setIsAllMonitored(subNtf.allMonitored()); subscriber.setIsExclusive(subNtf.exclusive()); subscriber.setItemFetchScope(ProtocolHelper::parseItemFetchScope(subNtf.itemFetchScope())); subscriber.setCollectionFetchScope(ProtocolHelper::parseCollectionFetchScope(subNtf.collectionFetchScope())); someoneWasListening = emitSubscriptionChangeNotification(subNtf, subscriber); } else if (msg->type() == Protocol::Command::DebugChangeNotification) { const auto &changeNtf = Protocol::cmdCast(msg); ChangeNotification notification; notification.setListeners(changeNtf.listeners()); notification.setTimestamp(QDateTime::fromMSecsSinceEpoch(changeNtf.timestamp())); notification.setNotification(changeNtf.notification()); switch (changeNtf.notification()->type()) { case Protocol::Command::ItemChangeNotification: notification.setType(ChangeNotification::Items); break; case Protocol::Command::CollectionChangeNotification: notification.setType(ChangeNotification::Collection); break; case Protocol::Command::TagChangeNotification: notification.setType(ChangeNotification::Tag); break; case Protocol::Command::RelationChangeNotification: notification.setType(ChangeNotification::Relation); break; case Protocol::Command::SubscriptionChangeNotification: notification.setType(ChangeNotification::Subscription); break; default: Q_ASSERT(false); // huh? return false; } someoneWasListening = emitDebugChangeNotification(changeNtf, notification); } return someoneWasListening; } void MonitorPrivate::updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg) { if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); notifyCollectionStatisticsWatchers(itemNtf.parentCollection(), itemNtf.resource()); // FIXME use the proper resource of the target collection, for cross resource moves notifyCollectionStatisticsWatchers(itemNtf.parentDestCollection(), itemNtf.destinationResource()); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); if (colNtf.operation() == Protocol::CollectionChangeNotification::Remove) { // no need for statistics updates anymore recentlyChangedCollections.remove(colNtf.collection()->id()); } } } void MonitorPrivate::slotSessionDestroyed(QObject *object) { Session *objectSession = qobject_cast(object); if (objectSession) { sessions.removeAll(objectSession->sessionId()); pendingModification.stopIgnoringSession(objectSession->sessionId()); scheduleSubscriptionUpdate(); } } void MonitorPrivate::slotStatisticsChangedFinished(KJob *job) { if (job->error()) { qCWarning(AKONADICORE_LOG) << "Error on fetching collection statistics: " << job->errorText(); } else { CollectionStatisticsJob *statisticsJob = static_cast(job); Q_ASSERT(statisticsJob->collection().isValid()); emit q_ptr->collectionStatisticsChanged(statisticsJob->collection().id(), statisticsJob->statistics()); } } void MonitorPrivate::slotFlushRecentlyChangedCollections() { for (Collection::Id collection : qAsConst(recentlyChangedCollections)) { Q_ASSERT(collection >= 0); if (fetchCollectionStatistics) { fetchStatistics(collection); } else { static const CollectionStatistics dummyStatistics; emit q_ptr->collectionStatisticsChanged(collection, dummyStatistics); } } recentlyChangedCollections.clear(); } int MonitorPrivate::translateAndCompress(QQueue ¬ificationQueue, const Protocol::ChangeNotificationPtr &msg) { // Always handle tags and relations if (msg->type() == Protocol::Command::TagChangeNotification || msg->type() == Protocol::Command::RelationChangeNotification) { notificationQueue.enqueue(msg); return 1; } // We have to split moves into insert or remove if the source or destination // is not monitored. if (!msg->isMove()) { notificationQueue.enqueue(msg); return 1; } bool sourceWatched = false; bool destWatched = false; if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); if (useRefCounting) { sourceWatched = isMonitored(itemNtf.parentCollection()); destWatched = isMonitored(itemNtf.parentDestCollection()); } else { if (!resources.isEmpty()) { sourceWatched = resources.contains(itemNtf.resource()); destWatched = isMoveDestinationResourceMonitored(itemNtf); } if (!sourceWatched) { sourceWatched = isCollectionMonitored(itemNtf.parentCollection()); } if (!destWatched) { destWatched = isCollectionMonitored(itemNtf.parentDestCollection()); } } } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { const auto &colNtf = Protocol::cmdCast(msg); if (!resources.isEmpty()) { sourceWatched = resources.contains(colNtf.resource()); destWatched = isMoveDestinationResourceMonitored(colNtf); } if (!sourceWatched) { sourceWatched = isCollectionMonitored(colNtf.parentCollection()); } if (!destWatched) { destWatched = isCollectionMonitored(colNtf.parentDestCollection()); } } else { Q_ASSERT(false); return 0; } if (!sourceWatched && !destWatched) { return 0; } if ((sourceWatched && destWatched) || (!collectionMoveTranslationEnabled && msg->type() == Protocol::Command::CollectionChangeNotification)) { notificationQueue.enqueue(msg); return 1; } if (sourceWatched) { if (msg->type() == Protocol::Command::ItemChangeNotification) { auto removalMessage = Protocol::ItemChangeNotificationPtr::create( Protocol::cmdCast(msg)); removalMessage->setOperation(Protocol::ItemChangeNotification::Remove); removalMessage->setParentDestCollection(-1); notificationQueue.enqueue(removalMessage); return 1; } else { auto removalMessage = Protocol::CollectionChangeNotificationPtr::create( Protocol::cmdCast(msg)); removalMessage->setOperation(Protocol::CollectionChangeNotification::Remove); removalMessage->setParentDestCollection(-1); notificationQueue.enqueue(removalMessage); return 1; } } // Transform into an insertion if (msg->type() == Protocol::Command::ItemChangeNotification) { auto insertionMessage = Protocol::ItemChangeNotificationPtr::create( Protocol::cmdCast(msg)); insertionMessage->setOperation(Protocol::ItemChangeNotification::Add); insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); insertionMessage->setParentDestCollection(-1); // We don't support batch insertion, so we have to do it one by one const auto split = splitMessage(*insertionMessage, false); for (const Protocol::ChangeNotificationPtr &insertion : split) { notificationQueue.enqueue(insertion); } return split.count(); } else if (msg->type() == Protocol::Command::CollectionChangeNotification) { auto insertionMessage = Protocol::CollectionChangeNotificationPtr::create( Protocol::cmdCast(msg)); insertionMessage->setOperation(Protocol::CollectionChangeNotification::Add); insertionMessage->setParentCollection(insertionMessage->parentDestCollection()); insertionMessage->setParentDestCollection(-1); notificationQueue.enqueue(insertionMessage); return 1; } Q_ASSERT(false); return 0; } void MonitorPrivate::handleCommands() { Q_Q(Monitor); CommandBufferLocker lock(&mCommandBuffer); CommandBufferNotifyBlocker notify(&mCommandBuffer); while (!mCommandBuffer.isEmpty()) { const auto cmd = mCommandBuffer.dequeue(); lock.unlock(); const auto command = cmd.command; if (command->isResponse()) { switch (command->type()) { case Protocol::Command::Hello: { qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus"; QByteArray subname; if (!q->objectName().isEmpty()) { subname = q->objectName().toLatin1(); } else { subname = session->sessionId(); } subname += " - " + QByteArray::number(quintptr(q)); qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\""; auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId()); ntfConnection->sendCommand(2, subCmd); break; } case Protocol::Command::CreateSubscription: { - auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(pendingModification); + auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(); + for (const auto &col : qAsConst(collections)) { + msubCmd->startMonitoringCollection(col.id()); + } + for (const auto &res : qAsConst(resources)) { + msubCmd->startMonitoringResource(res); + } + for (auto itemId : qAsConst(items)) { + msubCmd->startMonitoringItem(itemId); + } + for (auto tagId : qAsConst(tags)) { + msubCmd->startMonitoringTag(tagId); + } + for (auto type : qAsConst(types)) { + msubCmd->startMonitoringType(static_cast(type)); + } + for (const auto &mimetype : qAsConst(mimetypes)) { + msubCmd->startMonitoringMimeType(mimetype); + } + for (const auto &session : qAsConst(sessions)) { + msubCmd->startIgnoringSession(session); + } + msubCmd->setAllMonitored(monitorAll); + msubCmd->setIsExclusive(exclusive); + msubCmd->setItemFetchScope(ProtocolHelper::itemFetchScopeToProtocol(mItemFetchScope)); + msubCmd->setCollectionFetchScope(ProtocolHelper::collectionFetchScopeToProtocol(mCollectionFetchScope)); + msubCmd->setTagFetchScope(ProtocolHelper::tagFetchScopeToProtocol(mTagFetchScope)); pendingModification = Protocol::ModifySubscriptionCommand(); ntfConnection->sendCommand(3, msubCmd); break; } case Protocol::Command::ModifySubscription: // TODO: Handle errors if (!monitorReady) { monitorReady = true; Q_EMIT q_ptr->monitorReady(); } break; default: qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command); break; } } else { switch (command->type()) { case Protocol::Command::ItemChangeNotification: case Protocol::Command::CollectionChangeNotification: case Protocol::Command::TagChangeNotification: case Protocol::Command::RelationChangeNotification: case Protocol::Command::SubscriptionChangeNotification: case Protocol::Command::DebugChangeNotification: slotNotify(command.staticCast()); break; default: qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command); break; } } lock.relock(); } } /* server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit | | x --> discard x --> pipeline fetchJobDone --> pipeline ?dataAvailable --> emit */ void MonitorPrivate::slotNotify(const Protocol::ChangeNotificationPtr &msg) { int appendedMessages = 0; int modifiedMessages = 0; int erasedMessages = 0; invalidateCaches(msg); updatePendingStatistics(msg); bool needsSplit = true; bool supportsBatch = false; if (isLazilyIgnored(msg, true)) { return; } checkBatchSupport(msg, needsSplit, supportsBatch); const bool isModifyFlags = (msg->type() == Protocol::Command::ItemChangeNotification && Protocol::cmdCast(msg).operation() == Protocol::ItemChangeNotification::ModifyFlags); if (supportsBatch || (!needsSplit && !supportsBatch && !isModifyFlags) || msg->type() == Protocol::Command::CollectionChangeNotification) { // Make sure the batch msg is always queued before the split notifications const int oldSize = pendingNotifications.size(); const int appended = translateAndCompress(pendingNotifications, msg); if (appended > 0) { appendedMessages += appended; } else { ++modifiedMessages; } // translateAndCompress can remove an existing "modify" when msg is a "delete". // Or it can merge two ModifyFlags and return false. // We need to detect such removals, for ChangeRecorder. if (pendingNotifications.count() != oldSize + appended) { ++erasedMessages; // this count isn't exact, but it doesn't matter } } else if (needsSplit) { // If it's not queued at least make sure we fetch all the items from split // notifications in one go. if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto items = Protocol::cmdCast(msg).items(); itemCache->ensureCached(Protocol::ChangeNotification::itemsToUids(items), mItemFetchScope); } } // if the message contains more items, but we need to emit single-item notification, // split the message into one message per item and queue them // if the message contains only one item, but batches are not supported // (and thus neither is flagsModified), splitMessage() will convert the // notification to regular Modify with "FLAGS" part changed if (needsSplit || (!needsSplit && !supportsBatch && isModifyFlags)) { // Make sure inter-resource move notifications are translated into // Add/Remove notifications if (msg->type() == Protocol::Command::ItemChangeNotification) { const auto &itemNtf = Protocol::cmdCast(msg); if (itemNtf.operation() == Protocol::ItemChangeNotification::Move && itemNtf.resource() != itemNtf.destinationResource()) { if (needsSplit) { const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); for (const auto &splitMsg : split) { appendedMessages += translateAndCompress(pendingNotifications, splitMsg); } } else { appendedMessages += translateAndCompress(pendingNotifications, msg); } } else { const Protocol::ChangeNotificationList split = splitMessage(itemNtf, !supportsBatch); pendingNotifications << split.toList(); appendedMessages += split.count(); } } } // tell ChangeRecorder (even if 0 appended, the compression could have made changes to existing messages) if (appendedMessages > 0 || modifiedMessages > 0 || erasedMessages > 0) { if (erasedMessages > 0) { notificationsErased(); } else { notificationsEnqueued(appendedMessages); } } dispatchNotifications(); } void MonitorPrivate::flushPipeline() { while (!pipeline.isEmpty()) { const auto msg = pipeline.head(); if (ensureDataAvailable(msg)) { // dequeue should be before emit, otherwise stuff might happen (like dataAvailable // being called again) and we end up dequeuing an empty pipeline pipeline.dequeue(); emitNotification(msg); } else { break; } } } void MonitorPrivate::dataAvailable() { flushPipeline(); dispatchNotifications(); } void MonitorPrivate::dispatchNotifications() { // Note that this code is not used in a ChangeRecorder (pipelineSize==0) while (pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty()) { const auto msg = pendingNotifications.dequeue(); if (ensureDataAvailable(msg) && pipeline.isEmpty()) { emitNotification(msg); } else { pipeline.enqueue(msg); } } } static Relation::List extractRelations(const QSet &rels) { Relation::List relations; if (rels.isEmpty()) { return relations; } relations.reserve(rels.size()); for (const auto &rel : rels) { relations.push_back(Relation(rel.type.toLatin1(), Akonadi::Item(rel.leftId), Akonadi::Item(rel.rightId))); } return relations; } bool MonitorPrivate::emitItemsNotification(const Protocol::ItemChangeNotification &msg_, const Item::List &items, const Collection &collection, const Collection &collectionDest) { Protocol::ItemChangeNotification msg = msg_; Collection col = collection; Collection colDest = collectionDest; if (!col.isValid()) { col = Collection(msg.parentCollection()); col.setResource(QString::fromUtf8(msg.resource())); } if (!colDest.isValid()) { colDest = Collection(msg.parentDestCollection()); // HACK: destination resource is delivered in the parts field... if (!msg.itemParts().isEmpty()) { colDest.setResource(QString::fromLatin1(*(msg.itemParts().cbegin()))); } } const QSet addedFlags = msg.addedFlags(); const QSet removedFlags = msg.removedFlags(); Relation::List addedRelations, removedRelations; if (msg.operation() == Protocol::ItemChangeNotification::ModifyRelations) { addedRelations = extractRelations(msg.addedRelations()); removedRelations = extractRelations(msg.removedRelations()); } Tag::List addedTags, removedTags; if (msg.operation() == Protocol::ItemChangeNotification::ModifyTags) { addedTags = tagCache->retrieve(msg.addedTags().toList()); removedTags = tagCache->retrieve(msg.removedTags().toList()); } auto msgItems = msg.items(); Item::List its = items; QMutableVectorIterator iter(its); while (iter.hasNext()) { Item it = iter.next(); if (it.isValid()) { const auto msgItem = std::find_if(msgItems.begin(), msgItems.end(), [&it](const Protocol::ChangeNotification::Item &i) { return i.id == it.id(); }); if (msg.operation() == Protocol::ItemChangeNotification::Remove) { it.setRemoteId(msgItem->remoteId); it.setRemoteRevision(msgItem->remoteRevision); it.setMimeType(msgItem->mimeType); } else if (msg.operation() == Protocol::ItemChangeNotification::Move) { // For moves we remove the RID from the PimItemTable to prevent // RID conflict during merge (see T3904 in Phab), so restore the // RID from notification. // TODO: Should we do this for all items with empty RID? Right now // I only know about this usecase. it.setRemoteId(msgItem->remoteId); } if (!it.parentCollection().isValid()) { if (msg.operation() == Protocol::ItemChangeNotification::Move) { it.setParentCollection(colDest); } else { it.setParentCollection(col); } } else { // item has a valid parent collection, most likely due to retrieved ancestors // still, collection might contain extra info, so inject that if (it.parentCollection() == col) { const Collection oldParent = it.parentCollection(); if (oldParent.parentCollection().isValid() && !col.parentCollection().isValid()) { col.setParentCollection(oldParent.parentCollection()); // preserve ancestor chain } it.setParentCollection(col); } else { // If one client does a modify followed by a move we have to make sure that the // AgentBase::itemChanged() in another client always sees the parent collection // of the item before it has been moved. if (msg.operation() != Protocol::ItemChangeNotification::Move) { it.setParentCollection(col); } else { it.setParentCollection(colDest); } } } iter.setValue(it); msgItems.erase(msgItem); } else { // remove the invalid item iter.remove(); } } its.reserve(its.size() + msgItems.size()); // Now reconstruct any items there were left in msgItems Q_FOREACH (const Protocol::ItemChangeNotification::Item &msgItem, msgItems) { Item it(msgItem.id); it.setRemoteId(msgItem.remoteId); it.setRemoteRevision(msgItem.remoteRevision); it.setMimeType(msgItem.mimeType); if (msg.operation() == Protocol::ItemChangeNotification::Move) { it.setParentCollection(colDest); } else { it.setParentCollection(col); } its << it; } bool handled = false; switch (msg.operation()) { case Protocol::ItemChangeNotification::Add: if (q_ptr->receivers(SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemAdded(its.first(), col); return true; } return false; case Protocol::ItemChangeNotification::Modify: if (q_ptr->receivers(SIGNAL(itemChanged(Akonadi::Item,QSet))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemChanged(its.first(), msg.itemParts()); return true; } return false; case Protocol::ItemChangeNotification::ModifyFlags: if (q_ptr->receivers(SIGNAL(itemsFlagsChanged(Akonadi::Item::List,QSet,QSet))) > 0) { emit q_ptr->itemsFlagsChanged(its, msg.addedFlags(), msg.removedFlags()); handled = true; } return handled; case Protocol::ItemChangeNotification::Move: if (q_ptr->receivers(SIGNAL(itemMoved(Akonadi::Item,Akonadi::Collection,Akonadi::Collection))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemMoved(its.first(), col, colDest); handled = true; } if (q_ptr->receivers(SIGNAL(itemsMoved(Akonadi::Item::List,Akonadi::Collection,Akonadi::Collection))) > 0) { emit q_ptr->itemsMoved(its, col, colDest); handled = true; } return handled; case Protocol::ItemChangeNotification::Remove: if (q_ptr->receivers(SIGNAL(itemRemoved(Akonadi::Item))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemRemoved(its.first()); handled = true; } if (q_ptr->receivers(SIGNAL(itemsRemoved(Akonadi::Item::List))) > 0) { emit q_ptr->itemsRemoved(its); handled = true; } return handled; case Protocol::ItemChangeNotification::Link: if (q_ptr->receivers(SIGNAL(itemLinked(Akonadi::Item,Akonadi::Collection))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemLinked(its.first(), col); handled = true; } if (q_ptr->receivers(SIGNAL(itemsLinked(Akonadi::Item::List,Akonadi::Collection))) > 0) { emit q_ptr->itemsLinked(its, col); handled = true; } return handled; case Protocol::ItemChangeNotification::Unlink: if (q_ptr->receivers(SIGNAL(itemUnlinked(Akonadi::Item,Akonadi::Collection))) > 0) { Q_ASSERT(its.count() == 1); emit q_ptr->itemUnlinked(its.first(), col); handled = true; } if (q_ptr->receivers(SIGNAL(itemsUnlinked(Akonadi::Item::List,Akonadi::Collection))) > 0) { emit q_ptr->itemsUnlinked(its, col); handled = true; } return handled; case Protocol::ItemChangeNotification::ModifyTags: if (q_ptr->receivers(SIGNAL(itemsTagsChanged(Akonadi::Item::List,QSet,QSet))) > 0) { emit q_ptr->itemsTagsChanged(its, Akonadi::vectorToSet(addedTags), Akonadi::vectorToSet(removedTags)); return true; } return false; case Protocol::ItemChangeNotification::ModifyRelations: if (q_ptr->receivers(SIGNAL(itemsRelationsChanged(Akonadi::Item::List,Akonadi::Relation::List,Akonadi::Relation::List))) > 0) { emit q_ptr->itemsRelationsChanged(its, addedRelations, removedRelations); return true; } return false; default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in item change notification"; } return false; } bool MonitorPrivate::emitCollectionNotification(const Protocol::CollectionChangeNotification &msg, const Collection &col, const Collection &par, const Collection &dest) { Collection parent = par; if (!parent.isValid()) { parent = Collection(msg.parentCollection()); } Collection destination = dest; if (!destination.isValid()) { destination = Collection(msg.parentDestCollection()); } Collection collection = col; Q_ASSERT(collection.isValid()); if (!collection.isValid()) { qCWarning(AKONADICORE_LOG) << "Failed to get valid Collection for a Collection change!"; return true; // prevent Monitor disconnecting from a signal } if (!collection.parentCollection().isValid()) { if (msg.operation() == Protocol::CollectionChangeNotification::Move) { collection.setParentCollection(destination); } else { collection.setParentCollection(parent); } } switch (msg.operation()) { case Protocol::CollectionChangeNotification::Add: if (q_ptr->receivers(SIGNAL(collectionAdded(Akonadi::Collection,Akonadi::Collection))) == 0) { return false; } emit q_ptr->collectionAdded(collection, parent); return true; case Protocol::CollectionChangeNotification::Modify: if (q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection))) == 0 && q_ptr->receivers(SIGNAL(collectionChanged(Akonadi::Collection,QSet))) == 0) { return false; } emit q_ptr->collectionChanged(collection); emit q_ptr->collectionChanged(collection, msg.changedParts()); return true; case Protocol::CollectionChangeNotification::Move: if (q_ptr->receivers(SIGNAL(collectionMoved(Akonadi::Collection,Akonadi::Collection,Akonadi::Collection))) == 0) { return false; } emit q_ptr->collectionMoved(collection, parent, destination); return true; case Protocol::CollectionChangeNotification::Remove: if (q_ptr->receivers(SIGNAL(collectionRemoved(Akonadi::Collection))) == 0) { return false; } emit q_ptr->collectionRemoved(collection); return true; case Protocol::CollectionChangeNotification::Subscribe: if (q_ptr->receivers(SIGNAL(collectionSubscribed(Akonadi::Collection,Akonadi::Collection))) == 0) { return false; } if (!monitorAll) { // ### why?? emit q_ptr->collectionSubscribed(collection, parent); } return true; case Protocol::CollectionChangeNotification::Unsubscribe: if (q_ptr->receivers(SIGNAL(collectionUnsubscribed(Akonadi::Collection))) == 0) { return false; } if (!monitorAll) { // ### why?? emit q_ptr->collectionUnsubscribed(collection); } return true; default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in collection change notification"; } return false; } bool MonitorPrivate::emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tag) { - Tag validTag; - if (msg.operation() == Protocol::TagChangeNotification::Remove) { - //In case of a removed signal the cache entry was already invalidated, and we therefore received an empty list of tags - validTag = Tag(msg.id()); - validTag.setRemoteId(msg.remoteId().toLatin1()); - } else { - validTag = tag; - } - - if (!validTag.isValid()) { - return false; - } - + Q_UNUSED(msg); switch (msg.operation()) { case Protocol::TagChangeNotification::Add: if (q_ptr->receivers(SIGNAL(tagAdded(Akonadi::Tag))) == 0) { return false; } - Q_EMIT q_ptr->tagAdded(validTag); + Q_EMIT q_ptr->tagAdded(tag); return true; case Protocol::TagChangeNotification::Modify: if (q_ptr->receivers(SIGNAL(tagChanged(Akonadi::Tag))) == 0) { return false; } - Q_EMIT q_ptr->tagChanged(validTag); + Q_EMIT q_ptr->tagChanged(tag); return true; case Protocol::TagChangeNotification::Remove: if (q_ptr->receivers(SIGNAL(tagRemoved(Akonadi::Tag))) == 0) { return false; } - Q_EMIT q_ptr->tagRemoved(validTag); + Q_EMIT q_ptr->tagRemoved(tag); return true; default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; } return false; } bool MonitorPrivate::emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation) { if (!relation.isValid()) { return false; } switch (msg.operation()) { case Protocol::RelationChangeNotification::Add: if (q_ptr->receivers(SIGNAL(relationAdded(Akonadi::Relation))) == 0) { return false; } Q_EMIT q_ptr->relationAdded(relation); return true; case Protocol::RelationChangeNotification::Remove: if (q_ptr->receivers(SIGNAL(relationRemoved(Akonadi::Relation))) == 0) { return false; } Q_EMIT q_ptr->relationRemoved(relation); return true; default: qCDebug(AKONADICORE_LOG) << "Unknown operation type" << msg.operation() << "in tag change notification"; } return false; } bool MonitorPrivate::emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const Akonadi::NotificationSubscriber &subscriber) { if (!subscriber.isValid()) { return false; } switch (msg.operation()) { case Protocol::SubscriptionChangeNotification::Add: if (q_ptr->receivers(SIGNAL(notificationSubscriberAdded(Akonadi::NotificationSubscriber))) == 0) { return false; } Q_EMIT q_ptr->notificationSubscriberAdded(subscriber); return true; case Protocol::SubscriptionChangeNotification::Modify: if (q_ptr->receivers(SIGNAL(notificationSubscriberChanged(Akonadi::NotificationSubscriber))) == 0) { return false; } Q_EMIT q_ptr->notificationSubscriberChanged(subscriber); return true; case Protocol::SubscriptionChangeNotification::Remove: if (q_ptr->receivers(SIGNAL(notificationSubscriberRemoved(Akonadi::NotificationSubscriber))) == 0) { return false; } Q_EMIT q_ptr->notificationSubscriberRemoved(subscriber); return true; default: break; } return false; } bool MonitorPrivate::emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf) { Q_UNUSED(msg); if (!ntf.isValid()) { return false; } if (q_ptr->receivers(SIGNAL(debugNotification(Akonadi::ChangeNotification))) == 0) { return false; } Q_EMIT q_ptr->debugNotification(ntf); return true; } void MonitorPrivate::invalidateCaches(const Protocol::ChangeNotificationPtr &msg) { // remove invalidates // modify removes the cache entry, as we need to re-fetch // And subscription modify the visibility of the collection by the collectionFetchScope. switch (msg->type()) { case Protocol::Command::CollectionChangeNotification: { const auto &colNtf = Protocol::cmdCast(msg); switch (colNtf.operation()) { case Protocol::CollectionChangeNotification::Modify: case Protocol::CollectionChangeNotification::Move: case Protocol::CollectionChangeNotification::Subscribe: collectionCache->update(colNtf.collection()->id(), mCollectionFetchScope); break; case Protocol::CollectionChangeNotification::Remove: collectionCache->invalidate(colNtf.collection()->id()); break; default: break; } } break; case Protocol::Command::ItemChangeNotification: { const auto &itemNtf = Protocol::cmdCast(msg); switch (itemNtf.operation()) { case Protocol::ItemChangeNotification::Modify: case Protocol::ItemChangeNotification::ModifyFlags: case Protocol::ItemChangeNotification::ModifyTags: case Protocol::ItemChangeNotification::ModifyRelations: case Protocol::ItemChangeNotification::Move: itemCache->update(Protocol::ChangeNotification::itemsToUids(itemNtf.items()), mItemFetchScope); break; case Protocol::ItemChangeNotification::Remove: itemCache->invalidate(Protocol::ChangeNotification::itemsToUids(itemNtf.items())); break; default: break; } } break; case Protocol::Command::TagChangeNotification: { const auto &tagNtf = Protocol::cmdCast(msg); switch (tagNtf.operation()) { case Protocol::TagChangeNotification::Modify: - tagCache->update({ tagNtf.id() }, mTagFetchScope); + tagCache->update({ tagNtf.tag()->id() }, mTagFetchScope); break; case Protocol::TagChangeNotification::Remove: - tagCache->invalidate({ tagNtf.id() }); + tagCache->invalidate({ tagNtf.tag()->id() }); break; default: break; } } break; default: break; } } void MonitorPrivate::invalidateCache(const Collection &col) { collectionCache->update(col.id(), mCollectionFetchScope); } void MonitorPrivate::ref(Collection::Id id) { if (!refCountMap.contains(id)) { refCountMap.insert(id, 0); } ++refCountMap[id]; if (m_buffer.isBuffered(id)) { m_buffer.purge(id); } } Akonadi::Collection::Id MonitorPrivate::deref(Collection::Id id) { Q_ASSERT(refCountMap.contains(id)); if (--refCountMap[id] == 0) { refCountMap.remove(id); return m_buffer.buffer(id); } return -1; } void MonitorPrivate::PurgeBuffer::purge(Collection::Id id) { m_buffer.removeOne(id); } Akonadi::Collection::Id MonitorPrivate::PurgeBuffer::buffer(Collection::Id id) { // Ensure that we don't put a duplicate @p id into the buffer. purge(id); Collection::Id bumpedId = -1; if (m_buffer.size() == MAXBUFFERSIZE) { bumpedId = m_buffer.dequeue(); purge(bumpedId); } m_buffer.enqueue(id); return bumpedId; } int MonitorPrivate::PurgeBuffer::buffersize() { return MAXBUFFERSIZE; } bool MonitorPrivate::isMonitored(Collection::Id colId) const { if (!useRefCounting) { return true; } return refCountMap.contains(colId) || m_buffer.isBuffered(colId); } void MonitorPrivate::notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource) { if (collection > 0 && (monitorAll || isCollectionMonitored(collection) || resources.contains(resource))) { recentlyChangedCollections.insert(collection); if (!statisticsCompressionTimer.isActive()) { statisticsCompressionTimer.start(); } } } // @endcond diff --git a/src/core/notificationsubscriber.cpp b/src/core/notificationsubscriber.cpp index 2ff180b63..a127d9588 100644 --- a/src/core/notificationsubscriber.cpp +++ b/src/core/notificationsubscriber.cpp @@ -1,224 +1,237 @@ /* Copyright (c) 2016 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationsubscriber.h" #include "itemfetchscope.h" #include "collectionfetchscope.h" +#include "tagfetchscope.h" namespace Akonadi { class AKONADICORE_NO_EXPORT NotificationSubscriber::Private : public QSharedData { public: explicit Private() : QSharedData() , isAllMonitored(false) , isExclusive(false) {} Private(const Private &other) : QSharedData(other) , subscriber(other.subscriber) , sessionId(other.sessionId) , collections(other.collections) , items(other.items) , tags(other.tags) , types(other.types) , mimeTypes(other.mimeTypes) , resources(other.resources) , ignoredSessions(other.ignoredSessions) , itemFetchScope(other.itemFetchScope) , collectionFetchScope(other.collectionFetchScope) + , tagFetchScope(other.tagFetchScope) , isAllMonitored(other.isAllMonitored) , isExclusive(other.isExclusive) {} QByteArray subscriber; QByteArray sessionId; QSet collections; QSet items; QSet tags; QSet types; QSet mimeTypes; QSet resources; QSet ignoredSessions; ItemFetchScope itemFetchScope; CollectionFetchScope collectionFetchScope; + TagFetchScope tagFetchScope; bool isAllMonitored; bool isExclusive; }; } using namespace Akonadi; NotificationSubscriber::NotificationSubscriber() : d(new Private) { } NotificationSubscriber::NotificationSubscriber(const NotificationSubscriber &other) : d(other.d) { } NotificationSubscriber::~NotificationSubscriber() { } NotificationSubscriber &NotificationSubscriber::operator=(const NotificationSubscriber &other) { d = other.d; return *this; } bool NotificationSubscriber::isValid() const { return !d->subscriber.isEmpty(); } QByteArray NotificationSubscriber::subscriber() const { return d->subscriber; } void NotificationSubscriber::setSubscriber(const QByteArray &subscriber) { d->subscriber = subscriber; } QByteArray NotificationSubscriber::sessionId() const { return d->sessionId; } void NotificationSubscriber::setSessionId(const QByteArray &sessionId) { d->sessionId = sessionId; } QSet NotificationSubscriber::monitoredCollections() const { return d->collections; } void NotificationSubscriber::setMonitoredCollections(const QSet &collections) { d->collections = collections; } QSet NotificationSubscriber::monitoredItems() const { return d->items; } void NotificationSubscriber::setMonitoredItems(const QSet &items) { d->items = items; } QSet NotificationSubscriber::monitoredTags() const { return d->tags; } void NotificationSubscriber::setMonitoredTags(const QSet &tags) { d->tags = tags; } QSet NotificationSubscriber::monitoredTypes() const { return d->types; } void NotificationSubscriber::setMonitoredTypes(const QSet &types) { d->types = types; } QSet NotificationSubscriber::monitoredMimeTypes() const { return d->mimeTypes; } void NotificationSubscriber::setMonitoredMimeTypes(const QSet &mimeTypes) { d->mimeTypes = mimeTypes; } QSet NotificationSubscriber::monitoredResources() const { return d->resources; } void NotificationSubscriber::setMonitoredResources(const QSet &resources) { d->resources = resources; } QSet NotificationSubscriber::ignoredSessions() const { return d->ignoredSessions; } void NotificationSubscriber::setIgnoredSessions(const QSet &ignoredSessions) { d->ignoredSessions = ignoredSessions; } bool NotificationSubscriber::isAllMonitored() const { return d->isAllMonitored; } void NotificationSubscriber::setIsAllMonitored(bool isAllMonitored) { d->isAllMonitored = isAllMonitored; } bool NotificationSubscriber::isExclusive() const { return d->isExclusive; } void NotificationSubscriber::setIsExclusive(bool isExclusive) { d->isExclusive = isExclusive; } ItemFetchScope NotificationSubscriber::itemFetchScope() const { return d->itemFetchScope; } void NotificationSubscriber::setItemFetchScope(const ItemFetchScope &itemFetchScope) { d->itemFetchScope = itemFetchScope; } CollectionFetchScope NotificationSubscriber::collectionFetchScope() const { return d->collectionFetchScope; } void NotificationSubscriber::setCollectionFetchScope(const CollectionFetchScope &fetchScope) { d->collectionFetchScope = fetchScope; } + +TagFetchScope NotificationSubscriber::tagFetchScope() const +{ + return d->tagFetchScope; +} + +void NotificationSubscriber::setTagFetchScope(const TagFetchScope &tagFetchScope) +{ + d->tagFetchScope = tagFetchScope; +} diff --git a/src/core/notificationsubscriber.h b/src/core/notificationsubscriber.h index 70c5a23c4..7cae92446 100644 --- a/src/core/notificationsubscriber.h +++ b/src/core/notificationsubscriber.h @@ -1,87 +1,90 @@ /* Copyright (c) 2016 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_NOTIFICATIONSUBSCRIBER_H #define AKONADI_NOTIFICATIONSUBSCRIBER_H #include #include "monitor.h" #include namespace Akonadi { class AKONADICORE_EXPORT NotificationSubscriber { public: explicit NotificationSubscriber(); NotificationSubscriber(const NotificationSubscriber &other); ~NotificationSubscriber(); NotificationSubscriber &operator=(const NotificationSubscriber &other); bool isValid() const; QByteArray subscriber() const; void setSubscriber(const QByteArray &subscriber); QByteArray sessionId() const; void setSessionId(const QByteArray &sessionId); QSet monitoredCollections() const; void setMonitoredCollections(const QSet &collections); QSet monitoredItems() const; void setMonitoredItems(const QSet &items); QSet monitoredTags() const; void setMonitoredTags(const QSet &tags); QSet monitoredTypes() const; void setMonitoredTypes(const QSet &type); QSet monitoredMimeTypes() const; void setMonitoredMimeTypes(const QSet &mimeTypes); QSet monitoredResources() const; void setMonitoredResources(const QSet &resources); QSet ignoredSessions() const; void setIgnoredSessions(const QSet &ignoredSessions); bool isAllMonitored() const; void setIsAllMonitored(bool isAllMonitored); bool isExclusive() const; void setIsExclusive(bool isExclusive); ItemFetchScope itemFetchScope() const; void setItemFetchScope(const ItemFetchScope &itemFetchScope); CollectionFetchScope collectionFetchScope() const; void setCollectionFetchScope(const CollectionFetchScope &collectionFetchScope); + TagFetchScope tagFetchScope() const; + void setTagFetchScope(const TagFetchScope &tagFetchScope); + private: class Private; QSharedDataPointer d; }; } #endif diff --git a/src/core/protocolhelper.cpp b/src/core/protocolhelper.cpp index 4ebf25045..4bb72a91e 100644 --- a/src/core/protocolhelper.cpp +++ b/src/core/protocolhelper.cpp @@ -1,749 +1,778 @@ /* Copyright (c) 2008 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "protocolhelper_p.h" #include "akonadicore_debug.h" #include "attributefactory.h" #include "collectionstatistics.h" #include "item_p.h" #include "collection_p.h" #include "exceptionbase.h" #include "itemserializer_p.h" #include "itemserializerplugin.h" #include "servermanager.h" #include "tagfetchscope.h" #include "persistentsearchattribute.h" #include "private/protocol_p.h" #include "private/externalpartstorage_p.h" #include #include using namespace Akonadi; CachePolicy ProtocolHelper::parseCachePolicy(const Protocol::CachePolicy &policy) { CachePolicy cp; cp.setCacheTimeout(policy.cacheTimeout()); cp.setIntervalCheckTime(policy.checkInterval()); cp.setInheritFromParent(policy.inherit()); cp.setSyncOnDemand(policy.syncOnDemand()); cp.setLocalParts(policy.localParts()); return cp; } Protocol::CachePolicy ProtocolHelper::cachePolicyToProtocol(const CachePolicy &policy) { Protocol::CachePolicy proto; proto.setCacheTimeout(policy.cacheTimeout()); proto.setCheckInterval(policy.intervalCheckTime()); proto.setInherit(policy.inheritFromParent()); proto.setSyncOnDemand(policy.syncOnDemand()); proto.setLocalParts(policy.localParts()); return proto; } template inline static void parseAttributesImpl(const Protocol::Attributes &attributes, T *entity) { for (auto iter = attributes.cbegin(), end = attributes.cend(); iter != end; ++iter) { Attribute *attribute = AttributeFactory::createAttribute(iter.key()); if (!attribute) { qCWarning(AKONADICORE_LOG) << "Warning: unknown attribute" << iter.key(); continue; } attribute->deserialize(iter.value()); entity->addAttribute(attribute); } } template inline static void parseAncestorsCachedImpl(const QVector &ancestors, T *entity, Collection::Id parentCollection, ProtocolHelperValuePool *pool) { if (!pool || parentCollection == -1) { // if no pool or parent collection id is provided we can't cache anything, so continue as usual ProtocolHelper::parseAncestors(ancestors, entity); return; } if (pool->ancestorCollections.contains(parentCollection)) { // ancestor chain is cached already, so use the cached value entity->setParentCollection(pool->ancestorCollections.value(parentCollection)); } else { // not cached yet, parse the chain ProtocolHelper::parseAncestors(ancestors, entity); pool->ancestorCollections.insert(parentCollection, entity->parentCollection()); } } template inline static Protocol::Attributes attributesToProtocolImpl(const T &entity, bool ns) { Protocol::Attributes attributes; Q_FOREACH (const Attribute *attr, entity.attributes()) { attributes.insert(ProtocolHelper::encodePartIdentifier(ns ? ProtocolHelper::PartAttribute : ProtocolHelper::PartGlobal, attr->type()), attr->serialized()); } return attributes; } void ProtocolHelper::parseAncestorsCached(const QVector &ancestors, Item *item, Collection::Id parentCollection, ProtocolHelperValuePool *pool) { parseAncestorsCachedImpl(ancestors, item, parentCollection, pool); } void ProtocolHelper::parseAncestorsCached(const QVector &ancestors, Collection *collection, Collection::Id parentCollection, ProtocolHelperValuePool *pool) { parseAncestorsCachedImpl(ancestors, collection, parentCollection, pool); } void ProtocolHelper::parseAncestors(const QVector &ancestors, Item *item) { Collection fakeCollection; parseAncestors(ancestors, &fakeCollection); item->setParentCollection(fakeCollection.parentCollection()); } void ProtocolHelper::parseAncestors(const QVector &ancestors, Collection *collection) { static const Collection::Id rootCollectionId = Collection::root().id(); QList parentIds; Collection *current = collection; for (const Protocol::Ancestor &ancestor : ancestors) { if (ancestor.id() == rootCollectionId) { current->setParentCollection(Collection::root()); break; } Akonadi::Collection parentCollection(ancestor.id()); parentCollection.setName(ancestor.name()); parentCollection.setRemoteId(ancestor.remoteId()); parseAttributesImpl(ancestor.attributes(), &parentCollection); current->setParentCollection(parentCollection); current = ¤t->parentCollection(); } } static Collection::ListPreference parsePreference(Tristate value) { switch (value) { case Tristate::True: return Collection::ListEnabled; case Tristate::False: return Collection::ListDisabled; case Tristate::Undefined: return Collection::ListDefault; } Q_ASSERT(false); return Collection::ListDefault; } CollectionStatistics ProtocolHelper::parseCollectionStatistics(const Protocol::FetchCollectionStatsResponse &stats) { CollectionStatistics cs; cs.setCount(stats.count()); cs.setSize(stats.size()); cs.setUnreadCount(stats.unseen()); return cs; } void ProtocolHelper::parseAttributes(const Protocol::Attributes &attributes, Item *item) { parseAttributesImpl(attributes, item); } void ProtocolHelper::parseAttributes(const Protocol::Attributes &attributes, Collection *collection) { parseAttributesImpl(attributes, collection); } void ProtocolHelper::parseAttributes(const Protocol::Attributes &attributes, Tag *tag) { parseAttributesImpl(attributes, tag); } Protocol::Attributes ProtocolHelper::attributesToProtocol(const Item &item, bool ns) { return attributesToProtocolImpl(item, ns); } Protocol::Attributes ProtocolHelper::attributesToProtocol(const Collection &collection, bool ns) { return attributesToProtocolImpl(collection, ns); } Protocol::Attributes ProtocolHelper::attributesToProtocol(const Tag &tag, bool ns) { return attributesToProtocolImpl(tag, ns); } Collection ProtocolHelper::parseCollection(const Protocol::FetchCollectionsResponse &data, bool requireParent) { Collection collection(data.id()); if (requireParent) { collection.setParentCollection(Collection(data.parentId())); } collection.setName(data.name()); collection.setRemoteId(data.remoteId()); collection.setRemoteRevision(data.remoteRevision()); collection.setResource(data.resource()); collection.setContentMimeTypes(data.mimeTypes()); collection.setVirtual(data.isVirtual()); collection.setStatistics(parseCollectionStatistics(data.statistics())); collection.setCachePolicy(parseCachePolicy(data.cachePolicy())); parseAncestors(data.ancestors(), &collection); collection.setEnabled(data.enabled()); collection.setLocalListPreference(Collection::ListDisplay, parsePreference(data.displayPref())); collection.setLocalListPreference(Collection::ListIndex, parsePreference(data.indexPref())); collection.setLocalListPreference(Collection::ListSync, parsePreference(data.syncPref())); collection.setReferenced(data.referenced()); if (!data.searchQuery().isEmpty()) { auto attr = collection.attribute(Collection::AddIfMissing); attr->setQueryString(data.searchQuery()); QVector cols; cols.reserve(data.searchCollections().size()); foreach (auto id, data.searchCollections()) { cols.push_back(Collection(id)); } attr->setQueryCollections(cols); } parseAttributes(data.attributes(), &collection); collection.d_ptr->resetChangeLog(); return collection; } +Tag ProtocolHelper::parseTag(const Protocol::FetchTagsResponse &data) +{ + Tag tag(data.id()); + tag.setRemoteId(data.remoteId()); + tag.setGid(data.gid()); + tag.setType(data.type()); + tag.setParent(Tag(data.parentId())); + parseAttributes(data.attributes(), &tag); + + return tag; +} + QByteArray ProtocolHelper::encodePartIdentifier(PartNamespace ns, const QByteArray &label) { switch (ns) { case PartGlobal: return label; case PartPayload: return "PLD:" + label; case PartAttribute: return "ATR:" + label; default: Q_ASSERT(false); } return QByteArray(); } QByteArray ProtocolHelper::decodePartIdentifier(const QByteArray &data, PartNamespace &ns) { if (data.startsWith("PLD:")) { //krazy:exclude=strings ns = PartPayload; return data.mid(4); } else if (data.startsWith("ATR:")) { //krazy:exclude=strings ns = PartAttribute; return data.mid(4); } else { ns = PartGlobal; return data; } } Protocol::ScopeContext ProtocolHelper::commandContextToProtocol(const Akonadi::Collection &collection, const Akonadi::Tag &tag, const Item::List &requestedItems) { Protocol::ScopeContext ctx; if (tag.isValid()) { ctx.setContext(Protocol::ScopeContext::Tag, tag.id()); } if (collection == Collection::root()) { if (requestedItems.isEmpty() && !tag.isValid()) { // collection content listing throw Exception("Cannot perform item operations on root collection."); } } else { if (collection.isValid()) { ctx.setContext(Protocol::ScopeContext::Collection, collection.id()); } else if (!collection.remoteId().isEmpty()) { ctx.setContext(Protocol::ScopeContext::Collection, collection.remoteId()); } } return ctx; } Scope ProtocolHelper::hierarchicalRidToScope(const Collection &col) { if (col == Collection::root()) { return Scope({ Scope::HRID(0) }); } if (col.remoteId().isEmpty()) { return Scope(); } QVector chain; Collection c = col; while (!c.remoteId().isEmpty()) { chain.append(Scope::HRID(c.id(), c.remoteId())); c = c.parentCollection(); } return Scope(chain + QVector { Scope::HRID(0) }); } Scope ProtocolHelper::hierarchicalRidToScope(const Item &item) { return Scope(QVector({ Scope::HRID(item.id(), item.remoteId()) }) + hierarchicalRidToScope(item.parentCollection()).hridChain()); } Protocol::ItemFetchScope ProtocolHelper::itemFetchScopeToProtocol(const ItemFetchScope &fetchScope) { Protocol::ItemFetchScope fs; QVector parts; parts.reserve(fetchScope.payloadParts().size() + fetchScope.attributes().size()); Q_FOREACH (const QByteArray &part, fetchScope.payloadParts()) { parts << ProtocolHelper::encodePartIdentifier(ProtocolHelper::PartPayload, part); } Q_FOREACH (const QByteArray &part, fetchScope.attributes()) { parts << ProtocolHelper::encodePartIdentifier(ProtocolHelper::PartAttribute, part); } fs.setRequestedParts(parts); // The default scope fs.setFetch(Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::RemoteID | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::MTime); fs.setFetch(Protocol::ItemFetchScope::FullPayload, fetchScope.fullPayload()); fs.setFetch(Protocol::ItemFetchScope::AllAttributes, fetchScope.allAttributes()); fs.setFetch(Protocol::ItemFetchScope::CacheOnly, fetchScope.cacheOnly()); fs.setFetch(Protocol::ItemFetchScope::CheckCachedPayloadPartsOnly, fetchScope.checkForCachedPayloadPartsOnly()); fs.setFetch(Protocol::ItemFetchScope::IgnoreErrors, fetchScope.ignoreRetrievalErrors()); switch (fetchScope.ancestorRetrieval()) { case ItemFetchScope::Parent: fs.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor); break; case ItemFetchScope::All: fs.setAncestorDepth(Protocol::ItemFetchScope::AllAncestors); break; case ItemFetchScope::None: fs.setAncestorDepth(Protocol::ItemFetchScope::NoAncestor); break; default: Q_ASSERT(false); break; } if (fetchScope.fetchChangedSince().isValid()) { fs.setChangedSince(fetchScope.fetchChangedSince()); } fs.setFetch(Protocol::ItemFetchScope::RemoteID, fetchScope.fetchRemoteIdentification()); fs.setFetch(Protocol::ItemFetchScope::RemoteRevision, fetchScope.fetchRemoteIdentification()); fs.setFetch(Protocol::ItemFetchScope::GID, fetchScope.fetchGid()); if (fetchScope.fetchTags()) { fs.setFetch(Protocol::ItemFetchScope::Tags); if (!fetchScope.tagFetchScope().fetchIdOnly()) { if (fetchScope.tagFetchScope().attributes().isEmpty()) { fs.setTagFetchScope({ "ALL" }); } else { fs.setTagFetchScope(fetchScope.tagFetchScope().attributes()); } } } fs.setFetch(Protocol::ItemFetchScope::VirtReferences, fetchScope.fetchVirtualReferences()); fs.setFetch(Protocol::ItemFetchScope::MTime, fetchScope.fetchModificationTime()); fs.setFetch(Protocol::ItemFetchScope::Relations, fetchScope.fetchRelations()); return fs; } ItemFetchScope ProtocolHelper::parseItemFetchScope(const Protocol::ItemFetchScope &fetchScope) { ItemFetchScope ifs; Q_FOREACH (const auto &part, fetchScope.requestedParts()) { if (part.startsWith("PLD:")) { ifs.fetchPayloadPart(part.mid(4), true); } else if (part.startsWith("ATR:")) { ifs.fetchAttribute(part.mid(4), true); } } if (fetchScope.fetch(Protocol::ItemFetchScope::FullPayload)) { ifs.fetchFullPayload(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::AllAttributes)) { ifs.fetchAllAttributes(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::CacheOnly)) { ifs.setCacheOnly(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::CheckCachedPayloadPartsOnly)) { ifs.setCheckForCachedPayloadPartsOnly(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::IgnoreErrors)) { ifs.setIgnoreRetrievalErrors(true); } switch (fetchScope.ancestorDepth()) { case Protocol::Ancestor::ParentAncestor: ifs.setAncestorRetrieval(ItemFetchScope::Parent); break; case Protocol::Ancestor::AllAncestors: ifs.setAncestorRetrieval(ItemFetchScope::All); break; default: ifs.setAncestorRetrieval(ItemFetchScope::None); break; } if (fetchScope.changedSince().isValid()) { ifs.setFetchChangedSince(fetchScope.changedSince()); } if (fetchScope.fetch(Protocol::ItemFetchScope::RemoteID) || fetchScope.fetch(Protocol::ItemFetchScope::RemoteRevision)) { ifs.setFetchRemoteIdentification(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::GID)) { ifs.setFetchGid(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::Tags)) { ifs.setFetchTags(true); const auto tfs = fetchScope.tagFetchScope(); if (tfs.isEmpty()) { ifs.tagFetchScope().setFetchIdOnly(true); } else if (QSet({ "ALL" }) != tfs) { for (const auto &attr : tfs) { ifs.tagFetchScope().fetchAttribute(attr, true); } } } if (fetchScope.fetch(Protocol::ItemFetchScope::VirtReferences)) { ifs.setFetchVirtualReferences(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::MTime)) { ifs.setFetchModificationTime(true); } if (fetchScope.fetch(Protocol::ItemFetchScope::Relations)) { ifs.setFetchRelations(true); } return ifs; } Protocol::CollectionFetchScope ProtocolHelper::collectionFetchScopeToProtocol(const CollectionFetchScope &fetchScope) { Protocol::CollectionFetchScope cfs; switch (fetchScope.listFilter()) { case CollectionFetchScope::NoFilter: cfs.setListFilter(Protocol::CollectionFetchScope::NoFilter); break; case CollectionFetchScope::Display: cfs.setListFilter(Protocol::CollectionFetchScope::Display); break; case CollectionFetchScope::Sync: cfs.setListFilter(Protocol::CollectionFetchScope::Sync); break; case CollectionFetchScope::Index: cfs.setListFilter(Protocol::CollectionFetchScope::Index); break; case CollectionFetchScope::Enabled: cfs.setListFilter(Protocol::CollectionFetchScope::Enabled); break; } cfs.setIncludeStatistics(fetchScope.includeStatistics()); cfs.setResource(fetchScope.resource()); cfs.setContentMimeTypes(fetchScope.contentMimeTypes()); cfs.setAttributes(fetchScope.attributes()); cfs.setFetchIdOnly(fetchScope.fetchIdOnly()); switch (fetchScope.ancestorRetrieval()) { case CollectionFetchScope::None: cfs.setAncestorRetrieval(Protocol::CollectionFetchScope::None); break; case CollectionFetchScope::Parent: cfs.setAncestorRetrieval(Protocol::CollectionFetchScope::Parent); break; case CollectionFetchScope::All: cfs.setAncestorRetrieval(Protocol::CollectionFetchScope::All); break; } if (cfs.ancestorRetrieval() != Protocol::CollectionFetchScope::None) { cfs.setAncestorAttributes(fetchScope.ancestorFetchScope().attributes()); cfs.setAncestorFetchIdOnly(fetchScope.ancestorFetchScope().fetchIdOnly()); } cfs.setIgnoreRetrievalErrors(fetchScope.ignoreRetrievalErrors()); return cfs; } CollectionFetchScope ProtocolHelper::parseCollectionFetchScope(const Protocol::CollectionFetchScope &fetchScope) { CollectionFetchScope cfs; switch (fetchScope.listFilter()) { case Protocol::CollectionFetchScope::NoFilter: cfs.setListFilter(CollectionFetchScope::NoFilter); break; case Protocol::CollectionFetchScope::Display: cfs.setListFilter(CollectionFetchScope::Display); break; case Protocol::CollectionFetchScope::Sync: cfs.setListFilter(CollectionFetchScope::Sync); break; case Protocol::CollectionFetchScope::Index: cfs.setListFilter(CollectionFetchScope::Index); break; case Protocol::CollectionFetchScope::Enabled: cfs.setListFilter(CollectionFetchScope::Enabled); break; } cfs.setIncludeStatistics(fetchScope.includeStatistics()); cfs.setResource(fetchScope.resource()); cfs.setContentMimeTypes(fetchScope.contentMimeTypes()); switch (fetchScope.ancestorRetrieval()) { case Protocol::CollectionFetchScope::None: cfs.setAncestorRetrieval(CollectionFetchScope::None); break; case Protocol::CollectionFetchScope::Parent: cfs.setAncestorRetrieval(CollectionFetchScope::Parent); break; case Protocol::CollectionFetchScope::All: cfs.setAncestorRetrieval(CollectionFetchScope::All); break; } if (cfs.ancestorRetrieval() != CollectionFetchScope::None) { cfs.ancestorFetchScope().setFetchIdOnly(fetchScope.ancestorFetchIdOnly()); const auto attrs = fetchScope.ancestorAttributes(); for (const auto attr : attrs) { cfs.ancestorFetchScope().fetchAttribute(attr, true); } } const auto attrs = fetchScope.attributes(); - for (const auto attr : attrs) { + for (const auto &attr : attrs) { cfs.fetchAttribute(attr, true); } cfs.setFetchIdOnly(fetchScope.fetchIdOnly()); cfs.setIgnoreRetrievalErrors(fetchScope.ignoreRetrievalErrors()); return cfs; } +Protocol::TagFetchScope ProtocolHelper::tagFetchScopeToProtocol(const TagFetchScope &fetchScope) +{ + Protocol::TagFetchScope tfs; + tfs.setFetchIdOnly(fetchScope.fetchIdOnly()); + tfs.setAttributes(fetchScope.attributes()); + return tfs; +} +TagFetchScope ProtocolHelper::parseTagFetchScope(const Protocol::TagFetchScope &fetchScope) +{ + TagFetchScope tfs; + tfs.setFetchIdOnly(fetchScope.fetchIdOnly()); + const auto attrs = fetchScope.attributes(); + for (const auto &attr : attrs) { + tfs.fetchAttribute(attr, true); + } + return tfs; +} static Item::Flags convertFlags(const QVector &flags, ProtocolHelperValuePool *valuePool) { #if __cplusplus >= 201103L || defined(__GNUC__) || defined(__clang__) // When the compiler supports thread-safe static initialization (mandated by the C++11 memory model) // then use it to share the common case of a single-item set only containing the \SEEN flag. // NOTE: GCC and clang has threadsafe static initialization for some time now, even without C++11. if (flags.size() == 1 && flags.first() == "\\SEEN") { static const Item::Flags sharedSeen = Item::Flags() << QByteArray("\\SEEN"); return sharedSeen; } #endif Item::Flags convertedFlags; convertedFlags.reserve(flags.size()); for (const QByteArray &flag : flags) { if (valuePool) { convertedFlags.insert(valuePool->flagPool.sharedValue(flag)); } else { convertedFlags.insert(flag); } } return convertedFlags; } Item ProtocolHelper::parseItemFetchResult(const Protocol::FetchItemsResponse &data, ProtocolHelperValuePool *valuePool) { Item item; item.setId(data.id()); item.setRevision(data.revision()); item.setRemoteId(data.remoteId()); item.setRemoteRevision(data.remoteRevision()); item.setGid(data.gid()); item.setStorageCollectionId(data.parentId()); if (valuePool) { item.setMimeType(valuePool->mimeTypePool.sharedValue(data.mimeType())); } else { item.setMimeType(data.mimeType()); } if (!item.isValid()) { return Item(); } item.setFlags(convertFlags(data.flags(), valuePool)); if (!data.tags().isEmpty()) { Tag::List tags; tags.reserve(data.tags().size()); Q_FOREACH (const Protocol::FetchTagsResponse &tag, data.tags()) { tags.append(parseTagFetchResult(tag)); } item.setTags(tags); } if (!data.relations().isEmpty()) { Relation::List relations; relations.reserve(data.relations().size()); Q_FOREACH (const Protocol::FetchRelationsResponse &rel, data.relations()) { relations.append(parseRelationFetchResult(rel)); } item.d_ptr->mRelations = relations; } if (!data.virtualReferences().isEmpty()) { Collection::List virtRefs; virtRefs.reserve(data.virtualReferences().size()); Q_FOREACH (qint64 colId, data.virtualReferences()) { virtRefs.append(Collection(colId)); } item.setVirtualReferences(virtRefs); } if (!data.cachedParts().isEmpty()) { QSet cp; cp.reserve(data.cachedParts().size()); Q_FOREACH (const QByteArray &ba, data.cachedParts()) { cp.insert(ba); } item.setCachedPayloadParts(cp); } item.setSize(data.size()); item.setModificationTime(data.mTime()); parseAncestorsCached(data.ancestors(), &item, data.parentId(), valuePool); Q_FOREACH (const Protocol::StreamPayloadResponse &part, data.parts()) { ProtocolHelper::PartNamespace ns; const QByteArray plainKey = decodePartIdentifier(part.payloadName(), ns); const auto metaData = part.metaData(); switch (ns) { case ProtocolHelper::PartPayload: ItemSerializer::deserialize(item, plainKey, part.data(), metaData.version(), static_cast(metaData.storageType())); if (metaData.storageType() == Protocol::PartMetaData::Foreign) { item.d_ptr->mPayloadPath = QString::fromUtf8(part.data()); } break; case ProtocolHelper::PartAttribute: { Attribute *attr = AttributeFactory::createAttribute(plainKey); Q_ASSERT(attr); if (metaData.storageType() == Protocol::PartMetaData::External) { const QString filename = ExternalPartStorage::resolveAbsolutePath(part.data()); QFile file(filename); if (file.open(QFile::ReadOnly)) { attr->deserialize(file.readAll()); } else { qCWarning(AKONADICORE_LOG) << "Failed to open attribute file: " << filename; delete attr; attr = nullptr; } } else { attr->deserialize(part.data()); } if (attr) { item.addAttribute(attr); } break; } case ProtocolHelper::PartGlobal: default: qCWarning(AKONADICORE_LOG) << "Unknown item part type:" << part.payloadName(); } } item.d_ptr->resetChangeLog(); return item; } Tag ProtocolHelper::parseTagFetchResult(const Protocol::FetchTagsResponse &data) { Tag tag; tag.setId(data.id()); tag.setGid(data.gid()); tag.setRemoteId(data.remoteId()); tag.setType(data.type()); tag.setParent(data.parentId() > 0 ? Tag(data.parentId()) : Tag()); parseAttributes(data.attributes(), &tag); return tag; } Relation ProtocolHelper::parseRelationFetchResult(const Protocol::FetchRelationsResponse &data) { Relation relation; relation.setLeft(Item(data.left())); relation.setRight(Item(data.right())); relation.setRemoteId(data.remoteId()); relation.setType(data.type()); return relation; } bool ProtocolHelper::streamPayloadToFile(const QString &fileName, const QByteArray &data, QByteArray &error) { const QString filePath = ExternalPartStorage::resolveAbsolutePath(fileName); qCDebug(AKONADICORE_LOG) << filePath << fileName; if (!filePath.startsWith(ExternalPartStorage::akonadiStoragePath())) { qCWarning(AKONADICORE_LOG) << "Invalid file path" << fileName; error = "Invalid file path"; return false; } QFile file(filePath); if (!file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { qCWarning(AKONADICORE_LOG) << "Failed to open destination payload file" << file.errorString(); error = "Failed to store payload into file"; return false; } if (file.write(data) != data.size()) { qCWarning(AKONADICORE_LOG) << "Failed to write all payload data to file"; error = "Failed to store payload into file"; return false; } qCDebug(AKONADICORE_LOG) << "Wrote" << data.size() << "bytes to " << file.fileName(); // Make sure stuff is written to disk file.close(); return true; } Akonadi::Tristate ProtocolHelper::listPreference(Collection::ListPreference pref) { switch (pref) { case Collection::ListEnabled: return Tristate::True; case Collection::ListDisabled: return Tristate::False; case Collection::ListDefault: return Tristate::Undefined; } Q_ASSERT(false); return Tristate::Undefined; } diff --git a/src/core/protocolhelper_p.h b/src/core/protocolhelper_p.h index 2fb0a8df7..07fd8b97b 100644 --- a/src/core/protocolhelper_p.h +++ b/src/core/protocolhelper_p.h @@ -1,344 +1,344 @@ /* Copyright (c) 2008 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_PROTOCOLHELPER_P_H #define AKONADI_PROTOCOLHELPER_P_H #include "cachepolicy.h" #include "collection.h" #include "collectionutils.h" #include "item.h" #include "itemfetchscope.h" #include "collectionfetchscope.h" #include "sharedvaluepool_p.h" #include "tag.h" #include "private/imapparser_p.h" #include "private/protocol_p.h" #include "private/scope_p.h" #include "private/tristate_p.h" #include #include #include #include #include namespace Akonadi { struct ProtocolHelperValuePool { typedef Internal::SharedValuePool FlagPool; typedef Internal::SharedValuePool MimeTypePool; FlagPool flagPool; MimeTypePool mimeTypePool; QHash ancestorCollections; }; /** @internal Helper methods for converting between libakonadi objects and their protocol representation. @todo Add unit tests for this. @todo Use exceptions for a useful error handling */ class ProtocolHelper { public: /** Part namespaces. */ enum PartNamespace { PartGlobal, PartPayload, PartAttribute }; /** Parse a cache policy definition. @param policy The parsed cache policy. @returns Akonadi::CachePolicy */ static CachePolicy parseCachePolicy(const Protocol::CachePolicy &policy); /** Convert a cache policy object into its protocol representation. */ static Protocol::CachePolicy cachePolicyToProtocol(const CachePolicy &policy); /** Convert a ancestor chain from its protocol representation into an Item object. */ static void parseAncestors(const QVector &ancestors, Item *item); /** Convert a ancestor chain from its protocol representation into a Collection object. */ static void parseAncestors(const QVector &ancestors, Collection *collection); /** Convert a ancestor chain from its protocol representation into an Item object. This method allows to pass a @p valuePool which acts as cache, so ancestor paths for the same @p parentCollection don't have to be parsed twice. */ static void parseAncestorsCached(const QVector &ancestors, Item *item, Collection::Id parentCollection, ProtocolHelperValuePool *valuePool = nullptr); /** Convert a ancestor chain from its protocol representation into an Collection object. This method allows to pass a @p valuePool which acts as cache, so ancestor paths for the same @p parentCollection don't have to be parsed twice. */ static void parseAncestorsCached(const QVector &ancestors, Collection *collection, Collection::Id parentCollection, ProtocolHelperValuePool *valuePool = nullptr); /** Parse a collection description. @param data The input data. @param requireParent Whether or not we require a parent as part of the data. @returns The parsed collection */ static Collection parseCollection(const Protocol::FetchCollectionsResponse &data, bool requireParent = true); + static Tag parseTag(const Protocol::FetchTagsResponse &data); + static void parseAttributes(const Protocol::Attributes &attributes, Item *item); static void parseAttributes(const Protocol::Attributes &attributes, Collection *collection); static void parseAttributes(const Protocol::Attributes &attributes, Tag *entity); static CollectionStatistics parseCollectionStatistics(const Protocol::FetchCollectionStatsResponse &stats); /** Convert attributes to their protocol representation. */ static Protocol::Attributes attributesToProtocol(const Item &item, bool ns = false); static Protocol::Attributes attributesToProtocol(const Collection &collection, bool ns = false); static Protocol::Attributes attributesToProtocol(const Tag &entity, bool ns = false); /** Encodes part label and namespace. */ static QByteArray encodePartIdentifier(PartNamespace ns, const QByteArray &label); /** Decode part label and namespace. */ static QByteArray decodePartIdentifier(const QByteArray &data, PartNamespace &ns); /** Converts the given set of items into a protocol representation. @throws A Akonadi::Exception if the item set contains items with missing/invalid identifiers. */ template class Container> static Scope entitySetToScope(const Container &_objects) { if (_objects.isEmpty()) { throw Exception("No objects specified"); } Container objects(_objects); using namespace std::placeholders; std::sort(objects.begin(), objects.end(), [](const T & a, const T & b) -> bool { return a.id() < b.id(); }); if (objects.at(0).isValid()) { QVector uids; uids.reserve(objects.size()); for (const T &object : objects) { uids << object.id(); } ImapSet set; set.add(uids); return Scope(set); } if (entitySetHasGID(_objects)) { return entitySetToGID(_objects); } if (!entitySetHasRemoteIdentifier(_objects, std::mem_fn(&T::remoteId))) { throw Exception("No remote identifier specified"); } // check if we have RIDs or HRIDs if (entitySetHasHRID(_objects)) { return hierarchicalRidToScope(objects.first()); } return entitySetToRemoteIdentifier(Scope::Rid, _objects, std::mem_fn(&T::remoteId)); } static Protocol::ScopeContext commandContextToProtocol(const Akonadi::Collection &collection, const Akonadi::Tag &tag, const Item::List &requestedItems); /** Converts the given object identifier into a protocol representation. @throws A Akonadi::Exception if the item set contains items with missing/invalid identifiers. */ template static Scope entityToScope(const T &object) { return entitySetToScope(QVector() << object); } /** Converts the given collection's hierarchical RID into a protocol representation. Assumes @p col has a valid hierarchical RID, so check that before! */ static Scope hierarchicalRidToScope(const Collection &col); /** Converts the HRID of the given item into an ASAP protocol representation. Assumes @p item has a valid HRID. */ static Scope hierarchicalRidToScope(const Item &item); static Scope hierarchicalRidToScope(const Tag &/*tag*/) { assert(false); return Scope(); } /** Converts a given ItemFetchScope object into a protocol representation. */ static Protocol::ItemFetchScope itemFetchScopeToProtocol(const ItemFetchScope &fetchScope); static ItemFetchScope parseItemFetchScope(const Protocol::ItemFetchScope &fetchScope); static Protocol::CollectionFetchScope collectionFetchScopeToProtocol(const CollectionFetchScope &fetchScope); static CollectionFetchScope parseCollectionFetchScope(const Protocol::CollectionFetchScope &fetchScope); - /** - Converts a given TagFetchScope object into a protocol representation. - */ - static QVector tagFetchScopeToProtocol(const TagFetchScope &fetchScope); + static Protocol::TagFetchScope tagFetchScopeToProtocol(const TagFetchScope &fetchScope); + static TagFetchScope parseTagFetchScope(const Protocol::TagFetchScope &fetchScope); /** Parses a single line from an item fetch job result into an Item object. */ static Item parseItemFetchResult(const Protocol::FetchItemsResponse &data, ProtocolHelperValuePool *valuePool = nullptr); static Tag parseTagFetchResult(const Protocol::FetchTagsResponse &data); static Relation parseRelationFetchResult(const Protocol::FetchRelationsResponse &data); static bool streamPayloadToFile(const QString &file, const QByteArray &data, QByteArray &error); static Akonadi::Tristate listPreference(const Collection::ListPreference pref); private: template class Container> inline static typename std::enable_if < !std::is_same::value, bool >::type entitySetHasGID(const Container &objects) { return entitySetHasRemoteIdentifier(objects, std::mem_fn(&T::gid)); } template class Container> inline static typename std::enable_if::value, bool>::type entitySetHasGID(const Container &/*objects*/, int * /*dummy*/ = nullptr) { return false; } template class Container> inline static typename std::enable_if < !std::is_same::value, Scope >::type entitySetToGID(const Container &objects) { return entitySetToRemoteIdentifier(Scope::Gid, objects, std::mem_fn(&T::gid)); } template class Container> inline static typename std::enable_if::value, Scope>::type entitySetToGID(const Container &/*objects*/, int * /*dummy*/ = nullptr) { return Scope(); } template class Container, typename RIDFunc> inline static bool entitySetHasRemoteIdentifier(const Container &objects, const RIDFunc &ridFunc) { return std::find_if(objects.constBegin(), objects.constEnd(), [ = ](const T & obj) { return ridFunc(obj).isEmpty(); }) == objects.constEnd(); } template class Container, typename RIDFunc> inline static typename std::enable_if::value, Scope>::type entitySetToRemoteIdentifier(Scope::SelectionScope scope, const Container &objects, const RIDFunc &ridFunc) { QStringList rids; rids.reserve(objects.size()); std::transform(objects.cbegin(), objects.cend(), std::back_inserter(rids), [ = ](const T & obj) -> QString { return ridFunc(obj); }); return Scope(scope, rids); } template class Container, typename RIDFunc> inline static typename std::enable_if::value, Scope>::type entitySetToRemoteIdentifier(Scope::SelectionScope scope, const Container &objects, const RIDFunc &ridFunc, int * /*dummy*/ = nullptr) { QStringList rids; rids.reserve(objects.size()); std::transform(objects.cbegin(), objects.cend(), std::back_inserter(rids), [ = ](const T & obj) -> QString { return QString::fromLatin1(ridFunc(obj)); }); return Scope(scope, rids); } template class Container> inline static typename std::enable_if < !std::is_same::value, bool >::type entitySetHasHRID(const Container &objects) { return objects.size() == 1 && std::find_if(objects.constBegin(), objects.constEnd(), [](const T & obj) -> bool { return !CollectionUtils::hasValidHierarchicalRID(obj); }) == objects.constEnd(); // ### HRID sets are not yet specified } template class Container> inline static typename std::enable_if::value, bool>::type entitySetHasHRID(const Container &/*objects*/, int * /*dummy*/ = nullptr) { return false; } }; } #endif diff --git a/src/private/protocol.xml b/src/private/protocol.xml index 93942eb55..8234a3529 100644 --- a/src/private/protocol.xml +++ b/src/private/protocol.xml @@ -1,1102 +1,1111 @@ + + + + + - - + + + + + + diff --git a/src/server/aggregatedfetchscope.cpp b/src/server/aggregatedfetchscope.cpp index db7ed1a6f..5c05f1a85 100644 --- a/src/server/aggregatedfetchscope.cpp +++ b/src/server/aggregatedfetchscope.cpp @@ -1,132 +1,205 @@ /* Copyright (c) 2017 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "aggregatedfetchscope.h" #include #include namespace Akonadi { namespace Server { class AggregatedCollectionFetchScopePrivate { public: explicit AggregatedCollectionFetchScopePrivate() : fetchIdOnly(0) , fetchStats(0) { } - ~AggregatedCollectionFetchScopePrivate() - { - } - mutable QMutex lock; QSet attrs; QHash attrsCount; int fetchIdOnly; int fetchStats; +}; +class AggregatedTagFetchScopePrivate +{ +public: + explicit AggregatedTagFetchScopePrivate() + : fetchIdOnly(0) + { + } + + mutable QMutex lock; + QSet attrs; + QHash attrsCount; + int fetchIdOnly; }; } // namespace Server } // namespace Akonadi using namespace Akonadi::Server; AggregatedCollectionFetchScope::AggregatedCollectionFetchScope() : d_ptr(new AggregatedCollectionFetchScopePrivate) { } AggregatedCollectionFetchScope::~AggregatedCollectionFetchScope() { delete d_ptr; } QSet AggregatedCollectionFetchScope::attributes() const { Q_D(const AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); return d->attrs; } void AggregatedCollectionFetchScope::addAttribute(const QByteArray &attribute) { Q_D(AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); auto it = d->attrsCount.find(attribute); if (it == d->attrsCount.end()) { it = d->attrsCount.insert(attribute, 0); d->attrs.insert(attribute); } ++(*it); } void AggregatedCollectionFetchScope::removeAttribute(const QByteArray &attribute) { Q_D(AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); auto it = d->attrsCount.find(attribute); if (it == d->attrsCount.end()) { return; } if (--(*it) == 0) { d->attrsCount.erase(it); d->attrs.remove(attribute); } } bool AggregatedCollectionFetchScope::fetchIdOnly() const { Q_D(const AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); // Aggregation: we can return true only if everyone wants fetchIdOnly, // otherwise there's at least one subscriber who wants everything return d->fetchIdOnly == 0; } void AggregatedCollectionFetchScope::setFetchIdOnly(bool fetchIdOnly) { Q_D(AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); d->fetchIdOnly += fetchIdOnly ? 1 : -1; } bool AggregatedCollectionFetchScope::fetchStatistics() const { Q_D(const AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); // Aggregation: return true if at least one subscriber wants stats return d->fetchStats > 0; } void AggregatedCollectionFetchScope::setFetchStatistics(bool fetchStats) { Q_D(AggregatedCollectionFetchScope); QMutexLocker locker(&d->lock); d->fetchStats += fetchStats ? 1 : -1; } + + + +AggregatedTagFetchScope::AggregatedTagFetchScope() + : d_ptr(new AggregatedCollectionFetchScopePrivate) +{ +} + +AggregatedTagFetchScope::~AggregatedTagFetchScope() +{ + delete d_ptr; +} + +bool AggregatedTagFetchScope::fetchIdOnly() const +{ + Q_D(const AggregatedTagFetchScope); + QMutexLocker locker(&d->lock); + + // Aggregation: we can return true only if everyone wants fetchIdOnly, + // otherwise there's at least one subscriber who wants everything + return d->fetchIdOnly == 0; +} + +void AggregatedTagFetchScope::setFetchIdOnly(bool fetchIdOnly) +{ + Q_D(AggregatedTagFetchScope); + QMutexLocker locker(&d->lock); + + d->fetchIdOnly += fetchIdOnly ? 1 : -1; +} + +QSet AggregatedTagFetchScope::attributes() const +{ + Q_D(const AggregatedTagFetchScope); + QMutexLocker locker(&d->lock); + return d->attrs; +} + +void AggregatedTagFetchScope::addAttribute(const QByteArray &attribute) +{ + Q_D(AggregatedTagFetchScope); + QMutexLocker locker(&d->lock); + auto it = d->attrsCount.find(attribute); + if (it == d->attrsCount.end()) { + it = d->attrsCount.insert(attribute, 0); + d->attrs.insert(attribute); + } + ++(*it); +} + +void AggregatedTagFetchScope::removeAttribute(const QByteArray &attribute) +{ + Q_D(AggregatedTagFetchScope); + QMutexLocker locker(&d->lock); + auto it = d->attrsCount.find(attribute); + if (it == d->attrsCount.end()) { + return; + } + + if (--(*it) == 0) { + d->attrsCount.erase(it); + d->attrs.remove(attribute); + } +} diff --git a/src/server/aggregatedfetchscope.h b/src/server/aggregatedfetchscope.h index 3b7f558d7..90fbd02b7 100644 --- a/src/server/aggregatedfetchscope.h +++ b/src/server/aggregatedfetchscope.h @@ -1,56 +1,75 @@ /* Copyright (c) 2017 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_AGGREGATED_FETCHSCOPE_H_ #define AKONADI_AGGREGATED_FETCHSCOPE_H_ #include class QByteArray; namespace Akonadi { namespace Server { class AggregatedCollectionFetchScopePrivate; class AggregatedCollectionFetchScope { public: explicit AggregatedCollectionFetchScope(); ~AggregatedCollectionFetchScope(); QSet attributes() const; void addAttribute(const QByteArray &attribute); void removeAttribute(const QByteArray &attribute); bool fetchIdOnly() const; void setFetchIdOnly(bool fetchIdOnly); bool fetchStatistics() const; void setFetchStatistics(bool fetchStats); private: AggregatedCollectionFetchScopePrivate * const d_ptr; Q_DECLARE_PRIVATE(AggregatedCollectionFetchScope) }; +class AggregatedTagFetchScopePrivate; +class AggregatedTagFetchScope +{ +public: + explicit AggregatedTagFetchScope(); + ~AggregatedTagFetchScope(); + + QSet attributes() const; + void addAttribute(const QByteArray &attribute); + void removeAttribute(const QByteArray &attribute); + + bool fetchIdOnly() const; + void setFetchIdOnly(bool fetchIdOnly); + +private: + AggregatedCollectionFetchScopePrivate * const d_ptr; + Q_DECLARE_PRIVATE(AggregatedTagFetchScope) +}; + } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/notificationmanager.cpp b/src/server/notificationmanager.cpp index b35c28025..de8ab3fd1 100644 --- a/src/server/notificationmanager.cpp +++ b/src/server/notificationmanager.cpp @@ -1,307 +1,335 @@ /* Copyright (c) 2006 - 2007 Volker Krause Copyright (c) 2010 Michael Jansen This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationmanager.h" #include "notificationsubscriber.h" #include "storage/notificationcollector.h" #include "tracer.h" #include "akonadiserver_debug.h" #include "aggregatedfetchscope.h" #include "storage/collectionstatistics.h" #include "storage/selectquerybuilder.h" #include "handlerhelper.h" #include #include #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; NotificationManager::NotificationManager() : AkThread(QStringLiteral("NotificationManager")) , mTimer(nullptr) , mNotifyThreadPool(nullptr) , mDebugNotifications(0) , mCollectionFetchScope(nullptr) { } NotificationManager::~NotificationManager() { quitThread(); } void NotificationManager::init() { AkThread::init(); const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite); QSettings settings(serverConfigFile, QSettings::IniFormat); mTimer = new QTimer(this); mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt()); mTimer->setSingleShot(true); connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications); mNotifyThreadPool = new QThreadPool(this); mNotifyThreadPool->setMaxThreadCount(5); mCollectionFetchScope = new AggregatedCollectionFetchScope(); + mTagFetchScope = new AggregatedTagFetchScope(); } void NotificationManager::quit() { mQuitting = true; if (mEventLoop) { mEventLoop->quit(); return; } mTimer->stop(); delete mTimer; mNotifyThreadPool->clear(); mNotifyThreadPool->waitForDone(); delete mNotifyThreadPool; qDeleteAll(mSubscribers); delete mCollectionFetchScope; + delete mTagFetchScope; AkThread::quit(); } void NotificationManager::registerConnection(quintptr socketDescriptor) { Q_ASSERT(thread() == QThread::currentThread()); NotificationSubscriber *subscriber = new NotificationSubscriber(this, socketDescriptor); qCDebug(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")"; connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) { if (enabled) { ++mDebugNotifications; } else { --mDebugNotifications; } Q_ASSERT(mDebugNotifications >= 0); Q_ASSERT(mDebugNotifications <= mSubscribers.count()); }); mSubscribers.push_back(subscriber); if (mEventLoop) { mEventLoop->quit(); } if (!mWaiting) { mWaiting = true; QTimer::singleShot(0, this, &NotificationManager::waitForSocketData); } } void NotificationManager::waitForSocketData() { mWaiting = true; while (!mSubscribers.isEmpty()) { QEventLoop loop; mEventLoop = &loop; for (const auto sub : qAsConst(mSubscribers)) { if (sub) { connect(sub->socket(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit); connect(sub->socket(), &QLocalSocket::disconnected, &loop, &QEventLoop::quit); } } loop.exec(); mEventLoop = nullptr; if (mQuitting) { QTimer::singleShot(0, this, &NotificationManager::quit); break; } for (const auto sub : qAsConst(mSubscribers)) { if (sub) { sub->handleIncomingData(); } } } mWaiting = false; } -AggregatedCollectionFetchScope *NotificationManager::collectionFetchScope() const -{ - return mCollectionFetchScope; -} - void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber) { Q_ASSERT(QThread::currentThread() == thread()); mSubscribers.removeAll(subscriber); if (mEventLoop) { mEventLoop->quit(); } } void NotificationManager::connectNotificationCollector(NotificationCollector *collector) { connect(collector, &NotificationCollector::notify, this, &NotificationManager::slotNotify); } void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs) { Q_ASSERT(QThread::currentThread() == thread()); for (const auto &msg : msgs) { if (msg->type() == Protocol::Command::CollectionChangeNotification) { auto &cmsg = Protocol::cmdCast(msg); auto msgCollection = cmsg.collection(); // Make sure we have all the data if (!mCollectionFetchScope->fetchIdOnly() && msgCollection->name().isEmpty()) { const auto col = Collection::retrieveById(msgCollection->id()); const auto mts = col.mimeTypes(); QStringList mimeTypes; mimeTypes.reserve(mts.size()); for (const auto &mt : mts) { mimeTypes.push_back(mt.name()); } msgCollection = HandlerHelper::fetchCollectionsResponse(col, {}, false, 0, {}, {}, false, mimeTypes); } // Get up-to-date statistics if (mCollectionFetchScope->fetchStatistics()) { Collection col; col.setId(msgCollection->id()); const auto stats = CollectionStatistics::self()->statistics(col); msgCollection->setStatistics(Protocol::FetchCollectionStatsResponse(stats.count, stats.count - stats.read, stats.size)); } // Get attributes const auto requestedAttrs = mCollectionFetchScope->attributes(); auto msgColAttrs = msgCollection->attributes(); // TODO: This assumes that we have either none or all attributes in msgCollection if (msgColAttrs.isEmpty() && !requestedAttrs.isEmpty()) { SelectQueryBuilder qb; qb.addColumn(CollectionAttribute::typeFullColumnName()); qb.addColumn(CollectionAttribute::valueFullColumnName()); qb.addValueCondition(CollectionAttribute::collectionIdFullColumnName(), Query::Equals, msgCollection->id()); Query::Condition cond(Query::Or); for (const auto &attr : requestedAttrs) { cond.addValueCondition(CollectionAttribute::typeFullColumnName(), Query::Equals, attr); } qb.addCondition(cond); if (!qb.exec()) { qCWarning(AKONADISERVER_LOG) << "Failed to obtain collection attributes!"; } const auto attrs = qb.result(); for (const auto &attr : attrs) { msgColAttrs.insert(attr.type(), attr.value()); } msgCollection->setAttributes(msgColAttrs); } Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg); + } else if (msg->type() == Protocol::Command::TagChangeNotification) { + auto &tmsg = Protocol::cmdCast(msg); + auto msgTag = tmsg.tag(); + + if (!mTagFetchScope->fetchIdOnly() && msgTag->gid().isEmpty()) { + msgTag = HandlerHelper::fetchTagsResponse(Tag::retrieveById(msgTag->id()), false); + } + + const auto requestedAttrs = mTagFetchScope->attributes(); + auto msgTagAttrs = msgTag->attributes(); + if (msgTagAttrs.isEmpty() && !requestedAttrs.isEmpty()) { + SelectQueryBuilder qb; + qb.addColumn(TagAttribute::typeFullColumnName()); + qb.addColumn(TagAttribute::valueFullColumnName()); + qb.addValueCondition(TagAttribute::tagIdFullColumnName(), Query::Equals, msgTag->id()); + Query::Condition cond(Query::Or); + for (const auto &attr : requestedAttrs) { + cond.addValueCondition(TagAttribute::typeFullColumnName(), Query::Equals, attr); + } + qb.addCondition(cond); + if (!qb.exec()) { + qCWarning(AKONADISERVER_LOG) << "Failed to obtain tag attributes!"; + } + const auto attrs = qb.result(); + for (const auto &attr : attrs) { + msgTagAttrs.insert(attr.type(), attr.value()); + } + msgTag->setAttributes(msgTagAttrs); + } + mNotifications.push_back(msg); + } else { mNotifications.push_back(msg); } } if (!mTimer->isActive()) { mTimer->start(); } } class NotifyRunnable : public QRunnable { public: explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList ¬ifications) : mSubscriber(subscriber) , mNotifications(notifications) { } ~NotifyRunnable() { } void run() override { for (const auto &ntf : qAsConst(mNotifications)) { if (mSubscriber) { mSubscriber->notify(ntf); } else { break; } } } private: QPointer mSubscriber; Protocol::ChangeNotificationList mNotifications; }; void NotificationManager::emitPendingNotifications() { Q_ASSERT(QThread::currentThread() == thread()); if (mNotifications.isEmpty()) { return; } if (mDebugNotifications == 0) { for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications)); } } else { // When debugging notification we have to use a non-threaded approach // so that we can work with return value of notify() for (const auto ¬ification : qAsConst(mNotifications)) { QVector listeners; for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { if (subscriber->notify(notification)) { listeners.push_back(subscriber->subscriber()); } } emitDebugNotification(notification, listeners); } } mNotifications.clear(); } void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QVector &listeners) { auto debugNtf = Protocol::DebugChangeNotificationPtr::create(); debugNtf->setNotification(ntf); debugNtf->setListeners(listeners); debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch()); for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) { mNotifyThreadPool->start(new NotifyRunnable(subscriber, { debugNtf })); } } diff --git a/src/server/notificationmanager.h b/src/server/notificationmanager.h index 3dfdce49a..2a1711b38 100644 --- a/src/server/notificationmanager.h +++ b/src/server/notificationmanager.h @@ -1,92 +1,95 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_NOTIFICATIONMANAGER_H #define AKONADI_NOTIFICATIONMANAGER_H #include "akthread.h" #include #include #include class NotificationManagerTest; class QThreadPool; class QEventLoop; namespace Akonadi { namespace Server { class NotificationCollector; class NotificationSubscriber; class AggregatedCollectionFetchScope; +class AggregatedTagFetchScope; class NotificationManager : public AkThread { Q_OBJECT public: explicit NotificationManager(); ~NotificationManager() override; void connectNotificationCollector(NotificationCollector *collector); void forgetSubscriber(NotificationSubscriber *subscriber); - AggregatedCollectionFetchScope *collectionFetchScope() const; + AggregatedCollectionFetchScope *collectionFetchScope() const { return mCollectionFetchScope; } + AggregatedTagFetchScope *tagFetchScope() const { return mTagFetchScope; } public Q_SLOTS: void registerConnection(quintptr socketDescriptor); void emitPendingNotifications(); private Q_SLOTS: void slotNotify(const Akonadi::Protocol::ChangeNotificationList &msgs); void waitForSocketData(); protected: void init() override; void quit() override; void emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QVector &listeners); private: Protocol::ChangeNotificationList mNotifications; QTimer *mTimer = nullptr; QThreadPool *mNotifyThreadPool = nullptr; QVector> mSubscribers; int mDebugNotifications; AggregatedCollectionFetchScope *mCollectionFetchScope; + AggregatedTagFetchScope *mTagFetchScope; QEventLoop *mEventLoop = nullptr; bool mWaiting = false; bool mQuitting = false; friend class NotificationSubscriber; friend class ::NotificationManagerTest; }; } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/notificationsubscriber.cpp b/src/server/notificationsubscriber.cpp index 9108a076b..246d90c4b 100644 --- a/src/server/notificationsubscriber.cpp +++ b/src/server/notificationsubscriber.cpp @@ -1,726 +1,745 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationsubscriber.h" #include "akonadiserver_debug.h" #include "notificationmanager.h" #include "collectionreferencemanager.h" #include "aggregatedfetchscope.h" +#include "storage/querybuilder.h" +#include "utils.h" #include #include #include #include #include using namespace Akonadi; using namespace Akonadi::Server; QMimeDatabase NotificationSubscriber::sMimeDatabase; #define TRACE_NTF(x) //#define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x NotificationSubscriber::NotificationSubscriber(NotificationManager *manager) : QObject() , mManager(manager) , mSocket(nullptr) , mAllMonitored(false) , mExclusive(false) , mNotificationDebugging(false) { } NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor) : NotificationSubscriber(manager) { mSocket = new QLocalSocket(this); connect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected); mSocket->setSocketDescriptor(socketDescriptor); const SchemaVersion schema = SchemaVersion::retrieveAll().first(); auto hello = Protocol::HelloResponsePtr::create(); hello->setServerName(QStringLiteral("Akonadi")); hello->setMessage(QStringLiteral("Not really IMAP server")); hello->setProtocolVersion(Protocol::version()); hello->setGeneration(schema.generation()); writeCommand(0, hello); } NotificationSubscriber::~NotificationSubscriber() { QMutexLocker locker(&mLock); if (mNotificationDebugging) { Q_EMIT notificationDebuggingChanged(false); } } void NotificationSubscriber::handleIncomingData() { while (mSocket->bytesAvailable() > (int) sizeof(qint64)) { QDataStream stream(mSocket); // Ignored atm qint64 tag = -1; stream >> tag; Protocol::CommandPtr cmd; try { cmd = Protocol::deserialize(mSocket); } catch (const Akonadi::ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "ProtocolException:" << e.what(); disconnectSubscriber(); return; } catch (const std::exception &e) { qCWarning(AKONADISERVER_LOG) << "Unknown exception:" << e.what(); disconnectSubscriber(); return; } if (cmd->type() == Protocol::Command::Invalid) { qCWarning(AKONADISERVER_LOG) << "Received an invalid command: resetting connection"; disconnectSubscriber(); return; } switch (cmd->type()) { case Protocol::Command::CreateSubscription: registerSubscriber(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create()); break; case Protocol::Command::ModifySubscription: if (mSubscriber.isEmpty()) { qCWarning(AKONADISERVER_LOG) << "Received ModifySubscription command before RegisterSubscriber"; disconnectSubscriber(); return; } modifySubscription(Protocol::cmdCast(cmd)); writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create()); break; case Protocol::Command::Logout: disconnectSubscriber(); break; default: qCWarning(AKONADISERVER_LOG) << "Invalid command" << cmd->type() << "received by NotificationSubscriber" << mSubscriber; disconnectSubscriber(); break; } } } void NotificationSubscriber::socketDisconnected() { qCDebug(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected"; disconnectSubscriber(); } void NotificationSubscriber::disconnectSubscriber() { QMutexLocker locker(&mLock); if (mManager) { auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove); mManager->slotNotify({ changeNtf }); } disconnect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected); mSocket->close(); // Unregister ourselves from the aggergated collection fetch scope auto cfs = mManager->collectionFetchScope(); if (mCollectionFetchScope.fetchIdOnly()) { cfs->setFetchIdOnly(false); } if (mCollectionFetchScope.includeStatistics()) { cfs->setFetchStatistics(false); } const auto attrs = mCollectionFetchScope.attributes(); for (const auto &attr : attrs) { cfs->removeAttribute(attr); } mManager->forgetSubscriber(this); deleteLater(); } void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command) { QMutexLocker locker(&mLock); qCDebug(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName(); mSubscriber = command.subscriberName(); mSession = command.session(); if (mManager) { auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create(); changeNtf->setSubscriber(mSubscriber); changeNtf->setSessionId(mSession); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add); mManager->slotNotify({ changeNtf }); } } void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command) { QMutexLocker locker(&mLock); const auto modifiedParts = command.modifiedParts(); #define START_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add)) #define STOP_MONITORING(type) \ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove)) #define APPEND(set, newItems) \ Q_FOREACH (const auto &entity, newItems) { \ set.insert(entity); \ } #define REMOVE(set, items) \ Q_FOREACH (const auto &entity, items) { \ set.remove(entity); \ } if (START_MONITORING(Types)) { APPEND(mMonitoredTypes, command.startMonitoringTypes()) } if (STOP_MONITORING(Types)) { REMOVE(mMonitoredTypes, command.stopMonitoringTypes()) } if (START_MONITORING(Collections)) { APPEND(mMonitoredCollections, command.startMonitoringCollections()) } if (STOP_MONITORING(Collections)) { REMOVE(mMonitoredCollections, command.stopMonitoringCollections()) } if (START_MONITORING(Items)) { APPEND(mMonitoredItems, command.startMonitoringItems()) } if (STOP_MONITORING(Items)) { REMOVE(mMonitoredItems, command.stopMonitoringItems()) } if (START_MONITORING(Tags)) { APPEND(mMonitoredTags, command.startMonitoringTags()) } if (STOP_MONITORING(Tags)) { REMOVE(mMonitoredTags, command.stopMonitoringTags()) } if (START_MONITORING(Resources)) { APPEND(mMonitoredResources, command.startMonitoringResources()) } if (STOP_MONITORING(Resources)) { REMOVE(mMonitoredResources, command.stopMonitoringResources()) } if (START_MONITORING(MimeTypes)) { APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes()) } if (STOP_MONITORING(MimeTypes)) { REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes()) } if (START_MONITORING(Sessions)) { APPEND(mIgnoredSessions, command.startIgnoringSessions()) } if (STOP_MONITORING(Sessions)) { REMOVE(mIgnoredSessions, command.stopIgnoringSessions()) } if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) { mAllMonitored = command.allMonitored(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) { mExclusive = command.isExclusive(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) { mItemFetchScope = command.itemFetchScope(); } if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) { const auto newScope = command.collectionFetchScope(); auto cfs = mManager->collectionFetchScope(); if (newScope.includeStatistics() != mCollectionFetchScope.includeStatistics()) { cfs->setFetchStatistics(newScope.includeStatistics()); } if (newScope.fetchIdOnly() != mCollectionFetchScope.fetchIdOnly()) { cfs->setFetchIdOnly(newScope.fetchIdOnly()); } if (newScope.attributes() != mCollectionFetchScope.attributes()) { const auto added = newScope.attributes() - mCollectionFetchScope.attributes(); for (const auto &attr : added) { cfs->addAttribute(attr); } const auto removed = mCollectionFetchScope.attributes() - newScope.attributes(); for (const auto &attr : removed) { cfs->removeAttribute(attr); } } - mCollectionFetchScope = command.collectionFetchScope(); + mCollectionFetchScope = newScope; + } + if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) { + const auto newScope = command.tagFetchScope(); + auto tfs = mManager->tagFetchScope(); + if (newScope.fetchIdOnly() != mTagFetchScope.fetchIdOnly()) { + tfs->setFetchIdOnly(newScope.fetchIdOnly()); + } + if (newScope.attributes() != mTagFetchScope.attributes()) { + const auto added = newScope.attributes() - mTagFetchScope.attributes(); + for (const auto &attr : added) { + tfs->addAttribute(attr); + } + const auto removed = mTagFetchScope.attributes() - newScope.attributes(); + for (const auto &attr : removed) { + tfs->removeAttribute(attr); + } + } + mTagFetchScope = newScope; } if (mManager) { if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) { // Did the caller just subscribed to subscription changes? if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) { // If yes, then send them list of all existing subscribers Q_FOREACH (const NotificationSubscriber *subscriber, mManager->mSubscribers) { // Send them back to caller if (subscriber) { QMetaObject::invokeMethod(this, "notify", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification())); } } } if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (!mNotificationDebugging) { mNotificationDebugging = true; Q_EMIT notificationDebuggingChanged(true); } } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) { if (mNotificationDebugging) { mNotificationDebugging = false; Q_EMIT notificationDebuggingChanged(false); } } } // Emit subscription change notification auto changeNtf = toChangeNotification(); changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify); mManager->slotNotify({ changeNtf }); } #undef START_MONITORING #undef STOP_MONITORING #undef APPEND #undef REMOVE } Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const { // Assumes mLock being locked by caller auto ntf = Protocol::SubscriptionChangeNotificationPtr::create(); ntf->setSessionId(mSession); ntf->setSubscriber(mSubscriber); ntf->setOperation(Protocol::SubscriptionChangeNotification::Add); ntf->setCollections(mMonitoredCollections); ntf->setItems(mMonitoredItems); ntf->setTags(mMonitoredTags); ntf->setTypes(mMonitoredTypes); ntf->setMimeTypes(mMonitoredMimeTypes); ntf->setResources(mMonitoredResources); ntf->setIgnoredSessions(mIgnoredSessions); ntf->setAllMonitored(mAllMonitored); ntf->setExclusive(mExclusive); ntf->setItemFetchScope(mItemFetchScope); ntf->setCollectionFetchScope(mCollectionFetchScope); return ntf; } bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const { // Assumes mLock being locked by caller if (id < 0) { return false; } else if (mMonitoredCollections.contains(id)) { return true; } else if (mMonitoredCollections.contains(0)) { return true; } return false; } bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const { // Assumes mLock being locked by caller const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType); if (mMonitoredMimeTypes.contains(mimeType)) { return true; } const QStringList lst = mt.aliases(); for (const QString &alias : lst) { if (mMonitoredMimeTypes.contains(alias)) { return true; } } return false; } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::ItemChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.operation() != Protocol::CollectionChangeNotification::Move) { return false; } return mMonitoredResources.contains(msg.destinationResource()); } bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const { // Assumes mLock being locked by caller if (msg.items().isEmpty()) { return false; } if (CollectionReferenceManager::instance()->isReferenced(msg.parentCollection())) { //We always want notifications that affect the parent resource (like an item added to a referenced collection) const bool notificationForParentResource = (mSession == msg.resource()); const bool accepts = mExclusive || isCollectionMonitored(msg.parentCollection()) || isMoveDestinationResourceMonitored(msg) || notificationForParentResource; TRACE_NTF("ACCEPTS ITEM: parent col referenced" << "exclusive:" << mExclusive << "," << "parent monitored:" << isCollectionMonitored(notification.parentCollection()) << "," << "destination monitored:" << isMoveDestinationResourceMonitored(notification) << "," << "ntf for parent resource:" << notificationForParentResource << ":" << "ACCEPTED:" << accepts); return accepts; } if (mAllMonitored) { TRACE_NTF("ACCEPTS ITEM: all monitored"); return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) { TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored"); return false; } // we have a resource or mimetype filter if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) { if (mMonitoredResources.contains(msg.resource())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored"); return true; } if (isMoveDestinationResourceMonitored(msg)) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored"); return true; } Q_FOREACH (const auto &item, msg.items()) { if (isMimeTypeMonitored(item.mimeType)) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored"); return true; } } TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored"); return false; } // we explicitly monitor that item or the collections it's in Q_FOREACH (const auto &item, msg.items()) { if (mMonitoredItems.contains(item.id)) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored"); return true; } } if (isCollectionMonitored(msg.parentCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored"); return true; } if (isCollectionMonitored(msg.parentDestCollection())) { TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored"); return true; } TRACE_NTF("ACCEPTS ITEM: REJECTED"); return false; } bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const { // Assumes mLock being locked by caller const auto collection = msg.collection(); if (!collection || collection->id() < 0) { return false; } // HACK: We need to dispatch notifications about disabled collections to SOME // agents (that's what we have the exclusive subscription for) - but because // querying each Collection from database would be expensive, we use the // metadata hack to transfer this information from NotificationCollector if (msg.metadata().contains("DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe) && !msg.changedParts().contains("ENABLED")) { // Exclusive subscriber always gets it if (mExclusive) { return true; } //Deliver the notification if referenced from this session if (CollectionReferenceManager::instance()->isReferenced(collection->id(), mSession)) { return true; } //Exclusive subscribers still want the notification if (mExclusive && CollectionReferenceManager::instance()->isReferenced(collection->id())) { return true; } //The session belonging to this monitor referenced or dereferenced the collection. We always want this notification. //The referencemanager no longer holds a reference, so we have to check this way. if (msg.changedParts().contains(AKONADI_PARAM_REFERENCED) && mSession == msg.sessionId()) { return true; } // If the collection is not referenced, monitored or the subscriber is not // exclusive (i.e. if we got here), then the subscriber does not care about // this one, so drop it return false; } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) { return false; } // we have a resource filter if (!mMonitoredResources.isEmpty()) { const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg); // a bit hacky, but match the behaviour from the item case, // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course if (mMonitoredMimeTypes.isEmpty() || resourceMatches) { return resourceMatches; } // else continue } // we explicitly monitor that colleciton, or all of them if (isCollectionMonitored(collection->id())) { return true; } return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection()); } bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const { // Assumes mLock being locked by caller - if (msg.id() < 0) { + if (msg.tag()->id() < 0) { return false; } // Special handling for Tag removal notifications: When a Tag is removed, // a notification is emitted for each Resource that owns the tag (i.e. // each resource that owns a Tag RID - Tag RIDs are resource-specific). // Additionally then we send one more notification without any RID that is // destined for regular applications (which don't know anything about Tag RIDs) if (msg.operation() == Protocol::TagChangeNotification::Remove) { // HACK: Since have no way to determine which resource this NotificationSource // belongs to, we are abusing the fact that each resource ignores it's own // main session, which is called the same name as the resource. // If there are any ignored sessions, but this notification does not have // a specific resource set, then we ignore it, as this notification is // for clients, not resources (does not have tag RID) if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) { return false; } // If this source ignores a session (i.e. we assume it is a resource), // but this notification is for another resource, then we ignore it if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) { return false; } - // Now we got here, which means that this notification either has empty // resource, i.e. it is destined for a client applications, or it's // destined for resource that we *think* (see the hack above) this // NotificationSource belongs too. Which means we approve this notification, // but it can still be discarded in the generic Tag notification filter // below } if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) { return false; } if (mMonitoredTags.isEmpty()) { return true; } - if (mMonitoredTags.contains(msg.id())) { + if (mMonitoredTags.contains(msg.tag()->id())) { return true; } - return false; + return true; } bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); if (mAllMonitored) { return true; } if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges)) { return false; } return true; } bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const { // Assumes mLock being locked by caller Q_UNUSED(msg); // Unlike other types, subscription notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges); } bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const { // Assumes mLock being locked by caller // We should never end up sending debug notification about a debug notification. // This could get very messy very quickly... Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification); if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) { return false; } // Unlike other types, debug change notifications must be explicitly enabled // by caller and are excluded from "monitor all" as well return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications); } bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const { // Assumes mLock being locked // Uninitialized subscriber gets nothing if (mSubscriber.isEmpty()) { return false; } // session is ignored // TODO: Should this afect SubscriptionChangeNotification and DebugChangeNotification? if (mIgnoredSessions.contains(msg.sessionId())) { return false; } switch (msg.type()) { case Protocol::Command::ItemChangeNotification: return acceptsItemNotification(static_cast(msg)); case Protocol::Command::CollectionChangeNotification: return acceptsCollectionNotification(static_cast(msg)); case Protocol::Command::TagChangeNotification: return acceptsTagNotification(static_cast(msg)); case Protocol::Command::RelationChangeNotification: return acceptsRelationNotification(static_cast(msg)); case Protocol::Command::SubscriptionChangeNotification: return acceptsSubscriptionNotification(static_cast(msg)); case Protocol::Command::DebugChangeNotification: return acceptsDebugChangeNotification(static_cast(msg)); default: qCDebug(AKONADISERVER_LOG) << "Received invalid change notification!"; return false; } } bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr ¬ification) { // Guard against this object being deleted while we are waiting for the lock QPointer ptr(this); QMutexLocker locker(&mLock); if (!ptr) { return false; } if (acceptsNotification(*notification)) { QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, notification)); return true; } return false; } void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr ¬ification) { // tag chosen by fair dice roll writeCommand(4, notification); } void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd) { Q_ASSERT(QThread::currentThread() == thread()); QDataStream stream(mSocket); stream << tag; try { Protocol::serialize(mSocket, cmd); if (!mSocket->waitForBytesWritten()) { if (mSocket->state() == QLocalSocket::ConnectedState) { qCWarning(AKONADISERVER_LOG) << "Notification socket write timeout!"; } else { // client has disconnected, just discard the message } } } catch (const ProtocolException &e) { qCWarning(AKONADISERVER_LOG) << "Notification protocol exception:" << e.what(); } } diff --git a/src/server/notificationsubscriber.h b/src/server/notificationsubscriber.h index 5783a5659..68827e70a 100644 --- a/src/server/notificationsubscriber.h +++ b/src/server/notificationsubscriber.h @@ -1,122 +1,123 @@ /* Copyright (c) 2015 Daniel Vrátil This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef NOTIFICATIONSUBSCRIBER_H #define NOTIFICATIONSUBSCRIBER_H #include #include #include #include #include #include "entities.h" class QLocalSocket; namespace Akonadi { namespace Server { class NotificationManager; class NotificationSubscriber : public QObject { Q_OBJECT public: explicit NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor); ~NotificationSubscriber(); inline QByteArray subscriber() const { return mSubscriber; } QLocalSocket *socket() const { return mSocket; } void handleIncomingData(); public Q_SLOTS: bool notify(const Akonadi::Protocol::ChangeNotificationPtr ¬ification); private Q_SLOTS: void socketDisconnected(); Q_SIGNALS: void notificationDebuggingChanged(bool enabled); protected: void registerSubscriber(const Protocol::CreateSubscriptionCommand &command); void modifySubscription(const Protocol::ModifySubscriptionCommand &command); void disconnectSubscriber(); private: bool acceptsNotification(const Protocol::ChangeNotification ¬ification) const; bool acceptsItemNotification(const Protocol::ItemChangeNotification ¬ification) const; bool acceptsCollectionNotification(const Protocol::CollectionChangeNotification ¬ification) const; bool acceptsTagNotification(const Protocol::TagChangeNotification ¬ification) const; bool acceptsRelationNotification(const Protocol::RelationChangeNotification ¬ification) const; bool acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification ¬ification) const; bool acceptsDebugChangeNotification(const Protocol::DebugChangeNotification ¬ification) const; bool isCollectionMonitored(Entity::Id id) const; bool isMimeTypeMonitored(const QString &mimeType) const; bool isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const; bool isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const; Protocol::SubscriptionChangeNotificationPtr toChangeNotification() const; protected Q_SLOTS: virtual void writeNotification(const Akonadi::Protocol::ChangeNotificationPtr ¬ification); protected: explicit NotificationSubscriber(NotificationManager *manager = nullptr); void writeCommand(qint64 tag, const Protocol::CommandPtr &cmd); mutable QMutex mLock; NotificationManager *mManager = nullptr; QLocalSocket *mSocket = nullptr; QByteArray mSubscriber; QSet mMonitoredCollections; QSet mMonitoredItems; QSet mMonitoredTags; QSet mMonitoredTypes; QSet mMonitoredMimeTypes; QSet mMonitoredResources; QSet mIgnoredSessions; QByteArray mSession; Protocol::ItemFetchScope mItemFetchScope; Protocol::CollectionFetchScope mCollectionFetchScope; + Protocol::TagFetchScope mTagFetchScope; bool mAllMonitored; bool mExclusive; bool mNotificationDebugging; static QMimeDatabase sMimeDatabase; }; } // namespace Server } // namespace Akonadi #endif diff --git a/src/server/storage/notificationcollector.cpp b/src/server/storage/notificationcollector.cpp index 47708f415..95d6b0288 100644 --- a/src/server/storage/notificationcollector.cpp +++ b/src/server/storage/notificationcollector.cpp @@ -1,450 +1,449 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "notificationcollector.h" #include "storage/datastore.h" #include "storage/entity.h" #include "storage/collectionstatistics.h" #include "handlerhelper.h" #include "cachecleaner.h" #include "intervalcheck.h" #include "search/searchmanager.h" #include "akonadi.h" #include "handler/search.h" using namespace Akonadi; using namespace Akonadi::Server; NotificationCollector::NotificationCollector(QObject *parent) : QObject(parent) , mDb(nullptr) { } NotificationCollector::NotificationCollector(DataStore *db) : QObject(db) , mDb(db) { connect(db, &DataStore::transactionCommitted, this, &NotificationCollector::transactionCommitted); connect(db, &DataStore::transactionRolledBack, this, &NotificationCollector::transactionRolledBack); } NotificationCollector::~NotificationCollector() { } void NotificationCollector::itemAdded(const PimItem &item, bool seen, const Collection &collection, const QByteArray &resource) { SearchManager::instance()->scheduleSearchUpdate(); CollectionStatistics::self()->itemAdded(collection, item.size(), seen); itemNotification(Protocol::ItemChangeNotification::Add, item, collection, Collection(), resource); } void NotificationCollector::itemChanged(const PimItem &item, const QSet &changedParts, const Collection &collection, const QByteArray &resource) { SearchManager::instance()->scheduleSearchUpdate(); itemNotification(Protocol::ItemChangeNotification::Modify, item, collection, Collection(), resource, changedParts); } void NotificationCollector::itemsFlagsChanged(const PimItem::List &items, const QSet &addedFlags, const QSet &removedFlags, const Collection &collection, const QByteArray &resource) { int seenCount = (addedFlags.contains(AKONADI_FLAG_SEEN) || addedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); seenCount -= (removedFlags.contains(AKONADI_FLAG_SEEN) || removedFlags.contains(AKONADI_FLAG_IGNORED) ? items.count() : 0); CollectionStatistics::self()->itemsSeenChanged(collection, seenCount); itemNotification(Protocol::ItemChangeNotification::ModifyFlags, items, collection, Collection(), resource, QSet(), addedFlags, removedFlags); } void NotificationCollector::itemsTagsChanged(const PimItem::List &items, const QSet &addedTags, const QSet &removedTags, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::ModifyTags, items, collection, Collection(), resource, QSet(), QSet(), QSet(), addedTags, removedTags); } void NotificationCollector::itemsRelationsChanged(const PimItem::List &items, const Relation::List &addedRelations, const Relation::List &removedRelations, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::ModifyRelations, items, collection, Collection(), resource, QSet(), QSet(), QSet(), QSet(), QSet(), addedRelations, removedRelations); } void NotificationCollector::itemsMoved(const PimItem::List &items, const Collection &collectionSrc, const Collection &collectionDest, const QByteArray &sourceResource) { SearchManager::instance()->scheduleSearchUpdate(); itemNotification(Protocol::ItemChangeNotification::Move, items, collectionSrc, collectionDest, sourceResource); } void NotificationCollector::itemsRemoved(const PimItem::List &items, const Collection &collection, const QByteArray &resource) { itemNotification(Protocol::ItemChangeNotification::Remove, items, collection, Collection(), resource); } void NotificationCollector::itemsLinked(const PimItem::List &items, const Collection &collection) { itemNotification(Protocol::ItemChangeNotification::Link, items, collection, Collection(), QByteArray()); } void NotificationCollector::itemsUnlinked(const PimItem::List &items, const Collection &collection) { itemNotification(Protocol::ItemChangeNotification::Unlink, items, collection, Collection(), QByteArray()); } void NotificationCollector::collectionAdded(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionAdded(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionAdded(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Add, collection, collection.parentId(), -1, resource); } void NotificationCollector::collectionChanged(const Collection &collection, const QList &changes, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionChanged(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionChanged(collection.id()); } if (changes.contains(AKONADI_PARAM_ENABLED) || changes.contains(AKONADI_PARAM_REFERENCED)) { CollectionStatistics::self()->invalidateCollection(collection); } collectionNotification(Protocol::CollectionChangeNotification::Modify, collection, collection.parentId(), -1, resource, changes.toSet()); } void NotificationCollector::collectionMoved(const Collection &collection, const Collection &source, const QByteArray &resource, const QByteArray &destResource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionChanged(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionChanged(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Move, collection, source.id(), collection.parentId(), resource, QSet(), destResource); } void NotificationCollector::collectionRemoved(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionRemoved(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionRemoved(collection.id()); } CollectionStatistics::self()->invalidateCollection(collection); collectionNotification(Protocol::CollectionChangeNotification::Remove, collection, collection.parentId(), -1, resource); } void NotificationCollector::collectionSubscribed(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionAdded(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionAdded(collection.id()); } collectionNotification(Protocol::CollectionChangeNotification::Subscribe, collection, collection.parentId(), -1, resource, QSet()); } void NotificationCollector::collectionUnsubscribed(const Collection &collection, const QByteArray &resource) { if (auto cleaner = AkonadiServer::instance()->cacheCleaner()) { cleaner->collectionRemoved(collection.id()); } if (auto checker = AkonadiServer::instance()->intervalChecker()) { checker->collectionRemoved(collection.id()); } CollectionStatistics::self()->invalidateCollection(collection); collectionNotification(Protocol::CollectionChangeNotification::Unsubscribe, collection, collection.parentId(), -1, resource, QSet()); } void NotificationCollector::tagAdded(const Tag &tag) { tagNotification(Protocol::TagChangeNotification::Add, tag); } void NotificationCollector::tagChanged(const Tag &tag) { tagNotification(Protocol::TagChangeNotification::Modify, tag); } void NotificationCollector::tagRemoved(const Tag &tag, const QByteArray &resource, const QString &remoteId) { tagNotification(Protocol::TagChangeNotification::Remove, tag, resource, remoteId); } void NotificationCollector::relationAdded(const Relation &relation) { relationNotification(Protocol::RelationChangeNotification::Add, relation); } void NotificationCollector::relationRemoved(const Relation &relation) { relationNotification(Protocol::RelationChangeNotification::Remove, relation); } void NotificationCollector::transactionCommitted() { dispatchNotifications(); } void NotificationCollector::transactionRolledBack() { clear(); } void NotificationCollector::clear() { mNotifications.clear(); } void NotificationCollector::setSessionId(const QByteArray &sessionId) { mSessionId = sessionId; } void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, const PimItem &item, const Collection &collection, const Collection &collectionDest, const QByteArray &resource, const QSet &parts) { PimItem::List items; items << item; itemNotification(op, items, collection, collectionDest, resource, parts); } void NotificationCollector::itemNotification(Protocol::ItemChangeNotification::Operation op, const PimItem::List &items, const Collection &collection, const Collection &collectionDest, const QByteArray &resource, const QSet &parts, const QSet &addedFlags, const QSet &removedFlags, const QSet &addedTags, const QSet &removedTags, const Relation::List &addedRelations, const Relation::List &removedRelations) { QMap > vCollections; if ((op == Protocol::ItemChangeNotification::Modify) || (op == Protocol::ItemChangeNotification::ModifyFlags) || (op == Protocol::ItemChangeNotification::ModifyTags) || (op == Protocol::ItemChangeNotification::ModifyRelations)) { vCollections = DataStore::self()->virtualCollections(items); } auto msg = Protocol::ItemChangeNotificationPtr::create(); msg->setSessionId(mSessionId); msg->setOperation(op); msg->setItemParts(parts); msg->setAddedFlags(addedFlags); msg->setRemovedFlags(removedFlags); msg->setAddedTags(addedTags); msg->setRemovedTags(removedTags); if (!addedRelations.isEmpty()) { QSet rels; Q_FOREACH (const Relation &rel, addedRelations) { rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); } msg->setAddedRelations(rels); } if (!removedRelations.isEmpty()) { QSet rels; Q_FOREACH (const Relation &rel, removedRelations) { rels.insert(Protocol::ItemChangeNotification::Relation(rel.leftId(), rel.rightId(), rel.relationType().name())); } msg->setRemovedRelations(rels); } if (collectionDest.isValid()) { QByteArray destResourceName; destResourceName = collectionDest.resource().name().toLatin1(); msg->setDestinationResource(destResourceName); } msg->setParentDestCollection(collectionDest.id()); /* Notify all virtual collections the items are linked to. */ auto iter = vCollections.constBegin(), endIter = vCollections.constEnd(); for (; iter != endIter; ++iter) { auto copy = Protocol::ItemChangeNotificationPtr::create(*msg); QVector items; items.reserve(iter.value().size()); Q_FOREACH (const PimItem &item, iter.value()) { items.push_back({ item.id(), item.remoteId(), item.remoteRevision(), item.mimeType().name() }); } copy->setItems(items); copy->setParentCollection(iter.key()); copy->setResource(resource); CollectionStatistics::self()->invalidateCollection(Collection::retrieveById(iter.key())); dispatchNotification(copy); } QVector ntfItems; ntfItems.reserve(items.size()); Q_FOREACH (const PimItem &item, items) { ntfItems.push_back({ item.id(), item.remoteId(), item.remoteRevision(), item.mimeType().name() }); } msg->setItems(ntfItems); Collection col; if (!collection.isValid()) { msg->setParentCollection(items.first().collection().id()); col = items.first().collection(); } else { msg->setParentCollection(collection.id()); col = collection; } QByteArray res = resource; if (res.isEmpty()) { if (col.resourceId() <= 0) { col = Collection::retrieveById(col.id()); } res = col.resource().name().toLatin1(); } msg->setResource(res); // Add and ModifyFlags are handled incrementally // (see itemAdded() and itemsFlagsChanged()) if (msg->operation() != Protocol::ItemChangeNotification::Add && msg->operation() != Protocol::ItemChangeNotification::ModifyFlags) { CollectionStatistics::self()->invalidateCollection(col); } dispatchNotification(msg); } void NotificationCollector::collectionNotification(Protocol::CollectionChangeNotification::Operation op, const Collection &collection, Collection::Id source, Collection::Id destination, const QByteArray &resource, const QSet &changes, const QByteArray &destResource) { auto msg = Protocol::CollectionChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setCollection(HandlerHelper::fetchCollectionsResponse(collection)); msg->setParentCollection(source); msg->setParentDestCollection(destination); msg->setDestinationResource(destResource); msg->setChangedParts(changes); if (!collection.enabled()) { msg->addMetadata("DISABLED"); } QByteArray res = resource; if (res.isEmpty()) { res = collection.resource().name().toLatin1(); } msg->setResource(res); dispatchNotification(msg); } void NotificationCollector::tagNotification(Protocol::TagChangeNotification::Operation op, const Tag &tag, const QByteArray &resource, - const QString &remoteId - ) + const QString &remoteId) { auto msg = Protocol::TagChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setResource(resource); - msg->setId(tag.id()); - msg->setRemoteId(remoteId); + msg->setTag(HandlerHelper::fetchTagsResponse(tag, false)); + msg->tag()->setRemoteId(remoteId.toLatin1()); dispatchNotification(msg); } void NotificationCollector::relationNotification(Protocol::RelationChangeNotification::Operation op, const Relation &relation) { auto msg = Protocol::RelationChangeNotificationPtr::create(); msg->setOperation(op); msg->setSessionId(mSessionId); msg->setLeftItem(relation.leftId()); msg->setRightItem(relation.rightId()); msg->setRemoteId(relation.remoteId()); msg->setType(relation.relationType().name()); dispatchNotification(msg); } void NotificationCollector::dispatchNotification(const Protocol::ChangeNotificationPtr &msg) { if (!mDb || mDb->inTransaction()) { if (msg->type() == Protocol::Command::CollectionChangeNotification) { Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg); } else { mNotifications.append(msg); } } else { Q_EMIT notify({ msg }); } } void NotificationCollector::dispatchNotifications() { if (!mNotifications.isEmpty()) { Q_EMIT notify(mNotifications); clear(); } }