diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index f1b0ed21..12c0ae1e 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -1,698 +1,703 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "datastorequery.h" #include "log.h" #include "applicationdomaintype.h" using namespace Sink; using namespace Sink::Storage; static QByteArray operationName(const Sink::Operation op) { switch(op) { case Sink::Operation_Creation: return "Creation"; case Sink::Operation_Modification: return "Modification"; case Sink::Operation_Removal: return "Removal"; } return ""; } class Source : public FilterBase { public: typedef QSharedPointer Ptr; QVector mIds; QVector::ConstIterator mIt; QVector mIncrementalIds; QVector::ConstIterator mIncrementalIt; Source (const QVector &ids, DataStoreQuery *store) : FilterBase(store), mIds(ids), mIt(mIds.constBegin()) { } virtual ~Source(){} virtual void skip() Q_DECL_OVERRIDE { if (mIt != mIds.constEnd()) { mIt++; } }; void add(const QVector &ids) { mIncrementalIds = ids; mIncrementalIt = mIncrementalIds.constBegin(); } bool next(const std::function &callback) Q_DECL_OVERRIDE { if (!mIncrementalIds.isEmpty()) { if (mIncrementalIt == mIncrementalIds.constEnd()) { return false; } readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); callback({entity, operation}); }); mIncrementalIt++; if (mIncrementalIt == mIncrementalIds.constEnd()) { return false; } return true; } else { if (mIt == mIds.constEnd()) { return false; } readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); callback({entity, operation}); }); mIt++; return mIt != mIds.constEnd(); } } }; class Collector : public FilterBase { public: typedef QSharedPointer Ptr; Collector(FilterBase::Ptr source, DataStoreQuery *store) : FilterBase(source, store) { } virtual ~Collector(){} bool next(const std::function &callback) Q_DECL_OVERRIDE { return mSource->next(callback); } }; class Filter : public FilterBase { public: typedef QSharedPointer Ptr; QHash propertyFilter; Filter(FilterBase::Ptr source, DataStoreQuery *store) : FilterBase(source, store) { } virtual ~Filter(){} virtual bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); //Always accept removals. They can't match the filter since the data is gone. if (result.operation == Sink::Operation_Removal) { SinkTraceCtx(mDatastore->mLogCtx) << "Removal: " << result.entity.identifier() << operationName(result.operation); callback(result); foundValue = true; } else if (matchesFilter(result.entity)) { SinkTraceCtx(mDatastore->mLogCtx) << "Accepted: " << result.entity.identifier() << operationName(result.operation); callback(result); foundValue = true; //TODO if something did not match the filter so far but does now, turn into an add operation. } else { SinkTraceCtx(mDatastore->mLogCtx) << "Rejected: " << result.entity.identifier() << operationName(result.operation); //TODO emit a removal if we had the uid in the result set and this is a modification. //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways callback({result.entity, Sink::Operation_Removal, result.aggregateValues}); } return false; })) {} return foundValue; } bool matchesFilter(const ApplicationDomain::ApplicationDomainType &entity) { for (const auto &filterProperty : propertyFilter.keys()) { const auto property = entity.getProperty(filterProperty); const auto comparator = propertyFilter.value(filterProperty); //We can't deal with a fulltext filter if (comparator.comparator == QueryBase::Comparator::Fulltext) { continue; } if (!comparator.matches(property)) { SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; return false; } } return true; } }; class Reduce : public Filter { public: typedef QSharedPointer Ptr; struct Aggregator { QueryBase::Reduce::Aggregator::Operation operation; QByteArray property; QByteArray resultProperty; Aggregator(QueryBase::Reduce::Aggregator::Operation o, const QByteArray &property_, const QByteArray &resultProperty_) : operation(o), property(property_), resultProperty(resultProperty_) { } void process(const QVariant &value) { if (operation == QueryBase::Reduce::Aggregator::Collect) { mResult = mResult.toList() << value; } else if (operation == QueryBase::Reduce::Aggregator::Count) { mResult = mResult.toInt() + 1; } else { Q_ASSERT(false); } } void reset() { mResult.clear(); } QVariant result() const { return mResult; } private: QVariant mResult; }; QSet mReducedValues; QSet mIncrementallyReducedValues; QHash mSelectedValues; QByteArray mReductionProperty; QByteArray mSelectionProperty; QueryBase::Reduce::Selector::Comparator mSelectionComparator; QList mAggregators; Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, QueryBase::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store) : Filter(source, store), mReductionProperty(reductionProperty), mSelectionProperty(selectionProperty), mSelectionComparator(comparator) { } virtual ~Reduce(){} void updateComplete() Q_DECL_OVERRIDE { + SinkTraceCtx(mDatastore->mLogCtx) << "Reduction update is complete."; mIncrementallyReducedValues.clear(); } static QByteArray getByteArray(const QVariant &value) { if (value.type() == QVariant::DateTime) { return value.toDateTime().toString().toLatin1(); } if (value.isValid() && !value.toByteArray().isEmpty()) { return value.toByteArray(); } return QByteArray(); } static bool compare(const QVariant &left, const QVariant &right, QueryBase::Reduce::Selector::Comparator comparator) { if (comparator == QueryBase::Reduce::Selector::Max) { return left > right; } return false; } struct ReductionResult { QByteArray selection; QVector aggregateIds; QMap aggregateValues; }; ReductionResult reduceOnValue(const QVariant &reductionValue) { QMap aggregateValues; QVariant selectionResultValue; QByteArray selectionResult; const auto results = indexLookup(mReductionProperty, reductionValue); for (auto &aggregator : mAggregators) { aggregator.reset(); } QVector reducedAndFilteredResults; for (const auto &r : results) { readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { //We need to apply all property filters that we have until the reduction, because the index lookup was unfiltered. if (!matchesFilter(entity)) { return; } reducedAndFilteredResults << r; Q_ASSERT(operation != Sink::Operation_Removal); for (auto &aggregator : mAggregators) { if (!aggregator.property.isEmpty()) { aggregator.process(entity.getProperty(aggregator.property)); } else { aggregator.process(QVariant{}); } } auto selectionValue = entity.getProperty(mSelectionProperty); if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { selectionResultValue = selectionValue; selectionResult = entity.identifier(); } }); } for (auto &aggregator : mAggregators) { aggregateValues.insert(aggregator.resultProperty, aggregator.result()); } return {selectionResult, reducedAndFilteredResults, aggregateValues}; } bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { const auto reductionValue = [&] { const auto v = result.entity.getProperty(mReductionProperty); //Because we also get Operation_Removal for filtered entities. We use the fact that actually removed entites //won't have the property to reduce on. //TODO: Perhaps find a cleaner solutoin than abusing Operation::Removed for filtered properties. if (v.isNull() && result.operation == Sink::Operation_Removal) { //For removals we have to read the last revision to get a value, and thus be able to find the correct thread. QVariant reductionValue; readPrevious(result.entity.identifier(), [&] (const ApplicationDomain::ApplicationDomainType &prev) { reductionValue = prev.getProperty(mReductionProperty); }); return reductionValue; } else { return v; } }(); if (reductionValue.isNull()) { + SinkTraceCtx(mDatastore->mLogCtx) << "No reduction value: " << result.entity.identifier(); //We failed to find a value to reduce on, so ignore this entity. //Can happen if the entity was already removed and we have no previous revision. return; } const auto reductionValueBa = getByteArray(reductionValue); if (!mReducedValues.contains(reductionValueBa)) { //Only reduce every value once. mReducedValues.insert(reductionValueBa); + SinkTraceCtx(mDatastore->mLogCtx) << "Reducing new value: " << result.entity.identifier() << reductionValueBa; auto reductionResult = reduceOnValue(reductionValue); //This can happen if we get a removal message from a filtered entity and all entites of the reduction are filtered. if (reductionResult.selection.isEmpty()) { return; } mSelectedValues.insert(reductionValueBa, reductionResult.selection); readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds}); foundValue = true; }); } else { //During initial query, do nothing. The lookup above will take care of it. //During updates adjust the reduction according to the modification/addition or removal //We have to redo the reduction for every element, because of the aggregation values. if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { + SinkTraceCtx(mDatastore->mLogCtx) << "Incremental reduction update: " << result.entity.identifier() << reductionValueBa; mIncrementallyReducedValues.insert(reductionValueBa); //Redo the reduction to find new aggregated values auto selectionResult = reduceOnValue(reductionValue); auto oldSelectionResult = mSelectedValues.take(reductionValueBa); + SinkTraceCtx(mDatastore->mLogCtx) << "Old selection result: " << oldSelectionResult << " New selection result: " << selectionResult.selection; //If mSelectedValues did not containthe value, oldSelectionResult will be empty.(Happens if entites have been filtered) if (oldSelectionResult.isEmpty()) { return; } if (oldSelectionResult == selectionResult.selection) { mSelectedValues.insert(reductionValueBa, selectionResult.selection); Q_ASSERT(!selectionResult.selection.isEmpty()); readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds}); }); } else { //remove old result Q_ASSERT(!oldSelectionResult.isEmpty()); readEntity(oldSelectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Removal}); }); //If the last item has been removed, then there's nothing to add if (!selectionResult.selection.isEmpty()) { //add new result mSelectedValues.insert(reductionValueBa, selectionResult.selection); Q_ASSERT(!selectionResult.selection.isEmpty()); readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds}); }); } } } } })) {} return foundValue; } }; class Bloom : public Filter { public: typedef QSharedPointer Ptr; QByteArray mBloomProperty; Bloom(const QByteArray &bloomProperty, FilterBase::Ptr source, DataStoreQuery *store) : Filter(source, store), mBloomProperty(bloomProperty) { } virtual ~Bloom(){} bool next(const std::function &callback) Q_DECL_OVERRIDE { if (!mBloomed) { //Initially we bloom on the first value that matches. //From there on we just filter. bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { mBloomValue = result.entity.getProperty(mBloomProperty); auto results = indexLookup(mBloomProperty, mBloomValue); for (const auto &r : results) { readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { callback({entity, Sink::Operation_Creation}); SinkTraceCtx(mDatastore->mLogCtx) << "Bloom result: " << entity.identifier() << operationName(operation); foundValue = true; }); } return false; })) {} mBloomed = true; propertyFilter.insert(mBloomProperty, mBloomValue); return foundValue; } else { //Filter on bloom value return Filter::next(callback); } } QVariant mBloomValue; bool mBloomed = false; }; DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) { //This is what we use during a new query setupQuery(query); } DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental) : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) { //This is what we use when fetching more data, without having a new revision with incremental=false //And this is what we use when the data changed and we want to update with incremental = true mCollector = state.mCollector; mSource = state.mSource; auto source = mCollector; while (source) { source->mDatastore = this; source->mIncremental = incremental; source = source->mSource; } } DataStoreQuery::~DataStoreQuery() { } DataStoreQuery::State::Ptr DataStoreQuery::getState() { auto state = State::Ptr::create(); state->mSource = mSource; state->mCollector = mCollector; return state; } void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) { mStore.readLatest(mType, key, resultCallback); } void DataStoreQuery::readPrevious(const QByteArray &key, const std::function &callback) { mStore.readPrevious(mType, key, mStore.maxRevision(), callback); } QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { return mStore.indexLookup(mType, property, value); } /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ /* { */ /* const bool sortingRequired = !sortProperty.isEmpty(); */ /* if (mInitialQuery && sortingRequired) { */ /* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */ /* // Sort the complete set by reading the sort property and filling into a sorted map */ /* auto sortedMap = QSharedPointer>::create(); */ /* while (resultSet.next()) { */ /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ /* readEntity(resultSet.id(), */ /* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* const auto operation = buffer.operation(); */ /* // We're not interested in removals during the initial query */ /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ /* if (!sortProperty.isEmpty()) { */ /* const auto sortValue = getProperty(buffer.entity(), sortProperty); */ /* if (sortValue.type() == QVariant::DateTime) { */ /* sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), uid); */ /* } else { */ /* sortedMap->insert(sortValue.toString().toLatin1(), uid); */ /* } */ /* } else { */ /* sortedMap->insert(uid, uid); */ /* } */ /* } */ /* }); */ /* } */ /* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */ /* auto iterator = QSharedPointer>::create(*sortedMap); */ /* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */ /* std::function callback) -> bool { */ /* if (iterator->hasNext()) { */ /* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* callback(uid, buffer, Sink::Operation_Creation); */ /* }); */ /* return true; */ /* } */ /* return false; */ /* }; */ /* auto skip = [iterator]() { */ /* if (iterator->hasNext()) { */ /* iterator->next(); */ /* } */ /* }; */ /* return ResultSet(generator, skip); */ /* } else { */ /* auto resultSetPtr = QSharedPointer::create(resultSet); */ /* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */ /* if (resultSetPtr->next()) { */ /* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */ /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ /* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* const auto operation = buffer.operation(); */ /* if (mInitialQuery) { */ /* // We're not interested in removals during the initial query */ /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ /* // In the initial set every entity is new */ /* callback(uid, buffer, Sink::Operation_Creation); */ /* } */ /* } else { */ /* // Always remove removals, they probably don't match due to non-available properties */ /* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */ /* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */ /* callback(uid, buffer, operation); */ /* } */ /* } */ /* }); */ /* return true; */ /* } */ /* return false; */ /* }; */ /* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */ /* return ResultSet(generator, skip); */ /* } */ /* } */ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) { Q_ASSERT(!subquery.type().isEmpty()); auto sub = DataStoreQuery(subquery, subquery.type(), mStore); auto result = sub.execute(); QByteArrayList ids; while (result.next([&ids](const ResultSet::Result &result) { ids << result.entity.identifier(); })) {} return ids; } void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) { auto query = query_; auto baseFilters = query.getBaseFilters(); //Resolve any subqueries we have for (const auto &k : baseFilters.keys()) { const auto comparator = baseFilters.value(k); if (comparator.value.canConvert()) { SinkTraceCtx(mLogCtx) << "Executing subquery for property: " << k; const auto result = executeSubquery(comparator.value.value()); baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); } } query.setBaseFilters(baseFilters); QByteArray appliedSorting; //Determine initial set mSource = [&]() { if (!query.ids().isEmpty()) { //We have a set of ids as a starting point return Source::Ptr::create(query.ids().toVector(), this); } else { QSet appliedFilters; auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); if (!appliedFilters.isEmpty()) { //We have an index lookup as starting point return Source::Ptr::create(resultSet, this); } // We do a full scan if there were no indexes available to create the initial set (this is going to be expensive for large sets). return Source::Ptr::create(mStore.fullScan(mType), this); } }(); FilterBase::Ptr baseSet = mSource; if (!query.getBaseFilters().isEmpty()) { auto filter = Filter::Ptr::create(baseSet, this); //For incremental queries the remaining filters are not sufficient for (const auto &f : query.getBaseFilters().keys()) { filter->propertyFilter.insert(f, query.getFilter(f)); } baseSet = filter; } /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ /* //Apply manual sorting */ /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ /* } */ //Setup the rest of the filter stages on top of the base set for (const auto &stage : query.getFilterStages()) { if (auto filter = stage.dynamicCast()) { auto f = Filter::Ptr::create(baseSet, this); f->propertyFilter = filter->propertyFilter; baseSet = f; } else if (auto filter = stage.dynamicCast()) { auto reduction = Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this); for (const auto &aggregator : filter->aggregators) { reduction->mAggregators << Reduce::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty); } reduction->propertyFilter = query.getBaseFilters(); baseSet = reduction; } else if (auto filter = stage.dynamicCast()) { baseSet = Bloom::Ptr::create(filter->property, baseSet, this); } } mCollector = Collector::Ptr::create(baseSet, this); } QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) { QVector changedKeys; mStore.readRevisions(baseRevision, mType, [&](const QByteArray &key) { changedKeys << key; }); return changedKeys; } ResultSet DataStoreQuery::update(qint64 baseRevision) { SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; auto incrementalResultSet = loadIncrementalResultSet(baseRevision); SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; mSource->add(incrementalResultSet); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { if (mCollector->next([this, callback](const ResultSet::Result &result) { SinkTraceCtx(mLogCtx) << "Got incremental result: " << result.entity.identifier() << operationName(result.operation); callback(result); })) { return true; } return false; }; return ResultSet(generator, [this]() { mCollector->skip(); }); } void DataStoreQuery::updateComplete() { mSource->mIncrementalIds.clear(); auto source = mCollector; while (source) { source->updateComplete(); source = source->mSource; } } ResultSet DataStoreQuery::execute() { SinkTraceCtx(mLogCtx) << "Executing query"; Q_ASSERT(mCollector); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { if (mCollector->next([this, callback](const ResultSet::Result &result) { if (result.operation != Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds}); } })) { return true; } return false; }; return ResultSet(generator, [this]() { mCollector->skip(); }); } diff --git a/common/typeindex.cpp b/common/typeindex.cpp index a111134e..41821cb2 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -1,440 +1,440 @@ /* Copyright (c) 2015 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "typeindex.h" #include "log.h" #include "index.h" #include "fulltextindex.h" #include #include #include using namespace Sink; static QByteArray getByteArray(const QVariant &value) { if (value.type() == QVariant::DateTime) { QByteArray result; QDataStream ds(&result, QIODevice::WriteOnly); ds << value.toDateTime(); return result; } if (value.type() == QVariant::Bool) { return value.toBool() ? "t" : "f"; } if (value.canConvert()) { const auto ba = value.value().value; if (!ba.isEmpty()) { return ba; } } if (value.isValid() && !value.toByteArray().isEmpty()) { return value.toByteArray(); } // LMDB can't handle empty keys, so use something different return "toplevel"; } static QByteArray toSortableByteArrayImpl(const QDateTime &date) { // Sort invalid last if (!date.isValid()) { return QByteArray::number(std::numeric_limits::max()); } static unsigned int uint_num_digits = (unsigned int)std::log10(std::numeric_limits::max()) + 1; return QByteArray::number(std::numeric_limits::max() - date.toTime_t()).rightJustified(uint_num_digits, '0'); } static QByteArray toSortableByteArray(const QVariant &value) { if (!value.isValid()) { // FIXME: we don't know the type, so we don't know what to return // This mean we're fixing every sorted index keys to use unsigned int return QByteArray::number(std::numeric_limits::max()); } switch (value.type()) { case QMetaType::QDateTime: return toSortableByteArrayImpl(value.toDateTime()); default: SinkWarning() << "Not knowing how to convert a" << value.typeName() << "to a sortable key, falling back to default conversion"; return getByteArray(value); } } TypeIndex::TypeIndex(const QByteArray &type, const Sink::Log::Context &ctx) : mLogCtx(ctx), mType(type) { } QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &sortProperty) const { if (sortProperty.isEmpty()) { return mType + ".index." + property; } return mType + ".index." + property + ".sort." + sortProperty; } QByteArray TypeIndex::sortedIndexName(const QByteArray &property) const { return mType + ".index." + property + ".sorted"; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << value.toByteArray(); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << value.toByteArray(); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { //SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << getByteArray(value); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { addProperty(property); } template <> void TypeIndex::addSortedProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { const auto sortableDate = toSortableByteArray(value); if (add) { Index(sortedIndexName(property), transaction).add(sortableDate, identifier); } else { Index(sortedIndexName(property), transaction).remove(sortableDate, identifier); } }; mSortIndexer.insert(property, indexer); mSortedProperties << property; } template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { auto indexer = [=](bool add, const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::DataStore::Transaction &transaction) { const auto date = sortValue.toDateTime(); const auto propertyValue = getByteArray(value); if (add) { Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier); } else { Index(indexName(property, sortProperty), transaction).remove(propertyValue + toSortableByteArray(date), identifier); } }; mGroupedSortIndexer.insert(property + sortProperty, indexer); mGroupedSortedProperties.insert(property, sortProperty); } template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { addPropertyWithSorting(property, sortProperty); } void TypeIndex::updateIndex(bool add, const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { for (const auto &property : mProperties) { const auto value = entity.getProperty(property); auto indexer = mIndexer.value(property); indexer(add, identifier, value, transaction); } for (const auto &property : mSortedProperties) { const auto value = entity.getProperty(property); auto indexer = mSortIndexer.value(property); indexer(add, identifier, value, transaction); } for (auto it = mGroupedSortedProperties.constBegin(); it != mGroupedSortedProperties.constEnd(); it++) { const auto value = entity.getProperty(it.key()); const auto sortValue = entity.getProperty(it.value()); auto indexer = mGroupedSortIndexer.value(it.key() + it.value()); indexer(add, identifier, value, sortValue, transaction); } for (const auto &indexer : mCustomIndexer) { indexer->setup(this, &transaction, resourceInstanceId); if (add) { indexer->add(entity); } else { indexer->remove(entity); } } } void TypeIndex::commitTransaction() { for (const auto &indexer : mCustomIndexer) { indexer->commitTransaction(); } } void TypeIndex::abortTransaction() { for (const auto &indexer : mCustomIndexer) { indexer->abortTransaction(); } } void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { updateIndex(true, identifier, entity, transaction, resourceInstanceId); } void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { updateIndex(false, identifier, entity, transaction, resourceInstanceId); } static QVector indexLookup(Index &index, QueryBase::Comparator filter, std::function valueToKey = getByteArray) { QVector keys; QByteArrayList lookupKeys; if (filter.comparator == Query::Comparator::Equals) { lookupKeys << valueToKey(filter.value); } else if (filter.comparator == Query::Comparator::In) { for(const QVariant &value : filter.value.value()) { lookupKeys << valueToKey(value); } } else { Q_ASSERT(false); } for (const auto &lookupKey : lookupKeys) { index.lookup(lookupKey, [&](const QByteArray &value) { keys << value; }, [lookupKey](const Index::Error &error) { SinkWarning() << "Lookup error in index: " << error.message << lookupKey; }, true); } return keys; } static QVector sortedIndexLookup(Index &index, QueryBase::Comparator filter) { if (filter.comparator == Query::Comparator::In || filter.comparator == Query::Comparator::Contains) { SinkWarning() << "In and Contains comparison not supported on sorted indexes"; } if (filter.comparator != Query::Comparator::Within) { return indexLookup(index, filter, toSortableByteArray); } QVector keys; QByteArray lowerBound, upperBound; auto bounds = filter.value.value(); if (bounds[0].canConvert()) { // Inverse the bounds because dates are stored newest first upperBound = toSortableByteArray(bounds[0].toDateTime()); lowerBound = toSortableByteArray(bounds[1].toDateTime()); } else { lowerBound = bounds[0].toByteArray(); upperBound = bounds[1].toByteArray(); } index.rangeLookup(lowerBound, upperBound, [&](const QByteArray &value) { keys << value; }, [bounds](const Index::Error &error) { SinkWarning() << "Lookup error in index:" << error.message << "with bounds:" << bounds[0] << bounds[1]; }); return keys; } QVector TypeIndex::query(const Sink::QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { const auto baseFilters = query.getBaseFilters(); for (auto it = baseFilters.constBegin(); it != baseFilters.constEnd(); it++) { if (it.value().comparator == QueryBase::Comparator::Fulltext) { FulltextIndex fulltextIndex{resourceInstanceId}; const auto keys = fulltextIndex.lookup(it.value().value.toString()); appliedFilters << it.key(); SinkTraceCtx(mLogCtx) << "Fulltext index lookup found " << keys.size() << " keys."; return keys; } } for (auto it = mGroupedSortedProperties.constBegin(); it != mGroupedSortedProperties.constEnd(); it++) { if (query.hasFilter(it.key()) && query.sortProperty() == it.value()) { Index index(indexName(it.key(), it.value()), transaction); const auto keys = indexLookup(index, query.getFilter(it.key())); appliedFilters << it.key(); appliedSorting = it.value(); SinkTraceCtx(mLogCtx) << "Grouped sorted index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys."; return keys; } } for (const auto &property : mSortedProperties) { if (query.hasFilter(property)) { Index index(sortedIndexName(property), transaction); const auto keys = sortedIndexLookup(index, query.getFilter(property)); appliedFilters << property; SinkTraceCtx(mLogCtx) << "Sorted index lookup on " << property << " found " << keys.size() << " keys."; return keys; } } for (const auto &property : mProperties) { if (query.hasFilter(property)) { Index index(indexName(property), transaction); const auto keys = indexLookup(index, query.getFilter(property)); appliedFilters << property; SinkTraceCtx(mLogCtx) << "Index lookup on " << property << " found " << keys.size() << " keys."; return keys; } } SinkTraceCtx(mLogCtx) << "No matching index"; return {}; } QVector TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { SinkTraceCtx(mLogCtx) << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties; if (mProperties.contains(property)) { QVector keys; Index index(indexName(property), transaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); SinkTraceCtx(mLogCtx) << "Index lookup on " << property << " found " << keys.size() << " keys."; return keys; } else if (mSecondaryProperties.contains(property)) { //Lookups on secondary indexes first lookup the key, and then lookup the results again to resolve to entity id's QVector keys; auto resultProperty = mSecondaryProperties.value(property); QVector secondaryKeys; Index index(indexName(property + resultProperty), transaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { secondaryKeys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); - SinkTraceCtx(mLogCtx) << "Looked up secondary keys: " << secondaryKeys; + SinkTraceCtx(mLogCtx) << "Looked up secondary keys for the following lookup key: " << lookupKey << " => " << secondaryKeys; for (const auto &secondary : secondaryKeys) { keys += lookup(resultProperty, secondary, transaction); } return keys; } else { SinkWarning() << "Tried to lookup " << property << " but couldn't find value"; } return QVector(); } template <> void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::unindex(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).remove(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::unindex(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).remove(getByteArray(leftValue), getByteArray(rightValue)); } template <> QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value) { QVector keys; Index index(indexName(leftName + rightName), *mTransaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << QByteArray{value.constData(), value.size()}; }, [=](const Index::Error &error) { SinkWarning() << "Lookup error in secondary index: " << error.message << value << lookupKey; }); return keys; } template <> QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value) { QVector keys; Index index(indexName(leftName + rightName), *mTransaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << QByteArray{value.constData(), value.size()}; }, [=](const Index::Error &error) { SinkWarning() << "Lookup error in secondary index: " << error.message << value << lookupKey; }); return keys; }