diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 08a9a647..ff151633 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -1,175 +1,180 @@ /* * Copyright (C) 2019 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "messagequeue.h" #include "storage.h" #include "storage/key.h" #include using namespace Sink::Storage; -MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, DataStore::ReadWrite), mReplayedRevision{-1} +MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, DataStore::ReadWrite), mReplayedRevision{-1}, mName{name} { } MessageQueue::~MessageQueue() { if (mWriteTransaction) { mWriteTransaction.abort(); } } +QString MessageQueue::name() const +{ + return mName; +} + void MessageQueue::enqueue(void const *msg, size_t size) { enqueue(QByteArray::fromRawData(static_cast(msg), size)); } void MessageQueue::startTransaction() { if (mWriteTransaction) { return; } processRemovals(); mWriteTransaction = mStorage.createTransaction(DataStore::ReadWrite); } void MessageQueue::commit() { mWriteTransaction.commit(); mWriteTransaction = DataStore::Transaction(); processRemovals(); emit messageReady(); } void MessageQueue::enqueue(const QByteArray &value) { bool implicitTransaction = false; if (!mWriteTransaction) { implicitTransaction = true; startTransaction(); } const qint64 revision = DataStore::maxRevision(mWriteTransaction) + 1; mWriteTransaction.openDatabase().write(Revision{size_t(revision)}.toDisplayByteArray(), value); DataStore::setMaxRevision(mWriteTransaction, revision); if (implicitTransaction) { commit(); } } void MessageQueue::processRemovals() { if (mWriteTransaction) { if (mReplayedRevision > 0) { auto dequedRevisions = mReplayedRevision - DataStore::cleanedUpRevision(mWriteTransaction); if (dequedRevisions > 500) { SinkTrace() << "We're building up a large backlog of dequeued revisions " << dequedRevisions; } } return; } if (mReplayedRevision >= 0) { auto transaction = mStorage.createTransaction(DataStore::ReadWrite); auto db = transaction.openDatabase(); for (auto revision = DataStore::cleanedUpRevision(transaction) + 1; revision <= mReplayedRevision; revision++) { db.remove(Revision{size_t(revision)}.toDisplayByteArray()); } DataStore::setCleanedUpRevision(transaction, mReplayedRevision); transaction.commit(); mReplayedRevision = -1; } } void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { dequeueBatch(1, [resultHandler](const QByteArray &value) { return KAsync::start([&value, resultHandler](KAsync::Future &future) { resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); }); }).onError([errorHandler](const KAsync::Error &error) { errorHandler(Error("messagequeue", error.errorCode, error.errorMessage.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) { return KAsync::start([this, maxBatchSize, resultHandler](KAsync::Future &future) { int count = 0; QList> waitCondition; mStorage.createTransaction(DataStore::ReadOnly) .openDatabase() .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { const auto revision = key.toLongLong(); if (revision <= mReplayedRevision) { return true; } mReplayedRevision = revision; waitCondition << resultHandler(value).exec(); count++; if (count < maxBatchSize) { return true; } return false; }, [](const DataStore::Error &error) { SinkError() << "Error while retrieving value" << error.message; // errorHandler(Error(error.store, error.code, error.message)); }); // Trace() << "Waiting on " << waitCondition.size() << " results"; KAsync::waitForCompletion(waitCondition) .then([this, count, &future]() { processRemovals(); if (count == 0) { future.setFinished(); } else { if (isEmpty()) { emit this->drained(); } future.setFinished(); } }) .exec(); }); } bool MessageQueue::isEmpty() { int count = 0; auto t = mStorage.createTransaction(DataStore::ReadOnly); auto db = t.openDatabase(); if (db) { db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { const auto revision = key.toLongLong(); if (revision <= mReplayedRevision) { return true; } count++; return false; }, [](const DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); } return count == 0; } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_messagequeue.cpp" #pragma clang diagnostic pop diff --git a/common/messagequeue.h b/common/messagequeue.h index 93c29832..3729ee44 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -1,81 +1,84 @@ /* * Copyright (C) 2019 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include #include #include #include "storage.h" /** * A persistent FIFO message queue. */ class SINK_EXPORT MessageQueue : public QObject { Q_OBJECT public: enum ErrorCodes { NoMessageFound }; class Error { public: Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c) { } QByteArray store; QByteArray message; int code; }; MessageQueue(const QString &storageRoot, const QString &name); ~MessageQueue(); + QString name() const; + void startTransaction(); void enqueue(void const *msg, size_t size); void enqueue(const QByteArray &value); // Dequeue a message. This will return a new message everytime called. // Call the result handler with a success response to remove the message from the store. // TODO track processing progress to avoid processing the same message with the same preprocessor twice? void dequeue(const std::function)> &resultHandler, const std::function &errorHandler); KAsync::Job dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler); bool isEmpty(); public slots: void commit(); signals: void messageReady(); void drained(); private slots: void processRemovals(); private: Q_DISABLE_COPY(MessageQueue); Sink::Storage::DataStore mStorage; Sink::Storage::DataStore::Transaction mWriteTransaction; qint64 mReplayedRevision; + QString mName; };