diff --git a/autotests/libs/itemsynctest.cpp b/autotests/libs/itemsynctest.cpp index 3ef84d203..75660d777 100644 --- a/autotests/libs/itemsynctest.cpp +++ b/autotests/libs/itemsynctest.cpp @@ -1,639 +1,676 @@ /* Copyright (c) 2008 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "test_utils.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace Akonadi; Q_DECLARE_METATYPE(KJob *) Q_DECLARE_METATYPE(ItemSync::TransactionMode) class ItemsyncTest : public QObject { Q_OBJECT private: Item::List fetchItems(const Collection &col) { qDebug() << "fetching items from collection" << col.remoteId() << col.name(); ItemFetchJob *fetch = new ItemFetchJob(col, this); fetch->fetchScope().fetchFullPayload(); fetch->fetchScope().fetchAllAttributes(); fetch->fetchScope().setCacheOnly(true); // resources are switched off anyway if (!fetch->exec()) { []() { QFAIL("Failed to fetch items!"); }(); } return fetch->items(); } + void createItems(const Collection &col, int itemCount) + { + for (int i = 0; i < itemCount; ++i) { + Item item(QStringLiteral("application/octet-stream")); + item.setRemoteId(QStringLiteral("rid") + QString::number(i)); + item.setGid(QStringLiteral("gid") + QString::number(i)); + item.setPayload("payload1"); + ItemCreateJob *job = new ItemCreateJob(item, col); + AKVERIFYEXEC(job); + } + } + private Q_SLOTS: void initTestCase() { AkonadiTest::checkTestIsIsolated(); Control::start(); AkonadiTest::setAllResourcesOffline(); qRegisterMetaType(); qRegisterMetaType(); } static Item modifyItem(Item item) { static int counter = 0; item.setFlag(QByteArray("\\READ") + QByteArray::number(counter)); counter++; return item; } void testFullSync() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); //Since the item sync affects the knut resource we ensure we actually managed to load all items //This needs to be adjusted should the testdataset change QCOMPARE(origItems.size(), 15); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::SingleTransaction); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setFullSyncItems(origItems); AKVERIFYEXEC(syncer); QCOMPARE(transactionSpy.count(), 1); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); QTest::qWait(100); QCOMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QCOMPARE(changedSpy.count(), 0); } void testFullStreamingSync_data() { QTest::addColumn("transactionMode"); QTest::addColumn("goToEventLoopAfterAddingItems"); QTest::newRow("single transaction, no eventloop") << ItemSync::SingleTransaction << false; QTest::newRow("multi transaction, no eventloop") << ItemSync::MultipleTransactions << false; QTest::newRow("single transaction, with eventloop") << ItemSync::SingleTransaction << true; QTest::newRow("multi transaction, with eventloop") << ItemSync::MultipleTransactions << true; } void testFullStreamingSync() { QFETCH(ItemSync::TransactionMode, transactionMode); QFETCH(bool, goToEventLoopAfterAddingItems); const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); QCOMPARE(origItems.size(), 15); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); ItemSync *syncer = new ItemSync(col); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setTransactionMode(transactionMode); syncer->setBatchSize(1); syncer->setAutoDelete(false); syncer->setStreamingEnabled(true); QSignalSpy spy(syncer, SIGNAL(result(KJob*))); QVERIFY(spy.isValid()); syncer->setTotalItems(origItems.count()); QTest::qWait(0); QCOMPARE(spy.count(), 0); for (int i = 0; i < origItems.count(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setFullSyncItems(l); if (goToEventLoopAfterAddingItems) { QTest::qWait(0); } if (i < origItems.count() - 1) { QCOMPARE(spy.count(), 0); } } syncer->deliveryDone(); QTRY_COMPARE(spy.count(), 1); KJob *job = spy.at(0).at(0).value(); QCOMPARE(job, syncer); QCOMPARE(job->error(), 0); if (transactionMode == ItemSync::SingleTransaction) { QCOMPARE(transactionSpy.count(), 1); } if (transactionMode == ItemSync::MultipleTransactions) { QCOMPARE(transactionSpy.count(), origItems.count()); } Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); delete syncer; QTest::qWait(100); QTRY_COMPARE(deletedSpy.count(), 0); QTRY_COMPARE(addedSpy.count(), 0); QTRY_COMPARE(changedSpy.count(), origItems.count()); } void testIncrementalSync() { { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("akonadi_knut_resource_0")); AKVERIFYEXEC(select); } const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); QCOMPARE(origItems.size(), 15); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); { ItemSync *syncer = new ItemSync(col); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setTransactionMode(ItemSync::SingleTransaction); syncer->setIncrementalSyncItems(origItems, Item::List()); AKVERIFYEXEC(syncer); QCOMPARE(transactionSpy.count(), 1); } QTest::qWait(100); QTRY_COMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QTRY_COMPARE(changedSpy.count(), 0); deletedSpy.clear(); addedSpy.clear(); changedSpy.clear(); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); Item::List delItems; delItems << resultItems.takeFirst(); Item itemWithOnlyRemoteId; itemWithOnlyRemoteId.setRemoteId(resultItems.front().remoteId()); delItems << itemWithOnlyRemoteId; resultItems.takeFirst(); //This item will not be removed since it isn't existing locally Item itemWithRandomRemoteId; itemWithRandomRemoteId.setRemoteId(KRandom::randomString(100)); delItems << itemWithRandomRemoteId; { ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::SingleTransaction); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setIncrementalSyncItems(resultItems, delItems); AKVERIFYEXEC(syncer); QCOMPARE(transactionSpy.count(), 1); } Item::List resultItems2 = fetchItems(col); QCOMPARE(resultItems2.count(), resultItems.count()); QTest::qWait(100); QTRY_COMPARE(deletedSpy.count(), 2); QCOMPARE(addedSpy.count(), 0); QTRY_COMPARE(changedSpy.count(), 0); { ResourceSelectJob *select = new ResourceSelectJob(QStringLiteral("")); AKVERIFYEXEC(select); } } void testIncrementalStreamingSync() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::SingleTransaction); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setAutoDelete(false); QSignalSpy spy(syncer, SIGNAL(result(KJob*))); QVERIFY(spy.isValid()); syncer->setStreamingEnabled(true); QTest::qWait(0); QCOMPARE(spy.count(), 0); for (int i = 0; i < origItems.count(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < origItems.count() - 1) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } syncer->deliveryDone(); QTRY_COMPARE(spy.count(), 1); KJob *job = spy.at(0).at(0).value(); QCOMPARE(job, syncer); QCOMPARE(job->error(), 0); QCOMPARE(transactionSpy.count(), 1); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); delete syncer; QTest::qWait(100); QCOMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QTRY_COMPARE(changedSpy.count(), origItems.size()); } void testEmptyIncrementalSync() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::SingleTransaction); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setIncrementalSyncItems(Item::List(), Item::List()); AKVERIFYEXEC(syncer); //It would be better if we didn't have a transaction at all, but so far the transaction is still created QCOMPARE(transactionSpy.count(), 1); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); QTest::qWait(100); QCOMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QCOMPARE(changedSpy.count(), 0); } void testIncrementalStreamingSyncBatchProcessing() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); ItemSync *syncer = new ItemSync(col); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); QSignalSpy spy(syncer, SIGNAL(result(KJob*))); QVERIFY(spy.isValid()); syncer->setStreamingEnabled(true); syncer->setTransactionMode(ItemSync::MultipleTransactions); QTest::qWait(0); QCOMPARE(spy.count(), 0); for (int i = 0; i < syncer->batchSize(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < (syncer->batchSize() - 1)) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } QTest::qWait(100); //this should process one batch of batchSize() items QTRY_COMPARE(changedSpy.count(), syncer->batchSize()); QCOMPARE(transactionSpy.count(), 1); //one per batch for (int i = syncer->batchSize(); i < origItems.count(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < origItems.count() - 1) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } syncer->deliveryDone(); QTRY_COMPARE(spy.count(), 1); QCOMPARE(transactionSpy.count(), 2); //one per batch QTest::qWait(100); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); QTest::qWait(100); QCOMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QTRY_COMPARE(changedSpy.count(), resultItems.count()); } void testGidMerge() { Collection col(collectionIdFromPath(QStringLiteral("res3"))); { Item item(QStringLiteral("application/octet-stream")); item.setRemoteId(QStringLiteral("rid1")); item.setGid(QStringLiteral("gid1")); item.setPayload("payload1"); ItemCreateJob *job = new ItemCreateJob(item, col); AKVERIFYEXEC(job); } { Item item(QStringLiteral("application/octet-stream")); item.setRemoteId(QStringLiteral("rid2")); item.setGid(QStringLiteral("gid2")); item.setPayload("payload1"); ItemCreateJob *job = new ItemCreateJob(item, col); AKVERIFYEXEC(job); } Item modifiedItem(QStringLiteral("application/octet-stream")); modifiedItem.setRemoteId(QStringLiteral("rid3")); modifiedItem.setGid(QStringLiteral("gid2")); modifiedItem.setPayload("payload2"); ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::MultipleTransactions); syncer->setIncrementalSyncItems(Item::List() << modifiedItem, Item::List()); AKVERIFYEXEC(syncer); Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), 3); Item item; item.setGid(QStringLiteral("gid2")); ItemFetchJob *fetchJob = new ItemFetchJob(item); fetchJob->fetchScope().fetchFullPayload(); AKVERIFYEXEC(fetchJob); QCOMPARE(fetchJob->items().size(), 2); QCOMPARE(fetchJob->items().first().payload(), QByteArray("payload2")); QCOMPARE(fetchJob->items().first().remoteId(), QString::fromLatin1("rid3")); QCOMPARE(fetchJob->items().at(1).payload(), QByteArray("payload1")); QCOMPARE(fetchJob->items().at(1).remoteId(), QStringLiteral("rid2")); } /* - * This test verifies that ItemSync doesn't prematurely emit it's result if a job inside a transaction fails. + * This test verifies that ItemSync doesn't prematurely emit its result if a job inside a transaction fails. * ItemSync is supposed to continue the sync but simply ignoring all delivered data. */ void testFailingJob() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); ItemSync *syncer = new ItemSync(col); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); QSignalSpy spy(syncer, SIGNAL(result(KJob*))); QVERIFY(spy.isValid()); syncer->setStreamingEnabled(true); syncer->setTransactionMode(ItemSync::MultipleTransactions); QTest::qWait(0); QCOMPARE(spy.count(), 0); for (int i = 0; i < syncer->batchSize(); ++i) { Item::List l; //Modify to trigger a changed signal Item item = modifyItem(origItems[i]); // item.setRemoteId(QByteArray("foo")); item.setRemoteId(QString()); item.setId(-1); l << item; syncer->setIncrementalSyncItems(l, Item::List()); if (i < (syncer->batchSize() - 1)) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } QTest::qWait(100); QTRY_COMPARE(spy.count(), 0); for (int i = syncer->batchSize(); i < origItems.count(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < origItems.count() - 1) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } syncer->deliveryDone(); QTRY_COMPARE(spy.count(), 1); } /* - * This test verifies that ItemSync doesn't prematurly emit it's result if a job inside a transaction fails, due to a duplicate. + * This test verifies that ItemSync doesn't prematurely emit its result if a job inside a transaction fails, due to a duplicate. * This case used to break the TransactionSequence. * ItemSync is supposed to continue the sync but simply ignoring all delivered data. */ - void testFailingDueToDuplicateJob() + void testFailingDueToDuplicateItem() { const Collection col = Collection(collectionIdFromPath(QStringLiteral("res1/foo"))); QVERIFY(col.isValid()); Item::List origItems = fetchItems(col); //Create a duplicate that will trigger an error during the first batch Item duplicate = origItems.first(); duplicate.setId(-1); { ItemCreateJob *job = new ItemCreateJob(duplicate, col); AKVERIFYEXEC(job); } origItems = fetchItems(col); ItemSync *syncer = new ItemSync(col); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); QSignalSpy spy(syncer, SIGNAL(result(KJob*))); QVERIFY(spy.isValid()); syncer->setStreamingEnabled(true); syncer->setTransactionMode(ItemSync::MultipleTransactions); QTest::qWait(0); QCOMPARE(spy.count(), 0); for (int i = 0; i < syncer->batchSize(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < (syncer->batchSize() - 1)) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } QTest::qWait(100); //Ensure the job hasn't finished yet due to the errors QTRY_COMPARE(spy.count(), 0); for (int i = syncer->batchSize(); i < origItems.count(); ++i) { Item::List l; //Modify to trigger a changed signal l << modifyItem(origItems[i]); syncer->setIncrementalSyncItems(l, Item::List()); if (i < origItems.count() - 1) { QTest::qWait(0); // enter the event loop so itemsync actually can do something } QCOMPARE(spy.count(), 0); } syncer->deliveryDone(); QTRY_COMPARE(spy.count(), 1); } void testFullSyncManyItems() { + // Given a collection with 1000 items const Collection col = Collection(collectionIdFromPath(QStringLiteral("res2/foo2"))); QVERIFY(col.isValid()); Akonadi::Monitor monitor; monitor.setCollectionMonitored(col); QSignalSpy addedSpy(&monitor, SIGNAL(itemAdded(Akonadi::Item,Akonadi::Collection))); QVERIFY(addedSpy.isValid()); const int itemCount = 1000; - for (int i = 0; i < itemCount; ++i) { - Item item(QStringLiteral("application/octet-stream")); - item.setRemoteId(QStringLiteral("rid") + QString::number(i)); - item.setGid(QStringLiteral("gid") + QString::number(i)); - item.setPayload("payload1"); - ItemCreateJob *job = new ItemCreateJob(item, col); - AKVERIFYEXEC(job); - } - + createItems(col, itemCount); QTRY_COMPARE(addedSpy.count(), itemCount); addedSpy.clear(); const Item::List origItems = fetchItems(col); - - //Since the item sync affects the knut resource we ensure we actually managed to load all items - //This needs to be adjusted should the testdataset change QCOMPARE(origItems.size(), itemCount); QSignalSpy deletedSpy(&monitor, SIGNAL(itemRemoved(Akonadi::Item))); QVERIFY(deletedSpy.isValid()); QSignalSpy changedSpy(&monitor, SIGNAL(itemChanged(Akonadi::Item,QSet))); QVERIFY(changedSpy.isValid()); QBENCHMARK { ItemSync *syncer = new ItemSync(col); syncer->setTransactionMode(ItemSync::SingleTransaction); QSignalSpy transactionSpy(syncer, SIGNAL(transactionCommitted())); QVERIFY(transactionSpy.isValid()); syncer->setFullSyncItems(origItems); + AKVERIFYEXEC(syncer); QCOMPARE(transactionSpy.count(), 1); } const Item::List resultItems = fetchItems(col); QCOMPARE(resultItems.count(), origItems.count()); QTest::qWait(100); QCOMPARE(deletedSpy.count(), 0); QCOMPARE(addedSpy.count(), 0); QCOMPARE(changedSpy.count(), 0); // delete all items; QBENCHMARK leads to the whole method being called more than once ItemDeleteJob *job = new ItemDeleteJob(resultItems); AKVERIFYEXEC(job); } + + void testUserCancel() + { + // Given a collection with 100 items + const Collection col = Collection(collectionIdFromPath(QStringLiteral("res2/foo2"))); + QVERIFY(col.isValid()); + + const Item::List itemsToDelete = fetchItems(col); + if (!itemsToDelete.isEmpty()) { + ItemDeleteJob *deleteJob = new ItemDeleteJob(itemsToDelete); + AKVERIFYEXEC(deleteJob); + } + + const int itemCount = 100; + createItems(col, itemCount); + const Item::List origItems = fetchItems(col); + QCOMPARE(origItems.size(), itemCount); + + // and an ItemSync running + ItemSync *syncer = new ItemSync(col); + syncer->setTransactionMode(ItemSync::SingleTransaction); + syncer->setFullSyncItems(origItems); + + // When the user cancels the ItemSync + QTimer::singleShot(10, syncer, &ItemSync::rollback); + + // Then the itemsync should finish at some point, and not crash + QVERIFY(!syncer->exec()); + QCOMPARE(syncer->errorString(), QStringLiteral("User canceled operation.")); + + // Cleanup + ItemDeleteJob *job = new ItemDeleteJob(origItems); + AKVERIFYEXEC(job); + } }; QTEST_AKONADIMAIN(ItemsyncTest) #include "itemsynctest.moc" diff --git a/src/core/itemsync.cpp b/src/core/itemsync.cpp index 107fb315e..171441cb3 100644 --- a/src/core/itemsync.cpp +++ b/src/core/itemsync.cpp @@ -1,554 +1,567 @@ /* Copyright (c) 2007 Tobias Koenig Copyright (c) 2007 Volker Krause Copyright (c) 2014 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "itemsync.h" #include "job_p.h" #include "collection.h" #include "item.h" #include "item_p.h" #include "itemcreatejob.h" #include "itemdeletejob.h" #include "itemfetchjob.h" #include "itemmodifyjob.h" #include "transactionsequence.h" #include "itemfetchscope.h" #include "akonadicore_debug.h" using namespace Akonadi; /** * @internal */ class Akonadi::ItemSyncPrivate : public JobPrivate { public: ItemSyncPrivate(ItemSync *parent) : JobPrivate(parent) , mTransactionMode(ItemSync::SingleTransaction) , mCurrentTransaction(nullptr) , mTransactionJobs(0) , mPendingJobs(0) , mProgress(0) , mTotalItems(-1) , mTotalItemsProcessed(0) , mStreaming(false) , mIncremental(false) , mDeliveryDone(false) , mFinished(false) , mFullListingDone(false) , mProcessingBatch(false) , mDisableAutomaticDeliveryDone(false) , mBatchSize(10) , mMergeMode(Akonadi::ItemSync::RIDMerge) { // we want to fetch all data by default mFetchScope.fetchFullPayload(); mFetchScope.fetchAllAttributes(); } void createOrMerge(const Item &item); void checkDone(); void slotItemsReceived(const Item::List &items); void slotLocalListDone(KJob *job); void slotLocalDeleteDone(KJob *job); void slotLocalChangeDone(KJob *job); void execute(); void processItems(); void processBatch(); void deleteItems(const Item::List &items); void slotTransactionResult(KJob *job); void requestTransaction(); Job *subjobParent() const; void fetchLocalItemsToDelete(); QString jobDebuggingString() const override; bool allProcessed() const; Q_DECLARE_PUBLIC(ItemSync) Collection mSyncCollection; QSet mListedItems; ItemSync::TransactionMode mTransactionMode; TransactionSequence *mCurrentTransaction; int mTransactionJobs; // fetch scope for initial item listing ItemFetchScope mFetchScope; Akonadi::Item::List mRemoteItemQueue; Akonadi::Item::List mRemovedRemoteItemQueue; Akonadi::Item::List mCurrentBatchRemoteItems; Akonadi::Item::List mCurrentBatchRemovedRemoteItems; Akonadi::Item::List mItemsToDelete; // create counter int mPendingJobs; int mProgress; int mTotalItems; int mTotalItemsProcessed; bool mStreaming; bool mIncremental; bool mDeliveryDone; bool mFinished; bool mFullListingDone; bool mProcessingBatch; bool mDisableAutomaticDeliveryDone; int mBatchSize; Akonadi::ItemSync::MergeMode mMergeMode; }; void ItemSyncPrivate::createOrMerge(const Item &item) { Q_Q(ItemSync); // don't try to do anything in error state if (q->error()) { return; } mPendingJobs++; ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, subjobParent()); ItemCreateJob::MergeOptions merge = ItemCreateJob::Silent; if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) { merge |= ItemCreateJob::GID; } else { merge |= ItemCreateJob::RID; } create->setMerge(merge); q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) {slotLocalChangeDone(job);}); } bool ItemSyncPrivate::allProcessed() const { return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && mCurrentBatchRemovedRemoteItems.isEmpty(); } void ItemSyncPrivate::checkDone() { Q_Q(ItemSync); q->setProcessedAmount(KJob::Bytes, mProgress); if (mPendingJobs > 0) { return; } if (mTransactionJobs > 0) { //Commit the current transaction if we're in batch processing mode or done //and wait until the transaction is committed to process the next batch if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) { if (mCurrentTransaction) { Q_EMIT q->transactionCommitted(); mCurrentTransaction->commit(); mCurrentTransaction = nullptr; } return; } } mProcessingBatch = false; + + if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) { + qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling"; + mFinished = true; + q->emitResult(); + return; + } + if (!mRemoteItemQueue.isEmpty()) { execute(); //We don't have enough items, request more if (!mProcessingBatch) { Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size()); } return; } Q_EMIT q->readyForNextBatch(mBatchSize); if (allProcessed() && !mFinished) { // prevent double result emission, can happen since checkDone() is called from all over the place qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished"; mFinished = true; q->emitResult(); } } ItemSync::ItemSync(const Collection &collection, QObject *parent) : Job(new ItemSyncPrivate(this), parent) { Q_D(ItemSync); d->mSyncCollection = collection; } ItemSync::~ItemSync() { } void ItemSync::setFullSyncItems(const Item::List &items) { /* * We received a list of items from the server: * * fetch all local id's + rid's only * * check each full sync item whether it's locally available * * if it is modify the item * * if it's not create it * * delete all superfluous items */ Q_D(ItemSync); Q_ASSERT(!d->mIncremental); if (!d->mStreaming) { d->mDeliveryDone = true; } d->mRemoteItemQueue += items; d->mTotalItemsProcessed += items.count(); qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count() << "Already processed: " << d->mTotalItemsProcessed << "Expected total amount: " << d->mTotalItems; if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { d->mDeliveryDone = true; } d->execute(); } void ItemSync::setTotalItems(int amount) { Q_D(ItemSync); Q_ASSERT(!d->mIncremental); Q_ASSERT(amount >= 0); setStreamingEnabled(true); qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount; d->mTotalItems = amount; setTotalAmount(KJob::Bytes, amount); if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) { d->mDeliveryDone = true; d->execute(); } } void ItemSync::setDisableAutomaticDeliveryDone(bool disable) { Q_D(ItemSync); d->mDisableAutomaticDeliveryDone = disable; } void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems) { /* * We received an incremental listing of items: * * for each changed item: * ** If locally available => modify * ** else => create * * removed items can be removed right away */ Q_D(ItemSync); d->mIncremental = true; if (!d->mStreaming) { d->mDeliveryDone = true; } d->mRemoteItemQueue += changedItems; d->mRemovedRemoteItemQueue += removedItems; d->mTotalItemsProcessed += changedItems.count() + removedItems.count(); qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems; if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) { d->mDeliveryDone = true; } d->execute(); } void ItemSync::setFetchScope(ItemFetchScope &fetchScope) { Q_D(ItemSync); d->mFetchScope = fetchScope; } ItemFetchScope &ItemSync::fetchScope() { Q_D(ItemSync); return d->mFetchScope; } void ItemSync::doStart() { } void ItemSyncPrivate::fetchLocalItemsToDelete() { Q_Q(ItemSync); if (mIncremental) { qFatal("This must not be called while in incremental mode"); return; } ItemFetchJob *job = new ItemFetchJob(mSyncCollection, subjobParent()); job->fetchScope().setFetchRemoteIdentification(true); job->fetchScope().setFetchModificationTime(false); job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually); // we only can fetch parts already in the cache, otherwise this will deadlock job->fetchScope().setCacheOnly(true); QObject::connect(job, &ItemFetchJob::itemsReceived, q, [this](const Akonadi::Item::List &lst) { slotItemsReceived(lst); }); QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) { slotLocalListDone(job); }); mPendingJobs++; } void ItemSyncPrivate::slotItemsReceived(const Item::List &items) { for (const Akonadi::Item &item : items) { //Don't delete items that have not yet been synchronized if (item.remoteId().isEmpty()) { continue; } if (!mListedItems.contains(item.remoteId())) { mItemsToDelete << Item(item.id()); } } } void ItemSyncPrivate::slotLocalListDone(KJob *job) { mPendingJobs--; if (job->error()) { qCWarning(AKONADICORE_LOG) << job->errorString(); } deleteItems(mItemsToDelete); checkDone(); } QString ItemSyncPrivate::jobDebuggingString() const { // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job // started, so this requires passing jobDebuggingString to jobEnded(). return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name()); } void ItemSyncPrivate::execute() { //shouldn't happen if (mFinished) { qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job."; Q_ASSERT(false); return; } //not doing anything, start processing if (!mProcessingBatch) { if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) { //we have a new batch to process const int num = qMin(mBatchSize, mRemoteItemQueue.size()); mCurrentBatchRemoteItems.reserve(mBatchSize); std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems)); mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num); mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue; mRemovedRemoteItemQueue.clear(); } else { //nothing to do, let's wait for more data return; } mProcessingBatch = true; processBatch(); return; } checkDone(); } //process the current batch of items void ItemSyncPrivate::processBatch() { + Q_Q(ItemSync); if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) { return; } + if (q->error() == Job::UserCanceled) { + checkDone(); + return; + } //request a transaction, there are items that require processing requestTransaction(); processItems(); // removed if (!mIncremental && allProcessed()) { //the full listing is done and we know which items to remove fetchLocalItemsToDelete(); } else { deleteItems(mCurrentBatchRemovedRemoteItems); mCurrentBatchRemovedRemoteItems.clear(); } checkDone(); } void ItemSyncPrivate::processItems() { // added / updated for (const Item &remoteItem : qAsConst(mCurrentBatchRemoteItems)) { if (remoteItem.remoteId().isEmpty()) { qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier"; continue; } if (!mIncremental) { mListedItems << remoteItem.remoteId(); } createOrMerge(remoteItem); } mCurrentBatchRemoteItems.clear(); } void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete) { Q_Q(ItemSync); // if in error state, better not change anything anymore if (q->error()) { return; } if (itemsToDelete.isEmpty()) { return; } mPendingJobs++; ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent()); q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) { slotLocalDeleteDone(job); }); // It can happen that the groupware servers report us deleted items // twice, in this case this item delete job will fail on the second try. // To avoid a rollback of the complete transaction we gracefully allow the job // to fail :) TransactionSequence *transaction = qobject_cast(subjobParent()); if (transaction) { transaction->setIgnoreJobFailure(job); } } void ItemSyncPrivate::slotLocalDeleteDone(KJob *job) { if (job->error()) { qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString(); } mPendingJobs--; mProgress++; checkDone(); } void ItemSyncPrivate::slotLocalChangeDone(KJob *job) { if (job->error() && job->error() != Job::KilledJobError) { qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString(); } mPendingJobs--; mProgress++; checkDone(); } void ItemSyncPrivate::slotTransactionResult(KJob *job) { --mTransactionJobs; if (mCurrentTransaction == job) { mCurrentTransaction = nullptr; } checkDone(); } void ItemSyncPrivate::requestTransaction() { Q_Q(ItemSync); //we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially if (!mCurrentTransaction) { ++mTransactionJobs; mCurrentTransaction = new TransactionSequence(q); mCurrentTransaction->setAutomaticCommittingEnabled(false); QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) { slotTransactionResult(job); }); } } Job *ItemSyncPrivate::subjobParent() const { Q_Q(const ItemSync); if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) { return mCurrentTransaction; } return const_cast(q); } void ItemSync::setStreamingEnabled(bool enable) { Q_D(ItemSync); d->mStreaming = enable; } void ItemSync::deliveryDone() { Q_D(ItemSync); Q_ASSERT(d->mStreaming); d->mDeliveryDone = true; d->execute(); } void ItemSync::slotResult(KJob *job) { if (job->error()) { qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString(); // pretend there were no errors Akonadi::Job::removeSubjob(job); // propagate the first error we got but continue, we might still be fed with stuff from a resource if (!error()) { setError(job->error()); setErrorText(job->errorText()); } } else { Akonadi::Job::slotResult(job); } } void ItemSync::rollback() { Q_D(ItemSync); qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back."; setError(UserCanceled); if (d->mCurrentTransaction) { d->mCurrentTransaction->rollback(); } - d->mDeliveryDone = true; // user wont deliver more data + d->mDeliveryDone = true; // user won't deliver more data d->execute(); // end this in an ordered way, since we have an error set no real change will be done } void ItemSync::setTransactionMode(ItemSync::TransactionMode mode) { Q_D(ItemSync); d->mTransactionMode = mode; } int ItemSync::batchSize() const { Q_D(const ItemSync); return d->mBatchSize; } void ItemSync::setBatchSize(int size) { Q_D(ItemSync); d->mBatchSize = size; } ItemSync::MergeMode ItemSync::mergeMode() const { Q_D(const ItemSync); return d->mMergeMode; } void ItemSync::setMergeMode(MergeMode mergeMode) { Q_D(ItemSync); d->mMergeMode = mergeMode; } #include "moc_itemsync.cpp" diff --git a/src/core/jobs/transactionsequence.cpp b/src/core/jobs/transactionsequence.cpp index 89b956d5f..eef781393 100644 --- a/src/core/jobs/transactionsequence.cpp +++ b/src/core/jobs/transactionsequence.cpp @@ -1,241 +1,253 @@ /* Copyright (c) 2006-2008 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "transactionsequence.h" #include "transactionjobs.h" #include "job_p.h" #include using namespace Akonadi; class Akonadi::TransactionSequencePrivate : public JobPrivate { public: TransactionSequencePrivate(TransactionSequence *parent) : JobPrivate(parent) , mState(Idle) { } enum TransactionState { Idle, Running, WaitingForSubjobs, RollingBack, Committing }; Q_DECLARE_PUBLIC(TransactionSequence) TransactionState mState; QSet mIgnoredErrorJobs; bool mAutoCommit = true; void commitResult(KJob *job) { Q_Q(TransactionSequence); if (job->error()) { q->setError(job->error()); q->setErrorText(job->errorText()); } q->emitResult(); } void rollbackResult(KJob *job) { Q_Q(TransactionSequence); Q_UNUSED(job); q->emitResult(); } QString jobDebuggingString() const override; }; QString Akonadi::TransactionSequencePrivate::jobDebuggingString() const { //TODO add state return QStringLiteral("autocommit %1").arg(mAutoCommit); } TransactionSequence::TransactionSequence(QObject *parent) : Job(new TransactionSequencePrivate(this), parent) { } TransactionSequence::~TransactionSequence() { } bool TransactionSequence::addSubjob(KJob *job) { Q_D(TransactionSequence); //Don't abort the rollback job, while keeping the state set. if (d->mState == TransactionSequencePrivate::RollingBack) { return Job::addSubjob(job); } if (error()) { //This can happen if a rollback is in progress, so make sure we don't set the state back to running. job->kill(EmitResult); return false; } // TODO KDE5: remove property hack once SpecialCollectionsRequestJob has been fixed if (d->mState == TransactionSequencePrivate::Idle && !property("transactionsDisabled").toBool()) { d->mState = TransactionSequencePrivate::Running; // needs to be set before creating the transaction job to avoid infinite recursion new TransactionBeginJob(this); } else { d->mState = TransactionSequencePrivate::Running; } return Job::addSubjob(job); } void TransactionSequence::slotResult(KJob *job) { Q_D(TransactionSequence); if (!job->error() || d->mIgnoredErrorJobs.contains(job)) { // If we have an error but want to ignore it, we can't call Job::slotResult // because it would confuse the subjob queue processing logic. Just removing // the subjob instead is fine. if (!job->error()) { Job::slotResult(job); } else { Job::removeSubjob(job); } if (!hasSubjobs()) { if (d->mState == TransactionSequencePrivate::WaitingForSubjobs) { if (property("transactionsDisabled").toBool()) { emitResult(); return; } d->mState = TransactionSequencePrivate::Committing; TransactionCommitJob *job = new TransactionCommitJob(this); connect(job, &TransactionCommitJob::result, [d](KJob *job) { d->commitResult(job);}); } } + } else if (job->error() == KJob::KilledJobError) { + Job::slotResult(job); } else { setError(job->error()); setErrorText(job->errorText()); removeSubjob(job); - // cancel all subjobs in case someone else is listening (such as ItemSync), but without notifying ourselves again + // cancel all subjobs in case someone else is listening (such as ItemSync) foreach (KJob *job, subjobs()) { - disconnect(job, &KJob::result, this, &TransactionSequence::slotResult); - job->kill(EmitResult); + job->kill(KJob::EmitResult); } clearSubjobs(); if (d->mState == TransactionSequencePrivate::Running || d->mState == TransactionSequencePrivate::WaitingForSubjobs) { if (property("transactionsDisabled").toBool()) { emitResult(); return; } d->mState = TransactionSequencePrivate::RollingBack; TransactionRollbackJob *job = new TransactionRollbackJob(this); connect(job, &TransactionRollbackJob::result, this, [d](KJob *job) { d->rollbackResult(job);}); } } } void TransactionSequence::commit() { Q_D(TransactionSequence); if (d->mState == TransactionSequencePrivate::Running) { d->mState = TransactionSequencePrivate::WaitingForSubjobs; + } else if (d->mState == TransactionSequencePrivate::RollingBack) { + return; } else { // we never got any subjobs, that means we never started a transaction // so we can just quit here if (d->mState == TransactionSequencePrivate::Idle) { emitResult(); } return; } if (subjobs().isEmpty()) { if (property("transactionsDisabled").toBool()) { emitResult(); return; } if (!error()) { d->mState = TransactionSequencePrivate::Committing; TransactionCommitJob *job = new TransactionCommitJob(this); connect(job, &TransactionCommitJob::result, this, [d](KJob *job) { d->commitResult(job);}); } else { d->mState = TransactionSequencePrivate::RollingBack; TransactionRollbackJob *job = new TransactionRollbackJob(this); connect(job, &TransactionRollbackJob::result, this, [d](KJob *job) { d->rollbackResult(job);}); } } } void TransactionSequence::setIgnoreJobFailure(KJob *job) { Q_D(TransactionSequence); // make sure this is one of our sub jobs Q_ASSERT(subjobs().contains(job)); d->mIgnoredErrorJobs.insert(job); } void TransactionSequence::doStart() { Q_D(TransactionSequence); if (d->mAutoCommit) { if (d->mState == TransactionSequencePrivate::Idle) { emitResult(); } else { commit(); } } } void TransactionSequence::setAutomaticCommittingEnabled(bool enable) { Q_D(TransactionSequence); d->mAutoCommit = enable; } void TransactionSequence::rollback() { Q_D(TransactionSequence); setError(UserCanceled); // we never really started if (d->mState == TransactionSequencePrivate::Idle) { emitResult(); return; } - // TODO cancel not yet executed sub-jobs here + const auto jobList = subjobs(); + for (KJob *job : jobList) { + // Killing the current subjob means forcibly closing the akonadiserver socket + // (with a bit of delay since it happens in a secondary thread) + // which means the next job gets disconnected + // and the itemsync finishes with error "Cannot connect to the Akonadi service.", not ideal + if (job != d->mCurrentSubJob) { + job->kill(KJob::EmitResult); + } + } d->mState = TransactionSequencePrivate::RollingBack; TransactionRollbackJob *job = new TransactionRollbackJob(this); connect(job, &TransactionRollbackJob::result, this, [d](KJob *job) { d->rollbackResult(job);}); } #include "moc_transactionsequence.cpp"