diff --git a/autotests/libs/itemstoretest.h b/autotests/libs/itemstoretest.h --- a/autotests/libs/itemstoretest.h +++ b/autotests/libs/itemstoretest.h @@ -39,6 +39,7 @@ void testModificationTime(); void testRemoteIdRace(); void itemModifyJobShouldOnlySendModifiedAttributes(); + void testParallelJobsAddingAttributes(); }; #endif diff --git a/autotests/libs/itemstoretest.cpp b/autotests/libs/itemstoretest.cpp --- a/autotests/libs/itemstoretest.cpp +++ b/autotests/libs/itemstoretest.cpp @@ -420,3 +420,91 @@ QCOMPARE(fetchedItem.attribute()->data, "modified"); } } + +class ParallelJobsRunner +{ +public: + ParallelJobsRunner(int count) + : numSessions(count) + { + sessions.reserve(numSessions); + modifyJobs.reserve(numSessions); + for (int i = 0 ; i < numSessions; ++i) { + auto session = new Session(QByteArray::number(i)); + sessions.push_back(session); + } + } + + ~ParallelJobsRunner() + { + qDeleteAll(sessions); + } + + void addJob(ItemModifyJob *mjob) + { + modifyJobs.push_back(mjob); + QObject::connect(mjob, &KJob::result, [mjob, this]() { + if (mjob->error()) { + errors.append(mjob->errorString()); + } + doneJobs.push_back(mjob); + }); + } + + void waitForAllJobs() + { + for (int i = 0 ; i < modifyJobs.count(); ++i) { + ItemModifyJob *mjob = modifyJobs.at(i); + if (!doneJobs.contains(mjob)) { + QSignalSpy spy(mjob, &ItemModifyJob::result); + QVERIFY(spy.wait()); + if (mjob->error()) + qWarning() << mjob->errorString(); + QCOMPARE(mjob->error(), KJob::NoError); + } + } + QVERIFY2(errors.isEmpty(), qPrintable(errors.join(QLatin1String("; ")))); + } + + const int numSessions; + std::vector sessions; + QVector modifyJobs, doneJobs; + QStringList errors; +}; + +void ItemStoreTest::testParallelJobsAddingAttributes() +{ + // Given an item (created on the server) + Item::Id itemId; + { + Item item; + item.setMimeType(QStringLiteral("text/directory")); + ItemCreateJob *job = new ItemCreateJob(item, res1_foo); + AKVERIFYEXEC(job); + itemId = job->item().id(); + QVERIFY(itemId >= 0); + } + + // When adding N attributes from N different sessions (e.g. threads or processes) + ParallelJobsRunner runner(10); + for (int i = 0 ; i < runner.numSessions; ++i) { + Item item(itemId); + Attribute *attr = AttributeFactory::createAttribute("type" + QByteArray::number(i)); + QVERIFY(attr); + attr->deserialize("attr" + QByteArray::number(i)); + item.addAttribute(attr); + ItemModifyJob *mjob = new ItemModifyJob(item, runner.sessions.at(i)); + runner.addJob(mjob); + } + runner.waitForAllJobs(); + + // Then the item should have all attributes + ItemFetchJob *fetchJob = new ItemFetchJob(Item(itemId)); + ItemFetchScope fetchScope; + fetchScope.fetchAllAttributes(true); + fetchJob->setFetchScope(fetchScope); + AKVERIFYEXEC(fetchJob); + QCOMPARE(fetchJob->items().size(), 1); + const Item fetchedItem = fetchJob->items().first(); + QCOMPARE(fetchedItem.attributes().count(), runner.numSessions); +} diff --git a/autotests/server/CMakeLists.txt b/autotests/server/CMakeLists.txt --- a/autotests/server/CMakeLists.txt +++ b/autotests/server/CMakeLists.txt @@ -75,6 +75,7 @@ ) endmacro() +add_server_test(dbdeadlockcatchertest.cpp) add_server_test(dbtypetest.cpp) add_server_test(dbintrospectortest.cpp) add_server_test(querybuildertest.cpp) diff --git a/autotests/server/dbdeadlockcatchertest.cpp b/autotests/server/dbdeadlockcatchertest.cpp new file mode 100644 --- /dev/null +++ b/autotests/server/dbdeadlockcatchertest.cpp @@ -0,0 +1,70 @@ +/* + Copyright (c) 2019 David Faure + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ + +#include +#include +#include + +#include "storage/dbdeadlockcatcher.h" + +#include + +using namespace Akonadi::Server; + +class DbDeadlockCatcherTest : public QObject +{ + Q_OBJECT + +private: + int m_myFuncCalled = 0; + void myFunc(int maxRecursion) + { + ++m_myFuncCalled; + if (m_myFuncCalled <= maxRecursion) + throw DbDeadlockException(QSqlQuery()); + } + +private Q_SLOTS: + void testRecurseOnce() + { + m_myFuncCalled = 0; + DbDeadlockCatcher catcher([this](){ myFunc(1); }); + QCOMPARE(m_myFuncCalled, 2); + } + + void testRecurseTwice() + { + m_myFuncCalled = 0; + DbDeadlockCatcher catcher([this](){ myFunc(2); }); + QCOMPARE(m_myFuncCalled, 3); + } + + void testHitRecursionLimit() + { + m_myFuncCalled = 0; + QVERIFY_EXCEPTION_THROWN( + DbDeadlockCatcher catcher([this](){ myFunc(10); }), + DbDeadlockException); + QCOMPARE(m_myFuncCalled, 6); + } +}; + +AKTEST_MAIN(DbDeadlockCatcherTest) + +#include "dbdeadlockcatchertest.moc" diff --git a/src/core/jobs/itemmodifyjob.cpp b/src/core/jobs/itemmodifyjob.cpp --- a/src/core/jobs/itemmodifyjob.cpp +++ b/src/core/jobs/itemmodifyjob.cpp @@ -55,7 +55,7 @@ { ProtocolHelper::PartNamespace ns; // dummy const QByteArray partLabel = ProtocolHelper::decodePartIdentifier(partName, ns); - if (!mParts.remove(partLabel)) { + if (!mParts.contains(partLabel)) { // Error? return Protocol::PartMetaData(); } diff --git a/src/server/connection.h b/src/server/connection.h --- a/src/server/connection.h +++ b/src/server/connection.h @@ -128,6 +128,7 @@ bool m_connectionClosing = false; private: + void parseStream(const Protocol::CommandPtr &cmd); template inline typename std::enable_if::value>::type sendResponse(qint64 tag, T &&response); diff --git a/src/server/connection.cpp b/src/server/connection.cpp --- a/src/server/connection.cpp +++ b/src/server/connection.cpp @@ -28,6 +28,7 @@ #include #include "storage/datastore.h" +#include "storage/dbdeadlockcatcher.h" #include "handler.h" #include "notificationmanager.h" @@ -194,6 +195,19 @@ Q_EMIT disconnected(); } +void Connection::parseStream(const Protocol::CommandPtr &cmd) +{ + if (!m_currentHandler->parseStream()) { + try { + m_currentHandler->failureResponse("Error while handling a command"); + } catch (...) { + m_connectionClosing = true; + } + qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() + << "on connection" << m_identifier; + } +} + void Connection::handleIncomingData() { Q_FOREVER { @@ -274,15 +288,7 @@ m_currentHandler->setTag(tag); m_currentHandler->setCommand(cmd); try { - if (!m_currentHandler->parseStream()) { - try { - m_currentHandler->failureResponse("Error while handling a command"); - } catch (...) { - m_connectionClosing = true; - } - qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() - << "on connection" << m_identifier; - } + DbDeadlockCatcher catcher([this, cmd]() { parseStream(cmd); }); } catch (const Akonadi::Server::HandlerException &e) { if (m_currentHandler) { try { diff --git a/src/server/storage/datastore.h b/src/server/storage/datastore.h --- a/src/server/storage/datastore.h +++ b/src/server/storage/datastore.h @@ -287,6 +287,9 @@ return m_dbOpened; } + bool doRollback(); + void transactionKilledByDB(); + Q_SIGNALS: /** Emitted if a transaction has been successfully committed. @@ -330,55 +333,29 @@ */ static QDateTime dateTimeToQDateTime(const QByteArray &dateTime); - /** - * Adds the @p query to current transaction, so that it can be replayed in - * case the transaction deadlocks or timeouts. - * - * When DataStore is not in transaction or SQLite is configured, this method - * does nothing. - * - * All queries will automatically be removed when transaction is committed. - * - * This method should only be used by QueryBuilder. - */ - void addQueryToTransaction(const QString &statement, const QVector &bindValues, bool isBatch); - - /** - * Tries to execute all queries from last transaction again. If any of the - * queries fails, the entire transaction is rolled back and fails. - * - * This method can only be used by QueryBuilder when database rolls back - * transaction due to deadlock or timeout. - * - * @return Returns an invalid query when error occurs, or the last replayed - * query on success. - */ - QSqlQuery retryLastTransaction(bool rollbackFirst); - private Q_SLOTS: void sendKeepAliveQuery(); protected: - static QThreadStorage sInstances; static std::unique_ptr sFactory; + std::unique_ptr mNotificationCollector; +private: + void cleanupAfterRollback(); QString m_connectionName; QSqlDatabase m_database; bool m_dbOpened; + bool m_transactionKilledByDB = false; uint m_transactionLevel; struct TransactionQuery { QString query; QVector boundValues; bool isBatch; }; - QVector m_transactionQueries; QByteArray mSessionId; - std::unique_ptr mNotificationCollector; QTimer *m_keepAliveTimer = nullptr; static bool s_hasForeignKeyConstraints; - // Gives QueryBuilder access to addQueryToTransaction() and retryLastTransaction() - friend class QueryBuilder; friend class DataStoreFactory; }; diff --git a/src/server/storage/datastore.cpp b/src/server/storage/datastore.cpp --- a/src/server/storage/datastore.cpp +++ b/src/server/storage/datastore.cpp @@ -21,6 +21,7 @@ #include "datastore.h" #include "akonadi.h" +#include "collectionstatistics.h" #include "dbconfig.h" #include "dbinitializer.h" #include "dbupdater.h" @@ -65,7 +66,7 @@ static QMutex sTransactionMutex; bool DataStore::s_hasForeignKeyConstraints = false; -QThreadStorage DataStore::sInstances; +static QThreadStorage sInstances; #define TRANSACTION_MUTEX_LOCK if ( DbType::isSystemSQLite( m_database ) ) sTransactionMutex.lock() #define TRANSACTION_MUTEX_UNLOCK if ( DbType::isSystemSQLite( m_database ) ) sTransactionMutex.unlock() @@ -178,7 +179,6 @@ QueryCache::clear(); m_database.close(); m_database = QSqlDatabase(); - m_transactionQueries.clear(); QSqlDatabase::removeDatabase(m_connectionName); StorageDebugger::instance()->removeConnection(reinterpret_cast(this)); @@ -1361,92 +1361,38 @@ return QDateTime::fromString(QString::fromLatin1(dateTime), QStringLiteral("yyyy-MM-dd hh:mm:ss")); } -void DataStore::addQueryToTransaction(const QString &statement, const QVector &bindValues, bool isBatch) +bool DataStore::doRollback() { - // This is used for replaying deadlocked transactions, so only record queries - // for backends that support concurrent transactions. - if (!inTransaction() || DbType::isSystemSQLite(m_database)) { - return; + QSqlDriver *driver = m_database.driver(); + QElapsedTimer timer; timer.start(); + driver->rollbackTransaction(); + StorageDebugger::instance()->removeTransaction(reinterpret_cast(this), + false, timer.elapsed(), + m_database.lastError().text()); + if (m_database.lastError().isValid()) { + TRANSACTION_MUTEX_UNLOCK; + debugLastDbError("DataStore::rollbackTransaction"); + return false; } - - m_transactionQueries.append({ statement, bindValues, isBatch }); + TRANSACTION_MUTEX_UNLOCK; + return true; } -QSqlQuery DataStore::retryLastTransaction(bool rollbackFirst) +void DataStore::transactionKilledByDB() { - if (!inTransaction() || DbType::isSystemSQLite(m_database)) { - return QSqlQuery(); - } - - if (rollbackFirst) { - // In some cases the SQL database won't rollback the failed transaction, so - // we need to do it manually - QElapsedTimer timer; timer.start(); - m_database.driver()->rollbackTransaction(); - StorageDebugger::instance()->removeTransaction(reinterpret_cast(this), - false, timer.elapsed(), - m_database.lastError().text()); - } - - // The database has rolled back the actual transaction, so reset the counter - // to 0 and start a new one in beginTransaction(). Then restore the level - // because this has to be completely transparent to the original caller - const int oldTransactionLevel = m_transactionLevel; - m_transactionLevel = 0; - if (!beginTransaction(QStringLiteral("RETRY LAST TRX"))) { - m_transactionLevel = oldTransactionLevel; - return QSqlQuery(); - } - m_transactionLevel = oldTransactionLevel; - - QSqlQuery lastQuery; - for (auto q = m_transactionQueries.begin(), qEnd = m_transactionQueries.end(); q != qEnd; ++q) { - QSqlQuery query(database()); - query.prepare(q->query); - for (int i = 0, total = q->boundValues.count(); i < total; ++i) { - query.bindValue(QLatin1Char(':') + QString::number(i), q->boundValues.at(i)); - } - - bool res = false; - QElapsedTimer t; t.start(); - if (q->isBatch) { - res = query.execBatch(); - } else { - res = query.exec(); - } - if (StorageDebugger::instance()->isSQLDebuggingEnabled()) { - Q_EMIT StorageDebugger::instance()->queryExecuted(reinterpret_cast(this), - query, t.elapsed()); - } else { - StorageDebugger::instance()->incSequence(); - } - - if (!res) { - // Don't do another deadlock detection here, just give up. - qCCritical(AKONADISERVER_LOG) << "DATABASE ERROR when retrying transaction"; - qCCritical(AKONADISERVER_LOG) << " Error code:" << query.lastError().nativeErrorCode(); - qCCritical(AKONADISERVER_LOG) << " DB error: " << query.lastError().databaseText(); - qCCritical(AKONADISERVER_LOG) << " Error text:" << query.lastError().text(); - qCCritical(AKONADISERVER_LOG) << " Query:" << query.executedQuery(); - - // Return the last query, because that's what caller expects to retrieve - // from QueryBuilder. It is in error state anyway. - return query; - } - - lastQuery = query; - } - - return lastQuery; + m_transactionKilledByDB = true; + cleanupAfterRollback(); + Q_EMIT transactionRolledBack(); } bool DataStore::beginTransaction(const QString &name) { if (!m_dbOpened) { return false; } - if (m_transactionLevel == 0) { + if (m_transactionLevel == 0 || m_transactionKilledByDB) { + m_transactionKilledByDB = false; QElapsedTimer timer; timer.start(); TRANSACTION_MUTEX_LOCK; @@ -1499,22 +1445,10 @@ --m_transactionLevel; - if (m_transactionLevel == 0) { - QSqlDriver *driver = m_database.driver(); + if (m_transactionLevel == 0 && !m_transactionKilledByDB) { + doRollback(); + cleanupAfterRollback(); Q_EMIT transactionRolledBack(); - QElapsedTimer timer; timer.start(); - driver->rollbackTransaction(); - StorageDebugger::instance()->removeTransaction(reinterpret_cast(this), - false, timer.elapsed(), - m_database.lastError().text()); - if (m_database.lastError().isValid()) { - TRANSACTION_MUTEX_UNLOCK; - debugLastDbError("DataStore::rollbackTransaction"); - return false; - } - TRANSACTION_MUTEX_UNLOCK; - - m_transactionQueries.clear(); } return true; @@ -1532,6 +1466,10 @@ } if (m_transactionLevel == 1) { + if (m_transactionKilledByDB) { + qCWarning(AKONADISERVER_LOG) << "DataStore::commitTransaction(): Cannot commit, transaction was killed by mysql deadlock handling!"; + return false; + } QSqlDriver *driver = m_database.driver(); QElapsedTimer timer; timer.start(); @@ -1548,8 +1486,6 @@ m_transactionLevel--; Q_EMIT transactionCommitted(); } - - m_transactionQueries.clear(); } else { m_transactionLevel--; } @@ -1568,3 +1504,14 @@ query.exec(QStringLiteral("SELECT 1")); } } + +void DataStore::cleanupAfterRollback() +{ + MimeType::invalidateCompleteCache(); + Flag::invalidateCompleteCache(); + Resource::invalidateCompleteCache(); + Collection::invalidateCompleteCache(); + PartType::invalidateCompleteCache(); + CollectionStatistics::self()->expireCache(); + QueryCache::clear(); +} diff --git a/src/server/storage/dbexception.h b/src/server/storage/dbdeadlockcatcher.h copy from src/server/storage/dbexception.h copy to src/server/storage/dbdeadlockcatcher.h --- a/src/server/storage/dbexception.h +++ b/src/server/storage/dbdeadlockcatcher.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2012 Volker Krause + Copyright (c) 2019 David Faure This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by @@ -17,27 +17,45 @@ 02110-1301, USA. */ -#ifndef DBEXCEPTION_H -#define DBEXCEPTION_H +#ifndef AKONADI_DBDEADLOCKCATCHER_H +#define AKONADI_DBDEADLOCKCATCHER_H -#include "exception.h" - -class QSqlQuery; +#include "dbexception.h" namespace Akonadi { + namespace Server { -/** Exception for reporting SQL errors. */ -class DbException : public Exception +/** + This class catches DbDeadlockException (as emitted by QueryBuilder) + and retries execution of the method when it happens, as required by + SQL databases. +*/ +class DbDeadlockCatcher { public: - explicit DbException(const QSqlQuery &query, const char *what = nullptr); - const char *type() const throw() override; + template + explicit DbDeadlockCatcher(Func &&func) { callFunc(func, 0); } + +private: + static const int MaxRecursion = 5; + template + void callFunc(Func &&func, int recursionCounter) { + try { + func(); + } catch(const DbDeadlockException &) { + if (recursionCounter == MaxRecursion) { + throw; + } else { + callFunc(func, ++recursionCounter); // recurse + } + } + } }; } // namespace Server } // namespace Akonadi -#endif // DBEXCEPTION_H +#endif diff --git a/src/server/storage/dbexception.h b/src/server/storage/dbexception.h --- a/src/server/storage/dbexception.h +++ b/src/server/storage/dbexception.h @@ -37,6 +37,12 @@ const char *type() const throw() override; }; +class DbDeadlockException : public DbException +{ +public: + explicit DbDeadlockException(const QSqlQuery &query); +}; + } // namespace Server } // namespace Akonadi diff --git a/src/server/storage/dbexception.cpp b/src/server/storage/dbexception.cpp --- a/src/server/storage/dbexception.cpp +++ b/src/server/storage/dbexception.cpp @@ -35,3 +35,8 @@ { return "Database Exception"; } + +DbDeadlockException::DbDeadlockException(const QSqlQuery &query) + : DbException(query, "Database deadlock, unsuccessful after multiple retries") +{ +} diff --git a/src/server/storage/querybuilder.h b/src/server/storage/querybuilder.h --- a/src/server/storage/querybuilder.h +++ b/src/server/storage/querybuilder.h @@ -270,8 +270,6 @@ */ void sqliteAdaptUpdateJoin(Query::Condition &cond); - bool retryLastTransaction(bool rollback = false); - private: QString mTable; DbType::Type mDatabaseType; diff --git a/src/server/storage/querybuilder.cpp b/src/server/storage/querybuilder.cpp --- a/src/server/storage/querybuilder.cpp +++ b/src/server/storage/querybuilder.cpp @@ -19,6 +19,7 @@ #include "querybuilder.h" #include "akonadiserver_debug.h" +#include "dbexception.h" #ifndef QUERYBUILDER_UNITTEST #include "storage/datastore.h" @@ -346,17 +347,6 @@ } } -bool QueryBuilder::retryLastTransaction(bool rollback) -{ -#ifndef QUERYBUILDER_UNITTEST - mQuery = DataStore::self()->retryLastTransaction(rollback); - return !mQuery.lastError().isValid(); -#else - Q_UNUSED(rollback); - return true; -#endif -} - bool QueryBuilder::exec() { QString statement; @@ -412,51 +402,54 @@ } } - // Add the query to DataStore so that we can replay it in case transaction deadlocks. - // The method does nothing when this query is not executed within a transaction. - // We don't care whether the query was successful or not. In case of error, the caller - // will rollback the transaction anyway, and all cached queries will be removed. - DataStore::self()->addQueryToTransaction(statement, mBindValues, isBatch); - if (!ret) { + bool needsRetry = false; // Handle transaction deadlocks and timeouts by attempting to replay the transaction. if (mDatabaseType == DbType::PostgreSQL) { const QString dbError = mQuery.lastError().databaseText(); if (dbError.contains(QLatin1String("40P01" /* deadlock_detected */))) { qCWarning(AKONADISERVER_LOG) << "QueryBuilder::exec(): database reported transaction deadlock, retrying transaction"; qCWarning(AKONADISERVER_LOG) << mQuery.lastError().text(); - return retryLastTransaction(); + needsRetry = true; } } else if (mDatabaseType == DbType::MySQL) { const QString lastErrorStr = mQuery.lastError().nativeErrorCode(); const int error = lastErrorStr.isEmpty() ? -1 : lastErrorStr.toInt(); if (error == 1213 /* ER_LOCK_DEADLOCK */) { qCWarning(AKONADISERVER_LOG) << "QueryBuilder::exec(): database reported transaction deadlock, retrying transaction"; qCWarning(AKONADISERVER_LOG) << mQuery.lastError().text(); - return retryLastTransaction(); + needsRetry = true; } else if (error == 1205 /* ER_LOCK_WAIT_TIMEOUT */) { qCWarning(AKONADISERVER_LOG) << "QueryBuilder::exec(): database reported transaction timeout, retrying transaction"; qCWarning(AKONADISERVER_LOG) << mQuery.lastError().text(); - return retryLastTransaction(); + // Not sure retrying helps, maybe error is good enough.... but doesn't hurt to retry a few times before giving up. + needsRetry = true; } } else if (mDatabaseType == DbType::Sqlite && !DbType::isSystemSQLite(DataStore::self()->database())) { const QString lastErrorStr = mQuery.lastError().nativeErrorCode(); const int error = lastErrorStr.isEmpty() ? -1 : lastErrorStr.toInt(); if (error == 6 /* SQLITE_LOCKED */) { qCWarning(AKONADISERVER_LOG) << "QueryBuilder::exec(): database reported transaction deadlock, retrying transaction"; qCWarning(AKONADISERVER_LOG) << mQuery.lastError().text(); - return retryLastTransaction(true); + DataStore::self()->doRollback(); + needsRetry = true; } else if (error == 5 /* SQLITE_BUSY */) { qCWarning(AKONADISERVER_LOG) << "QueryBuilder::exec(): database reported transaction timeout, retrying transaction"; qCWarning(AKONADISERVER_LOG) << mQuery.lastError().text(); - return retryLastTransaction(true); + DataStore::self()->doRollback(); + needsRetry = true; } } else if (mDatabaseType == DbType::Sqlite) { // We can't have a transaction deadlock in SQLite when using driver shipped // with Qt, because it does not support concurrent transactions and DataStore // serializes them through a global lock. } + if (needsRetry) { + DataStore::self()->transactionKilledByDB(); + throw DbDeadlockException(mQuery); + } + qCCritical(AKONADISERVER_LOG) << "DATABASE ERROR:"; qCCritical(AKONADISERVER_LOG) << " Error code:" << mQuery.lastError().nativeErrorCode(); qCCritical(AKONADISERVER_LOG) << " DB error: " << mQuery.lastError().databaseText();