diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 362ddfd4..f1fc1429 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -1,146 +1,166 @@ +/* + * Copyright (C) 2019 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ #include "messagequeue.h" #include "storage.h" -#include #include -MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite) +MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite), mReplayedRevision{-1} { } MessageQueue::~MessageQueue() { if (mWriteTransaction) { mWriteTransaction.abort(); } } void MessageQueue::enqueue(void const *msg, size_t size) { enqueue(QByteArray::fromRawData(static_cast(msg), size)); } void MessageQueue::startTransaction() { if (mWriteTransaction) { return; } processRemovals(); mWriteTransaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); } void MessageQueue::commit() { mWriteTransaction.commit(); mWriteTransaction = Sink::Storage::DataStore::Transaction(); processRemovals(); emit messageReady(); } void MessageQueue::enqueue(const QByteArray &value) { bool implicitTransaction = false; if (!mWriteTransaction) { implicitTransaction = true; startTransaction(); } const qint64 revision = Sink::Storage::DataStore::maxRevision(mWriteTransaction) + 1; - const QByteArray key = QString("%1").arg(revision).toUtf8(); - mWriteTransaction.openDatabase().write(key, value); + mWriteTransaction.openDatabase().write(QByteArray::number(revision), value); Sink::Storage::DataStore::setMaxRevision(mWriteTransaction, revision); if (implicitTransaction) { commit(); } } void MessageQueue::processRemovals() { if (mWriteTransaction) { return; } - auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); - for (const auto &key : mPendingRemoval) { - transaction.openDatabase().remove(key); + if (mReplayedRevision >= 0) { + auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); + auto db = transaction.openDatabase(); + for (auto revision = Sink::Storage::DataStore::cleanedUpRevision(transaction) + 1; revision <= mReplayedRevision; revision++) { + db.remove(QByteArray::number(revision)); + } + Sink::Storage::DataStore::setCleanedUpRevision(transaction, mReplayedRevision); + transaction.commit(); + mReplayedRevision = -1; } - transaction.commit(); - mPendingRemoval.clear(); } void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { dequeueBatch(1, [resultHandler](const QByteArray &value) { return KAsync::start([&value, resultHandler](KAsync::Future &future) { resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); }); }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) { - auto resultCount = QSharedPointer::create(0); - return KAsync::start([this, maxBatchSize, resultHandler, resultCount](KAsync::Future &future) { + return KAsync::start([this, maxBatchSize, resultHandler](KAsync::Future &future) { int count = 0; QList> waitCondition; mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("", - [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { - if (mPendingRemoval.contains(key)) { + [&](const QByteArray &key, const QByteArray &value) -> bool { + auto revision = key.toLongLong(); + if (revision <= mReplayedRevision) { return true; } - *resultCount += 1; - // We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) - mPendingRemoval << QByteArray(key.constData(), key.size()); + mReplayedRevision = revision; waitCondition << resultHandler(value).exec(); count++; if (count < maxBatchSize) { return true; } return false; }, [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while retrieving value" << error.message; // errorHandler(Error(error.store, error.code, error.message)); }); // Trace() << "Waiting on " << waitCondition.size() << " results"; KAsync::waitForCompletion(waitCondition) - .then([this, resultCount, &future]() { + .then([this, count, &future]() { processRemovals(); - if (*resultCount == 0) { + if (count == 0) { future.setFinished(); } else { if (isEmpty()) { emit this->drained(); } future.setFinished(); } }) .exec(); }); } bool MessageQueue::isEmpty() { int count = 0; auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = t.openDatabase(); if (db) { db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { - if (!mPendingRemoval.contains(key)) { - count++; - return false; + const auto revision = key.toLongLong(); + if (revision <= mReplayedRevision) { + return true; } - return true; + count++; + return false; }, [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); } return count == 0; } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_messagequeue.cpp" #pragma clang diagnostic pop diff --git a/common/messagequeue.h b/common/messagequeue.h index a2b72261..93c29832 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -1,62 +1,81 @@ +/* + * Copyright (C) 2019 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ #pragma once #include "sink_export.h" #include #include #include #include #include #include #include "storage.h" /** * A persistent FIFO message queue. */ class SINK_EXPORT MessageQueue : public QObject { Q_OBJECT public: enum ErrorCodes { NoMessageFound }; class Error { public: Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c) { } QByteArray store; QByteArray message; int code; }; MessageQueue(const QString &storageRoot, const QString &name); ~MessageQueue(); void startTransaction(); void enqueue(void const *msg, size_t size); void enqueue(const QByteArray &value); // Dequeue a message. This will return a new message everytime called. // Call the result handler with a success response to remove the message from the store. // TODO track processing progress to avoid processing the same message with the same preprocessor twice? void dequeue(const std::function)> &resultHandler, const std::function &errorHandler); KAsync::Job dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler); bool isEmpty(); public slots: void commit(); signals: void messageReady(); void drained(); private slots: void processRemovals(); private: Q_DISABLE_COPY(MessageQueue); Sink::Storage::DataStore mStorage; Sink::Storage::DataStore::Transaction mWriteTransaction; - QByteArrayList mPendingRemoval; + qint64 mReplayedRevision; }; diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index 2e3bd750..0caee04d 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp @@ -1,200 +1,243 @@ #include #include #include #include "store.h" #include "storage.h" #include "messagequeue.h" #include "log.h" #include "test.h" +#include "testutils.h" /** * Test of the messagequeue implementation. */ class MessageQueueTest : public QObject { Q_OBJECT private slots: void initTestCase() { Sink::Test::initTest(); Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } void cleanupTestCase() { } void cleanup() { Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } void testEmpty() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); QVERIFY(queue.isEmpty()); queue.enqueue("value"); QVERIFY(!queue.isEmpty()); + queue.dequeue([](void *ptr, int size, std::function callback) { callback(true); }, [](const MessageQueue::Error &error) {}); + QVERIFY(queue.isEmpty()); } void testDequeueEmpty() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); bool gotValue = false; bool gotError = false; queue.dequeue([&](void *ptr, int size, std::function callback) { gotValue = true; }, [&](const MessageQueue::Error &error) { gotError = true; }); QVERIFY(!gotValue); QVERIFY(!gotError); } void testEnqueue() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); QSignalSpy spy(&queue, SIGNAL(messageReady())); queue.enqueue("value1"); QCOMPARE(spy.size(), 1); } void testDrained() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); QSignalSpy spy(&queue, SIGNAL(drained())); queue.enqueue("value1"); queue.dequeue([](void *ptr, int size, std::function callback) { callback(true); }, [](const MessageQueue::Error &error) {}); QCOMPARE(spy.size(), 1); } void testSyncDequeue() { QQueue values; values << "value1"; values << "value2"; MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); for (const QByteArray &value : values) { queue.enqueue(value); } while (!queue.isEmpty()) { SinkLog() << "start"; const auto expected = values.dequeue(); bool gotValue = false; bool gotError = false; queue.dequeue( [&](void *ptr, int size, std::function callback) { if (QByteArray(static_cast(ptr), size) == expected) { gotValue = true; } callback(true); }, [&](const MessageQueue::Error &error) { gotError = true; }); QVERIFY(gotValue); QVERIFY(!gotError); } QVERIFY(values.isEmpty()); } void testAsyncDequeue() { QQueue values; values << "value1"; values << "value2"; MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); for (const QByteArray &value : values) { queue.enqueue(value); } while (!queue.isEmpty()) { QEventLoop eventLoop; const auto expected = values.dequeue(); bool gotValue = false; bool gotError = false; queue.dequeue( [&](void *ptr, int size, std::function callback) { if (QByteArray(static_cast(ptr), size) == expected) { gotValue = true; } auto timer = new QTimer(); timer->setSingleShot(true); QObject::connect(timer, &QTimer::timeout, [timer, callback, &eventLoop]() { delete timer; callback(true); eventLoop.exit(); }); timer->start(0); }, [&](const MessageQueue::Error &error) { gotError = true; }); eventLoop.exec(); QVERIFY(gotValue); QVERIFY(!gotError); } QVERIFY(values.isEmpty()); } /* * Dequeue's are async and we want to be able to enqueue new items in between. */ void testNestedEnqueue() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); queue.enqueue("value1"); bool gotError = false; queue.dequeue( [&](void *ptr, int size, std::function callback) { queue.enqueue("value3"); callback(true); }, [&](const MessageQueue::Error &error) { gotError = true; }); QVERIFY(!gotError); } void testBatchDequeue() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); queue.enqueue("value1"); queue.enqueue("value2"); queue.enqueue("value3"); int count = 0; queue.dequeueBatch(2, [&count](const QByteArray &data) { count++; + ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); return KAsync::null(); }).exec().waitForFinished(); QCOMPARE(count, 2); queue.dequeueBatch(1, [&count](const QByteArray &data) { count++; + ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); + return KAsync::null(); + }).exec().waitForFinished(); + QCOMPARE(count, 3); + } + + void testBatchDequeueDuringWriteTransaction() + { + MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); + queue.enqueue("value1"); + queue.enqueue("value2"); + queue.enqueue("value3"); + + queue.startTransaction(); + //Inivisible to dequeues because in write transaction + queue.enqueue("value4"); + + int count = 0; + queue.dequeueBatch(2, [&count](const QByteArray &data) { + count++; + ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); + return KAsync::null(); + }).exec().waitForFinished(); + QCOMPARE(count, 2); + + queue.dequeueBatch(2, [&count](const QByteArray &data) { + count++; + ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); return KAsync::null(); }).exec().waitForFinished(); QCOMPARE(count, 3); + QVERIFY(queue.isEmpty()); + + //Commit value4 + queue.commit(); + QVERIFY(!queue.isEmpty()); + queue.dequeueBatch(2, [&count](const QByteArray &data) { + count++; + ASYNCCOMPARE(data, QByteArray{"value"} + QByteArray::number(count)); + return KAsync::null(); + }).exec().waitForFinished(); + QCOMPARE(count, 4); } void testBatchEnqueue() { MessageQueue queue(Sink::Store::storageLocation(), "sink.dummy.testqueue"); QSignalSpy spy(&queue, SIGNAL(messageReady())); queue.startTransaction(); queue.enqueue("value1"); queue.enqueue("value2"); queue.enqueue("value3"); QVERIFY(queue.isEmpty()); QCOMPARE(spy.count(), 0); queue.commit(); QVERIFY(!queue.isEmpty()); QCOMPARE(spy.count(), 1); } }; QTEST_MAIN(MessageQueueTest) #include "messagequeuetest.moc"