diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 660326af..5fb1d0f8 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -1,1002 +1,1086 @@ /* * Copyright (C) 2014 Christian Mollekopf * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "storage.h" #include #include #include #include +#include +#include #include #include #include #include #include "log.h" #ifdef Q_OS_WIN #include typedef SSIZE_T ssize_t; #endif namespace Sink { namespace Storage { extern QReadWriteLock sDbisLock; extern QReadWriteLock sEnvironmentsLock; extern QHash sEnvironments; extern QHash sDbis; QReadWriteLock sDbisLock; QReadWriteLock sEnvironmentsLock; QHash sEnvironments; QHash sDbis; int getErrorCode(int e) { switch (e) { case MDB_NOTFOUND: return DataStore::ErrorCodes::NotFound; default: break; } return -1; } static QList getDatabaseNames(MDB_txn *transaction) { if (!transaction) { SinkWarning() << "Invalid transaction"; return QList(); } int rc; QList list; MDB_dbi dbi; if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) { MDB_val key; MDB_val data; MDB_cursor *cursor; mdb_cursor_open(transaction, dbi, &cursor); if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); } } else { //Normal if we don't have any databases yet if (rc == MDB_NOTFOUND) { rc = 0; } if (rc) { SinkWarning() << "Failed to get a value" << rc; } } mdb_cursor_close(cursor); } else { SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); } return list; } +/* + * To create a dbi we always need a write transaction, + * and we always need to commit the transaction ASAP + * We can only ever enter from one point per process. + */ + +QMutex sCreateDbiLock; + +static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, bool allowDuplicates, MDB_dbi &dbi) +{ + + unsigned int flags = 0; + if (allowDuplicates) { + flags |= MDB_DUPSORT; + } + + MDB_dbi flagtableDbi; + if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { + if (!readOnly) { + SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); + } + } else { + MDB_val key, value; + key.mv_data = const_cast(static_cast(db.constData())); + key.mv_size = db.size(); + if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { + //We expect this to fail for new databases + if (rc != MDB_NOTFOUND) { + SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); + } + } else { + //Found the flags + const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); + flags = ba.toInt(); + } + } + + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { + //Create the db if it is not existing already + if (rc == MDB_NOTFOUND && !readOnly) { + //Sanity check db name + { + auto parts = db.split('.'); + for (const auto &p : parts) { + auto containsSpecialCharacter = [] (const QByteArray &p) { + for (int i = 0; i < p.size(); i++) { + const auto c = p.at(i); + //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars + if (c < 0x30 || c > 0x7A) { + return true; + } + } + return false; + }; + if (p.isEmpty() || containsSpecialCharacter(p)) { + SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; + Q_ASSERT(false); + throw std::runtime_error("Fatal error while creating db."); + } + } + } + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { + SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); + return false; + } + //Record the db flags + MDB_val key, value; + key.mv_data = const_cast(static_cast(db.constData())); + key.mv_size = db.size(); + //Store the flags without the create option + const auto ba = QByteArray::number(flags); + value.mv_data = const_cast(static_cast(db.constData())); + value.mv_size = db.size(); + if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { + //We expect this to fail if we're only creating the dbi but not the db + if (rc != MDB_KEYEXIST) { + SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); + } + } + } else { + //It's not an error if we only want to read + if (!readOnly) { + SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); + return true; + } + return false; + } + } + return true; +} class DataStore::NamedDatabase::Private { public: Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } ~Private() { } QByteArray db; MDB_txn *transaction; MDB_dbi dbi; bool allowDuplicates; std::function defaultErrorHandler; QString name; bool createdNewDbi = false; - QString createdDbName; + QString createdNewDbiName; - bool openDatabase(bool readOnly, std::function errorHandler) + bool dbiValidForTransaction(MDB_dbi dbi, MDB_txn *transaction) { - unsigned int flags = 0; - if (allowDuplicates) { - flags |= MDB_DUPSORT; + //sDbis can contain dbi's that are not available to this transaction. + //We use mdb_dbi_flags to check if the dbi is valid for this transaction. + uint f; + if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { + return false; } + return true; + } + bool openDatabase(bool readOnly, std::function errorHandler) + { const auto dbiName = name + db; + QReadLocker dbiLocker{&sDbisLock}; if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); - //sDbis can contain dbi's that are not available to this transaction. - //We use mdb_dbi_flags to check if the dbi is valid for this transaction. - uint f; - if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { - //In readonly mode we can just ignore this. In read-write we would have tried to concurrently create a db. - if (!readOnly) { - SinkWarning() << "Tried to create database in second transaction: " << dbiName; - } - dbi = 0; - transaction = 0; - return false; - } + Q_ASSERT(dbiValidForTransaction(dbi, transaction)); } else { - MDB_dbi flagtableDbi; - if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { - if (!readOnly) { - SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); + /* + * Dynamic creation of databases. + * If all databases were defined via the database layout we wouldn't ever end up in here. + * However, we rely on this codepath for indexes, synchronization databases and in race-conditions + * where the database is not yet fully created when the client initializes it for reading. + * + * There are a few things to consider: + * * dbi's (DataBase Identifier) should be opened once (ideally), and then be persisted in the environment. + * * To open a dbi we need a transaction and must commit the transaction. From then on any open transaction will have access to the dbi. + * * Already running transactions will not have access to the dbi. + * * There *must* only ever be one active transaction opening dbi's (using mdb_dbi_open), and that transaction *must* + * commit or abort before any other transaction opens a dbi. + * + * We solve this the following way: + * * For read-only transactions we abort the transaction, open the dbi and persist it in the environment, and reopen the transaction (so the dbi is available). This may result in the db content changing unexpectedly and referenced memory becoming unavailable, but isn't a problem as long as we don't rely on memory remaining valid for the duration of the transaction (which is anyways not given since any operation would invalidate the memory region).. + * * For write transactions we open the dbi for future use, and then open it as well in the current transaction. + */ + SinkTrace() << "Creating database dynamically: " << dbiName << readOnly; + //Only one transaction may ever create dbis at a time. + QMutexLocker createDbiLocker(&sCreateDbiLock); + //Double checked locking + if (sDbis.contains(dbiName)) { + dbi = sDbis.value(dbiName); + Q_ASSERT(dbiValidForTransaction(dbi, transaction)); + return true; + } + + //Create a transaction to open the dbi + MDB_txn *dbiTransaction; + if (readOnly) { + MDB_env *env = mdb_txn_env(transaction); + Q_ASSERT(env); + mdb_txn_reset(transaction); + if (const int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &dbiTransaction)) { + SinkError() << "Failed to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; + return false; } } else { - MDB_val key, value; - key.mv_data = const_cast(static_cast(db.constData())); - key.mv_size = db.size(); - if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { - //We expect this to fail for new databases - if (rc != MDB_NOTFOUND) { - SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); - } - } else { - //Found the flags - const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); - flags = ba.toInt(); - } + dbiTransaction = transaction; } - - Q_ASSERT(transaction); - if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { - //Create the db if it is not existing already - if (rc == MDB_NOTFOUND && !readOnly) { - //Sanity check db name - { - auto parts = db.split('.'); - for (const auto &p : parts) { - auto containsSpecialCharacter = [] (const QByteArray &p) { - for (int i = 0; i < p.size(); i++) { - const auto c = p.at(i); - //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars - if (c < 0x30 || c > 0x7A) { - return true; - } - } - return false; - }; - if (p.isEmpty() || containsSpecialCharacter(p)) { - SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; - Q_ASSERT(false); - throw std::runtime_error("Fatal error while creating db."); - } - } - } - if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { - SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); - Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while creating database: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : defaultErrorHandler(error); - return false; - } - //Record the db flags - MDB_val key, value; - key.mv_data = const_cast(static_cast(db.constData())); - key.mv_size = db.size(); - //Store the flags without the create option - const auto ba = QByteArray::number(flags); - value.mv_data = const_cast(static_cast(db.constData())); - value.mv_size = db.size(); - if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { - //We expect this to fail if we're only creating the dbi but not the db - if (rc != MDB_KEYEXIST) { - SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); - } - } + if (createDbi(dbiTransaction, db, readOnly, allowDuplicates, dbi)) { + if (readOnly) { + mdb_txn_commit(dbiTransaction); + dbiLocker.unlock(); + QWriteLocker dbiWriteLocker(&sDbisLock); + sDbis.insert(dbiName, dbi); + //We reopen the read-only transaction so the dbi becomes available in it. + mdb_txn_renew(transaction); } else { - dbi = 0; - transaction = 0; - //It's not an error if we only want to read - if (!readOnly) { - SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); - Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : defaultErrorHandler(error); - } - return false; + createdNewDbi = true; + createdNewDbiName = dbiName; + } + //Ensure the dbi is valid for the parent transaction + Q_ASSERT(dbiValidForTransaction(dbi, transaction)); + } else { + if (readOnly) { + mdb_txn_abort(dbiTransaction); + mdb_txn_renew(transaction); } + SinkWarning() << "Failed to create the dbi: " << dbiName; + dbi = 0; + transaction = 0; + return false; } - - createdNewDbi = true; - createdDbName = dbiName; } return true; } }; DataStore::NamedDatabase::NamedDatabase() : d(nullptr) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) { *this = std::move(other); } DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) { if (&other != this) { delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::NamedDatabase::~NamedDatabase() { delete d; } bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); if (d) { errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return false; } const void *keyPtr = sKey.data(); const size_t keySize = sKey.size(); const void *valuePtr = sValue.data(); const size_t valueSize = sValue.size(); if (!keyPtr || keySize == 0) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key."); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } int rc; MDB_val key, data; key.mv_size = keySize; key.mv_data = const_cast(keyPtr); data.mv_size = valueSize; data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return !rc; } void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } int rc; MDB_val key; key.mv_size = k.size(); key.mv_data = const_cast(static_cast(k.data())); if (value.isEmpty()) { rc = mdb_del(d->transaction, d->dbi, &key, 0); } else { MDB_val data; data.mv_size = value.size(); data.mv_data = const_cast(static_cast(value.data())); rc = mdb_del(d->transaction, d->dbi, &key, &data); } if (rc) { auto errorCode = ErrorCodes::GenericError; if (rc == MDB_NOTFOUND) { errorCode = ErrorCodes::NotFound; } Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } } int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return 0; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { //Invalid arguments can mean that the transaction doesn't contain the db dbi Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates || findSubstringKeys) { MDB_cursor_op op = d->allowDuplicates ? MDB_SET : MDB_FIRST; if (findSubstringKeys) { op = MDB_SET_RANGE; } if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // The first lookup will find a key that is equal or greather than our key if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; } if (!callResultHandler || resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { if (findSubstringKeys) { // Reset the key to what we search for key.mv_data = (void *)k.constData(); key.mv_size = k.size(); } MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // Every consequitive lookup simply iterates through the list if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { break; } } } } } } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } } else { if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { numberOfRetrievedValues++; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return numberOfRetrievedValues; } void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return; } if (k.isEmpty()) { Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } bool foundValue = false; MDB_cursor_op op = MDB_SET_RANGE; if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { // The first lookup will find a key that is equal or greather than our key if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { //Read next value until we no longer match while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { MDB_cursor_op nextOp = MDB_NEXT; rc = mdb_cursor_get(cursor, &key, &data, nextOp); if (rc) { break; } } //Now read the previous value, and that's the latest one MDB_cursor_op prefOp = MDB_PREV; // We read past the end above, just take the last value if (rc == MDB_NOTFOUND) { prefOp = MDB_LAST; } rc = mdb_cursor_get(cursor, &key, &data, prefOp); foundValue = true; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } else if (!foundValue) { Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } qint64 DataStore::NamedDatabase::getSize() { if (!d || !d->transaction) { return -1; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); } return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages); } DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat() { if (!d || !d->transaction) { return {}; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); return {}; } return {stat.ms_branch_pages, stat.ms_leaf_pages, stat.ms_overflow_pages, stat.ms_entries}; // std::cout << "page size: " << stat.ms_psize << std::endl; // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl; // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl; // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; // std::cout << "depth: " << stat.ms_depth << std::endl; // std::cout << "entries: " << stat.ms_entries << std::endl; } bool DataStore::NamedDatabase::allowsDuplicates() const { unsigned int flags; mdb_dbi_flags(d->transaction, d->dbi, &flags); return flags & MDB_DUPSORT; } class DataStore::Transaction::Private { public: - Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env, bool _noLock = false) - : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0), noLock(_noLock) + Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) + : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false) { } ~Private() { } MDB_env *env; MDB_txn *transaction; bool requestedRead; std::function defaultErrorHandler; QString name; bool implicitCommit; bool error; - int modificationCounter; - bool noLock; - QMap createdDbs; void startTransaction() { Q_ASSERT(!transaction); Q_ASSERT(sEnvironments.values().contains(env)); Q_ASSERT(env); // auto f = [](const char *msg, void *ctx) -> int { // qDebug() << msg; // return 0; // }; // mdb_reader_list(env, f, nullptr); // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; if (rc) { unsigned int flags; mdb_env_get_flags(env, &flags); if (flags & MDB_RDONLY && !requestedRead) { SinkError() << "Tried to open a write transation in a read-only enironment"; } defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); } } }; DataStore::Transaction::Transaction() : d(nullptr) { } DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) { *this = std::move(other); } DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) { if (&other != this) { abort(); delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { commit(); } else { // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; abort(); } } delete d; } DataStore::Transaction::operator bool() const { return (d && d->transaction); } bool DataStore::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; } // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); const int rc = mdb_txn_commit(d->transaction); if (rc) { abort(); Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application. throw std::runtime_error("Fatal error while committing transaction."); } - d->transaction = nullptr; //Add the created dbis to the shared environment if (!d->createdDbs.isEmpty()) { - if (!d->noLock) { - sDbisLock.lockForWrite(); - } + sDbisLock.lockForWrite(); for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) { + //This means we opened the dbi again in a read-only transaction while the write transaction was ongoing. Q_ASSERT(!sDbis.contains(it.key())); - sDbis.insert(it.key(), it.value()); + if (!sDbis.contains(it.key())) { + sDbis.insert(it.key(), it.value()); + } } d->createdDbs.clear(); - if (!d->noLock) { - sDbisLock.unlock(); - } + sDbisLock.unlock(); } + d->transaction = nullptr; return !rc; } void DataStore::Transaction::abort() { if (!d || !d->transaction) { return; } - d->createdDbs.clear(); // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); mdb_txn_abort(d->transaction); + d->createdDbs.clear(); d->transaction = nullptr; } //Ensure that we opened the correct database by comparing the expected identifier with the one //we write to the database on first open. static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly) { bool openedTheWrongDatabase = false; auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { if (value != db) { SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db; openedTheWrongDatabase = true; } return false; }, [&](const DataStore::Error &) { }, false); //This is the first time we open this database in a write transaction, write the db name if (!count) { if (!readOnly) { database.write("__internal_dbname", db); } } return !openedTheWrongDatabase; } DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const { if (!d) { SinkError() << "Tried to open database on invalid transaction: " << db; return DataStore::NamedDatabase(); } Q_ASSERT(d->transaction); // We don't now if anything changed d->implicitCommit = true; auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); - if (!d->noLock) { - sDbisLock.lockForRead(); - } - if (!p->openDatabase(d->requestedRead, errorHandler)) { - if (!d->noLock) { - sDbisLock.unlock(); - } + auto ret = p->openDatabase(d->requestedRead, errorHandler); + if (!ret) { delete p; return DataStore::NamedDatabase(); } - if (!d->noLock) { - sDbisLock.unlock(); - } + if (p->createdNewDbi) { - d->createdDbs.insert(p->createdDbName, p->dbi); + d->createdDbs.insert(p->createdNewDbiName, p->dbi); } + auto database = DataStore::NamedDatabase(p); if (!ensureCorrectDb(database, db, d->requestedRead)) { SinkWarning() << "Failed to open the database correctly" << db; Q_ASSERT(false); return DataStore::NamedDatabase(); } return database; } QList DataStore::Transaction::getDatabaseNames() const { if (!d) { SinkWarning() << "Invalid transaction"; return QList(); } return Sink::Storage::getDatabaseNames(d->transaction); } DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails) { const int freeDbi = 0; const int mainDbi = 1; MDB_envinfo mei; mdb_env_info(d->env, &mei); MDB_stat mst; mdb_stat(d->transaction, freeDbi, &mst); auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; mdb_stat(d->transaction, mainDbi, &mst); auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; MDB_cursor *cursor; MDB_val key, data; size_t freePages = 0, *iptr; int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor); if (rc) { fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); return {}; } while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { iptr = static_cast(data.mv_data); freePages += *iptr; bool bad = false; size_t pg, prev; ssize_t i, j, span = 0; j = *iptr++; for (i = j, prev = 1; --i >= 0; ) { pg = iptr[i]; if (pg <= prev) { bad = true; } prev = pg; pg += span; for (; i >= span && iptr[i-span] == pg; span++, pg++) ; } if (printDetails) { std::cout << " Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl; for (--j; j >= 0; ) { pg = iptr[j]; for (span=1; --j >= 0 && iptr[j] == pg+span; span++); if (span > 1) { std::cout << " " << pg << "[" << span << "]\n"; } else { std::cout << " " << pg << std::endl; } } } } mdb_cursor_close(cursor); return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat}; } class DataStore::Private { public: Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {}); ~Private(); QString storageRoot; QString name; MDB_env *env = nullptr; AccessMode mode; Sink::Log::Context logCtx; void initEnvironment(const QString &fullPath, const DbLayout &layout) { // Ensure the environment is only created once, and that we only have one environment per process QReadLocker locker(&sEnvironmentsLock); if (!(env = sEnvironments.value(fullPath))) { locker.unlock(); QWriteLocker envLocker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); if (!(env = sEnvironments.value(fullPath))) { int rc = 0; if ((rc = mdb_env_create(&env))) { SinkWarningCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc); qCritical() << "mdb_env_create: " << rc << " " << mdb_strerror(rc); env = nullptr; } else { //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table. mdb_env_set_maxdbs(env, 50); if (RUNNING_ON_VALGRIND) { // In order to run valgrind this size must be smaller than half your available RAM // https://github.com/BVLC/caffe/issues/2404 mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)1000); // 1MB * 1000 } else { //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit. mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)100000); // 1MB * 1000 } const bool readOnly = (mode == ReadOnly); unsigned int flags = MDB_NOTLS; if (readOnly) { flags |= MDB_RDONLY; } if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { if (readOnly) { SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath; } else { SinkWarningCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc); } mdb_env_close(env); env = 0; } else { Q_ASSERT(env); sEnvironments.insert(fullPath, env); //Open all available dbi's - bool noLock = true; - auto t = Transaction(new Transaction::Private(readOnly, nullptr, name, env, noLock)); + MDB_txn *transaction; + if (const int rc = mdb_txn_begin(env, nullptr, readOnly ? MDB_RDONLY : 0, &transaction)) { + SinkWarning() << "Failed to to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; + return; + } if (!layout.tables.isEmpty()) { //TODO upgrade db if the layout has changed: //* read existing layout //* if layout is not the same create new layout - //If the db is read only, abort if the db is not yet existing. - //If the db is not read-only but is not existing, ensure we have a layout and create all tables. + //Create dbis from the given layout. for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { - bool allowDuplicates = it.value(); - t.openDatabase(it.key(), {}, allowDuplicates); + const bool allowDuplicates = it.value(); + MDB_dbi dbi = 0; + const auto db = it.key(); + const auto dbiName = name + db; + if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { + sDbis.insert(dbiName, dbi); + } } } else { - for (const auto &db : t.getDatabaseNames()) { - //Get dbi to store for future use. - t.openDatabase(db); + //Open all available databases + for (const auto &db : getDatabaseNames(transaction)) { + MDB_dbi dbi = 0; + const auto dbiName = name + db; + //We're going to load the flags anyways. + bool allowDuplicates = false; + if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { + sDbis.insert(dbiName, dbi); + } } } //To persist the dbis (this is also necessary for read-only transactions) - t.commit(); + mdb_txn_commit(transaction); } } } } } }; DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1()) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); if (!dirInfo.exists() && mode == ReadWrite) { QDir().mkpath(fullPath); dirInfo.refresh(); } if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { qCritical() << fullPath << "does not have write permissions. Aborting"; } else if (dirInfo.exists()) { initEnvironment(fullPath, layout); } } DataStore::Private::~Private() { //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) //and create storage instance from all over the place. Thus, we're not closing it here on purpose. } DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout)) { } DataStore::~DataStore() { delete d; } bool DataStore::exists(const QString &storageRoot, const QString &name) { return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists(); } bool DataStore::exists() const { return (d->env != 0) && DataStore::exists(d->storageRoot, d->name); } DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function &errorHandlerArg) { auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); if (!d->env) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment")); return Transaction(); } bool requestedRead = type == ReadOnly; if (d->mode == ReadOnly && !requestedRead) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode.")); return Transaction(); } QReadLocker locker(&sEnvironmentsLock); if (!sEnvironments.values().contains(d->env)) { return {}; } return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); } qint64 DataStore::diskUsage() const { QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); if (!info.exists()) { SinkWarning() << "Tried to get filesize for non-existant file: " << info.path(); } return info.size(); } void DataStore::removeFromDisk() const { const QString fullPath(d->storageRoot + '/' + d->name); QWriteLocker dbiLocker(&sDbisLock); QWriteLocker envLocker(&sEnvironmentsLock); SinkTrace() << "Removing database from disk: " << fullPath; auto env = sEnvironments.take(fullPath); for (const auto &key : sDbis.keys()) { if (key.startsWith(d->name)) { sDbis.remove(key); } } mdb_env_close(env); QDir dir(fullPath); if (!dir.removeRecursively()) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); defaultErrorHandler()(error); } } void DataStore::clearEnv() { + SinkTrace() << "Clearing environment"; QWriteLocker locker(&sEnvironmentsLock); - for (auto env : sEnvironments) { + QWriteLocker dbiLocker(&sDbisLock); + for (const auto &envName : sEnvironments.keys()) { + auto env = sEnvironments.value(envName); + mdb_env_sync(env, true); + for (const auto &k : sDbis.keys()) { + if (k.startsWith(envName)) { + auto dbi = sDbis.value(k); + mdb_dbi_close(env, dbi); + } + } mdb_env_close(env); } sDbis.clear(); sEnvironments.clear(); } } } // namespace Sink diff --git a/tests/dbwriter.cpp b/tests/dbwriter.cpp index 902a6071..3045eac5 100644 --- a/tests/dbwriter.cpp +++ b/tests/dbwriter.cpp @@ -1,45 +1,49 @@ #include #include #include int main(int argc, char *argv[]) { QByteArrayList arguments; for (int i = 0; i < argc; i++) { arguments << argv[i]; } auto testDataPath = arguments.value(1); auto dbName = arguments.value(2); auto count = arguments.value(3).toInt(); if (Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly).exists()) { Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } qWarning() << "Creating db: " << testDataPath << dbName << count; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + QMap dbs = {{"a", 0}, {"b", 0}, {"c", 0}, {"p", 0}, {"q", 0}, {"db", 0}}; + for (int d = 0; d < 40; d++) { + dbs.insert("db" + QByteArray::number(d), 0); + } + Sink::Storage::DataStore store(testDataPath, {dbName, dbs}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); for (int i = 0; i < count; i++) { if (!transaction) { qWarning() << "No valid transaction"; return -1; } transaction.openDatabase("a", nullptr, false).write(QByteArray::number(i), "a"); transaction.openDatabase("b", nullptr, false).write(QByteArray::number(i), "b"); transaction.openDatabase("c", nullptr, false).write(QByteArray::number(i), "c"); transaction.openDatabase("p", nullptr, false).write(QByteArray::number(i), "c"); transaction.openDatabase("q", nullptr, false).write(QByteArray::number(i), "c"); if (i > (count/2)) { for (int d = 0; d < 40; d++) { transaction.openDatabase("db" + QByteArray::number(d), nullptr, false).write(QByteArray::number(i), "a"); } } if ((i % 1000) == 0) { transaction.commit(); transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); } } qWarning() << "Creating db done."; return 0; } diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index 802947f0..618f9d09 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp @@ -1,577 +1,658 @@ #include #include #include #include #include #include "common/storage.h" /** * Test of the storage implementation to ensure it can do the low level operations as expected. */ class StorageTest : public QObject { Q_OBJECT private: QString testDataPath; - QString dbName; + QByteArray dbName; const char *keyPrefix = "key"; void populate(int count) { - Sink::Storage::DataStore storage(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore storage(testDataPath, {dbName, {{"default", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadWrite); for (int i = 0; i < count; i++) { // This should perhaps become an implementation detail of the db? if (i % 10000 == 0) { if (i > 0) { transaction.commit(); transaction = storage.createTransaction(Sink::Storage::DataStore::ReadWrite); } } transaction.openDatabase().write(keyPrefix + QByteArray::number(i), keyPrefix + QByteArray::number(i)); } transaction.commit(); } bool verify(Sink::Storage::DataStore &storage, int i) { bool success = true; bool keyMatch = true; const auto reference = keyPrefix + QByteArray::number(i); storage.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan(keyPrefix + QByteArray::number(i), [&keyMatch, &reference](const QByteArray &key, const QByteArray &value) -> bool { if (value != reference) { qDebug() << "Mismatch while reading"; keyMatch = false; } return keyMatch; }, [&success](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; success = false; }); return success && keyMatch; } private slots: void initTestCase() { testDataPath = "./testdb"; dbName = "test"; - Sink::Storage::DataStore storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, {dbName, {{"default", 0}}}); storage.removeFromDisk(); } void cleanup() { - Sink::Storage::DataStore storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, {dbName, {{"default", 0}}}); storage.removeFromDisk(); } void testCleanup() { populate(1); - Sink::Storage::DataStore storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, {dbName, {{"default", 0}}}); storage.removeFromDisk(); QFileInfo info(testDataPath + "/" + dbName); QVERIFY(!info.exists()); } void testRead() { const int count = 100; populate(count); // ensure we can read everything back correctly { Sink::Storage::DataStore storage(testDataPath, dbName); for (int i = 0; i < count; i++) { QVERIFY(verify(storage, i)); } } } void testScan() { const int count = 100; populate(count); // ensure we can scan for values { int hit = 0; Sink::Storage::DataStore store(testDataPath, dbName); store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { if (key == "key50") { hit++; } return true; }); QCOMPARE(hit, 1); } // ensure we can read a single value { int hit = 0; bool foundInvalidValue = false; Sink::Storage::DataStore store(testDataPath, dbName); store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("key50", [&](const QByteArray &key, const QByteArray &value) -> bool { if (key != "key50") { foundInvalidValue = true; } hit++; return true; }); QVERIFY(!foundInvalidValue); QCOMPARE(hit, 1); } } void testNestedOperations() { populate(3); Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); transaction.openDatabase().scan("key1", [&](const QByteArray &key, const QByteArray &value) -> bool { transaction.openDatabase().remove(key, [](const Sink::Storage::DataStore::Error &) { QVERIFY(false); }); return false; }); } void testNestedTransactions() { populate(3); Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("key1", [&](const QByteArray &key, const QByteArray &value) -> bool { store.createTransaction(Sink::Storage::DataStore::ReadWrite).openDatabase().remove(key, [](const Sink::Storage::DataStore::Error &) { QVERIFY(false); }); return false; }); } void testReadEmptyDb() { bool gotResult = false; bool gotError = false; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"default", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase("default", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); int numValues = db.scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { gotResult = true; return false; }, [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QCOMPARE(numValues, 0); QVERIFY(!gotResult); QVERIFY(!gotError); } void testConcurrentRead() { // With a count of 10000 this test is more likely to expose problems, but also takes some time to execute. const int count = 1000; populate(count); // QTest::qWait(500); // We repeat the test a bunch of times since failing is relatively random for (int tries = 0; tries < 10; tries++) { bool error = false; // Try to concurrently read QList> futures; const int concurrencyLevel = 20; for (int num = 0; num < concurrencyLevel; num++) { futures << QtConcurrent::run([this, &error]() { Sink::Storage::DataStore storage(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly); Sink::Storage::DataStore storage2(testDataPath, dbName + "2", Sink::Storage::DataStore::ReadOnly); for (int i = 0; i < count; i++) { if (!verify(storage, i)) { error = true; break; } } }); } for (auto future : futures) { future.waitForFinished(); } QVERIFY(!error); } { Sink::Storage::DataStore storage(testDataPath, dbName); storage.removeFromDisk(); Sink::Storage::DataStore storage2(testDataPath, dbName + "2"); storage2.removeFromDisk(); } } void testNoDuplicates() { bool gotResult = false; bool gotError = false; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"default", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("default", nullptr, false); db.write("key", "value"); db.write("key", "value"); int numValues = db.scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { gotResult = true; return true; }, [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QCOMPARE(numValues, 1); QVERIFY(!gotError); QVERIFY(gotResult); } void testDuplicates() { bool gotResult = false; bool gotError = false; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"default", 0x04}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("default", nullptr, true); db.write("key", "value1"); db.write("key", "value2"); int numValues = db.scan("key", [&](const QByteArray &key, const QByteArray &value) -> bool { gotResult = true; return true; }, [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QCOMPARE(numValues, 2); QVERIFY(!gotError); } void testNonexitingNamedDb() { bool gotResult = false; bool gotError = false; Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly); int numValues = store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase("test") .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { gotResult = true; return false; }, [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QCOMPARE(numValues, 0); QVERIFY(!gotResult); QVERIFY(!gotError); } void testWriteToNamedDb() { bool gotError = false; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); store.createTransaction(Sink::Storage::DataStore::ReadWrite) .openDatabase("test") .write("key1", "value1", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QVERIFY(!gotError); } void testWriteDuplicatesToNamedDb() { bool gotError = false; - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); store.createTransaction(Sink::Storage::DataStore::ReadWrite) .openDatabase("test", nullptr, true) .write("key1", "value1", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); QVERIFY(!gotError); } // By default we want only exact matches void testSubstringKeys() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0x04}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub", "value1"); db.write("subsub", "value2"); int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { return true; }); QCOMPARE(numValues, 1); } void testFindSubstringKeys() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub", "value1"); db.write("subsub", "value2"); db.write("wubsub", "value3"); int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { return true; }, nullptr, true); QCOMPARE(numValues, 2); } void testFindSubstringKeysWithDuplicatesEnabled() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub", "value1"); db.write("subsub", "value2"); db.write("wubsub", "value3"); int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { return true; }, nullptr, true); QCOMPARE(numValues, 2); } void testKeySorting() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub_2", "value2"); db.write("sub_1", "value1"); db.write("sub_3", "value3"); QList results; int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { results << value; return true; }, nullptr, true); QCOMPARE(numValues, 3); QCOMPARE(results.at(0), QByteArray("value1")); QCOMPARE(results.at(1), QByteArray("value2")); QCOMPARE(results.at(2), QByteArray("value3")); } // Ensure we don't retrieve a key that is greater than the current key. We only want equal keys. void testKeyRange() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub1", "value1"); int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { return true; }); QCOMPARE(numValues, 0); } void testFindLatest() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub1", "value1"); db.write("sub2", "value2"); db.write("wub3", "value3"); db.write("wub4", "value4"); QByteArray result; db.findLatest("sub", [&](const QByteArray &key, const QByteArray &value) { result = value; }); QCOMPARE(result, QByteArray("value2")); } void testFindLatestInSingle() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub2", "value2"); QByteArray result; db.findLatest("sub", [&](const QByteArray &key, const QByteArray &value) { result = value; }); QCOMPARE(result, QByteArray("value2")); } void testFindLast() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub2", "value2"); db.write("wub3", "value3"); QByteArray result; db.findLatest("wub", [&](const QByteArray &key, const QByteArray &value) { result = value; }); QCOMPARE(result, QByteArray("value3")); } + static QMap baseDbs() + { + return {{"revisionType", 0}, + {"revisions", 0}, + {"uids", 0}, + {"default", 0}, + {"__flagtable", 0}}; + } + void testRecordRevision() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, baseDbs()}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); Sink::Storage::DataStore::recordRevision(transaction, 1, "uid", "type"); QCOMPARE(Sink::Storage::DataStore::getTypeFromRevision(transaction, 1), QByteArray("type")); QCOMPARE(Sink::Storage::DataStore::getUidFromRevision(transaction, 1), QByteArray("uid")); } void testRecordRevisionSorting() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"test", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); QByteArray result; auto db = transaction.openDatabase("test", nullptr, false); const auto uid = "{c5d06a9f-1534-4c52-b8ea-415db68bdadf}"; //Ensure we can sort 1 and 10 properly (by default string comparison 10 comes before 6) db.write(Sink::Storage::DataStore::assembleKey(uid, 6), "value1"); db.write(Sink::Storage::DataStore::assembleKey(uid, 10), "value2"); db.findLatest(uid, [&](const QByteArray &key, const QByteArray &value) { result = value; }); QCOMPARE(result, QByteArray("value2")); } void testTransactionVisibility() { auto readValue = [](const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray) { QByteArray result; db.scan("key1", [&](const QByteArray &, const QByteArray &value) { result = value; return true; }); return result; }; { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"testTransactionVisibility", 0}}}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("testTransactionVisibility", nullptr, false); db.write("key1", "foo"); QCOMPARE(readValue(db, "key1"), QByteArray("foo")); { auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db2 = transaction2 .openDatabase("testTransactionVisibility", nullptr, false); QCOMPARE(readValue(db2, "key1"), QByteArray()); } transaction.commit(); { auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db2 = transaction2 .openDatabase("testTransactionVisibility", nullptr, false); QCOMPARE(readValue(db2, "key1"), QByteArray("foo")); } } } void testCopyTransaction() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore store(testDataPath, {dbName, {{"a", 0}, {"b", 0}, {"c", 0}}}, Sink::Storage::DataStore::ReadWrite); { auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); transaction.openDatabase("a", nullptr, false); transaction.openDatabase("b", nullptr, false); transaction.openDatabase("c", nullptr, false); transaction.commit(); } auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); for (int i = 0; i < 1000; i++) { transaction.openDatabase("a", nullptr, false); transaction.openDatabase("b", nullptr, false); transaction.openDatabase("c", nullptr, false); transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); } } /* * This test is meant to find problems with the multi-process architecture and initial database creation. * If we create named databases dynamically (not all up front), it is possilbe that we violate the rule * that mdb_open_dbi may only be used by a single thread at a time. * This test is meant to stress that condition. * * However, it yields absolutely nothing. */ void testReadDuringExternalProcessWrite() { - QSKIP("Not running multiprocess test"); QList> futures; for (int i = 0; i < 5; i++) { futures << QtConcurrent::run([&]() { QTRY_VERIFY(Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly).exists()); Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); for (int i = 0; i < 100000; i++) { transaction.openDatabase("a", nullptr, false); transaction.openDatabase("b", nullptr, false); transaction.openDatabase("c", nullptr, false); transaction.openDatabase("p", nullptr, false); transaction.openDatabase("q", nullptr, false); } }); } //Start writing to the db from a separate process QVERIFY(QProcess::startDetached(QCoreApplication::applicationDirPath() + "/dbwriter", QStringList() << testDataPath << dbName << QString::number(100000))); for (auto future : futures) { future.waitForFinished(); } } void testRecordUid() { - Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + + QMap dbs = {{"revisionType", 0}, + {"revisions", 0}, + {"uids", 0}, + {"default", 0}, + {"__flagtable", 0}, + {"typeuids", 0}, + {"type2uids", 0} + }; + + Sink::Storage::DataStore store(testDataPath, {dbName, dbs}, Sink::Storage::DataStore::ReadWrite); auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); Sink::Storage::DataStore::recordUid(transaction, "uid1", "type"); Sink::Storage::DataStore::recordUid(transaction, "uid2", "type"); Sink::Storage::DataStore::recordUid(transaction, "uid3", "type2"); { QVector uids; Sink::Storage::DataStore::getUids("type", transaction, [&](const QByteArray &r) { uids << r; }); QVector expected{{"uid1"}, {"uid2"}}; QCOMPARE(uids, expected); } Sink::Storage::DataStore::removeUid(transaction, "uid2", "type"); { QVector uids; Sink::Storage::DataStore::getUids("type", transaction, [&](const QByteArray &r) { uids << r; }); QVector expected{{"uid1"}}; QCOMPARE(uids, expected); } } + + void testDbiVisibility() + { + auto readValue = [](const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray) { + QByteArray result; + db.scan("key1", [&](const QByteArray &, const QByteArray &value) { + result = value; + return true; + }); + return result; + }; + { + Sink::Storage::DataStore store(testDataPath, {dbName, {{"testTransactionVisibility", 0}}}, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); + + auto db = transaction.openDatabase("testTransactionVisibility", nullptr, false); + db.write("key1", "foo"); + QCOMPARE(readValue(db, "key1"), QByteArray("foo")); + transaction.commit(); + } + Sink::Storage::DataStore::clearEnv(); + + //Try to read-only dynamic opening of the db. + //This is the case if we don't have all databases available upon initializatoin and we don't (e.g. because the db hasn't been created yet) + { + // Trick the db into not loading all dbs by passing in a bogus layout. + Sink::Storage::DataStore store(testDataPath, {dbName, {{"bogus", 0}}}, Sink::Storage::DataStore::ReadOnly); + + //This transaction should open the dbi + auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadOnly); + auto db2 = transaction2.openDatabase("testTransactionVisibility", nullptr, false); + QCOMPARE(readValue(db2, "key1"), QByteArray("foo")); + + //This transaction should have the dbi available + auto transaction3 = store.createTransaction(Sink::Storage::DataStore::ReadOnly); + auto db3 = transaction3.openDatabase("testTransactionVisibility", nullptr, false); + QCOMPARE(readValue(db3, "key1"), QByteArray("foo")); + } + + Sink::Storage::DataStore::clearEnv(); + //Try to read-write dynamic opening of the db. + //This is the case if we don't have all databases available upon initializatoin and we don't (e.g. because the db hasn't been created yet) + { + // Trick the db into not loading all dbs by passing in a bogus layout. + Sink::Storage::DataStore store(testDataPath, {dbName, {{"bogus", 0}}}, Sink::Storage::DataStore::ReadWrite); + + //This transaction should open the dbi + auto transaction2 = store.createTransaction(Sink::Storage::DataStore::ReadWrite); + auto db2 = transaction2.openDatabase("testTransactionVisibility", nullptr, false); + QCOMPARE(readValue(db2, "key1"), QByteArray("foo")); + + //This transaction should have the dbi available (creating two write transactions obviously doesn't work) + //NOTE: we don't support this scenario. A write transaction must commit or abort before a read transaction opens the same database. + // auto transaction3 = store.createTransaction(Sink::Storage::DataStore::ReadOnly); + // auto db3 = transaction3.openDatabase("testTransactionVisibility", nullptr, false); + // QCOMPARE(readValue(db3, "key1"), QByteArray("foo")); + + //Ensure we can still open further dbis in the write transaction + auto db4 = transaction2.openDatabase("anotherDb", nullptr, false); + } + + } }; QTEST_MAIN(StorageTest) #include "storagetest.moc"