diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index db70fb41..b84f2892 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -1,1008 +1,1019 @@ /* * Copyright (C) 2014 Christian Mollekopf * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "storage.h" #include #include #include #include +#include +#include #include #include #include #include #include "log.h" #ifdef Q_OS_WIN #include typedef SSIZE_T ssize_t; #endif namespace Sink { namespace Storage { extern QReadWriteLock sDbisLock; extern QReadWriteLock sEnvironmentsLock; extern QHash sEnvironments; extern QHash sDbis; QReadWriteLock sDbisLock; QReadWriteLock sEnvironmentsLock; QHash sEnvironments; QHash sDbis; int getErrorCode(int e) { switch (e) { case MDB_NOTFOUND: return DataStore::ErrorCodes::NotFound; default: break; } return -1; } static QList getDatabaseNames(MDB_txn *transaction) { if (!transaction) { SinkWarning() << "Invalid transaction"; return QList(); } int rc; QList list; MDB_dbi dbi; if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) { MDB_val key; MDB_val data; MDB_cursor *cursor; mdb_cursor_open(transaction, dbi, &cursor); if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); } } else { //Normal if we don't have any databases yet if (rc == MDB_NOTFOUND) { rc = 0; } if (rc) { SinkWarning() << "Failed to get a value" << rc; } } mdb_cursor_close(cursor); } else { SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); } return list; } +/* + * To create a dbi we always need a write transaction, + * and we always need to commit the transaction ASAP + * We can only ever enter from one point per process. + */ + +QMutex sCreateDbiLock; + +static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, bool allowDuplicates, MDB_dbi &dbi) +{ + Q_ASSERT(transaction); + QMutexLocker locker{&sCreateDbiLock}; + + unsigned int flags = 0; + if (allowDuplicates) { + flags |= MDB_DUPSORT; + } + + MDB_dbi flagtableDbi; + if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { + if (!readOnly) { + SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); + } + } else { + MDB_val key, value; + key.mv_data = const_cast(static_cast(db.constData())); + key.mv_size = db.size(); + if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { + //We expect this to fail for new databases + if (rc != MDB_NOTFOUND) { + SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); + } + } else { + //Found the flags + const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); + flags = ba.toInt(); + } + } + + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { + //Create the db if it is not existing already + if (rc == MDB_NOTFOUND && !readOnly) { + //Sanity check db name + { + auto parts = db.split('.'); + for (const auto &p : parts) { + auto containsSpecialCharacter = [] (const QByteArray &p) { + for (int i = 0; i < p.size(); i++) { + const auto c = p.at(i); + //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars + if (c < 0x30 || c > 0x7A) { + return true; + } + } + return false; + }; + if (p.isEmpty() || containsSpecialCharacter(p)) { + SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; + Q_ASSERT(false); + throw std::runtime_error("Fatal error while creating db."); + } + } + } + if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { + SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); + // Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while creating database: " + QByteArray(mdb_strerror(rc))); + // errorHandler ? errorHandler(error) : defaultErrorHandler(error); + return false; + } + //Record the db flags + MDB_val key, value; + key.mv_data = const_cast(static_cast(db.constData())); + key.mv_size = db.size(); + //Store the flags without the create option + const auto ba = QByteArray::number(flags); + value.mv_data = const_cast(static_cast(db.constData())); + value.mv_size = db.size(); + if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { + //We expect this to fail if we're only creating the dbi but not the db + if (rc != MDB_KEYEXIST) { + SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); + } + } + } else { + dbi = 0; + transaction = 0; + //It's not an error if we only want to read + if (!readOnly) { + SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); + // Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); + // errorHandler ? errorHandler(error) : defaultErrorHandler(error); + } + return false; + } + } + return true; +} class DataStore::NamedDatabase::Private { public: Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } ~Private() { } QByteArray db; MDB_txn *transaction; MDB_dbi dbi; bool allowDuplicates; std::function defaultErrorHandler; QString name; bool createdNewDbi = false; QString createdDbName; bool openDatabase(bool readOnly, std::function errorHandler, bool noLock = false) { - unsigned int flags = 0; - if (allowDuplicates) { - flags |= MDB_DUPSORT; - } - const auto dbiName = name + db; if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); //sDbis can contain dbi's that are not available to this transaction. //We use mdb_dbi_flags to check if the dbi is valid for this transaction. uint f; if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { //In readonly mode we can just ignore this. In read-write we would have tried to concurrently create a db. if (!readOnly) { SinkWarning() << "Tried to create database in second transaction: " << dbiName; } dbi = 0; transaction = 0; return false; } } else { //Only go in here from initEnvironment if (!noLock) { SinkError() << "Tried to create db " << dbiName; Q_ASSERT(false); abort(); } - MDB_dbi flagtableDbi; - if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { - if (!readOnly) { - SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); - } - } else { - MDB_val key, value; - key.mv_data = const_cast(static_cast(db.constData())); - key.mv_size = db.size(); - if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { - //We expect this to fail for new databases - if (rc != MDB_NOTFOUND) { - SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); - } - } else { - //Found the flags - const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); - flags = ba.toInt(); - } - } - - Q_ASSERT(transaction); - if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { - //Create the db if it is not existing already - if (rc == MDB_NOTFOUND && !readOnly) { - //Sanity check db name - { - auto parts = db.split('.'); - for (const auto &p : parts) { - auto containsSpecialCharacter = [] (const QByteArray &p) { - for (int i = 0; i < p.size(); i++) { - const auto c = p.at(i); - //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars - if (c < 0x30 || c > 0x7A) { - return true; - } - } - return false; - }; - if (p.isEmpty() || containsSpecialCharacter(p)) { - SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; - Q_ASSERT(false); - throw std::runtime_error("Fatal error while creating db."); - } - } - } - if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { - SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); - Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while creating database: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : defaultErrorHandler(error); - return false; - } - //Record the db flags - MDB_val key, value; - key.mv_data = const_cast(static_cast(db.constData())); - key.mv_size = db.size(); - //Store the flags without the create option - const auto ba = QByteArray::number(flags); - value.mv_data = const_cast(static_cast(db.constData())); - value.mv_size = db.size(); - if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { - //We expect this to fail if we're only creating the dbi but not the db - if (rc != MDB_KEYEXIST) { - SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); - } - } - } else { - dbi = 0; - transaction = 0; - //It's not an error if we only want to read - if (!readOnly) { - SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); - Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); - errorHandler ? errorHandler(error) : defaultErrorHandler(error); - } - return false; - } + if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { + createdNewDbi = true; + createdDbName = dbiName; } - - createdNewDbi = true; - createdDbName = dbiName; } return true; } }; DataStore::NamedDatabase::NamedDatabase() : d(nullptr) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) { *this = std::move(other); } DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) { if (&other != this) { delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::NamedDatabase::~NamedDatabase() { delete d; } bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); if (d) { errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return false; } const void *keyPtr = sKey.data(); const size_t keySize = sKey.size(); const void *valuePtr = sValue.data(); const size_t valueSize = sValue.size(); if (!keyPtr || keySize == 0) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key."); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } int rc; MDB_val key, data; key.mv_size = keySize; key.mv_data = const_cast(keyPtr); data.mv_size = valueSize; data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return !rc; } void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } int rc; MDB_val key; key.mv_size = k.size(); key.mv_data = const_cast(static_cast(k.data())); if (value.isEmpty()) { rc = mdb_del(d->transaction, d->dbi, &key, 0); } else { MDB_val data; data.mv_size = value.size(); data.mv_data = const_cast(static_cast(value.data())); rc = mdb_del(d->transaction, d->dbi, &key, &data); } if (rc) { auto errorCode = ErrorCodes::GenericError; if (rc == MDB_NOTFOUND) { errorCode = ErrorCodes::NotFound; } Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } } int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return 0; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { //Invalid arguments can mean that the transaction doesn't contain the db dbi Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates || findSubstringKeys) { MDB_cursor_op op = d->allowDuplicates ? MDB_SET : MDB_FIRST; if (findSubstringKeys) { op = MDB_SET_RANGE; } if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // The first lookup will find a key that is equal or greather than our key if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; } if (!callResultHandler || resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { if (findSubstringKeys) { // Reset the key to what we search for key.mv_data = (void *)k.constData(); key.mv_size = k.size(); } MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // Every consequitive lookup simply iterates through the list if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { break; } } } } } } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } } else { if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { numberOfRetrievedValues++; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return numberOfRetrievedValues; } void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return; } if (k.isEmpty()) { Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } bool foundValue = false; MDB_cursor_op op = MDB_SET_RANGE; if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { // The first lookup will find a key that is equal or greather than our key if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { //Read next value until we no longer match while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { MDB_cursor_op nextOp = MDB_NEXT; rc = mdb_cursor_get(cursor, &key, &data, nextOp); if (rc) { break; } } //Now read the previous value, and that's the latest one MDB_cursor_op prefOp = MDB_PREV; // We read past the end above, just take the last value if (rc == MDB_NOTFOUND) { prefOp = MDB_LAST; } rc = mdb_cursor_get(cursor, &key, &data, prefOp); foundValue = true; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } else if (!foundValue) { Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } qint64 DataStore::NamedDatabase::getSize() { if (!d || !d->transaction) { return -1; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); } return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages); } DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat() { if (!d || !d->transaction) { return {}; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); return {}; } return {stat.ms_branch_pages, stat.ms_leaf_pages, stat.ms_overflow_pages, stat.ms_entries}; // std::cout << "page size: " << stat.ms_psize << std::endl; // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl; // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl; // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; // std::cout << "depth: " << stat.ms_depth << std::endl; // std::cout << "entries: " << stat.ms_entries << std::endl; } bool DataStore::NamedDatabase::allowsDuplicates() const { unsigned int flags; mdb_dbi_flags(d->transaction, d->dbi, &flags); return flags & MDB_DUPSORT; } class DataStore::Transaction::Private { public: Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env, bool _noLock = false) : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0), noLock(_noLock) { } ~Private() { } MDB_env *env; MDB_txn *transaction; bool requestedRead; std::function defaultErrorHandler; QString name; bool implicitCommit; bool error; int modificationCounter; bool noLock; QMap createdDbs; void startTransaction() { Q_ASSERT(!transaction); Q_ASSERT(sEnvironments.values().contains(env)); Q_ASSERT(env); // auto f = [](const char *msg, void *ctx) -> int { // qDebug() << msg; // return 0; // }; // mdb_reader_list(env, f, nullptr); // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; if (rc) { unsigned int flags; mdb_env_get_flags(env, &flags); if (flags & MDB_RDONLY && !requestedRead) { SinkError() << "Tried to open a write transation in a read-only enironment"; } defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); } } }; DataStore::Transaction::Transaction() : d(nullptr) { } DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) { *this = std::move(other); } DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) { if (&other != this) { abort(); delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { commit(); } else { // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; abort(); } } delete d; } DataStore::Transaction::operator bool() const { return (d && d->transaction); } bool DataStore::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; } // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); const int rc = mdb_txn_commit(d->transaction); if (rc) { abort(); Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application. throw std::runtime_error("Fatal error while committing transaction."); } d->transaction = nullptr; //Add the created dbis to the shared environment if (!d->createdDbs.isEmpty()) { if (!d->noLock) { sDbisLock.lockForWrite(); } for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) { Q_ASSERT(!sDbis.contains(it.key())); sDbis.insert(it.key(), it.value()); } d->createdDbs.clear(); if (!d->noLock) { sDbisLock.unlock(); } } return !rc; } void DataStore::Transaction::abort() { if (!d || !d->transaction) { return; } d->createdDbs.clear(); // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); mdb_txn_abort(d->transaction); d->transaction = nullptr; } //Ensure that we opened the correct database by comparing the expected identifier with the one //we write to the database on first open. static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly) { bool openedTheWrongDatabase = false; auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { if (value != db) { SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db; openedTheWrongDatabase = true; } return false; }, [&](const DataStore::Error &) { }, false); //This is the first time we open this database in a write transaction, write the db name if (!count) { if (!readOnly) { database.write("__internal_dbname", db); } } return !openedTheWrongDatabase; } DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const { if (!d) { SinkError() << "Tried to open database on invalid transaction: " << db; return DataStore::NamedDatabase(); } Q_ASSERT(d->transaction); // We don't now if anything changed d->implicitCommit = true; auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); - if (!d->noLock) { - sDbisLock.lockForRead(); - } - if (!p->openDatabase(d->requestedRead, errorHandler, d->noLock)) { - if (!d->noLock) { - sDbisLock.unlock(); - } + if (!d->noLock) { sDbisLock.lockForRead(); } + auto ret = p->openDatabase(d->requestedRead, errorHandler, d->noLock); + if (!d->noLock) { sDbisLock.unlock(); } + if (!ret) { delete p; return DataStore::NamedDatabase(); } - if (!d->noLock) { - sDbisLock.unlock(); - } if (p->createdNewDbi) { d->createdDbs.insert(p->createdDbName, p->dbi); } auto database = DataStore::NamedDatabase(p); if (!ensureCorrectDb(database, db, d->requestedRead)) { SinkWarning() << "Failed to open the database correctly" << db; Q_ASSERT(false); return DataStore::NamedDatabase(); } return database; } QList DataStore::Transaction::getDatabaseNames() const { if (!d) { SinkWarning() << "Invalid transaction"; return QList(); } return Sink::Storage::getDatabaseNames(d->transaction); } DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails) { const int freeDbi = 0; const int mainDbi = 1; MDB_envinfo mei; mdb_env_info(d->env, &mei); MDB_stat mst; mdb_stat(d->transaction, freeDbi, &mst); auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; mdb_stat(d->transaction, mainDbi, &mst); auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; MDB_cursor *cursor; MDB_val key, data; size_t freePages = 0, *iptr; int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor); if (rc) { fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); return {}; } while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { iptr = static_cast(data.mv_data); freePages += *iptr; bool bad = false; size_t pg, prev; ssize_t i, j, span = 0; j = *iptr++; for (i = j, prev = 1; --i >= 0; ) { pg = iptr[i]; if (pg <= prev) { bad = true; } prev = pg; pg += span; for (; i >= span && iptr[i-span] == pg; span++, pg++) ; } if (printDetails) { std::cout << " Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl; for (--j; j >= 0; ) { pg = iptr[j]; for (span=1; --j >= 0 && iptr[j] == pg+span; span++); if (span > 1) { std::cout << " " << pg << "[" << span << "]\n"; } else { std::cout << " " << pg << std::endl; } } } } mdb_cursor_close(cursor); return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat}; } class DataStore::Private { public: Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {}); ~Private(); QString storageRoot; QString name; MDB_env *env = nullptr; AccessMode mode; Sink::Log::Context logCtx; void initEnvironment(const QString &fullPath, const DbLayout &layout) { // Ensure the environment is only created once, and that we only have one environment per process QReadLocker locker(&sEnvironmentsLock); if (!(env = sEnvironments.value(fullPath))) { locker.unlock(); QWriteLocker envLocker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); if (!(env = sEnvironments.value(fullPath))) { int rc = 0; if ((rc = mdb_env_create(&env))) { SinkWarningCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc); qCritical() << "mdb_env_create: " << rc << " " << mdb_strerror(rc); env = nullptr; } else { //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table. mdb_env_set_maxdbs(env, 50); if (RUNNING_ON_VALGRIND) { // In order to run valgrind this size must be smaller than half your available RAM // https://github.com/BVLC/caffe/issues/2404 mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)1000); // 1MB * 1000 } else { //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit. mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)100000); // 1MB * 1000 } const bool readOnly = (mode == ReadOnly); unsigned int flags = MDB_NOTLS; if (readOnly) { flags |= MDB_RDONLY; } if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { if (readOnly) { SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath; } else { SinkWarningCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc); } mdb_env_close(env); env = 0; } else { Q_ASSERT(env); sEnvironments.insert(fullPath, env); //Open all available dbi's bool noLock = true; auto t = Transaction(new Transaction::Private(readOnly, nullptr, name, env, noLock)); if (!layout.tables.isEmpty()) { //TODO upgrade db if the layout has changed: //* read existing layout //* if layout is not the same create new layout //If the db is read only, abort if the db is not yet existing. //If the db is not read-only but is not existing, ensure we have a layout and create all tables. for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { bool allowDuplicates = it.value(); t.openDatabase(it.key(), {}, allowDuplicates); } } else { for (const auto &db : t.getDatabaseNames()) { //Get dbi to store for future use. t.openDatabase(db); } } //To persist the dbis (this is also necessary for read-only transactions) t.commit(); } } } } } }; DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1()) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); if (!dirInfo.exists() && mode == ReadWrite) { QDir().mkpath(fullPath); dirInfo.refresh(); } if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { qCritical() << fullPath << "does not have write permissions. Aborting"; } else if (dirInfo.exists()) { initEnvironment(fullPath, layout); } } DataStore::Private::~Private() { //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) //and create storage instance from all over the place. Thus, we're not closing it here on purpose. } DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout)) { } DataStore::~DataStore() { delete d; } bool DataStore::exists(const QString &storageRoot, const QString &name) { return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists(); } bool DataStore::exists() const { return (d->env != 0) && DataStore::exists(d->storageRoot, d->name); } DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function &errorHandlerArg) { auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); if (!d->env) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment")); return Transaction(); } bool requestedRead = type == ReadOnly; if (d->mode == ReadOnly && !requestedRead) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode.")); return Transaction(); } QReadLocker locker(&sEnvironmentsLock); if (!sEnvironments.values().contains(d->env)) { return {}; } return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); } qint64 DataStore::diskUsage() const { QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); if (!info.exists()) { SinkWarning() << "Tried to get filesize for non-existant file: " << info.path(); } return info.size(); } void DataStore::removeFromDisk() const { const QString fullPath(d->storageRoot + '/' + d->name); QWriteLocker dbiLocker(&sDbisLock); QWriteLocker envLocker(&sEnvironmentsLock); SinkTrace() << "Removing database from disk: " << fullPath; auto env = sEnvironments.take(fullPath); for (const auto &key : sDbis.keys()) { if (key.startsWith(d->name)) { sDbis.remove(key); } } mdb_env_close(env); QDir dir(fullPath); if (!dir.removeRecursively()) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); defaultErrorHandler()(error); } } void DataStore::clearEnv() { QWriteLocker locker(&sEnvironmentsLock); for (auto env : sEnvironments) { mdb_env_close(env); } sDbis.clear(); sEnvironments.clear(); } } } // namespace Sink