diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 68157c9e..d4dd6c17 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -1,1085 +1,1078 @@ /* * Copyright (C) 2014 Christian Mollekopf * Copyright (C) 2014 Aaron Seigo * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "storage.h" #include #include #include #include #include #include #include #include #include #include #include "log.h" #ifdef Q_OS_WIN #include typedef SSIZE_T ssize_t; #endif namespace Sink { namespace Storage { -extern QReadWriteLock sDbisLock; -extern QReadWriteLock sEnvironmentsLock; -extern QMutex sCreateDbiLock; -extern QHash sEnvironments; -extern QHash sDbis; - - -QReadWriteLock sDbisLock; -QReadWriteLock sEnvironmentsLock; -QMutex sCreateDbiLock; -QHash sEnvironments; -QHash sDbis; +static QReadWriteLock sDbisLock; +static QReadWriteLock sEnvironmentsLock; +static QMutex sCreateDbiLock; +static QHash sEnvironments; +static QHash sDbis; int getErrorCode(int e) { switch (e) { case MDB_NOTFOUND: return DataStore::ErrorCodes::NotFound; default: break; } return -1; } static QList getDatabaseNames(MDB_txn *transaction) { if (!transaction) { SinkWarning() << "Invalid transaction"; return QList(); } int rc; QList list; MDB_dbi dbi; if ((rc = mdb_dbi_open(transaction, nullptr, 0, &dbi) == 0)) { MDB_val key; MDB_val data; MDB_cursor *cursor; mdb_cursor_open(transaction, dbi, &cursor); if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); } } else { //Normal if we don't have any databases yet if (rc == MDB_NOTFOUND) { rc = 0; } if (rc) { SinkWarning() << "Failed to get a value" << rc; } } mdb_cursor_close(cursor); } else { SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); } return list; } /* * To create a dbi we always need a write transaction, * and we always need to commit the transaction ASAP * We can only ever enter from one point per process. */ static bool createDbi(MDB_txn *transaction, const QByteArray &db, bool readOnly, bool allowDuplicates, MDB_dbi &dbi) { unsigned int flags = 0; if (allowDuplicates) { flags |= MDB_DUPSORT; } MDB_dbi flagtableDbi; if (const int rc = mdb_dbi_open(transaction, "__flagtable", readOnly ? 0 : MDB_CREATE, &flagtableDbi)) { if (!readOnly) { SinkWarning() << "Failed to to open flagdb: " << QByteArray(mdb_strerror(rc)); } } else { MDB_val key, value; key.mv_data = const_cast(static_cast(db.constData())); key.mv_size = db.size(); if (const auto rc = mdb_get(transaction, flagtableDbi, &key, &value)) { //We expect this to fail for new databases if (rc != MDB_NOTFOUND) { SinkWarning() << "Failed to read flags from flag db: " << QByteArray(mdb_strerror(rc)); } } else { //Found the flags const auto ba = QByteArray::fromRawData((char *)value.mv_data, value.mv_size); flags = ba.toInt(); } } if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { //Create the db if it is not existing already if (rc == MDB_NOTFOUND && !readOnly) { //Sanity check db name { auto parts = db.split('.'); for (const auto &p : parts) { auto containsSpecialCharacter = [] (const QByteArray &p) { for (int i = 0; i < p.size(); i++) { const auto c = p.at(i); //Between 0 and z in the ascii table. Essentially ensures that the name is printable and doesn't contain special chars if (c < 0x30 || c > 0x7A) { return true; } } return false; }; if (p.isEmpty() || containsSpecialCharacter(p)) { SinkError() << "Tried to create a db with an invalid name. Hex:" << db.toHex() << " ASCII:" << db; Q_ASSERT(false); throw std::runtime_error("Fatal error while creating db."); } } } if (const int rc = mdb_dbi_open(transaction, db.constData(), flags | MDB_CREATE, &dbi)) { SinkWarning() << "Failed to create db " << QByteArray(mdb_strerror(rc)); return false; } //Record the db flags MDB_val key, value; key.mv_data = const_cast(static_cast(db.constData())); key.mv_size = db.size(); //Store the flags without the create option const auto ba = QByteArray::number(flags); value.mv_data = const_cast(static_cast(db.constData())); value.mv_size = db.size(); if (const int rc = mdb_put(transaction, flagtableDbi, &key, &value, MDB_NOOVERWRITE)) { //We expect this to fail if we're only creating the dbi but not the db if (rc != MDB_KEYEXIST) { SinkWarning() << "Failed to write flags to flag db: " << QByteArray(mdb_strerror(rc)); } } } else { //It's not an error if we only want to read if (!readOnly) { SinkWarning() << "Failed to open db " << QByteArray(mdb_strerror(rc)); return true; } return false; } } return true; } class DataStore::NamedDatabase::Private { public: Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } ~Private() { } QByteArray db; MDB_txn *transaction; MDB_dbi dbi; bool allowDuplicates; std::function defaultErrorHandler; QString name; bool createdNewDbi = false; QString createdNewDbiName; bool dbiValidForTransaction(MDB_dbi dbi, MDB_txn *transaction) { //sDbis can contain dbi's that are not available to this transaction. //We use mdb_dbi_flags to check if the dbi is valid for this transaction. uint f; if (mdb_dbi_flags(transaction, dbi, &f) == EINVAL) { return false; } return true; } bool openDatabase(bool readOnly, std::function errorHandler) { const auto dbiName = name + db; QReadLocker dbiLocker{&sDbisLock}; if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); Q_ASSERT(dbiValidForTransaction(dbi, transaction)); } else { /* * Dynamic creation of databases. * If all databases were defined via the database layout we wouldn't ever end up in here. * However, we rely on this codepath for indexes, synchronization databases and in race-conditions * where the database is not yet fully created when the client initializes it for reading. * * There are a few things to consider: * * dbi's (DataBase Identifier) should be opened once (ideally), and then be persisted in the environment. * * To open a dbi we need a transaction and must commit the transaction. From then on any open transaction will have access to the dbi. * * Already running transactions will not have access to the dbi. * * There *must* only ever be one active transaction opening dbi's (using mdb_dbi_open), and that transaction *must* * commit or abort before any other transaction opens a dbi. * * We solve this the following way: * * For read-only transactions we abort the transaction, open the dbi and persist it in the environment, and reopen the transaction (so the dbi is available). This may result in the db content changing unexpectedly and referenced memory becoming unavailable, but isn't a problem as long as we don't rely on memory remaining valid for the duration of the transaction (which is anyways not given since any operation would invalidate the memory region).. * * For write transactions we open the dbi for future use, and then open it as well in the current transaction. */ SinkTrace() << "Creating database dynamically: " << dbiName << readOnly; //Only one transaction may ever create dbis at a time. QMutexLocker createDbiLocker(&sCreateDbiLock); //Double checked locking if (sDbis.contains(dbiName)) { dbi = sDbis.value(dbiName); Q_ASSERT(dbiValidForTransaction(dbi, transaction)); return true; } //Create a transaction to open the dbi MDB_txn *dbiTransaction; if (readOnly) { MDB_env *env = mdb_txn_env(transaction); Q_ASSERT(env); mdb_txn_reset(transaction); if (const int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &dbiTransaction)) { SinkError() << "Failed to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; return false; } } else { dbiTransaction = transaction; } if (createDbi(dbiTransaction, db, readOnly, allowDuplicates, dbi)) { if (readOnly) { mdb_txn_commit(dbiTransaction); dbiLocker.unlock(); QWriteLocker dbiWriteLocker(&sDbisLock); sDbis.insert(dbiName, dbi); //We reopen the read-only transaction so the dbi becomes available in it. mdb_txn_renew(transaction); } else { createdNewDbi = true; createdNewDbiName = dbiName; } //Ensure the dbi is valid for the parent transaction Q_ASSERT(dbiValidForTransaction(dbi, transaction)); } else { if (readOnly) { mdb_txn_abort(dbiTransaction); mdb_txn_renew(transaction); } SinkWarning() << "Failed to create the dbi: " << dbiName; dbi = 0; transaction = 0; return false; } } return true; } }; DataStore::NamedDatabase::NamedDatabase() : d(nullptr) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) { *this = std::move(other); } DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) { if (&other != this) { delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::NamedDatabase::~NamedDatabase() { delete d; } bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); if (d) { errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return false; } const void *keyPtr = sKey.data(); const size_t keySize = sKey.size(); const void *valuePtr = sValue.data(); const size_t valueSize = sValue.size(); if (!keyPtr || keySize == 0) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Tried to write empty key."); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return false; } int rc; MDB_val key, data; key.mv_size = keySize; key.mv_data = const_cast(keyPtr); data.mv_size = valueSize; data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "mdb_put: " + QByteArray(mdb_strerror(rc)) + " Key: " + sKey + " Value: " + sValue); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return !rc; } void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } int rc; MDB_val key; key.mv_size = k.size(); key.mv_data = const_cast(static_cast(k.data())); if (value.isEmpty()) { rc = mdb_del(d->transaction, d->dbi, &key, 0); } else { MDB_val data; data.mv_size = value.size(); data.mv_data = const_cast(static_cast(value.data())); rc = mdb_del(d->transaction, d->dbi, &key, &data); } if (rc) { auto errorCode = ErrorCodes::GenericError; if (rc == MDB_NOTFOUND) { errorCode = ErrorCodes::NotFound; } Error error(d->name.toLatin1() + d->db, errorCode, QString("Error on mdb_del: %1 %2").arg(rc).arg(mdb_strerror(rc)).toLatin1()); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } } int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return 0; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { //Invalid arguments can mean that the transaction doesn't contain the db dbi Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc)) + ". Key: " + k); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return 0; } int numberOfRetrievedValues = 0; if (k.isEmpty() || d->allowDuplicates || findSubstringKeys) { MDB_cursor_op op = d->allowDuplicates ? MDB_SET : MDB_FIRST; if (findSubstringKeys) { op = MDB_SET_RANGE; } if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // The first lookup will find a key that is equal or greather than our key if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; } if (!callResultHandler || resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { if (findSubstringKeys) { // Reset the key to what we search for key.mv_data = (void *)k.constData(); key.mv_size = k.size(); } MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { const auto current = QByteArray::fromRawData((char *)key.mv_data, key.mv_size); // Every consequitive lookup simply iterates through the list if (current.startsWith(k)) { const bool callResultHandler = !(skipInternalKeys && isInternalKey(current)); if (callResultHandler) { numberOfRetrievedValues++; if (!resultHandler(current, QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { break; } } } } } } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } } else { if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { numberOfRetrievedValues++; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during scan. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return numberOfRetrievedValues; } void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. return; } if (k.isEmpty()) { Error error(d->name.toLatin1() + d->db, GenericError, QByteArray("Can't use findLatest with empty key.")); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } int rc; MDB_val key; MDB_val data; MDB_cursor *cursor; key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); if (rc) { Error error(d->name.toLatin1() + d->db, getErrorCode(rc), QByteArray("Error during mdb_cursor_open: ") + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); return; } bool foundValue = false; MDB_cursor_op op = MDB_SET_RANGE; if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { // The first lookup will find a key that is equal or greather than our key if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { //Read next value until we no longer match while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { MDB_cursor_op nextOp = MDB_NEXT; rc = mdb_cursor_get(cursor, &key, &data, nextOp); if (rc) { break; } } //Now read the previous value, and that's the latest one MDB_cursor_op prefOp = MDB_PREV; // We read past the end above, just take the last value if (rc == MDB_NOTFOUND) { prefOp = MDB_LAST; } rc = mdb_cursor_get(cursor, &key, &data, prefOp); foundValue = true; resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } mdb_cursor_close(cursor); if (rc) { Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during find latest. Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } else if (!foundValue) { Error error(d->name.toLatin1(), 1, QByteArray("Error during find latest. Key: ") + k + " : No value found"); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); } return; } qint64 DataStore::NamedDatabase::getSize() { if (!d || !d->transaction) { return -1; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); } return stat.ms_psize * (stat.ms_leaf_pages + stat.ms_branch_pages + stat.ms_overflow_pages); } DataStore::NamedDatabase::Stat DataStore::NamedDatabase::stat() { if (!d || !d->transaction) { return {}; } int rc; MDB_stat stat; rc = mdb_stat(d->transaction, d->dbi, &stat); if (rc) { SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); return {}; } return {stat.ms_branch_pages, stat.ms_leaf_pages, stat.ms_overflow_pages, stat.ms_entries}; // std::cout << "page size: " << stat.ms_psize << std::endl; // std::cout << "leaf_pages: " << stat.ms_leaf_pages << std::endl; // std::cout << "branch_pages: " << stat.ms_branch_pages << std::endl; // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; // std::cout << "depth: " << stat.ms_depth << std::endl; // std::cout << "entries: " << stat.ms_entries << std::endl; } bool DataStore::NamedDatabase::allowsDuplicates() const { unsigned int flags; mdb_dbi_flags(d->transaction, d->dbi, &flags); return flags & MDB_DUPSORT; } class DataStore::Transaction::Private { public: Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false) { } ~Private() { } MDB_env *env; MDB_txn *transaction; bool requestedRead; std::function defaultErrorHandler; QString name; bool implicitCommit; bool error; QMap createdDbs; void startTransaction() { Q_ASSERT(!transaction); Q_ASSERT(sEnvironments.values().contains(env)); Q_ASSERT(env); // auto f = [](const char *msg, void *ctx) -> int { // qDebug() << msg; // return 0; // }; // mdb_reader_list(env, f, nullptr); // Trace_area("storage." + name.toLatin1()) << "Opening transaction " << requestedRead; const int rc = mdb_txn_begin(env, NULL, requestedRead ? MDB_RDONLY : 0, &transaction); // Trace_area("storage." + name.toLatin1()) << "Started transaction " << mdb_txn_id(transaction) << transaction; if (rc) { unsigned int flags; mdb_env_get_flags(env, &flags); if (flags & MDB_RDONLY && !requestedRead) { SinkError() << "Tried to open a write transation in a read-only enironment"; } defaultErrorHandler(Error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening transaction: " + QByteArray(mdb_strerror(rc)))); } } }; DataStore::Transaction::Transaction() : d(nullptr) { } DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) { *this = std::move(other); } DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) { if (&other != this) { abort(); delete d; d = other.d; other.d = nullptr; } return *this; } DataStore::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { commit(); } else { // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; abort(); } } delete d; } DataStore::Transaction::operator bool() const { return (d && d->transaction); } bool DataStore::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; } // Trace_area("storage." + d->name.toLatin1()) << "Committing transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); const int rc = mdb_txn_commit(d->transaction); if (rc) { abort(); Error error(d->name.toLatin1(), ErrorCodes::TransactionError, "Error during transaction commit: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); //If transactions start failing we're in an unrecoverable situation (i.e. out of diskspace). So throw an exception that will terminate the application. throw std::runtime_error("Fatal error while committing transaction."); } //Add the created dbis to the shared environment if (!d->createdDbs.isEmpty()) { sDbisLock.lockForWrite(); for (auto it = d->createdDbs.constBegin(); it != d->createdDbs.constEnd(); it++) { //This means we opened the dbi again in a read-only transaction while the write transaction was ongoing. Q_ASSERT(!sDbis.contains(it.key())); if (!sDbis.contains(it.key())) { sDbis.insert(it.key(), it.value()); } } d->createdDbs.clear(); sDbisLock.unlock(); } d->transaction = nullptr; return !rc; } void DataStore::Transaction::abort() { if (!d || !d->transaction) { return; } // Trace_area("storage." + d->name.toLatin1()) << "Aborting transaction" << mdb_txn_id(d->transaction) << d->transaction; Q_ASSERT(sEnvironments.values().contains(d->env)); mdb_txn_abort(d->transaction); d->createdDbs.clear(); d->transaction = nullptr; } //Ensure that we opened the correct database by comparing the expected identifier with the one //we write to the database on first open. static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly) { bool openedTheWrongDatabase = false; auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { if (value != db) { SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db; openedTheWrongDatabase = true; } return false; }, [&](const DataStore::Error &) { }, false); //This is the first time we open this database in a write transaction, write the db name if (!count) { if (!readOnly) { database.write("__internal_dbname", db); } } return !openedTheWrongDatabase; } DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const { if (!d) { SinkError() << "Tried to open database on invalid transaction: " << db; return DataStore::NamedDatabase(); } Q_ASSERT(d->transaction); // We don't now if anything changed d->implicitCommit = true; auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); auto ret = p->openDatabase(d->requestedRead, errorHandler); if (!ret) { delete p; return DataStore::NamedDatabase(); } if (p->createdNewDbi) { d->createdDbs.insert(p->createdNewDbiName, p->dbi); } auto database = DataStore::NamedDatabase(p); if (!ensureCorrectDb(database, db, d->requestedRead)) { SinkWarning() << "Failed to open the database correctly" << db; Q_ASSERT(false); return DataStore::NamedDatabase(); } return database; } QList DataStore::Transaction::getDatabaseNames() const { if (!d) { SinkWarning() << "Invalid transaction"; return QList(); } return Sink::Storage::getDatabaseNames(d->transaction); } DataStore::Transaction::Stat DataStore::Transaction::stat(bool printDetails) { const int freeDbi = 0; const int mainDbi = 1; MDB_envinfo mei; mdb_env_info(d->env, &mei); MDB_stat mst; mdb_stat(d->transaction, freeDbi, &mst); auto freeStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; mdb_stat(d->transaction, mainDbi, &mst); auto mainStat = NamedDatabase::Stat{mst.ms_branch_pages, mst.ms_leaf_pages, mst.ms_overflow_pages, mst.ms_entries}; MDB_cursor *cursor; MDB_val key, data; size_t freePages = 0, *iptr; int rc = mdb_cursor_open(d->transaction, freeDbi, &cursor); if (rc) { fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); return {}; } while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { iptr = static_cast(data.mv_data); freePages += *iptr; bool bad = false; size_t pg, prev; ssize_t i, j, span = 0; j = *iptr++; for (i = j, prev = 1; --i >= 0; ) { pg = iptr[i]; if (pg <= prev) { bad = true; } prev = pg; pg += span; for (; i >= span && iptr[i-span] == pg; span++, pg++) ; } if (printDetails) { std::cout << " Transaction " << *(size_t *)key.mv_data << ", "<< j << " pages, maxspan " << span << (bad ? " [bad sequence]" : "") << std::endl; for (--j; j >= 0; ) { pg = iptr[j]; for (span=1; --j >= 0 && iptr[j] == pg+span; span++); if (span > 1) { std::cout << " " << pg << "[" << span << "]\n"; } else { std::cout << " " << pg << std::endl; } } } } mdb_cursor_close(cursor); return {mei.me_last_pgno + 1, freePages, mst.ms_psize, mainStat, freeStat}; } class DataStore::Private { public: Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout = {}); ~Private(); QString storageRoot; QString name; MDB_env *env = nullptr; AccessMode mode; Sink::Log::Context logCtx; void initEnvironment(const QString &fullPath, const DbLayout &layout) { // Ensure the environment is only created once, and that we only have one environment per process QReadLocker locker(&sEnvironmentsLock); if (!(env = sEnvironments.value(fullPath))) { locker.unlock(); QWriteLocker envLocker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); if (!(env = sEnvironments.value(fullPath))) { int rc = 0; if ((rc = mdb_env_create(&env))) { SinkWarningCtx(logCtx) << "mdb_env_create: " << rc << " " << mdb_strerror(rc); qCritical() << "mdb_env_create: " << rc << " " << mdb_strerror(rc); env = nullptr; } else { //Limit large enough to accomodate all our named dbs. This only starts to matter if the number gets large, otherwise it's just a bunch of extra entries in the main table. mdb_env_set_maxdbs(env, 50); if (RUNNING_ON_VALGRIND) { // In order to run valgrind this size must be smaller than half your available RAM // https://github.com/BVLC/caffe/issues/2404 mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)1000); // 1MB * 1000 } else { //This is the maximum size of the db (but will not be used directly), so we make it large enough that we hopefully never run into the limit. mdb_env_set_mapsize(env, (size_t)10485760 * (size_t)100000); // 1MB * 1000 } const bool readOnly = (mode == ReadOnly); unsigned int flags = MDB_NOTLS; if (readOnly) { flags |= MDB_RDONLY; } if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { if (readOnly) { SinkLogCtx(logCtx) << "Tried to open non-existing db: " << fullPath; } else { SinkWarningCtx(logCtx) << "mdb_env_open: " << rc << ":" << mdb_strerror(rc); } mdb_env_close(env); env = 0; } else { Q_ASSERT(env); sEnvironments.insert(fullPath, env); //Open all available dbi's MDB_txn *transaction; if (const int rc = mdb_txn_begin(env, nullptr, readOnly ? MDB_RDONLY : 0, &transaction)) { SinkWarning() << "Failed to to open transaction: " << QByteArray(mdb_strerror(rc)) << readOnly << transaction; return; } if (!layout.tables.isEmpty()) { //TODO upgrade db if the layout has changed: //* read existing layout //* if layout is not the same create new layout //Create dbis from the given layout. for (auto it = layout.tables.constBegin(); it != layout.tables.constEnd(); it++) { const bool allowDuplicates = it.value(); MDB_dbi dbi = 0; const auto db = it.key(); const auto dbiName = name + db; if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { sDbis.insert(dbiName, dbi); } } } else { //Open all available databases for (const auto &db : getDatabaseNames(transaction)) { MDB_dbi dbi = 0; const auto dbiName = name + db; //We're going to load the flags anyways. bool allowDuplicates = false; if (createDbi(transaction, db, readOnly, allowDuplicates, dbi)) { sDbis.insert(dbiName, dbi); } } } //To persist the dbis (this is also necessary for read-only transactions) mdb_txn_commit(transaction); } } } } } }; DataStore::Private::Private(const QString &s, const QString &n, AccessMode m, const DbLayout &layout) : storageRoot(s), name(n), env(0), mode(m), logCtx(n.toLatin1()) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); if (!dirInfo.exists() && mode == ReadWrite) { QDir().mkpath(fullPath); dirInfo.refresh(); } if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { qCritical() << fullPath << "does not have write permissions. Aborting"; } else if (dirInfo.exists()) { initEnvironment(fullPath, layout); } } DataStore::Private::~Private() { //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) //and create storage instance from all over the place. Thus, we're not closing it here on purpose. } DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } DataStore::DataStore(const QString &storageRoot, const DbLayout &dbLayout, AccessMode mode) : d(new Private(storageRoot, dbLayout.name, mode, dbLayout)) { } DataStore::~DataStore() { delete d; } bool DataStore::exists(const QString &storageRoot, const QString &name) { return QFileInfo(storageRoot + '/' + name + "/data.mdb").exists(); } bool DataStore::exists() const { return (d->env != 0) && DataStore::exists(d->storageRoot, d->name); } DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function &errorHandlerArg) { auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); if (!d->env) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Missing database environment")); return Transaction(); } bool requestedRead = type == ReadOnly; if (d->mode == ReadOnly && !requestedRead) { errorHandler(Error(d->name.toLatin1(), ErrorCodes::GenericError, "Failed to create transaction: Requested read/write transaction in read-only mode.")); return Transaction(); } QReadLocker locker(&sEnvironmentsLock); if (!sEnvironments.values().contains(d->env)) { return {}; } return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); } qint64 DataStore::diskUsage() const { QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); if (!info.exists()) { SinkWarning() << "Tried to get filesize for non-existant file: " << info.path(); } return info.size(); } void DataStore::removeFromDisk() const { const QString fullPath(d->storageRoot + '/' + d->name); QWriteLocker dbiLocker(&sDbisLock); QWriteLocker envLocker(&sEnvironmentsLock); SinkTrace() << "Removing database from disk: " << fullPath; auto env = sEnvironments.take(fullPath); for (const auto &key : sDbis.keys()) { if (key.startsWith(d->name)) { sDbis.remove(key); } } mdb_env_close(env); QDir dir(fullPath); if (!dir.removeRecursively()) { Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); defaultErrorHandler()(error); } } void DataStore::clearEnv() { SinkTrace() << "Clearing environment"; QWriteLocker locker(&sEnvironmentsLock); QWriteLocker dbiLocker(&sDbisLock); for (const auto &envName : sEnvironments.keys()) { auto env = sEnvironments.value(envName); mdb_env_sync(env, true); for (const auto &k : sDbis.keys()) { if (k.startsWith(envName)) { auto dbi = sDbis.value(k); mdb_dbi_close(env, dbi); } } mdb_env_close(env); } sDbis.clear(); sEnvironments.clear(); } } } // namespace Sink