diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 12c0ae1e..263d3eaa 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -1,703 +1,712 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "datastorequery.h" #include "log.h" #include "applicationdomaintype.h" using namespace Sink; using namespace Sink::Storage; static QByteArray operationName(const Sink::Operation op) { switch(op) { case Sink::Operation_Creation: return "Creation"; case Sink::Operation_Modification: return "Modification"; case Sink::Operation_Removal: return "Removal"; } return ""; } class Source : public FilterBase { public: typedef QSharedPointer Ptr; QVector mIds; QVector::ConstIterator mIt; QVector mIncrementalIds; QVector::ConstIterator mIncrementalIt; Source (const QVector &ids, DataStoreQuery *store) : FilterBase(store), mIds(ids), mIt(mIds.constBegin()) { } virtual ~Source(){} virtual void skip() Q_DECL_OVERRIDE { if (mIt != mIds.constEnd()) { mIt++; } }; void add(const QVector &ids) { mIncrementalIds = ids; mIncrementalIt = mIncrementalIds.constBegin(); } bool next(const std::function &callback) Q_DECL_OVERRIDE { if (!mIncrementalIds.isEmpty()) { if (mIncrementalIt == mIncrementalIds.constEnd()) { return false; } readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); callback({entity, operation}); }); mIncrementalIt++; if (mIncrementalIt == mIncrementalIds.constEnd()) { return false; } return true; } else { if (mIt == mIds.constEnd()) { return false; } readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); callback({entity, operation}); }); mIt++; return mIt != mIds.constEnd(); } } }; class Collector : public FilterBase { public: typedef QSharedPointer Ptr; Collector(FilterBase::Ptr source, DataStoreQuery *store) : FilterBase(source, store) { } virtual ~Collector(){} bool next(const std::function &callback) Q_DECL_OVERRIDE { return mSource->next(callback); } }; class Filter : public FilterBase { public: typedef QSharedPointer Ptr; - QHash propertyFilter; + QHash propertyFilter; Filter(FilterBase::Ptr source, DataStoreQuery *store) : FilterBase(source, store) { } virtual ~Filter(){} virtual bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); //Always accept removals. They can't match the filter since the data is gone. if (result.operation == Sink::Operation_Removal) { SinkTraceCtx(mDatastore->mLogCtx) << "Removal: " << result.entity.identifier() << operationName(result.operation); callback(result); foundValue = true; } else if (matchesFilter(result.entity)) { SinkTraceCtx(mDatastore->mLogCtx) << "Accepted: " << result.entity.identifier() << operationName(result.operation); callback(result); foundValue = true; //TODO if something did not match the filter so far but does now, turn into an add operation. } else { SinkTraceCtx(mDatastore->mLogCtx) << "Rejected: " << result.entity.identifier() << operationName(result.operation); //TODO emit a removal if we had the uid in the result set and this is a modification. //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways callback({result.entity, Sink::Operation_Removal, result.aggregateValues}); } return false; })) {} return foundValue; } bool matchesFilter(const ApplicationDomain::ApplicationDomainType &entity) { for (const auto &filterProperty : propertyFilter.keys()) { - const auto property = entity.getProperty(filterProperty); + QVariant property; + if (filterProperty.size() == 1) { + property = entity.getProperty(filterProperty[0]); + } else { + QVariantList propList; + for (const auto &propName : filterProperty) { + propList.push_back(entity.getProperty(propName)); + } + property = propList; + } const auto comparator = propertyFilter.value(filterProperty); //We can't deal with a fulltext filter if (comparator.comparator == QueryBase::Comparator::Fulltext) { continue; } if (!comparator.matches(property)) { SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; return false; } } return true; } }; class Reduce : public Filter { public: typedef QSharedPointer Ptr; struct Aggregator { QueryBase::Reduce::Aggregator::Operation operation; QByteArray property; QByteArray resultProperty; Aggregator(QueryBase::Reduce::Aggregator::Operation o, const QByteArray &property_, const QByteArray &resultProperty_) : operation(o), property(property_), resultProperty(resultProperty_) { } void process(const QVariant &value) { if (operation == QueryBase::Reduce::Aggregator::Collect) { mResult = mResult.toList() << value; } else if (operation == QueryBase::Reduce::Aggregator::Count) { mResult = mResult.toInt() + 1; } else { Q_ASSERT(false); } } void reset() { mResult.clear(); } QVariant result() const { return mResult; } private: QVariant mResult; }; QSet mReducedValues; QSet mIncrementallyReducedValues; QHash mSelectedValues; QByteArray mReductionProperty; QByteArray mSelectionProperty; QueryBase::Reduce::Selector::Comparator mSelectionComparator; QList mAggregators; Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, QueryBase::Reduce::Selector::Comparator comparator, FilterBase::Ptr source, DataStoreQuery *store) : Filter(source, store), mReductionProperty(reductionProperty), mSelectionProperty(selectionProperty), mSelectionComparator(comparator) { } virtual ~Reduce(){} void updateComplete() Q_DECL_OVERRIDE { SinkTraceCtx(mDatastore->mLogCtx) << "Reduction update is complete."; mIncrementallyReducedValues.clear(); } static QByteArray getByteArray(const QVariant &value) { if (value.type() == QVariant::DateTime) { return value.toDateTime().toString().toLatin1(); } if (value.isValid() && !value.toByteArray().isEmpty()) { return value.toByteArray(); } return QByteArray(); } static bool compare(const QVariant &left, const QVariant &right, QueryBase::Reduce::Selector::Comparator comparator) { if (comparator == QueryBase::Reduce::Selector::Max) { return left > right; } return false; } struct ReductionResult { QByteArray selection; QVector aggregateIds; QMap aggregateValues; }; ReductionResult reduceOnValue(const QVariant &reductionValue) { QMap aggregateValues; QVariant selectionResultValue; QByteArray selectionResult; const auto results = indexLookup(mReductionProperty, reductionValue); for (auto &aggregator : mAggregators) { aggregator.reset(); } QVector reducedAndFilteredResults; for (const auto &r : results) { readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { //We need to apply all property filters that we have until the reduction, because the index lookup was unfiltered. if (!matchesFilter(entity)) { return; } reducedAndFilteredResults << r; Q_ASSERT(operation != Sink::Operation_Removal); for (auto &aggregator : mAggregators) { if (!aggregator.property.isEmpty()) { aggregator.process(entity.getProperty(aggregator.property)); } else { aggregator.process(QVariant{}); } } auto selectionValue = entity.getProperty(mSelectionProperty); if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { selectionResultValue = selectionValue; selectionResult = entity.identifier(); } }); } for (auto &aggregator : mAggregators) { aggregateValues.insert(aggregator.resultProperty, aggregator.result()); } return {selectionResult, reducedAndFilteredResults, aggregateValues}; } bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { const auto reductionValue = [&] { const auto v = result.entity.getProperty(mReductionProperty); //Because we also get Operation_Removal for filtered entities. We use the fact that actually removed entites //won't have the property to reduce on. //TODO: Perhaps find a cleaner solutoin than abusing Operation::Removed for filtered properties. if (v.isNull() && result.operation == Sink::Operation_Removal) { //For removals we have to read the last revision to get a value, and thus be able to find the correct thread. QVariant reductionValue; readPrevious(result.entity.identifier(), [&] (const ApplicationDomain::ApplicationDomainType &prev) { reductionValue = prev.getProperty(mReductionProperty); }); return reductionValue; } else { return v; } }(); if (reductionValue.isNull()) { SinkTraceCtx(mDatastore->mLogCtx) << "No reduction value: " << result.entity.identifier(); //We failed to find a value to reduce on, so ignore this entity. //Can happen if the entity was already removed and we have no previous revision. return; } const auto reductionValueBa = getByteArray(reductionValue); if (!mReducedValues.contains(reductionValueBa)) { //Only reduce every value once. mReducedValues.insert(reductionValueBa); SinkTraceCtx(mDatastore->mLogCtx) << "Reducing new value: " << result.entity.identifier() << reductionValueBa; auto reductionResult = reduceOnValue(reductionValue); //This can happen if we get a removal message from a filtered entity and all entites of the reduction are filtered. if (reductionResult.selection.isEmpty()) { return; } mSelectedValues.insert(reductionValueBa, reductionResult.selection); readEntity(reductionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { callback({entity, operation, reductionResult.aggregateValues, reductionResult.aggregateIds}); foundValue = true; }); } else { //During initial query, do nothing. The lookup above will take care of it. //During updates adjust the reduction according to the modification/addition or removal //We have to redo the reduction for every element, because of the aggregation values. if (mIncremental && !mIncrementallyReducedValues.contains(reductionValueBa)) { SinkTraceCtx(mDatastore->mLogCtx) << "Incremental reduction update: " << result.entity.identifier() << reductionValueBa; mIncrementallyReducedValues.insert(reductionValueBa); //Redo the reduction to find new aggregated values auto selectionResult = reduceOnValue(reductionValue); auto oldSelectionResult = mSelectedValues.take(reductionValueBa); SinkTraceCtx(mDatastore->mLogCtx) << "Old selection result: " << oldSelectionResult << " New selection result: " << selectionResult.selection; //If mSelectedValues did not containthe value, oldSelectionResult will be empty.(Happens if entites have been filtered) if (oldSelectionResult.isEmpty()) { return; } if (oldSelectionResult == selectionResult.selection) { mSelectedValues.insert(reductionValueBa, selectionResult.selection); Q_ASSERT(!selectionResult.selection.isEmpty()); readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Modification, selectionResult.aggregateValues, selectionResult.aggregateIds}); }); } else { //remove old result Q_ASSERT(!oldSelectionResult.isEmpty()); readEntity(oldSelectionResult, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Removal}); }); //If the last item has been removed, then there's nothing to add if (!selectionResult.selection.isEmpty()) { //add new result mSelectedValues.insert(reductionValueBa, selectionResult.selection); Q_ASSERT(!selectionResult.selection.isEmpty()); readEntity(selectionResult.selection, [&](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { callback({entity, Sink::Operation_Creation, selectionResult.aggregateValues, selectionResult.aggregateIds}); }); } } } } })) {} return foundValue; } }; class Bloom : public Filter { public: typedef QSharedPointer Ptr; QByteArray mBloomProperty; Bloom(const QByteArray &bloomProperty, FilterBase::Ptr source, DataStoreQuery *store) : Filter(source, store), mBloomProperty(bloomProperty) { } virtual ~Bloom(){} bool next(const std::function &callback) Q_DECL_OVERRIDE { if (!mBloomed) { //Initially we bloom on the first value that matches. //From there on we just filter. bool foundValue = false; while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { mBloomValue = result.entity.getProperty(mBloomProperty); auto results = indexLookup(mBloomProperty, mBloomValue); for (const auto &r : results) { readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { callback({entity, Sink::Operation_Creation}); SinkTraceCtx(mDatastore->mLogCtx) << "Bloom result: " << entity.identifier() << operationName(operation); foundValue = true; }); } return false; })) {} mBloomed = true; - propertyFilter.insert(mBloomProperty, mBloomValue); + propertyFilter.insert({mBloomProperty}, mBloomValue); return foundValue; } else { //Filter on bloom value return Filter::next(callback); } } QVariant mBloomValue; bool mBloomed = false; }; DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) { //This is what we use during a new query setupQuery(query); } DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental) : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) { //This is what we use when fetching more data, without having a new revision with incremental=false //And this is what we use when the data changed and we want to update with incremental = true mCollector = state.mCollector; mSource = state.mSource; auto source = mCollector; while (source) { source->mDatastore = this; source->mIncremental = incremental; source = source->mSource; } } DataStoreQuery::~DataStoreQuery() { } DataStoreQuery::State::Ptr DataStoreQuery::getState() { auto state = State::Ptr::create(); state->mSource = mSource; state->mCollector = mCollector; return state; } void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) { mStore.readLatest(mType, key, resultCallback); } void DataStoreQuery::readPrevious(const QByteArray &key, const std::function &callback) { mStore.readPrevious(mType, key, mStore.maxRevision(), callback); } QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { return mStore.indexLookup(mType, property, value); } /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ /* { */ /* const bool sortingRequired = !sortProperty.isEmpty(); */ /* if (mInitialQuery && sortingRequired) { */ /* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */ /* // Sort the complete set by reading the sort property and filling into a sorted map */ /* auto sortedMap = QSharedPointer>::create(); */ /* while (resultSet.next()) { */ /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ /* readEntity(resultSet.id(), */ /* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* const auto operation = buffer.operation(); */ /* // We're not interested in removals during the initial query */ /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ /* if (!sortProperty.isEmpty()) { */ /* const auto sortValue = getProperty(buffer.entity(), sortProperty); */ /* if (sortValue.type() == QVariant::DateTime) { */ /* sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), uid); */ /* } else { */ /* sortedMap->insert(sortValue.toString().toLatin1(), uid); */ /* } */ /* } else { */ /* sortedMap->insert(uid, uid); */ /* } */ /* } */ /* }); */ /* } */ /* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */ /* auto iterator = QSharedPointer>::create(*sortedMap); */ /* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */ /* std::function callback) -> bool { */ /* if (iterator->hasNext()) { */ /* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* callback(uid, buffer, Sink::Operation_Creation); */ /* }); */ /* return true; */ /* } */ /* return false; */ /* }; */ /* auto skip = [iterator]() { */ /* if (iterator->hasNext()) { */ /* iterator->next(); */ /* } */ /* }; */ /* return ResultSet(generator, skip); */ /* } else { */ /* auto resultSetPtr = QSharedPointer::create(resultSet); */ /* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */ /* if (resultSetPtr->next()) { */ /* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */ /* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ /* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ /* const auto operation = buffer.operation(); */ /* if (mInitialQuery) { */ /* // We're not interested in removals during the initial query */ /* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ /* // In the initial set every entity is new */ /* callback(uid, buffer, Sink::Operation_Creation); */ /* } */ /* } else { */ /* // Always remove removals, they probably don't match due to non-available properties */ /* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */ /* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */ /* callback(uid, buffer, operation); */ /* } */ /* } */ /* }); */ /* return true; */ /* } */ /* return false; */ /* }; */ /* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */ /* return ResultSet(generator, skip); */ /* } */ /* } */ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) { Q_ASSERT(!subquery.type().isEmpty()); auto sub = DataStoreQuery(subquery, subquery.type(), mStore); auto result = sub.execute(); QByteArrayList ids; while (result.next([&ids](const ResultSet::Result &result) { ids << result.entity.identifier(); })) {} return ids; } void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) { auto query = query_; auto baseFilters = query.getBaseFilters(); //Resolve any subqueries we have for (const auto &k : baseFilters.keys()) { const auto comparator = baseFilters.value(k); if (comparator.value.canConvert()) { SinkTraceCtx(mLogCtx) << "Executing subquery for property: " << k; const auto result = executeSubquery(comparator.value.value()); baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); } } query.setBaseFilters(baseFilters); QByteArray appliedSorting; //Determine initial set mSource = [&]() { if (!query.ids().isEmpty()) { //We have a set of ids as a starting point return Source::Ptr::create(query.ids().toVector(), this); } else { - QSet appliedFilters; + QSet appliedFilters; auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); if (!appliedFilters.isEmpty()) { //We have an index lookup as starting point return Source::Ptr::create(resultSet, this); } // We do a full scan if there were no indexes available to create the initial set (this is going to be expensive for large sets). return Source::Ptr::create(mStore.fullScan(mType), this); } }(); FilterBase::Ptr baseSet = mSource; if (!query.getBaseFilters().isEmpty()) { auto filter = Filter::Ptr::create(baseSet, this); //For incremental queries the remaining filters are not sufficient for (const auto &f : query.getBaseFilters().keys()) { filter->propertyFilter.insert(f, query.getFilter(f)); } baseSet = filter; } /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ /* //Apply manual sorting */ /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ /* } */ //Setup the rest of the filter stages on top of the base set for (const auto &stage : query.getFilterStages()) { if (auto filter = stage.dynamicCast()) { auto f = Filter::Ptr::create(baseSet, this); f->propertyFilter = filter->propertyFilter; baseSet = f; } else if (auto filter = stage.dynamicCast()) { auto reduction = Reduce::Ptr::create(filter->property, filter->selector.property, filter->selector.comparator, baseSet, this); for (const auto &aggregator : filter->aggregators) { reduction->mAggregators << Reduce::Aggregator(aggregator.operation, aggregator.propertyToCollect, aggregator.resultProperty); } reduction->propertyFilter = query.getBaseFilters(); baseSet = reduction; } else if (auto filter = stage.dynamicCast()) { baseSet = Bloom::Ptr::create(filter->property, baseSet, this); } } mCollector = Collector::Ptr::create(baseSet, this); } QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) { QVector changedKeys; mStore.readRevisions(baseRevision, mType, [&](const QByteArray &key) { changedKeys << key; }); return changedKeys; } ResultSet DataStoreQuery::update(qint64 baseRevision) { SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; auto incrementalResultSet = loadIncrementalResultSet(baseRevision); SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; mSource->add(incrementalResultSet); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { if (mCollector->next([this, callback](const ResultSet::Result &result) { SinkTraceCtx(mLogCtx) << "Got incremental result: " << result.entity.identifier() << operationName(result.operation); callback(result); })) { return true; } return false; }; return ResultSet(generator, [this]() { mCollector->skip(); }); } void DataStoreQuery::updateComplete() { mSource->mIncrementalIds.clear(); auto source = mCollector; while (source) { source->updateComplete(); source = source->mSource; } } ResultSet DataStoreQuery::execute() { SinkTraceCtx(mLogCtx) << "Executing query"; Q_ASSERT(mCollector); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { if (mCollector->next([this, callback](const ResultSet::Result &result) { if (result.operation != Sink::Operation_Removal) { SinkTraceCtx(mLogCtx) << "Got initial result: " << result.entity.identifier() << result.operation; callback(ResultSet::Result{result.entity, Sink::Operation_Creation, result.aggregateValues, result.aggregateIds}); } })) { return true; } return false; }; return ResultSet(generator, [this]() { mCollector->skip(); }); } diff --git a/common/domain/typeimplementations.cpp b/common/domain/typeimplementations.cpp index a8f4baf1..2b2d2ac8 100644 --- a/common/domain/typeimplementations.cpp +++ b/common/domain/typeimplementations.cpp @@ -1,273 +1,274 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "typeimplementations.h" #include #include #include #include "../propertymapper.h" #include "../typeindex.h" #include "entitybuffer.h" #include "entity_generated.h" #include "mail/threadindexer.h" #include "mail/fulltextindexer.h" #include "domainadaptor.h" #include "typeimplementations_p.h" using namespace Sink; using namespace Sink::ApplicationDomain; #define SINK_REGISTER_SERIALIZER(MAPPER, ENTITYTYPE, PROPERTY, LOWERCASEPROPERTY) \ MAPPER.addMapping(&Sink::ApplicationDomain::Buffer::ENTITYTYPE::LOWERCASEPROPERTY, &Sink::ApplicationDomain::Buffer::ENTITYTYPE##Builder::add_##LOWERCASEPROPERTY); typedef IndexConfig, ValueIndex, ValueIndex, ValueIndex, ValueIndex, SortedIndex, SecondaryIndex, SecondaryIndex, CustomSecondaryIndex, CustomSecondaryIndex > MailIndexConfig; typedef IndexConfig, ValueIndex > FolderIndexConfig; typedef IndexConfig > ContactIndexConfig; typedef IndexConfig > AddressbookIndexConfig; typedef IndexConfig, - SortedIndex + SortedIndex, + SampledPeriodIndex > EventIndexConfig; typedef IndexConfig > TodoIndexConfig; typedef IndexConfig > CalendarIndexConfig; void TypeImplementation::configure(TypeIndex &index) { MailIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Mail::name} + ".main", 0}}, MailIndexConfig::databases()); } void TypeImplementation::configure(IndexPropertyMapper &indexPropertyMapper) { indexPropertyMapper.addIndexLookupProperty([](TypeIndex &index, const ApplicationDomain::BufferAdaptor &entity) { auto messageId = entity.getProperty(Mail::MessageId::name); auto thread = index.secondaryLookup(messageId); if (!thread.isEmpty()) { return thread.first(); } return QByteArray{}; }); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Sender, sender); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, To, to); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Cc, cc); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Bcc, bcc); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Subject, subject); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Date, date); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Unread, unread); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Important, important); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Folder, folder); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, MimeMessage, mimeMessage); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, FullPayloadAvailable, fullPayloadAvailable); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Draft, draft); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Trash, trash); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, Sent, sent); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, MessageId, messageId); SINK_REGISTER_SERIALIZER(propertyMapper, Mail, ParentMessageId, parentMessageId); } void TypeImplementation::configure(TypeIndex &index) { FolderIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Folder::name} + ".main", 0}}, FolderIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Folder, Parent, parent); SINK_REGISTER_SERIALIZER(propertyMapper, Folder, Name, name); SINK_REGISTER_SERIALIZER(propertyMapper, Folder, Icon, icon); SINK_REGISTER_SERIALIZER(propertyMapper, Folder, SpecialPurpose, specialpurpose); SINK_REGISTER_SERIALIZER(propertyMapper, Folder, Enabled, enabled); } void TypeImplementation::configure(IndexPropertyMapper &) { } void TypeImplementation::configure(TypeIndex &index) { ContactIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Contact::name} + ".main", 0}}, ContactIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Uid, uid); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Fn, fn); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Emails, emails); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Vcard, vcard); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Addressbook, addressbook); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Firstname, firstname); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Lastname, lastname); SINK_REGISTER_SERIALIZER(propertyMapper, Contact, Photo, photo); } void TypeImplementation::configure(IndexPropertyMapper &) { } void TypeImplementation::configure(TypeIndex &index) { AddressbookIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Addressbook::name} + ".main", 0}}, AddressbookIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Addressbook, Parent, parent); SINK_REGISTER_SERIALIZER(propertyMapper, Addressbook, Name, name); } void TypeImplementation::configure(IndexPropertyMapper &) { } void TypeImplementation::configure(TypeIndex &index) { EventIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Event::name} + ".main", 0}}, EventIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Event, Summary, summary); SINK_REGISTER_SERIALIZER(propertyMapper, Event, Description, description); SINK_REGISTER_SERIALIZER(propertyMapper, Event, Uid, uid); SINK_REGISTER_SERIALIZER(propertyMapper, Event, StartTime, startTime); SINK_REGISTER_SERIALIZER(propertyMapper, Event, EndTime, endTime); SINK_REGISTER_SERIALIZER(propertyMapper, Event, AllDay, allDay); SINK_REGISTER_SERIALIZER(propertyMapper, Event, Ical, ical); SINK_REGISTER_SERIALIZER(propertyMapper, Event, Calendar, calendar); } void TypeImplementation::configure(IndexPropertyMapper &) { } void TypeImplementation::configure(TypeIndex &index) { TodoIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Todo::name} + ".main", 0}}, TodoIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Uid, uid); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Summary, summary); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Description, description); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, CompletedDate, completedDate); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, DueDate, dueDate); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, StartDate, startDate); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Status, status); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Priority, priority); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Categories, categories); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Ical, ical); SINK_REGISTER_SERIALIZER(propertyMapper, Todo, Calendar, calendar); } void TypeImplementation::configure(IndexPropertyMapper &) { } void TypeImplementation::configure(TypeIndex &index) { CalendarIndexConfig::configure(index); } QMap TypeImplementation::typeDatabases() { return merge(QMap{{QByteArray{Calendar::name} + ".main", 0}}, CalendarIndexConfig::databases()); } void TypeImplementation::configure(PropertyMapper &propertyMapper) { SINK_REGISTER_SERIALIZER(propertyMapper, Calendar, Name, name); } void TypeImplementation::configure(IndexPropertyMapper &) {} diff --git a/common/domain/typeimplementations_p.h b/common/domain/typeimplementations_p.h index fc08048b..51af1139 100644 --- a/common/domain/typeimplementations_p.h +++ b/common/domain/typeimplementations_p.h @@ -1,170 +1,190 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "typeindex.h" #include template void mergeImpl(T &map, First f) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } } template void mergeImpl(T &map, First f, Tail ...maps) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } mergeImpl(map, maps...); } template First merge(First f, Tail ...maps) { First map; mergeImpl(map, f, maps...); return map; } template class ValueIndex { public: static void configure(TypeIndex &index) { index.addProperty(); } template static QMap databases() { return {{QByteArray{EntityType::name} +".index." + Property::name, 1}}; } }; template class SortedIndex { public: static void configure(TypeIndex &index) { index.addPropertyWithSorting(); } template static QMap databases() { return {{QByteArray{EntityType::name} +".index." + Property::name + ".sort." + SortProperty::name, 1}}; } }; template class SortedIndex { public: static void configure(TypeIndex &index) { index.addSortedProperty(); } template static QMap databases() { return {{QByteArray{EntityType::name} +".index." + SortProperty::name + ".sorted", 1}}; } }; template class SecondaryIndex { public: static void configure(TypeIndex &index) { index.addSecondaryProperty(); } template static QMap databases() { return {{QByteArray{EntityType::name} +".index." + Property::name + SecondaryProperty::name, 1}}; } }; template class CustomSecondaryIndex { public: static void configure(TypeIndex &index) { index.addSecondaryPropertyIndexer(); } template static QMap databases() { return Indexer::databases(); } }; +template +class SampledPeriodIndex +{ + static_assert(std::is_same::value && + std::is_same::value, + "Date range index is not supported for types other than 'QDateTime's"); + +public: + static void configure(TypeIndex &index) + { + index.addSampledPeriodIndex(); + } + + template + static QMap databases() + { + return {{QByteArray{EntityType::name} +".index." + RangeBeginProperty::name + ".range." + RangeEndProperty::name, 1}}; + } +}; + template class IndexConfig { template static void applyIndex(TypeIndex &index) { T::configure(index); } ///Apply recursively for parameter pack template static void applyIndex(TypeIndex &index) { applyIndex(index); applyIndex(index); } template static QMap getDbs() { return T::template databases(); } template static QMap getDbs() { return merge(getDbs(), getDbs()); } public: static void configure(TypeIndex &index) { applyIndex(index); } static QMap databases() { return getDbs(); } }; diff --git a/common/query.cpp b/common/query.cpp index 404a304b..ceb1897c 100644 --- a/common/query.cpp +++ b/common/query.cpp @@ -1,193 +1,206 @@ /* * Copyright (C) 2014 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "query.h" #include #include using namespace Sink; static const int registerQuery = qRegisterMetaTypeStreamOperators(); QDebug operator<<(QDebug dbg, const Sink::QueryBase::Comparator &c) { if (c.comparator == Sink::Query::Comparator::Equals) { dbg.nospace() << "== " << c.value; } else if (c.comparator == Sink::Query::Comparator::Contains) { dbg.nospace() << "contains " << c.value; } else if (c.comparator == Sink::Query::Comparator::In) { dbg.nospace() << "in " << c.value; } else if (c.comparator == Sink::Query::Comparator::Fulltext) { dbg.nospace() << "fulltext contains " << c.value; } else { dbg.nospace() << "unknown comparator: " << c.value; } return dbg.space(); } QDebug operator<<(QDebug dbg, const Sink::QueryBase::Filter &filter) { if (filter.ids.isEmpty()) { dbg.nospace() << "Filter(" << filter.propertyFilter << ")"; } else { dbg.nospace() << "Filter(" << filter.ids << ")"; } return dbg.maybeSpace(); } QDebug operator<<(QDebug dbg, const Sink::QueryBase &query) { dbg.nospace() << "Query [" << query.type() << "] << Id: " << query.id() << "\n"; dbg.nospace() << " Filter: " << query.getBaseFilters() << "\n"; dbg.nospace() << " Ids: " << query.ids() << "\n"; dbg.nospace() << " Sorting: " << query.sortProperty() << "\n"; return dbg.maybeSpace(); } QDebug operator<<(QDebug dbg, const Sink::Query &query) { dbg << static_cast(query); dbg.nospace() << " Requested: " << query.requestedProperties << "\n"; dbg.nospace() << " Parent: " << query.parentProperty() << "\n"; dbg.nospace() << " IsLive: " << query.liveQuery() << "\n"; dbg.nospace() << " ResourceFilter: " << query.getResourceFilter() << "\n"; return dbg.maybeSpace(); } QDataStream & operator<< (QDataStream &stream, const Sink::QueryBase::Comparator &comparator) { stream << comparator.comparator; stream << comparator.value; return stream; } QDataStream & operator>> (QDataStream &stream, Sink::QueryBase::Comparator &comparator) { int c; stream >> c; comparator.comparator = static_cast(c); stream >> comparator.value; return stream; } QDataStream & operator<< (QDataStream &stream, const Sink::QueryBase::Filter &filter) { stream << filter.ids; stream << filter.propertyFilter; return stream; } QDataStream & operator>> (QDataStream &stream, Sink::QueryBase::Filter &filter) { stream >> filter.ids; stream >> filter.propertyFilter; return stream; } QDataStream & operator<< (QDataStream &stream, const Sink::QueryBase &query) { stream << query.type(); stream << query.sortProperty(); stream << query.getFilter(); return stream; } QDataStream & operator>> (QDataStream &stream, Sink::QueryBase &query) { QByteArray type; stream >> type; query.setType(type); QByteArray sortProperty; stream >> sortProperty; query.setSortProperty(sortProperty); Sink::QueryBase::Filter filter; stream >> filter; query.setFilter(filter); return stream; } bool QueryBase::Filter::operator==(const QueryBase::Filter &other) const { auto ret = ids == other.ids && propertyFilter == other.propertyFilter; return ret; } bool QueryBase::operator==(const QueryBase &other) const { auto ret = mType == other.mType && mSortProperty == other.mSortProperty && mBaseFilterStage == other.mBaseFilterStage; return ret; } QueryBase::Comparator::Comparator() : comparator(Invalid) { } QueryBase::Comparator::Comparator(const QVariant &v) : value(v), comparator(Equals) { } QueryBase::Comparator::Comparator(const QVariant &v, Comparators c) : value(v), comparator(c) { } bool QueryBase::Comparator::matches(const QVariant &v) const { switch(comparator) { case Equals: if (!v.isValid()) { if (!value.isValid()) { return true; } return false; } return v == value; case Contains: if (!v.isValid()) { return false; } return v.value().contains(value.toByteArray()); case In: if (!v.isValid()) { return false; } return value.value().contains(v.toByteArray()); case Within: { auto range = value.value>(); if (range.size() < 2) { return false; } return range[0] <= v && v <= range[1]; } + case Overlap: { + auto bounds = value.value>(); + if (bounds.size() < 2) { + return false; + } + + auto range = v.value>(); + if (range.size() < 2) { + return false; + } + + return range[0] <= bounds[1] && bounds[0] <= range[1]; + } case Fulltext: case Invalid: default: break; } return false; } bool Query::Comparator::operator==(const Query::Comparator &other) const { return value == other.value && comparator == other.comparator; } diff --git a/common/query.h b/common/query.h index 7130116e..cb9c8ca3 100644 --- a/common/query.h +++ b/common/query.h @@ -1,594 +1,618 @@ /* * Copyright (C) 2014 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include #include #include "applicationdomaintype.h" namespace Sink { class SINK_EXPORT QueryBase { public: struct SINK_EXPORT Comparator { enum Comparators { Invalid, Equals, Contains, In, Within, + Overlap, Fulltext }; Comparator(); Comparator(const QVariant &v); Comparator(const QVariant &v, Comparators c); bool matches(const QVariant &v) const; bool operator==(const Comparator &other) const; QVariant value; Comparators comparator; }; class SINK_EXPORT Filter { public: QByteArrayList ids; - QHash propertyFilter; + QHash propertyFilter; bool operator==(const Filter &other) const; }; QueryBase() = default; QueryBase(const QByteArray &type) : mType(type) {} bool operator==(const QueryBase &other) const; Comparator getFilter(const QByteArray &property) const { - return mBaseFilterStage.propertyFilter.value(property); + return mBaseFilterStage.propertyFilter.value({property}); + } + + Comparator getFilter(const QByteArrayList &properties) const + { + return mBaseFilterStage.propertyFilter.value(properties); } template Comparator getFilter() const { return getFilter(T::name); } + template + Comparator getFilter() const + { + return getFilter({T1::name, T2::name, Rest::name...}); + } + bool hasFilter(const QByteArray &property) const { - return mBaseFilterStage.propertyFilter.contains(property); + return mBaseFilterStage.propertyFilter.contains({property}); } template bool hasFilter() const { return hasFilter(T::name); } void setId(const QByteArray &id) { mId = id; } QByteArray id() const { return mId; } - void setBaseFilters(const QHash &filter) + void setBaseFilters(const QHash &filter) { mBaseFilterStage.propertyFilter = filter; } void setFilter(const Filter &filter) { mBaseFilterStage = filter; } - QHash getBaseFilters() const + QHash getBaseFilters() const { return mBaseFilterStage.propertyFilter; } Filter getFilter() const { return mBaseFilterStage; } QByteArrayList ids() const { return mBaseFilterStage.ids; } void filter(const QByteArray &id) { mBaseFilterStage.ids << id; } void filter(const QByteArrayList &ids) { mBaseFilterStage.ids << ids; } void filter(const QByteArray &property, const QueryBase::Comparator &comparator) { - mBaseFilterStage.propertyFilter.insert(property, comparator); + mBaseFilterStage.propertyFilter.insert({property}, comparator); + } + + void filter(const QByteArrayList &properties, const QueryBase::Comparator &comparator) + { + mBaseFilterStage.propertyFilter.insert(properties, comparator); } void setType(const QByteArray &type) { mType = type; } template void setType() { setType(ApplicationDomain::getTypeName()); } QByteArray type() const { return mType; } void setSortProperty(const QByteArray &property) { mSortProperty = property; } QByteArray sortProperty() const { return mSortProperty; } class FilterStage { public: virtual ~FilterStage(){}; }; QList> getFilterStages() { return mFilterStages; } class Reduce : public FilterStage { public: class Selector { public: enum Comparator { Min, //get the minimum value Max, //get the maximum value First //Get the first result we get }; template static Selector max() { return Selector(SelectionProperty::name, Max); } Selector(const QByteArray &p, Comparator c) : property(p), comparator(c) { } QByteArray property; Comparator comparator; }; class Aggregator { public: enum Operation { Count, Collect }; Aggregator(const QByteArray &p, Operation o, const QByteArray &c = QByteArray()) : resultProperty(p), operation(o), propertyToCollect(c) { } QByteArray resultProperty; Operation operation; QByteArray propertyToCollect; }; Reduce(const QByteArray &p, const Selector &s) : property(p), selector(s) { } Reduce &count(const QByteArray &propertyName = "count") { aggregators << Aggregator(propertyName, Aggregator::Count); return *this; } template Reduce &collect(const QByteArray &propertyName) { aggregators << Aggregator(propertyName, Aggregator::Collect, T::name); return *this; } template Reduce &collect() { aggregators << Aggregator(QByteArray{T::name} + QByteArray{"Collected"}, Aggregator::Collect, T::name); return *this; } //Reduce on property QByteArray property; Selector selector; QList aggregators; //TODO add aggregate functions like: //.count() //.collect(); //... // //Potentially pass-in an identifier under which the result will be available in the result set. }; Reduce &reduce(const QByteArray &name, const Reduce::Selector &s) { auto reduction = QSharedPointer::create(name, s); mFilterStages << reduction; return *reduction; } template Reduce &reduce(const Reduce::Selector &s) { return reduce(T::name, s); } /** * "Bloom" on a property. * * For every encountered value of a property, * a result set is generated containing all entries with the same value. * * Example: * For an input set of one mail; return all emails with the same threadId. */ class Bloom : public FilterStage { public: //Property to bloom on QByteArray property; Bloom(const QByteArray &p) : property(p) { } }; template void bloom() { mFilterStages << QSharedPointer::create(T::name); } private: Filter mBaseFilterStage; QList> mFilterStages; QByteArray mType; QByteArray mSortProperty; QByteArray mId; }; /** * A query that matches a set of entities. */ class SINK_EXPORT Query : public QueryBase { public: enum Flag { NoFlags = 0, /** Leave the query running and continuously update the result set. */ LiveQuery = 1, /** Run the query synchronously. */ SynchronousQuery = 2, /** Include status updates via notifications */ UpdateStatus = 4 }; Q_DECLARE_FLAGS(Flags, Flag) template Query &request() { requestedProperties << T::name; return *this; } template Query &requestTree() { mParentProperty = T::name; return *this; } Query &requestTree(const QByteArray &parentProperty) { mParentProperty = parentProperty; return *this; } QByteArray parentProperty() const { return mParentProperty; } template Query &sort() { setSortProperty(T::name); return *this; } template Query &filter(const typename T::Type &value) { filter(T::name, QVariant::fromValue(value)); return *this; } template Query &containsFilter(const QByteArray &value) { static_assert(std::is_same::value, "The contains filter is only implemented for QByteArray in QByteArrayList"); QueryBase::filter(T::name, QueryBase::Comparator(QVariant::fromValue(value), QueryBase::Comparator::Contains)); return *this; } template Query &filter(const QueryBase::Comparator &comparator) { QueryBase::filter(T::name, comparator); return *this; } + template + Query &filter(const QueryBase::Comparator &comparator) + { + QueryBase::filter({T1::name, T2::name, Rest::name...}, comparator); + return *this; + } + Query &filter(const QByteArray &id) { QueryBase::filter(id); return *this; } Query &filter(const QByteArrayList &ids) { QueryBase::filter(ids); return *this; } Query &filter(const QByteArray &property, const QueryBase::Comparator &comparator) { QueryBase::filter(property, comparator); return *this; } template Query &filter(const ApplicationDomain::Entity &value) { filter(T::name, QVariant::fromValue(ApplicationDomain::Reference{value.identifier()})); return *this; } template Query &filter(const Query &query) { auto q = query; q.setType(ApplicationDomain::getTypeName()); filter(T::name, QVariant::fromValue(q)); return *this; } Query(const ApplicationDomain::Entity &value) : mLimit(0) { filter(value.identifier()); resourceFilter(value.resourceInstanceIdentifier()); } Query(Flags flags = Flags()) : mLimit(0), mFlags(flags) { } QByteArrayList requestedProperties; void setFlags(Flags flags) { mFlags = flags; } Flags flags() const { return mFlags; } bool liveQuery() const { return mFlags.testFlag(LiveQuery); } bool synchronousQuery() const { return mFlags.testFlag(SynchronousQuery); } Query &limit(int l) { mLimit = l; return *this; } int limit() const { return mLimit; } Filter getResourceFilter() const { return mResourceFilter; } Query &resourceFilter(const QByteArray &id) { mResourceFilter.ids << id; return *this; } template Query &resourceFilter(const ApplicationDomain::ApplicationDomainType &entity) { - mResourceFilter.propertyFilter.insert(T::name, Comparator(entity.identifier())); + mResourceFilter.propertyFilter.insert({T::name}, Comparator(entity.identifier())); return *this; } Query &resourceFilter(const QByteArray &name, const Comparator &comparator) { - mResourceFilter.propertyFilter.insert(name, comparator); + mResourceFilter.propertyFilter.insert({name}, comparator); return *this; } template Query &resourceContainsFilter(const QVariant &value) { return resourceFilter(T::name, Comparator(value, Comparator::Contains)); } template Query &resourceFilter(const QVariant &value) { return resourceFilter(T::name, value); } private: friend class SyncScope; int mLimit; Flags mFlags; Filter mResourceFilter; QByteArray mParentProperty; }; class SyncScope : public QueryBase { public: using QueryBase::QueryBase; SyncScope() = default; SyncScope(const Query &other) : QueryBase(other), mResourceFilter(other.mResourceFilter) { } template SyncScope(const T &o) : QueryBase() { resourceFilter(o.resourceInstanceIdentifier()); filter(o.identifier()); setType(ApplicationDomain::getTypeName()); } Query::Filter getResourceFilter() const { return mResourceFilter; } SyncScope &resourceFilter(const QByteArray &id) { mResourceFilter.ids << id; return *this; } template SyncScope &resourceFilter(const ApplicationDomain::ApplicationDomainType &entity) { - mResourceFilter.propertyFilter.insert(T::name, Comparator(entity.identifier())); + mResourceFilter.propertyFilter.insert({T::name}, Comparator(entity.identifier())); return *this; } SyncScope &resourceFilter(const QByteArray &name, const Comparator &comparator) { - mResourceFilter.propertyFilter.insert(name, comparator); + mResourceFilter.propertyFilter.insert({name}, comparator); return *this; } template SyncScope &resourceContainsFilter(const QVariant &value) { return resourceFilter(T::name, Comparator(value, Comparator::Contains)); } template SyncScope &resourceFilter(const QVariant &value) { return resourceFilter(T::name, value); } template SyncScope &filter(const Query::Comparator &comparator) { return filter(T::name, comparator); } SyncScope &filter(const QByteArray &id) { QueryBase::filter(id); return *this; } SyncScope &filter(const QByteArrayList &ids) { QueryBase::filter(ids); return *this; } SyncScope &filter(const QByteArray &property, const Query::Comparator &comparator) { QueryBase::filter(property, comparator); return *this; } private: Query::Filter mResourceFilter; }; } SINK_EXPORT QDebug operator<<(QDebug dbg, const Sink::QueryBase::Comparator &); SINK_EXPORT QDebug operator<<(QDebug dbg, const Sink::QueryBase &); SINK_EXPORT QDebug operator<<(QDebug dbg, const Sink::Query &); SINK_EXPORT QDataStream &operator<< (QDataStream &stream, const Sink::QueryBase &query); SINK_EXPORT QDataStream &operator>> (QDataStream &stream, Sink::QueryBase &query); Q_DECLARE_OPERATORS_FOR_FLAGS(Sink::Query::Flags) Q_DECLARE_METATYPE(Sink::QueryBase); Q_DECLARE_METATYPE(Sink::Query); diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 79986922..90194d41 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -1,450 +1,450 @@ /* * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "resourcefacade.h" #include "resourceconfig.h" #include "query.h" #include "definitions.h" #include "store.h" #include "resourceaccess.h" #include "resource.h" #include "facadefactory.h" using namespace Sink; template ConfigNotifier LocalStorageFacade::sConfigNotifier; static void applyConfig(ConfigStore &configStore, const QByteArray &id, ApplicationDomain::ApplicationDomainType &object, const QByteArrayList &requestedProperties) { const auto configurationValues = configStore.get(id); for (auto it = configurationValues.constBegin(); it != configurationValues.constEnd(); it++) { object.setProperty(it.key(), it.value()); } //Populate the object with dummy values for non-available but requested properties. //This avoid a warning about non-existing properties in bufferadaptor.h if (!requestedProperties.isEmpty()) { for (const auto &requested: requestedProperties) { if (!object.hasProperty(requested)) { object.setProperty(requested, QVariant{}); } } } } template static typename DomainType::Ptr readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type, const QByteArrayList &requestedProperties) { auto object = DomainType::Ptr::create(id); applyConfig(configStore, id, *object, requestedProperties); return object; } template <> typename ApplicationDomain::SinkAccount::Ptr readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type, const QByteArrayList &requestedProperties) { auto object = ApplicationDomain::SinkAccount::Ptr::create(id); object->setProperty(ApplicationDomain::SinkAccount::AccountType::name, type); applyConfig(configStore, id, *object, requestedProperties); return object; } template <> typename ApplicationDomain::SinkResource::Ptr readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type, const QByteArrayList &requestedProperties) { auto object = ApplicationDomain::SinkResource::Ptr::create(id); object->setProperty(ApplicationDomain::SinkResource::ResourceType::name, type); //Apply the capabilities where we have capabilities if (!ApplicationDomain::isGlobalType(type)) { if (auto res = ResourceFactory::load(type)) { object->setCapabilities(res->capabilities()); } } applyConfig(configStore, id, *object, requestedProperties); return object; } -static bool matchesFilter(const QHash &filter, const ApplicationDomain::ApplicationDomainType &entity) +static bool matchesFilter(const QHash &filter, const ApplicationDomain::ApplicationDomainType &entity) { for (const auto &filterProperty : filter.keys()) { - if (filterProperty == ApplicationDomain::SinkResource::ResourceType::name) { + if (filterProperty[0] == ApplicationDomain::SinkResource::ResourceType::name) { continue; } - if (!filter.value(filterProperty).matches(entity.getProperty(filterProperty))) { + if (!filter.value(filterProperty).matches(entity.getProperty(filterProperty[0]))) { return false; } } return true; } template LocalStorageQueryRunner::LocalStorageQueryRunner(const Query &query, const QByteArray &identifier, const QByteArray &typeName, ConfigNotifier &configNotifier, const Sink::Log::Context &ctx) : mResultProvider(new ResultProvider), mConfigStore(identifier, typeName), mGuard(new QObject), mLogCtx(ctx.subContext("config")) { auto matchesTypeAndIds = [query, this] (const QByteArray &type, const QByteArray &id) { if (query.hasFilter(ApplicationDomain::SinkResource::ResourceType::name) && query.getFilter(ApplicationDomain::SinkResource::ResourceType::name).value.toByteArray() != type) { SinkTraceCtx(mLogCtx) << "Skipping due to type."; return false; } if (!query.ids().isEmpty() && !query.ids().contains(id)) { return false; } return true; }; QObject *guard = new QObject; mResultProvider->setFetcher([this, query, matchesTypeAndIds]() { const auto entries = mConfigStore.getEntries(); for (const auto &res : entries.keys()) { const auto type = entries.value(res); if (!matchesTypeAndIds(type, res)){ continue; } auto entity = readFromConfig(mConfigStore, res, type, query.requestedProperties); if (!matchesFilter(query.getBaseFilters(), *entity)){ SinkTraceCtx(mLogCtx) << "Skipping due to filter." << res; continue; } SinkTraceCtx(mLogCtx) << "Found match " << res; updateStatus(*entity); mResultProvider->add(entity); } // TODO initialResultSetComplete should be implicit mResultProvider->initialResultSetComplete(true); mResultProvider->complete(); }); if (query.liveQuery()) { { auto ret = QObject::connect(&configNotifier, &ConfigNotifier::added, guard, [this, query, matchesTypeAndIds](const ApplicationDomain::ApplicationDomainType::Ptr &entry, const QByteArray &type) { auto entity = entry.staticCast(); if (!matchesTypeAndIds(type, entity->identifier())){ return; } if (!matchesFilter(query.getBaseFilters(), *entity)){ return; } SinkTraceCtx(mLogCtx) << "A new resource has been added: " << entity->identifier(); updateStatus(*entity); mResultProvider->add(entity); }); Q_ASSERT(ret); } { auto ret = QObject::connect(&configNotifier, &ConfigNotifier::modified, guard, [this, query, matchesTypeAndIds](const ApplicationDomain::ApplicationDomainType::Ptr &entry, const QByteArray &type) { auto entity = entry.staticCast(); if (!matchesTypeAndIds(type, entity->identifier())){ return; } if (!matchesFilter(query.getBaseFilters(), *entity)){ return; } updateStatus(*entity); mResultProvider->modify(entity); }); Q_ASSERT(ret); } { auto ret = QObject::connect(&configNotifier, &ConfigNotifier::removed, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) { mResultProvider->remove(entry.staticCast()); }); Q_ASSERT(ret); } } mResultProvider->onDone([=]() { delete guard; delete this; }); } template QObject *LocalStorageQueryRunner::guard() const { return mGuard.get(); } template void LocalStorageQueryRunner::updateStatus(DomainType &entity) { if (mStatusUpdater) { mStatusUpdater(entity); } } template void LocalStorageQueryRunner::setStatusUpdater(const std::function &updater) { mStatusUpdater = updater; } template void LocalStorageQueryRunner::statusChanged(const QByteArray &identifier) { SinkTraceCtx(mLogCtx) << "Status changed " << identifier; auto entity = readFromConfig(mConfigStore, identifier, ApplicationDomain::getTypeName(), QByteArrayList{}); updateStatus(*entity); mResultProvider->modify(entity); } template typename Sink::ResultEmitter::Ptr LocalStorageQueryRunner::emitter() { return mResultProvider->emitter(); } template LocalStorageFacade::LocalStorageFacade(const QByteArray &identifier, const QByteArray &typeName) : StoreFacade(), mIdentifier(identifier), mTypeName(typeName) { } template LocalStorageFacade::~LocalStorageFacade() { } template KAsync::Job LocalStorageFacade::create(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; auto typeName = mTypeName; return KAsync::start([domainObject, configStoreIdentifier, typeName]() { const QByteArray type = domainObject.getProperty(typeName).toByteArray(); const QByteArray providedIdentifier = domainObject.identifier().isEmpty() ? domainObject.getProperty("identifier").toByteArray() : domainObject.identifier(); const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; auto configStore = ConfigStore(configStoreIdentifier, typeName); configStore.add(identifier, type); auto changedProperties = domainObject.changedProperties(); changedProperties.removeOne("identifier"); changedProperties.removeOne(typeName); if (!changedProperties.isEmpty()) { // We have some configuration values QMap configurationValues; for (const auto &property : changedProperties) { configurationValues.insert(property, domainObject.getProperty(property)); } configStore.modify(identifier, configurationValues); } sConfigNotifier.add(::readFromConfig(configStore, identifier, type, QByteArrayList{}), type); }); } template KAsync::Job LocalStorageFacade::modify(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; auto typeName = mTypeName; return KAsync::start([domainObject, configStoreIdentifier, typeName]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; return; } auto changedProperties = domainObject.changedProperties(); changedProperties.removeOne("identifier"); changedProperties.removeOne(typeName); auto configStore = ConfigStore(configStoreIdentifier, typeName); if (!changedProperties.isEmpty()) { // We have some configuration values QMap configurationValues; for (const auto &property : changedProperties) { configurationValues.insert(property, domainObject.getProperty(property)); } configStore.modify(identifier, configurationValues); } const auto type = configStore.getEntries().value(identifier); sConfigNotifier.modify(::readFromConfig(configStore, identifier, type, QByteArrayList{}), type); }); } template KAsync::Job LocalStorageFacade::move(const DomainType &, const QByteArray &) { return KAsync::error(1, "Resources and Accounts cannot be moved."); } template KAsync::Job LocalStorageFacade::copy(const DomainType &, const QByteArray &) { return KAsync::error(1, "Resources and Accounts cannot be copied."); } template KAsync::Job LocalStorageFacade::remove(const DomainType &domainObject) { auto configStoreIdentifier = mIdentifier; auto typeName = mTypeName; return KAsync::start([domainObject, configStoreIdentifier, typeName]() { const QByteArray identifier = domainObject.identifier(); if (identifier.isEmpty()) { SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; return; } SinkTrace() << "Removing: " << identifier; auto configStore = ConfigStore(configStoreIdentifier, typeName); configStore.remove(identifier); sConfigNotifier.remove(QSharedPointer::create(domainObject), typeName); }); } template QPair, typename ResultEmitter::Ptr> LocalStorageFacade::load(const Query &query, const Sink::Log::Context &parentCtx) { auto ctx = parentCtx.subContext(ApplicationDomain::getTypeName()); auto runner = new LocalStorageQueryRunner(query, mIdentifier, mTypeName, sConfigNotifier, ctx); return qMakePair(KAsync::null(), runner->emitter()); } ResourceFacade::ResourceFacade() : LocalStorageFacade("resources", Sink::ApplicationDomain::SinkResource::ResourceType::name) { } ResourceFacade::~ResourceFacade() { } KAsync::Job ResourceFacade::remove(const Sink::ApplicationDomain::SinkResource &resource) { const auto identifier = resource.identifier(); return Sink::Store::removeDataFromDisk(identifier).then(LocalStorageFacade::remove(resource)); } QPair, typename Sink::ResultEmitter::Ptr> ResourceFacade::load(const Sink::Query &query, const Sink::Log::Context &parentCtx) { auto ctx = parentCtx.subContext("resource"); auto runner = new LocalStorageQueryRunner(query, mIdentifier, mTypeName, sConfigNotifier, ctx); auto monitoredResources = QSharedPointer>::create(); runner->setStatusUpdater([runner, monitoredResources, ctx](ApplicationDomain::SinkResource &resource) { auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); if (!monitoredResources->contains(resource.identifier())) { auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess, ctx](const Notification ¬ification) { SinkTraceCtx(ctx) << "Received notification in facade: " << notification.type; if (notification.type == Notification::Status) { runner->statusChanged(resource.identifier()); } }); Q_ASSERT(ret); monitoredResources->insert(resource.identifier()); } resource.setStatusStatus(resourceAccess->getResourceStatus()); }); return qMakePair(KAsync::null(), runner->emitter()); } AccountFacade::AccountFacade() : LocalStorageFacade("accounts", ApplicationDomain::SinkAccount::AccountType::name) { } AccountFacade::~AccountFacade() { } QPair, typename Sink::ResultEmitter::Ptr> AccountFacade::load(const Sink::Query &query, const Sink::Log::Context &parentCtx) { auto ctx = parentCtx.subContext("accounts"); auto runner = new LocalStorageQueryRunner(query, mIdentifier, mTypeName, sConfigNotifier, ctx); auto monitoredResources = QSharedPointer>::create(); auto monitorResource = [monitoredResources, runner, ctx] (const QByteArray &accountIdentifier, const ApplicationDomain::SinkResource &resource, const ResourceAccess::Ptr &resourceAccess) { if (!monitoredResources->contains(resource.identifier())) { auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess, accountIdentifier, ctx](const Notification ¬ification) { SinkTraceCtx(ctx) << "Received notification in facade: " << notification.type; if (notification.type == Notification::Status) { runner->statusChanged(accountIdentifier); } }); Q_ASSERT(ret); monitoredResources->insert(resource.identifier()); } }; runner->setStatusUpdater([runner, monitoredResources, ctx, monitorResource](ApplicationDomain::SinkAccount &account) { Query query{Query::LiveQuery}; query.filter(account.identifier()); query.request() .request(); const auto resources = Store::read(query); SinkTraceCtx(ctx) << "Found resource belonging to the account " << account.identifier() << " : " << resources; auto accountIdentifier = account.identifier(); //Monitor for new resources so they can be monitored as well if (!runner->mResourceEmitter.contains(accountIdentifier)) { auto facade = Sink::FacadeFactory::instance().getFacade(); Q_ASSERT(facade); auto emitter = facade->load(query, ctx).second; emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { auto resourceAccess = Sink::ResourceAccessFactory::instance().getAccess(resource->identifier(), ResourceConfig::getResourceType(resource->identifier())); monitorResource(accountIdentifier, *resource, resourceAccess); }); emitter->fetch(); runner->mResourceEmitter[accountIdentifier] = emitter; } QList states; //Gather all resources and ensure they are monitored for (const auto &resource : resources) { auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); monitorResource(accountIdentifier, resource, resourceAccess); states << resourceAccess->getResourceStatus(); } const auto status = [&] { if (states.contains(ApplicationDomain::ErrorStatus)) { return ApplicationDomain::ErrorStatus; } if (states.contains(ApplicationDomain::BusyStatus)) { return ApplicationDomain::BusyStatus; } if (states.contains(ApplicationDomain::OfflineStatus)) { return ApplicationDomain::OfflineStatus; } if (states.contains(ApplicationDomain::ConnectedStatus)) { return ApplicationDomain::ConnectedStatus; } return ApplicationDomain::NoStatus; }(); account.setStatusStatus(status); }); return qMakePair(KAsync::null(), runner->emitter()); } KAsync::Job AccountFacade::remove(const Sink::ApplicationDomain::SinkAccount &account) { using namespace Sink::ApplicationDomain; auto job = KAsync::null(); //Remove all resources job = job.then(Store::fetch(Sink::Query{}.filter(account))) .each([] (const SinkResource::Ptr &resource) { return Store::remove(*resource); }); //Remove all identities job = job.then(Store::fetch(Sink::Query{}.filter(account))) .each([] (const Identity::Ptr &identity) { return Store::remove(*identity); }); - + return job.then(LocalStorageFacade::remove(account)); } IdentityFacade::IdentityFacade() : LocalStorageFacade("identities", "type") { } IdentityFacade::~IdentityFacade() { } #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_resourcefacade.cpp" #pragma clang diagnostic pop diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 230dbc75..4fe7e3be 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -1,660 +1,660 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "entitystore.h" #include #include #include "entitybuffer.h" #include "log.h" #include "typeindex.h" #include "definitions.h" #include "resourcecontext.h" #include "index.h" #include "bufferutils.h" #include "entity_generated.h" #include "applicationdomaintype_p.h" #include "typeimplementations.h" using namespace Sink; using namespace Sink::Storage; static QMap baseDbs() { return {{"revisionType", 0}, {"revisions", 0}, {"uids", 0}, {"default", 0}, {"__flagtable", 0}}; } template void mergeImpl(T &map, First f) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } } template void mergeImpl(T &map, First f, Tail ...maps) { for (auto it = f.constBegin(); it != f.constEnd(); it++) { map.insert(it.key(), it.value()); } mergeImpl(map, maps...); } template First merge(First f, Tail ...maps) { First map; mergeImpl(map, f, maps...); return map; } template struct DbLayoutHelper { void operator()(QMap map) const { mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); } }; static Sink::Storage::DbLayout dbLayout(const QByteArray &instanceId) { static auto databases = [] { QMap map; mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); mergeImpl(map, ApplicationDomain::TypeImplementation::typeDatabases()); return merge(baseDbs(), map); }(); return {instanceId, databases}; } class EntityStore::Private { public: Private(const ResourceContext &context, const Sink::Log::Context &ctx) : resourceContext(context), logCtx(ctx.subContext("entitystore")) { } ResourceContext resourceContext; DataStore::Transaction transaction; QHash > indexByType; Sink::Log::Context logCtx; bool exists() { return DataStore(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly).exists(); } DataStore::Transaction &getTransaction() { if (transaction) { return transaction; } DataStore store(Sink::storageLocation(), dbLayout(resourceContext.instanceId()), DataStore::ReadOnly); transaction = store.createTransaction(DataStore::ReadOnly); return transaction; } template struct ConfigureHelper { void operator()(TypeIndex &arg) const { ApplicationDomain::TypeImplementation::configure(arg); } }; TypeIndex &cachedIndex(const QByteArray &type) { if (indexByType.contains(type)) { return *indexByType.value(type); } auto index = QSharedPointer::create(type, logCtx); TypeHelper{type}.template operator()(*index); indexByType.insert(type, index); return *index; } TypeIndex &typeIndex(const QByteArray &type) { auto &index = cachedIndex(type); index.mTransaction = &transaction; return index; } ApplicationDomainType createApplicationDomainType(const QByteArray &type, const QByteArray &uid, qint64 revision, const EntityBuffer &buffer) { auto adaptor = resourceContext.adaptorFactory(type).createAdaptor(buffer.entity(), &typeIndex(type)); return ApplicationDomainType{resourceContext.instanceId(), uid, revision, adaptor}; } }; EntityStore::EntityStore(const ResourceContext &context, const Log::Context &ctx) : d(new EntityStore::Private{context, ctx}) { } void EntityStore::initialize() { //This function is only called in the resource code where we want to be able to write to the databse. //Check for the existience of the db without creating it or the envrionment. //This is required to be able to set the database version only in the case where we create a new database. if (!Storage::DataStore::exists(Sink::storageLocation(), d->resourceContext.instanceId())) { //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. startTransaction(DataStore::ReadWrite); //Create the database with the correct version if it wasn't existing before SinkLogCtx(d->logCtx) << "Creating resource database."; Storage::DataStore::setDatabaseVersion(d->transaction, Sink::latestDatabaseVersion()); } else { //The first time we open the environment we always want it to be read/write. Otherwise subsequent tries to open a write transaction will fail. startTransaction(DataStore::ReadWrite); } commitTransaction(); } void EntityStore::startTransaction(DataStore::AccessMode accessMode) { SinkTraceCtx(d->logCtx) << "Starting transaction: " << accessMode; Q_ASSERT(!d->transaction); d->transaction = DataStore(Sink::storageLocation(), dbLayout(d->resourceContext.instanceId()), accessMode).createTransaction(accessMode); } void EntityStore::commitTransaction() { SinkTraceCtx(d->logCtx) << "Committing transaction"; for (const auto &type : d->indexByType.keys()) { d->typeIndex(type).commitTransaction(); } Q_ASSERT(d->transaction); d->transaction.commit(); d->transaction = {}; } void EntityStore::abortTransaction() { SinkTraceCtx(d->logCtx) << "Aborting transaction"; d->transaction.abort(); d->transaction = {}; } bool EntityStore::hasTransaction() const { return d->transaction; } bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool replayToSource) { if (entity.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; return false; } SinkTraceCtx(d->logCtx) << "New entity " << entity; d->typeIndex(type).add(entity.identifier(), entity, d->transaction, d->resourceContext.instanceId()); //The maxRevision may have changed meanwhile if the entity created sub-entities const qint64 newRevision = maxRevision() + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Creation); metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); DataStore::mainDatabase(d->transaction, type) .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); DataStore::recordUid(d->transaction, entity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; return true; } ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const { auto newEntity = *ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; // Apply diff //SinkTrace() << "Applying changed properties: " << changeset; for (const auto &property : diff.changedProperties()) { const auto value = diff.getProperty(property); if (value.isValid()) { //SinkTrace() << "Setting property: " << property; newEntity.setProperty(property, value); } } // Remove deletions for (const auto &property : deletions) { //SinkTrace() << "Removing property: " << property; newEntity.setProperty(property, QVariant()); } return newEntity; } bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) { const auto current = readLatest(type, diff.identifier()); if (current.identifier().isEmpty()) { SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); return false; } auto newEntity = applyDiff(type, current, diff, deletions); return modify(type, current, newEntity, replayToSource); } bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource) { SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; d->typeIndex(type).remove(current.identifier(), current, d->transaction, d->resourceContext.instanceId()); d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction, d->resourceContext.instanceId()); const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; { //We add availableProperties to account for the properties that have been changed by the preprocessors auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Modification); metadataBuilder.add_replayToSource(replayToSource); metadataBuilder.add_modifiedProperties(modifiedProperties); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); } SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); newEntity.setChangedProperties(newEntity.availableProperties().toSet()); flatbuffers::FlatBufferBuilder fbb; d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); DataStore::mainDatabase(d->transaction, type) .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; return true; } bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource) { const auto uid = current.identifier(); if (!exists(type, uid)) { SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; return false; } d->typeIndex(type).remove(current.identifier(), current, d->transaction, d->resourceContext.instanceId()); SinkTraceCtx(d->logCtx) << "Removed entity " << current; const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); metadataBuilder.add_operation(Operation_Removal); metadataBuilder.add_replayToSource(replayToSource); auto metadataBuffer = metadataBuilder.Finish(); FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); DataStore::mainDatabase(d->transaction, type) .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, uid, type); DataStore::removeUid(d->transaction, uid, type); return true; } void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) { const auto uid = DataStore::getUidFromRevision(d->transaction, revision); const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); if (bufferType.isEmpty() || uid.isEmpty()) { SinkErrorCtx(d->logCtx) << "Failed to find revision during cleanup: " << revision; Q_ASSERT(false); return; } SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; DataStore::mainDatabase(d->transaction, bufferType) .scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; } else { const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); const qint64 rev = metadata->revision(); const auto isRemoval = metadata->operation() == Operation_Removal; // Remove old revisions, and the current if the entity has already been removed if (rev < revision || isRemoval) { DataStore::removeRevision(d->transaction, rev); DataStore::mainDatabase(d->transaction, bufferType).remove(key); } //Don't cleanup more than specified if (rev >= revision) { return false; } } return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); DataStore::setCleanedUpRevision(d->transaction, revision); } bool EntityStore::cleanupRevisions(qint64 revision) { Q_ASSERT(d->exists()); bool implicitTransaction = false; if (!d->transaction) { startTransaction(DataStore::ReadWrite); Q_ASSERT(d->transaction); implicitTransaction = true; } const auto lastCleanRevision = DataStore::cleanedUpRevision(d->transaction); const auto firstRevisionToCleanup = lastCleanRevision + 1; bool cleanupIsNecessary = firstRevisionToCleanup <= revision; if (cleanupIsNecessary) { SinkTraceCtx(d->logCtx) << "Cleaning up from " << firstRevisionToCleanup << " to " << revision; for (qint64 rev = firstRevisionToCleanup; rev <= revision; rev++) { cleanupEntityRevisionsUntil(rev); } } if (implicitTransaction) { commitTransaction(); } return cleanupIsNecessary; } QVector EntityStore::fullScan(const QByteArray &type) { SinkTraceCtx(d->logCtx) << "Looking for : " << type; if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return QVector(); } //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. QSet keys; DataStore::mainDatabase(d->getTransaction(), type) .scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { const auto uid = DataStore::uidFromKey(key); if (keys.contains(uid)) { //Not something that should persist if the replay works, so we keep a message for now. SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key; } keys << uid; return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; }); SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; return keys.toList().toVector(); } -QVector EntityStore::indexLookup(const QByteArray &type, const QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting) +QVector EntityStore::indexLookup(const QByteArray &type, const QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return QVector(); } return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction(), d->resourceContext.instanceId()); } QVector EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return QVector(); } return d->typeIndex(type).lookup(property, value, d->getTransaction()); } void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback) { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return; } auto list = d->typeIndex(type).lookup(property, value, d->getTransaction()); for (const auto &uid : list) { callback(uid); } /* Index index(type + ".index." + property, d->transaction); */ /* index.lookup(value, [&](const QByteArray &sinkId) { */ /* callback(sinkId); */ /* }, */ /* [&](const Index::Error &error) { */ /* SinkWarningCtx(d->logCtx) << "Error in index: " << error.message << property; */ /* }); */ } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { Q_ASSERT(d); Q_ASSERT(!uid.isEmpty()); auto db = DataStore::mainDatabase(d->getTransaction(), type); db.findLatest(uid, [=](const QByteArray &key, const QByteArray &value) { callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << uid; }); } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { //TODO cache max revision for the duration of the transaction. callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) { readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { //TODO cache max revision for the duration of the transaction. callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer), buffer.operation()); }); } ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid) { ApplicationDomainType dt; readLatest(type, uid, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function callback) { auto db = DataStore::mainDatabase(d->getTransaction(), type); db.scan(key, [=](const QByteArray &key, const QByteArray &value) -> bool { callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readEntity query: " << error.message << key; }); } void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback) { readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid) { ApplicationDomainType dt; readEntity(type, uid, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readAll(const QByteArray &type, const std::function &callback) { readAllUids(type, [&] (const QByteArray &uid) { readLatest(type, uid, callback); }); } void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function &callback) { qint64 revisionCounter = baseRevision; const qint64 topRevision = DataStore::maxRevision(d->getTransaction()); // Spit out the revision keys one by one. while (revisionCounter <= topRevision) { const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter); const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter); // SinkTrace() << "Revision" << *revisionCounter << type << uid; Q_ASSERT(!uid.isEmpty()); Q_ASSERT(!type.isEmpty()); if (type != expectedType) { // Skip revision revisionCounter++; continue; } const auto key = DataStore::assembleKey(uid, revisionCounter); revisionCounter++; callback(key); } } void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) { auto db = DataStore::mainDatabase(d->getTransaction(), type); qint64 latestRevision = 0; db.scan(uid, [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { const auto foundRevision = DataStore::revisionFromKey(key); if (foundRevision < revision && foundRevision > latestRevision) { latestRevision = foundRevision; } return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); readEntity(type, DataStore::assembleKey(uid, latestRevision), callback); } void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) { readPrevious(type, uid, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) { callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); }); } ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision) { ApplicationDomainType dt; readPrevious(type, uid, revision, [&](const ApplicationDomainType &entity) { dt = *ApplicationDomainType::getInMemoryRepresentation(entity, entity.availableProperties()); }); return dt; } void EntityStore::readAllUids(const QByteArray &type, const std::function callback) { DataStore::getUids(type, d->getTransaction(), callback); } bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) { return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); } bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) { bool found = false; bool alreadyRemoved = false; DataStore::mainDatabase(d->transaction, type) .findLatest(uid, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { auto entity = GetEntity(data.data()); if (entity && entity->metadata()) { auto metadata = GetMetadata(entity->metadata()->Data()); found = true; if (metadata->operation() == Operation_Removal) { alreadyRemoved = true; } } }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); if (!found) { SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; return false; } if (alreadyRemoved) { SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; return false; } return true; } qint64 EntityStore::maxRevision() { if (!d->exists()) { SinkTraceCtx(d->logCtx) << "Database is not existing."; return 0; } return DataStore::maxRevision(d->getTransaction()); } Sink::Log::Context EntityStore::logContext() const { return d->logCtx; } diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index d79a0b58..ffa70b94 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -1,140 +1,140 @@ /* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #pragma once #include "sink_export.h" #include #include "domaintypeadaptorfactoryinterface.h" #include "query.h" #include "storage.h" #include "resourcecontext.h" #include "metadata_generated.h" namespace Sink { class EntityBuffer; namespace Storage { class SINK_EXPORT EntityStore { public: typedef QSharedPointer Ptr; EntityStore(const ResourceContext &resourceContext, const Sink::Log::Context &); ~EntityStore() = default; using ApplicationDomainType = ApplicationDomain::ApplicationDomainType; void initialize(); //Only the pipeline may call the following functions outside of tests bool add(const QByteArray &type, ApplicationDomainType newEntity, bool replayToSource); bool modify(const QByteArray &type, const ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); bool cleanupRevisions(qint64 revision); ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; void startTransaction(Sink::Storage::DataStore::AccessMode); void commitTransaction(); void abortTransaction(); bool hasTransaction() const; QVector fullScan(const QByteArray &type); - QVector indexLookup(const QByteArray &type, const QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting); + QVector indexLookup(const QByteArray &type, const QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting); QVector indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value); void indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback); template void indexLookup(const QVariant &value, const std::function &callback) { return indexLookup(ApplicationDomain::getTypeName(), PropertyType::name, value, callback); } ///Returns the uid and buffer. Note that the memory only remains valid until the next operation or transaction end. void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); ///Returns an entity. Note that the memory only remains valid until the next operation or transaction end. void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); ///Returns an entity and operation. Note that the memory only remains valid until the next operation or transaction end. void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); ///Returns a copy ApplicationDomainType readLatest(const QByteArray &type, const QByteArray &uid); template T readLatest(const QByteArray &uid) { return T(readLatest(ApplicationDomain::getTypeName(), uid)); } ///Returns the uid and buffer. Note that the memory only remains valid until the next operation or transaction end. void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); ///Returns an entity. Note that the memory only remains valid until the next operation or transaction end. void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); ///Returns a copy ApplicationDomainType readEntity(const QByteArray &type, const QByteArray &key); template T readEntity(const QByteArray &key) { return T(readEntity(ApplicationDomain::getTypeName(), key)); } void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); ///Returns a copy ApplicationDomainType readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision); template T readPrevious(const QByteArray &uid, qint64 revision) { return T(readPrevious(ApplicationDomain::getTypeName(), uid, revision)); } void readAllUids(const QByteArray &type, const std::function callback); void readAll(const QByteArray &type, const std::function &callback); template void readAll(const std::function &callback) { return readAll(ApplicationDomain::getTypeName(), [&](const ApplicationDomainType &entity) { callback(T(entity)); }); } void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function &callback); ///Db contains entity (but may already be marked as removed bool contains(const QByteArray &type, const QByteArray &uid); ///Db contains entity and entity is not yet removed bool exists(const QByteArray &type, const QByteArray &uid); qint64 maxRevision(); Sink::Log::Context logContext() const; private: /* * Remove any old revisions of the same entity up until @param revision */ void cleanupEntityRevisionsUntil(qint64 revision); void copyBlobs(ApplicationDomainType &entity, qint64 newRevision); class Private; const QSharedPointer d; }; } } diff --git a/common/store.cpp b/common/store.cpp index be2488a0..0328c7f2 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -1,527 +1,529 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" #include "utils.h" #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast(A) == static_cast(B), "The enum values must match"); //Ensure the copied enum matches typedef ModelResult MailModelResult; ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole) ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole) ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole) ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole) ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole) ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole) Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); static bool sanityCheckQuery(const Sink::Query &query) { for (const auto &id : query.ids()) { if (id.isEmpty()) { SinkError() << "Empty id in query."; return false; } } return true; } namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) { auto ctx = ctx_.subContext(resourceInstanceIdentifier); auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QPair::Ptr, typename ResultEmitter::Ptr> getEmitter(Query query, const Log::Context &ctx) { query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Query: " << query; // Query all resources and aggregate results auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { //For global types we don't need to query for the resources first. queryResource("", "", query, aggregatingEmitter, ctx).exec(); } else { auto resourceCtx = ctx.subContext("resourceQuery"); auto facade = FacadeFactory::instance().getFacade(); Q_ASSERT(facade); Sink::Query resourceQuery; resourceQuery.request(); if (query.liveQuery()) { SinkTraceCtx(ctx) << "Listening for new resources."; resourceQuery.setFlags(Query::LiveQuery); } //Filter resources by available content types (unless the query already specifies a capability filter) auto resourceFilter = query.getResourceFilter(); - if (!resourceFilter.propertyFilter.contains(ApplicationDomain::SinkResource::Capabilities::name)) { - resourceFilter.propertyFilter.insert(ApplicationDomain::SinkResource::Capabilities::name, Query::Comparator{ApplicationDomain::getTypeName(), Query::Comparator::Contains}); + if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) { + resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName(), Query::Comparator::Contains}); } resourceQuery.setFilter(resourceFilter); - resourceQuery.requestedProperties << resourceFilter.propertyFilter.keys(); + for (auto const &properties : resourceFilter.propertyFilter.keys()) { + resourceQuery.requestedProperties << properties; + } auto result = facade->load(resourceQuery, resourceCtx); auto emitter = result.second; emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec(); }); emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { SinkTraceCtx(resourceCtx) << "Resource query complete"; }); return qMakePair(aggregatingEmitter, emitter); } return qMakePair(aggregatingEmitter, ResultEmitter::Ptr{}); } static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type) { if (!query.id().isEmpty()) { return Log::Context{"query." + type + "." + query.id()}; } return Log::Context{"query." + type}; } template QSharedPointer Store::loadModel(const Query &query) { Q_ASSERT(sanityCheckQuery(query)); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); auto model = QSharedPointer>::create(query, query.requestedProperties, ctx); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). auto result = getEmitter(query, ctx); model->setEmitter(result.first); //Keep the emitter alive if (auto resourceEmitter = result.second) { model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries resourceEmitter->fetch(); } //Automatically populate the top-level model->fetchMore(QModelIndex()); return model; } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade()) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkLog() << "Create: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; }); }); } return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::modify(const Query &query, const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << query << domainObject; return fetchAll(query) .each([=] (const typename DomainType::Ptr &entity) { auto copy = *entity; for (const auto &p : domainObject.changedProperties()) { copy.setProperty(p, domainObject.getProperty(p)); } return modify(copy); }); } template KAsync::Job Store::move(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Move: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); }); } return facade->move(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); } template KAsync::Job Store::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Copy: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); }); } return facade->copy(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkLog() << "Remove: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); }); } return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); } template KAsync::Job Store::remove(const Sink::Query &query) { SinkLog() << "Remove: " << query; return fetchAll(query) .each([] (const typename DomainType::Ptr &entity) { return remove(*entity); }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification from the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown QObject::connect(resourceAccess.data(), &ResourceAccess::ready, [&future](bool ready) { if (!ready) { future.setFinished(); } }); } else { future.setFinished(); } }) .then([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } static KAsync::Job upgrade(const QByteArray &resource) { auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) { return KAsync::value(Store::UpgradeResult{false}); } SinkLog() << "Upgrading " << resource; //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater}; return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during upgrade."; return KAsync::error(error); } SinkTrace() << "Upgrade of resource " << resource << " complete."; return KAsync::null(); }) .then(KAsync::value(Store::UpgradeResult{true})); } KAsync::Job Store::upgrade() { SinkLog() << "Upgrading..."; //Migrate from sink.dav to sink.carddav const auto resources = ResourceConfig::getResources(); for (auto it = resources.constBegin(); it != resources.constEnd(); it++) { if (it.value() == "sink.dav") { ResourceConfig::setResourceType(it.key(), "sink.carddav"); } } auto ret = QSharedPointer::create(false); return fetchAll({}) .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return Sink::upgrade(resource->identifier()) .then([ret](UpgradeResult returnValue) { if (returnValue.upgradeExecuted) { SinkLog() << "Upgrade executed."; *ret = true; } }); }) .then([ret] { if (*ret) { SinkLog() << "Upgrade complete."; } return Store::UpgradeResult{*ret}; }); } static KAsync::Job synchronize(const QByteArray &resource, const Sink::SyncScope &scope) { SinkLog() << "Synchronizing " << resource << scope; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "Synchronization of resource " << resource << " complete."; return KAsync::null(); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { return synchronize(Sink::SyncScope{query}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { auto resourceFilter = scope.getResourceFilter(); //Filter resources by type by default - if (!resourceFilter.propertyFilter.contains(ApplicationDomain::SinkResource::Capabilities::name) && !scope.type().isEmpty()) { - resourceFilter.propertyFilter.insert(ApplicationDomain::SinkResource::Capabilities::name, Query::Comparator{scope.type(), Query::Comparator::Contains}); + if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { + resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); } Sink::Query query; query.setFilter(resourceFilter); SinkLog() << "Synchronizing all resource matching: " << query; return fetchAll(query) .template each([scope](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return synchronize(resource->identifier(), scope); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { Q_ASSERT(sanityCheckQuery(query)); auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } SinkWarning() << "Tried to read value but no values are available."; return DomainType(); } template QList Store::read(const Sink::Query &query_) { Q_ASSERT(sanityCheckQuery(query_)); auto query = query_; query.setFlags(Query::SynchronousQuery); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); QList list; auto result = getEmitter(query, ctx); auto aggregatingEmitter = result.first; aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); list << *value; }); if (auto resourceEmitter = result.second) { resourceEmitter->fetch(); } aggregatingEmitter->fetch(); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::remove(const Query &); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template KAsync::Job Store::modify(const Query &, const T &); \ template KAsync::Job Store::move(const T &domainObject, const QByteArray &newResource); \ template KAsync::Job Store::copy(const T &domainObject, const QByteArray &newResource); \ template QSharedPointer Store::loadModel(const Query &query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); SINK_REGISTER_TYPES() } // namespace Sink diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 41821cb2..6aa37966 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -1,440 +1,550 @@ /* Copyright (c) 2015 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "typeindex.h" #include "log.h" #include "index.h" #include "fulltextindex.h" #include #include #include using namespace Sink; static QByteArray getByteArray(const QVariant &value) { if (value.type() == QVariant::DateTime) { QByteArray result; QDataStream ds(&result, QIODevice::WriteOnly); ds << value.toDateTime(); return result; } if (value.type() == QVariant::Bool) { return value.toBool() ? "t" : "f"; } if (value.canConvert()) { const auto ba = value.value().value; if (!ba.isEmpty()) { return ba; } } if (value.isValid() && !value.toByteArray().isEmpty()) { return value.toByteArray(); } // LMDB can't handle empty keys, so use something different return "toplevel"; } +template +static QByteArray padNumber(T number) +{ + static T uint_num_digits = (T)std::log10(std::numeric_limits::max()) + 1; + return QByteArray::number(number).rightJustified(uint_num_digits, '0'); +} static QByteArray toSortableByteArrayImpl(const QDateTime &date) { // Sort invalid last if (!date.isValid()) { return QByteArray::number(std::numeric_limits::max()); } - static unsigned int uint_num_digits = (unsigned int)std::log10(std::numeric_limits::max()) + 1; - return QByteArray::number(std::numeric_limits::max() - date.toTime_t()).rightJustified(uint_num_digits, '0'); + return padNumber(std::numeric_limits::max() - date.toTime_t()); } static QByteArray toSortableByteArray(const QVariant &value) { if (!value.isValid()) { // FIXME: we don't know the type, so we don't know what to return // This mean we're fixing every sorted index keys to use unsigned int return QByteArray::number(std::numeric_limits::max()); } switch (value.type()) { case QMetaType::QDateTime: return toSortableByteArrayImpl(value.toDateTime()); default: SinkWarning() << "Not knowing how to convert a" << value.typeName() << "to a sortable key, falling back to default conversion"; return getByteArray(value); } } TypeIndex::TypeIndex(const QByteArray &type, const Sink::Log::Context &ctx) : mLogCtx(ctx), mType(type) { } QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &sortProperty) const { if (sortProperty.isEmpty()) { return mType + ".index." + property; } return mType + ".index." + property + ".sort." + sortProperty; } QByteArray TypeIndex::sortedIndexName(const QByteArray &property) const { return mType + ".index." + property + ".sorted"; } +QByteArray TypeIndex::sampledPeriodIndexName(const QByteArray &rangeBeginProperty, const QByteArray &rangeEndProperty) const +{ + return mType + ".index." + rangeBeginProperty + ".range." + rangeEndProperty; +} + +static unsigned int bucketOf(const QVariant &value) +{ + switch (value.type()) { + case QMetaType::QDateTime: + return value.value().date().toJulianDay() / 7; + default: + SinkError() << "Not knowing how to get the bucket of a" << value.typeName(); + return {}; + } +} + template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << value.toByteArray(); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << value.toByteArray(); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { //SinkTraceCtx(mLogCtx) << "Indexing " << mType + ".index." + property << getByteArray(value); if (add) { Index(indexName(property), transaction).add(getByteArray(value), identifier); } else { Index(indexName(property), transaction).remove(getByteArray(value), identifier); } }; mIndexer.insert(property, indexer); mProperties << property; } template <> void TypeIndex::addProperty(const QByteArray &property) { addProperty(property); } template <> void TypeIndex::addSortedProperty(const QByteArray &property) { auto indexer = [this, property](bool add, const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { const auto sortableDate = toSortableByteArray(value); if (add) { Index(sortedIndexName(property), transaction).add(sortableDate, identifier); } else { Index(sortedIndexName(property), transaction).remove(sortableDate, identifier); } }; mSortIndexer.insert(property, indexer); mSortedProperties << property; } template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { auto indexer = [=](bool add, const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::DataStore::Transaction &transaction) { const auto date = sortValue.toDateTime(); const auto propertyValue = getByteArray(value); if (add) { Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier); } else { Index(indexName(property, sortProperty), transaction).remove(propertyValue + toSortableByteArray(date), identifier); } }; mGroupedSortIndexer.insert(property + sortProperty, indexer); mGroupedSortedProperties.insert(property, sortProperty); } template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { addPropertyWithSorting(property, sortProperty); } +template <> +void TypeIndex::addSampledPeriodIndex( + const QByteArray &beginProperty, const QByteArray &endProperty) +{ + auto indexer = [=](bool add, const QByteArray &identifier, const QVariant &begin, + const QVariant &end, Sink::Storage::DataStore::Transaction &transaction) { + SinkTraceCtx(mLogCtx) << "Adding entity to sampled period index"; + const auto beginDate = begin.toDateTime(); + const auto endDate = end.toDateTime(); + + auto beginBucket = bucketOf(beginDate); + auto endBucket = bucketOf(endDate); + + if (beginBucket > endBucket) { + SinkError() << "End bucket greater than begin bucket"; + return; + } + + Index index(sampledPeriodIndexName(beginProperty, endProperty), transaction); + for (auto bucket = beginBucket; bucket <= endBucket; ++bucket) { + QByteArray bucketKey = padNumber(bucket); + if (add) { + SinkTraceCtx(mLogCtx) << "Adding entity to bucket:" << bucketKey; + index.add(bucketKey, identifier); + } else { + SinkTraceCtx(mLogCtx) << "Removing entity from bucket:" << bucketKey; + index.remove(bucketKey, identifier); + } + } + }; + + mSampledPeriodProperties.insert({ beginProperty, endProperty }); + mSampledPeriodIndexer.insert({ beginProperty, endProperty }, indexer); +} + void TypeIndex::updateIndex(bool add, const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { for (const auto &property : mProperties) { const auto value = entity.getProperty(property); auto indexer = mIndexer.value(property); indexer(add, identifier, value, transaction); } + for (const auto &properties : mSampledPeriodProperties) { + const auto beginValue = entity.getProperty(properties.first); + const auto endValue = entity.getProperty(properties.second); + auto indexer = mSampledPeriodIndexer.value(properties); + indexer(add, identifier, beginValue, endValue, transaction); + } for (const auto &property : mSortedProperties) { const auto value = entity.getProperty(property); auto indexer = mSortIndexer.value(property); indexer(add, identifier, value, transaction); } for (auto it = mGroupedSortedProperties.constBegin(); it != mGroupedSortedProperties.constEnd(); it++) { const auto value = entity.getProperty(it.key()); const auto sortValue = entity.getProperty(it.value()); auto indexer = mGroupedSortIndexer.value(it.key() + it.value()); indexer(add, identifier, value, sortValue, transaction); } for (const auto &indexer : mCustomIndexer) { indexer->setup(this, &transaction, resourceInstanceId); if (add) { indexer->add(entity); } else { indexer->remove(entity); } } } void TypeIndex::commitTransaction() { for (const auto &indexer : mCustomIndexer) { indexer->commitTransaction(); } } void TypeIndex::abortTransaction() { for (const auto &indexer : mCustomIndexer) { indexer->abortTransaction(); } } void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { updateIndex(true, identifier, entity, transaction, resourceInstanceId); } void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { updateIndex(false, identifier, entity, transaction, resourceInstanceId); } static QVector indexLookup(Index &index, QueryBase::Comparator filter, std::function valueToKey = getByteArray) { QVector keys; QByteArrayList lookupKeys; if (filter.comparator == Query::Comparator::Equals) { lookupKeys << valueToKey(filter.value); } else if (filter.comparator == Query::Comparator::In) { for(const QVariant &value : filter.value.value()) { lookupKeys << valueToKey(value); } } else { Q_ASSERT(false); } for (const auto &lookupKey : lookupKeys) { index.lookup(lookupKey, [&](const QByteArray &value) { keys << value; }, [lookupKey](const Index::Error &error) { SinkWarning() << "Lookup error in index: " << error.message << lookupKey; }, true); } return keys; } static QVector sortedIndexLookup(Index &index, QueryBase::Comparator filter) { if (filter.comparator == Query::Comparator::In || filter.comparator == Query::Comparator::Contains) { SinkWarning() << "In and Contains comparison not supported on sorted indexes"; } if (filter.comparator != Query::Comparator::Within) { return indexLookup(index, filter, toSortableByteArray); } QVector keys; QByteArray lowerBound, upperBound; auto bounds = filter.value.value(); if (bounds[0].canConvert()) { // Inverse the bounds because dates are stored newest first upperBound = toSortableByteArray(bounds[0].toDateTime()); lowerBound = toSortableByteArray(bounds[1].toDateTime()); } else { lowerBound = bounds[0].toByteArray(); upperBound = bounds[1].toByteArray(); } index.rangeLookup(lowerBound, upperBound, [&](const QByteArray &value) { keys << value; }, [bounds](const Index::Error &error) { SinkWarning() << "Lookup error in index:" << error.message << "with bounds:" << bounds[0] << bounds[1]; }); return keys; } -QVector TypeIndex::query(const Sink::QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) +static QVector sampledIndexLookup(Index &index, QueryBase::Comparator filter) +{ + if (filter.comparator != Query::Comparator::Overlap) { + SinkWarning() << "Comparisons other than Overlap not supported on sampled period indexes"; + return {}; + } + + QVector keys; + + auto bounds = filter.value.value(); + + QByteArray lowerBound = toSortableByteArray(bounds[0]); + QByteArray upperBound = toSortableByteArray(bounds[1]); + + QByteArray lowerBucket = padNumber(bucketOf(bounds[0])); + QByteArray upperBucket = padNumber(bucketOf(bounds[1])); + + SinkTrace() << "Looking up from bucket:" << lowerBucket << "to:" << upperBucket; + + index.rangeLookup(lowerBucket, upperBucket, + [&](const QByteArray &value) { + keys << value.data(); + }, + [bounds](const Index::Error &error) { + SinkWarning() << "Lookup error in index:" << error.message + << "with bounds:" << bounds[0] << bounds[1]; + }); + + return keys; +} + +QVector TypeIndex::query(const Sink::QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId) { const auto baseFilters = query.getBaseFilters(); for (auto it = baseFilters.constBegin(); it != baseFilters.constEnd(); it++) { if (it.value().comparator == QueryBase::Comparator::Fulltext) { FulltextIndex fulltextIndex{resourceInstanceId}; const auto keys = fulltextIndex.lookup(it.value().value.toString()); appliedFilters << it.key(); SinkTraceCtx(mLogCtx) << "Fulltext index lookup found " << keys.size() << " keys."; return keys; } } + for (auto it = baseFilters.constBegin(); it != baseFilters.constEnd(); it++) { + if (it.value().comparator == QueryBase::Comparator::Overlap) { + if (mSampledPeriodProperties.contains({it.key()[0], it.key()[1]})) { + Index index(sampledPeriodIndexName(it.key()[0], it.key()[1]), transaction); + const auto keys = sampledIndexLookup(index, query.getFilter(it.key())); + // The filter is not completely applied, we need post-filtering + // in the case the overlap period is not completely aligned + // with a week starting on monday + //appliedFilters << it.key(); + SinkTraceCtx(mLogCtx) << "Sampled period index lookup on" << it.key() << "found" << keys.size() << "keys."; + return keys; + } else { + SinkWarning() << "Overlap search without sampled period index"; + } + } + } + for (auto it = mGroupedSortedProperties.constBegin(); it != mGroupedSortedProperties.constEnd(); it++) { if (query.hasFilter(it.key()) && query.sortProperty() == it.value()) { Index index(indexName(it.key(), it.value()), transaction); const auto keys = indexLookup(index, query.getFilter(it.key())); - appliedFilters << it.key(); + appliedFilters.insert({it.key()}); appliedSorting = it.value(); SinkTraceCtx(mLogCtx) << "Grouped sorted index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys."; return keys; } } for (const auto &property : mSortedProperties) { if (query.hasFilter(property)) { Index index(sortedIndexName(property), transaction); const auto keys = sortedIndexLookup(index, query.getFilter(property)); - appliedFilters << property; + appliedFilters.insert({property}); SinkTraceCtx(mLogCtx) << "Sorted index lookup on " << property << " found " << keys.size() << " keys."; return keys; } } for (const auto &property : mProperties) { if (query.hasFilter(property)) { Index index(indexName(property), transaction); const auto keys = indexLookup(index, query.getFilter(property)); - appliedFilters << property; + appliedFilters.insert({property}); SinkTraceCtx(mLogCtx) << "Index lookup on " << property << " found " << keys.size() << " keys."; return keys; } } SinkTraceCtx(mLogCtx) << "No matching index"; return {}; } QVector TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { SinkTraceCtx(mLogCtx) << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties; if (mProperties.contains(property)) { QVector keys; Index index(indexName(property), transaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); SinkTraceCtx(mLogCtx) << "Index lookup on " << property << " found " << keys.size() << " keys."; return keys; } else if (mSecondaryProperties.contains(property)) { //Lookups on secondary indexes first lookup the key, and then lookup the results again to resolve to entity id's QVector keys; auto resultProperty = mSecondaryProperties.value(property); QVector secondaryKeys; Index index(indexName(property + resultProperty), transaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { secondaryKeys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); SinkTraceCtx(mLogCtx) << "Looked up secondary keys for the following lookup key: " << lookupKey << " => " << secondaryKeys; for (const auto &secondary : secondaryKeys) { keys += lookup(resultProperty, secondary, transaction); } return keys; } else { SinkWarning() << "Tried to lookup " << property << " but couldn't find value"; } return QVector(); } template <> void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::unindex(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).remove(getByteArray(leftValue), getByteArray(rightValue)); } template <> void TypeIndex::unindex(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).remove(getByteArray(leftValue), getByteArray(rightValue)); } template <> QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value) { QVector keys; Index index(indexName(leftName + rightName), *mTransaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << QByteArray{value.constData(), value.size()}; }, [=](const Index::Error &error) { SinkWarning() << "Lookup error in secondary index: " << error.message << value << lookupKey; }); return keys; } template <> QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value) { QVector keys; Index index(indexName(leftName + rightName), *mTransaction); const auto lookupKey = getByteArray(value); index.lookup( lookupKey, [&](const QByteArray &value) { keys << QByteArray{value.constData(), value.size()}; }, [=](const Index::Error &error) { SinkWarning() << "Lookup error in secondary index: " << error.message << value << lookupKey; }); return keys; } diff --git a/common/typeindex.h b/common/typeindex.h index 793dc1e3..a8c0e10d 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -1,130 +1,142 @@ /* Copyright (c) 2015 Christian Mollekopf This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #pragma once #include "resultset.h" #include "storage.h" #include "query.h" #include "log.h" #include "indexer.h" #include namespace Sink { namespace Storage { class EntityStore; } } class TypeIndex { public: TypeIndex(const QByteArray &type, const Sink::Log::Context &); template void addProperty(const QByteArray &property); template void addSortedProperty(const QByteArray &property); template void addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty); template void addPropertyWithSorting() { addPropertyWithSorting(T::name, S::name); } template void addProperty() { addProperty(T::name); } template void addSortedProperty() { addSortedProperty(T::name); } template void addSecondaryProperty() { mSecondaryProperties.insert(Left::name, Right::name); } template void addSecondaryPropertyIndexer() { mCustomIndexer << CustomIndexer::Ptr::create(); } + template + void addSampledPeriodIndex(const QByteArray &beginProperty, const QByteArray &endProperty); + + template + void addSampledPeriodIndex() + { + addSampledPeriodIndex(Begin::name, End::name); + } + void add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId); void remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId); - QVector query(const Sink::QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId); + QVector query(const Sink::QueryBase &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId); QVector lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); template QVector secondaryLookup(const QVariant &value) { return secondaryLookup(Left::name, Right::name, value); } template QVector secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value); template void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { index(Left::name, Right::name, leftValue, rightValue, transaction); } template void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction); template void unindex(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { index(Left::name, Right::name, leftValue, rightValue, transaction); } template void unindex(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction); void commitTransaction(); void abortTransaction(); private: friend class Sink::Storage::EntityStore; void updateIndex(bool add, const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction, const QByteArray &resourceInstanceId); QByteArray indexName(const QByteArray &property, const QByteArray &sortProperty = QByteArray()) const; QByteArray sortedIndexName(const QByteArray &property) const; + QByteArray sampledPeriodIndexName(const QByteArray &rangeBeginProperty, const QByteArray &rangeEndProperty) const; Sink::Log::Context mLogCtx; QByteArray mType; QByteArrayList mProperties; QByteArrayList mSortedProperties; QMap mGroupedSortedProperties; // QMap mSecondaryProperties; + QSet> mSampledPeriodProperties; QList mCustomIndexer; Sink::Storage::DataStore::Transaction *mTransaction; QHash> mIndexer; QHash> mSortIndexer; QHash> mGroupedSortIndexer; + QHash, std::function> mSampledPeriodIndexer; }; diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 36b6e90e..b52ba96e 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp @@ -1,1623 +1,1780 @@ #include #include #include #include "resource.h" #include "store.h" #include "resourcecontrol.h" #include "commands.h" #include "resourceconfig.h" #include "log.h" #include "modelresult.h" #include "test.h" #include "testutils.h" #include "applicationdomaintype.h" #include "queryrunner.h" #include "adaptorfactoryregistry.h" #include using namespace Sink; using namespace Sink::ApplicationDomain; /** * Test of the query system using the dummy resource. * * This test requires the dummy resource installed. */ class QueryTest : public QObject { Q_OBJECT private slots: void initTestCase() { Sink::Test::initTest(); auto factory = Sink::ResourceFactory::load("sink.dummy"); QVERIFY(factory); ResourceConfig::addResource("sink.dummy.instance1", "sink.dummy"); VERIFYEXEC(Sink::Store::removeDataFromDisk(QByteArray("sink.dummy.instance1"))); } void cleanup() { VERIFYEXEC(Sink::Store::removeDataFromDisk(QByteArray("sink.dummy.instance1"))); } void init() { qDebug(); qDebug() << "-----------------------------------------"; qDebug(); } void testSerialization() { auto type = QByteArray("type"); auto sort = QByteArray("sort"); Sink::QueryBase::Filter filter; filter.ids << "id"; - filter.propertyFilter.insert("foo", QVariant::fromValue(QByteArray("bar"))); + filter.propertyFilter.insert({"foo"}, QVariant::fromValue(QByteArray("bar"))); Sink::Query query; query.setFilter(filter); query.setType(type); query.setSortProperty(sort); QByteArray data; { QDataStream stream(&data, QIODevice::WriteOnly); stream << query; } Sink::Query deserializedQuery; { QDataStream stream(&data, QIODevice::ReadOnly); stream >> deserializedQuery; } QCOMPARE(deserializedQuery.type(), type); QCOMPARE(deserializedQuery.sortProperty(), sort); QCOMPARE(deserializedQuery.getFilter().ids, filter.ids); QCOMPARE(deserializedQuery.getFilter().propertyFilter.keys(), filter.propertyFilter.keys()); QCOMPARE(deserializedQuery.getFilter().propertyFilter, filter.propertyFilter); } void testNoResources() { // Test Sink::Query query; query.resourceFilter("foobar"); query.setFlags(Query::LiveQuery); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 0); } void testSingle() { // Setup auto mail = Mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); VERIFYEXEC(Sink::Store::create(mail)); // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.setFlags(Query::LiveQuery); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); } void testSingleWithDelay() { // Setup auto mail = Mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); VERIFYEXEC(Sink::Store::create(mail)); // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); // We fetch after the data is available and don't rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } void testFilter() { // Setup { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); mail.setFolder("folder1"); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test2"); mail.setFolder("folder2"); VERIFYEXEC(Sink::Store::create(mail)); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.setFlags(Query::LiveQuery); query.filter("folder1"); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); auto mail = model->index(0, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).value(); { mail->setFolder("folder2"); VERIFYEXEC(Sink::Store::modify(*mail)); } QTRY_COMPARE(model->rowCount(), 0); { mail->setFolder("folder1"); VERIFYEXEC(Sink::Store::modify(*mail)); } QTRY_COMPARE(model->rowCount(), 1); } void testById() { QByteArray id; // Setup { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); VERIFYEXEC(Sink::Store::create(mail)); mail.setExtractedMessageId("test2"); VERIFYEXEC(Sink::Store::create(mail)); Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed Sink::Store::synchronize(query).exec().waitForFinished(); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QVERIFY(model->rowCount() >= 1); id = model->index(0, 0).data(Sink::Store::DomainObjectRole).value()->identifier(); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(id); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } void testFolder() { // Setup { Folder folder("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder)); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.setFlags(Query::LiveQuery); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); auto folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); } void testFolderTree() { // Setup { auto folder = ApplicationDomainType::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder)); auto subfolder = ApplicationDomainType::createEntity("sink.dummy.instance1"); subfolder.setParent(folder.identifier()); VERIFYEXEC(Sink::Store::create(subfolder)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.requestTree(); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); // We fetch after the data is available and don't rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); QCOMPARE(model->rowCount(model->index(0, 0)), 1); } void testIncrementalFolderTree() { // Setup auto folder = ApplicationDomainType::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); // Test Sink::Query query{Sink::Query::LiveQuery}; query.resourceFilter("sink.dummy.instance1"); query.requestTree(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); auto subfolder = ApplicationDomainType::createEntity("sink.dummy.instance1"); subfolder.setParent(folder.identifier()); VERIFYEXEC(Sink::Store::create(subfolder)); VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); //Ensure the folder appears QTRY_COMPARE(model->rowCount(model->index(0, 0)), 1); //...and dissapears again after removal VERIFYEXEC(Sink::Store::remove(subfolder)); VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(model->index(0, 0)), 0); } void testMailByMessageId() { // Setup { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); mail.setProperty("sender", "doe@example.org"); Sink::Store::create(mail).exec().waitForFinished(); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test2"); mail.setProperty("sender", "doe@example.org"); Sink::Store::create(mail).exec().waitForFinished(); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter("test1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } void testMailByFolder() { // Setup Folder::Ptr folderEntity; { Folder folder("sink.dummy.instance1"); Sink::Store::create(folder).exec().waitForFinished(); Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); mail.setFolder(folderEntity->identifier()); Sink::Store::create(mail).exec().waitForFinished(); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(*folderEntity); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } /* * Filter by two properties to make sure that we also use a non-index based filter. */ void testMailByMessageIdAndFolder() { // Setup Folder::Ptr folderEntity; { Folder folder("sink.dummy.instance1"); Sink::Store::create(folder).exec().waitForFinished(); Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); mail.setFolder(folderEntity->identifier()); Sink::Store::create(mail).exec().waitForFinished(); Mail mail1("sink.dummy.instance1"); mail1.setExtractedMessageId("test1"); mail1.setFolder("foobar"); Sink::Store::create(mail1).exec().waitForFinished(); Mail mail2("sink.dummy.instance1"); mail2.setExtractedMessageId("test2"); mail2.setFolder(folderEntity->identifier()); Sink::Store::create(mail2).exec().waitForFinished(); } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(*folderEntity); query.filter("test1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } void testMailByFolderSortedByDate() { // Setup Folder::Ptr folderEntity; const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); { Folder folder("sink.dummy.instance1"); Sink::Store::create(folder).exec().waitForFinished(); Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("testSecond"); mail.setFolder(folderEntity->identifier()); mail.setExtractedDate(date.addDays(-1)); Sink::Store::create(mail).exec().waitForFinished(); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("testLatest"); mail.setFolder(folderEntity->identifier()); mail.setExtractedDate(date); Sink::Store::create(mail).exec().waitForFinished(); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("testLast"); mail.setFolder(folderEntity->identifier()); mail.setExtractedDate(date.addDays(-2)); Sink::Store::create(mail).exec().waitForFinished(); } } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(*folderEntity); query.sort(); query.limit(1); query.setFlags(Query::LiveQuery); query.reduce(Query::Reduce::Selector::max()) .count("count") .collect("unreadCollected") .collect("importantCollected"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); // The model is not sorted, but the limited set is sorted, so we can only test for the latest result. QCOMPARE(model->rowCount(), 1); QCOMPARE(model->index(0, 0).data(Sink::Store::DomainObjectRole).value()->getProperty("messageId").toByteArray(), QByteArray("testLatest")); model->fetchMore(QModelIndex()); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 2); // We can't make any assumptions about the order of the indexes // QCOMPARE(model->index(1, 0).data(Sink::Store::DomainObjectRole).value()->getProperty("messageId").toByteArray(), QByteArray("testSecond")); //New revisions always go through { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("testInjected"); mail.setFolder(folderEntity->identifier()); mail.setExtractedDate(date.addDays(-2)); Sink::Store::create(mail).exec().waitForFinished(); } QTRY_COMPARE(model->rowCount(), 3); //Ensure we can continue fetching after the incremental update model->fetchMore(QModelIndex()); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 4); //Ensure we have fetched all model->fetchMore(QModelIndex()); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 4); } void testReactToNewResource() { Sink::Query query; query.setFlags(Query::LiveQuery); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(QModelIndex()), 0); auto res = DummyResource::create(""); VERIFYEXEC(Sink::Store::create(res)); auto folder = Folder::create(res.identifier()); VERIFYEXEC(Sink::Store::create(folder)); QTRY_COMPARE(model->rowCount(QModelIndex()), 1); VERIFYEXEC(Sink::Store::remove(res)); } void testAccountFilter() { using namespace Sink; using namespace Sink::ApplicationDomain; //Setup QString accountName("name"); QString accountIcon("icon"); auto account1 = ApplicationDomainType::createEntity(); account1.setAccountType("maildir"); account1.setName(accountName); account1.setIcon(accountIcon); VERIFYEXEC(Store::create(account1)); auto account2 = ApplicationDomainType::createEntity(); account2.setAccountType("maildir"); account2.setName(accountName); account2.setIcon(accountIcon); VERIFYEXEC(Store::create(account2)); auto resource1 = ApplicationDomainType::createEntity(); resource1.setResourceType("sink.dummy"); resource1.setAccount(account1); Store::create(resource1).exec().waitForFinished(); auto resource2 = ApplicationDomainType::createEntity(); resource2.setResourceType("sink.dummy"); resource2.setAccount(account2); Store::create(resource2).exec().waitForFinished(); { Folder folder1(resource1.identifier()); VERIFYEXEC(Sink::Store::create(folder1)); Folder folder2(resource2.identifier()); VERIFYEXEC(Sink::Store::create(folder2)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << resource1.identifier())); VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << resource2.identifier())); // Test Sink::Query query; query.resourceFilter(account1); auto folders = Sink::Store::read(query); QCOMPARE(folders.size(), 1); } void testSubquery() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); folder1.setSpecialPurpose(QByteArrayList() << "purpose1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); folder2.setSpecialPurpose(QByteArrayList() << "purpose2"); VERIFYEXEC(Sink::Store::create(folder2)); { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail1"); mail.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail)); } { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail2"); mail.setFolder(folder2); VERIFYEXEC(Sink::Store::create(mail)); } // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter. Query query; query.filter(Sink::Query().containsFilter("purpose1")); query.request(); auto mails = Sink::Store::read(query); QCOMPARE(mails.size(), 1); QCOMPARE(mails.first().getMessageId(), QByteArray("mail1")); } void testLiveSubquery() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); folder1.setSpecialPurpose(QByteArrayList() << "purpose1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); folder2.setSpecialPurpose(QByteArrayList() << "purpose2"); VERIFYEXEC(Sink::Store::create(folder2)); { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail1"); mail.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail)); } { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail2"); mail.setFolder(folder2); VERIFYEXEC(Sink::Store::create(mail)); } // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter. Query query; query.filter(Sink::Query().containsFilter("purpose1")); query.request(); query.setFlags(Query::LiveQuery); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); //This folder should not make it through the query { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail3"); mail.setFolder(folder2); VERIFYEXEC(Sink::Store::create(mail)); } //But this one should { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail4"); mail.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail)); } QTRY_COMPARE(model->rowCount(), 2); } void testResourceSubQuery() { using namespace Sink; using namespace Sink::ApplicationDomain; //Setup auto resource1 = ApplicationDomainType::createEntity(); resource1.setResourceType("sink.dummy"); resource1.setCapabilities(QByteArrayList() << "cap1"); VERIFYEXEC(Store::create(resource1)); auto resource2 = ApplicationDomainType::createEntity(); resource2.setCapabilities(QByteArrayList() << "cap2"); resource2.setResourceType("sink.dummy"); VERIFYEXEC(Store::create(resource2)); VERIFYEXEC(Sink::Store::create(Folder{resource1.identifier()})); VERIFYEXEC(Sink::Store::create(Folder{resource2.identifier()})); VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(resource1.identifier())); VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(resource2.identifier())); // We fetch before the data is available and rely on the live query mechanism to deliver the actual data auto folders = Sink::Store::read(Sink::Query{}.resourceContainsFilter("cap1")); QCOMPARE(folders.size(), 1); //TODO this should be part of the regular cleanup between tests VERIFYEXEC(Store::remove(resource1)); VERIFYEXEC(Store::remove(resource2)); } void testFilteredLiveResourceSubQuery() { using namespace Sink; using namespace Sink::ApplicationDomain; //Setup auto resource1 = ApplicationDomainType::createEntity(); resource1.setResourceType("sink.dummy"); resource1.setCapabilities(QByteArrayList() << "cap1"); VERIFYEXEC(Store::create(resource1)); VERIFYEXEC(Store::create(Folder{resource1.identifier()})); VERIFYEXEC(ResourceControl::flushMessageQueue(resource1.identifier())); auto model = Sink::Store::loadModel(Query{Query::LiveQuery}.resourceContainsFilter("cap1")); QTRY_COMPARE(model->rowCount(), 1); auto resource2 = ApplicationDomainType::createEntity(); resource2.setCapabilities(QByteArrayList() << "cap2"); resource2.setResourceType("sink.dummy"); VERIFYEXEC(Store::create(resource2)); VERIFYEXEC(Store::create(Folder{resource2.identifier()})); VERIFYEXEC(ResourceControl::flushMessageQueue(resource2.identifier())); //The new resource should be filtered and thus not make it in here QCOMPARE(model->rowCount(), 1); //TODO this should be part of the regular cleanup between tests VERIFYEXEC(Store::remove(resource1)); VERIFYEXEC(Store::remove(resource2)); } void testLivequeryUnmatchInThread() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter. Query query; query.setId("testLivequeryUnmatch"); query.filter(folder1); query.reduce(Query::Reduce::Selector::max()).count("count").collect("senders"); query.sort(); query.setFlags(Query::LiveQuery); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); //After the modifcation the mail should have vanished. { mail1.setFolder(folder2); VERIFYEXEC(Sink::Store::modify(mail1)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 0); } void testLivequeryRemoveOneInThread() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail1)); auto mail2 = Mail::createEntity("sink.dummy.instance1"); mail2.setExtractedMessageId("mail2"); mail2.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail2)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); //Setup two folders with a mail each, ensure we only get the mail from the folder that matches the folder filter. Query query; query.setId("testLivequeryUnmatch"); query.reduce(Query::Reduce::Selector::max()).count("count").collect("senders"); query.sort(); query.setFlags(Query::LiveQuery); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); QCOMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value()->getProperty("count").toInt(), 2); //After the removal, the thread size should be reduced by one { VERIFYEXEC(Sink::Store::remove(mail1)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); QTRY_COMPARE(model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value()->getProperty("count").toInt(), 1); //After the second removal, the thread should be gone { VERIFYEXEC(Sink::Store::remove(mail2)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 0); } void testDontUpdateNonLiveQuery() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); mail1.setUnread(false); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; //Not a live query query.setFlags(Query::Flags{}); query.setId("testNoLiveQuery"); query.filter(folder1); query.reduce(Query::Reduce::Selector::max()).count("count").collect("senders"); query.sort(); query.request(); QVERIFY(!query.liveQuery()); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); //After the modifcation the mail should have vanished. { mail1.setUnread(true); VERIFYEXEC(Sink::Store::modify(mail1)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QTest::qWait(100); QCOMPARE(mail->getUnread(), false); } void testLivequeryModifcationUpdateInThread() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); mail1.setUnread(false); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testLivequeryUnmatch"); query.filter(folder1); query.reduce(Query::Reduce::Selector::max()).count("count").collect("folders"); query.sort(); query.setFlags(Query::LiveQuery); query.request(); auto model = Sink::Store::loadModel(query); QTRY_COMPARE(model->rowCount(), 1); //After the modifcation the mail should have vanished. { mail1.setUnread(true); VERIFYEXEC(Sink::Store::modify(mail1)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QTRY_COMPARE(mail->getUnread(), true); QCOMPARE(mail->getProperty("count").toInt(), 1); QCOMPARE(mail->getProperty("folders").toList().size(), 1); } void testReductionUpdate() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}}; auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); mail1.setUnread(false); mail1.setExtractedDate(now); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testLivequeryUnmatch"); query.setFlags(Query::LiveQuery); query.filter(folder1); query.reduce(Query::Reduce::Selector::max()).count("count").collect("folders"); query.sort(); query.request(); query.request(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); QSignalSpy insertedSpy(model.data(), &QAbstractItemModel::rowsInserted); QSignalSpy removedSpy(model.data(), &QAbstractItemModel::rowsRemoved); QSignalSpy changedSpy(model.data(), &QAbstractItemModel::dataChanged); QSignalSpy layoutChangedSpy(model.data(), &QAbstractItemModel::layoutChanged); QSignalSpy resetSpy(model.data(), &QAbstractItemModel::modelReset); //The leader should change to mail2 after the modification { auto mail2 = Mail::createEntity("sink.dummy.instance1"); mail2.setExtractedMessageId("mail2"); mail2.setFolder(folder1); mail2.setUnread(false); mail2.setExtractedDate(later); VERIFYEXEC(Sink::Store::create(mail2)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail2"}); QCOMPARE(mail->getProperty("count").toInt(), 2); QCOMPARE(mail->getProperty("folders").toList().size(), 2); //This should eventually be just one modification instead of remove + add (See datastorequery reduce component) QCOMPARE(insertedSpy.size(), 1); QCOMPARE(removedSpy.size(), 1); QCOMPARE(changedSpy.size(), 0); QCOMPARE(layoutChangedSpy.size(), 0); QCOMPARE(resetSpy.size(), 0); } void testFilteredReductionUpdate() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testFilteredReductionUpdate"); query.setFlags(Query::LiveQuery); query.filter(folder1); query.reduce(Query::Reduce::Selector::max()).count("count").collect("folders"); query.sort(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 0); QSignalSpy insertedSpy(model.data(), &QAbstractItemModel::rowsInserted); QSignalSpy removedSpy(model.data(), &QAbstractItemModel::rowsRemoved); QSignalSpy changedSpy(model.data(), &QAbstractItemModel::dataChanged); QSignalSpy layoutChangedSpy(model.data(), &QAbstractItemModel::layoutChanged); QSignalSpy resetSpy(model.data(), &QAbstractItemModel::modelReset); //Ensure we don't end up with a mail in the thread that was filtered //This tests the case of an otherwise emtpy thread on purpose. { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("filtered"); mail.setFolder(folder2); mail.setExtractedDate(QDateTime{QDate{2017, 2, 3}, QTime{11, 0, 0}}); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QCOMPARE(model->rowCount(), 0); //Ensure the non-filtered still get through. { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("not-filtered"); mail.setFolder(folder1); mail.setExtractedDate(QDateTime{QDate{2017, 2, 3}, QTime{11, 0, 0}}); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); } void testBloom() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail2"); mail.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail)); } { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail3"); mail.setFolder(folder2); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.resourceFilter("sink.dummy.instance1"); query.setId("testFilterCreationInThread"); query.filter(mail1.identifier()); query.bloom(); query.request(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 2); } void testLivequeryFilterCreationInThread() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testFilterCreationInThread"); query.resourceFilter("sink.dummy.instance1"); query.filter(mail1.identifier()); query.bloom(); query.sort(); query.setFlags(Query::LiveQuery); query.request(); query.request(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); QSignalSpy insertedSpy(model.data(), &QAbstractItemModel::rowsInserted); QSignalSpy removedSpy(model.data(), &QAbstractItemModel::rowsRemoved); QSignalSpy changedSpy(model.data(), &QAbstractItemModel::dataChanged); QSignalSpy layoutChangedSpy(model.data(), &QAbstractItemModel::layoutChanged); QSignalSpy resetSpy(model.data(), &QAbstractItemModel::modelReset); //This modification should make it through { //This should not trigger an entity already in model warning mail1.setUnread(false); VERIFYEXEC(Sink::Store::modify(mail1)); } //This mail should make it through { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail2"); mail.setFolder(folder1); VERIFYEXEC(Sink::Store::create(mail)); } //This mail shouldn't make it through { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail3"); mail.setFolder(folder2); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 2); QTest::qWait(100); QCOMPARE(model->rowCount(), 2); //From mail2 QCOMPARE(insertedSpy.size(), 1); QCOMPARE(removedSpy.size(), 0); //From the modification QCOMPARE(changedSpy.size(), 1); QCOMPARE(layoutChangedSpy.size(), 0); QCOMPARE(resetSpy.size(), 0); } void testLivequeryThreadleaderChange() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); QDateTime earlier{QDate{2017, 2, 3}, QTime{9, 0, 0}}; QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}}; auto mail1 = Mail::createEntity("sink.dummy.instance1"); mail1.setExtractedMessageId("mail1"); mail1.setFolder(folder1); mail1.setExtractedDate(now); VERIFYEXEC(Sink::Store::create(mail1)); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testLivequeryThreadleaderChange"); query.setFlags(Query::LiveQuery); query.reduce(Query::Reduce::Selector::max()).count("count").collect("folders"); query.sort(); query.request(); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); QSignalSpy insertedSpy(model.data(), &QAbstractItemModel::rowsInserted); QSignalSpy removedSpy(model.data(), &QAbstractItemModel::rowsRemoved); QSignalSpy changedSpy(model.data(), &QAbstractItemModel::dataChanged); QSignalSpy layoutChangedSpy(model.data(), &QAbstractItemModel::layoutChanged); QSignalSpy resetSpy(model.data(), &QAbstractItemModel::modelReset); //The leader shouldn't change to mail2 after the modification { auto mail2 = Mail::createEntity("sink.dummy.instance1"); mail2.setExtractedMessageId("mail2"); mail2.setFolder(folder1); mail2.setExtractedDate(earlier); VERIFYEXEC(Sink::Store::create(mail2)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); { auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail1"}); QTRY_COMPARE(mail->getProperty("count").toInt(), 2); QCOMPARE(mail->getProperty("folders").toList().size(), 2); } QCOMPARE(insertedSpy.size(), 0); QCOMPARE(removedSpy.size(), 0); QCOMPARE(changedSpy.size(), 1); QCOMPARE(layoutChangedSpy.size(), 0); QCOMPARE(resetSpy.size(), 0); //The leader should change to mail3 after the modification { auto mail3 = Mail::createEntity("sink.dummy.instance1"); mail3.setExtractedMessageId("mail3"); mail3.setFolder(folder1); mail3.setExtractedDate(later); VERIFYEXEC(Sink::Store::create(mail3)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 1); { auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail3"}); QCOMPARE(mail->getProperty("count").toInt(), 3); QCOMPARE(mail->getProperty("folders").toList().size(), 3); } //This should eventually be just one modification instead of remove + add (See datastorequery reduce component) QCOMPARE(insertedSpy.size(), 1); QCOMPARE(removedSpy.size(), 1); QCOMPARE(changedSpy.size(), 1); QCOMPARE(layoutChangedSpy.size(), 0); QCOMPARE(resetSpy.size(), 0); //Nothing should change on third mail in separate folder { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId("mail4"); mail.setFolder(folder2); mail.setExtractedDate(now); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); QTRY_COMPARE(model->rowCount(), 2); //This should eventually be just one modification instead of remove + add (See datastorequery reduce component) QCOMPARE(insertedSpy.size(), 2); QCOMPARE(removedSpy.size(), 1); QCOMPARE(changedSpy.size(), 1); QCOMPARE(layoutChangedSpy.size(), 0); QCOMPARE(resetSpy.size(), 0); } /* * Ensure that we handle the situation properly if the thread-leader doesn't match a property filter. */ void testFilteredThreadLeader() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); auto folder2 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder2)); QDateTime earlier{QDate{2017, 2, 3}, QTime{9, 0, 0}}; QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}}; auto createMail = [] (const QByteArray &messageid, const Folder &folder, const QDateTime &date, bool important) { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId(messageid); mail.setFolder(folder); mail.setExtractedDate(date); mail.setImportant(important); return mail; }; VERIFYEXEC(Sink::Store::create(createMail("mail1", folder1, now, false))); VERIFYEXEC(Sink::Store::create(createMail("mail2", folder1, earlier, false))); VERIFYEXEC(Sink::Store::create(createMail("mail3", folder1, later, true))); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setId("testLivequeryThreadleaderChange"); query.setFlags(Query::LiveQuery); query.reduce(Query::Reduce::Selector::max()).count().collect(); query.sort(); query.request(); query.filter(false); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); { auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value(); QCOMPARE(mail->getMessageId(), QByteArray{"mail1"}); QCOMPARE(mail->count(), 2); QCOMPARE(mail->getCollectedProperty().size(), 2); } } void testQueryRunnerDontMissUpdates() { // Setup auto folder1 = Folder::createEntity("sink.dummy.instance1"); VERIFYEXEC(Sink::Store::create(folder1)); QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; auto createMail = [] (const QByteArray &messageid, const Folder &folder, const QDateTime &date, bool important) { auto mail = Mail::createEntity("sink.dummy.instance1"); mail.setExtractedMessageId(messageid); mail.setFolder(folder); mail.setExtractedDate(date); mail.setImportant(important); return mail; }; VERIFYEXEC(Sink::Store::create(createMail("mail1", folder1, now, false))); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); Query query; query.setFlags(Query::LiveQuery); Sink::ResourceContext resourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}; Sink::Log::Context logCtx; auto runner = new QueryRunner(query, resourceContext, ApplicationDomain::getTypeName(), logCtx); runner->delayNextQuery(); auto emitter = runner->emitter(); QList added; emitter->onAdded([&](Mail::Ptr mail) { added << mail; }); emitter->fetch(); VERIFYEXEC(Sink::Store::create(createMail("mail2", folder1, now, false))); QTRY_COMPARE(added.size(), 2); runner->delayNextQuery(); VERIFYEXEC(Sink::Store::create(createMail("mail3", folder1, now, false))); //The second revision update is supposed to come in while the initial revision update is still in the query. //So wait a bit to make sure the query is currently runnning. QTest::qWait(500); VERIFYEXEC(Sink::Store::create(createMail("mail4", folder1, now, false))); QTRY_COMPARE(added.size(), 4); } /* * This test is here to ensure we don't crash if we call removeFromDisk with a running query. */ void testRemoveFromDiskWithRunningQuery() { // FIXME: we currently crash QSKIP("Skipping because this produces a crash."); { // Setup Folder::Ptr folderEntity; const auto date = QDateTime(QDate(2015, 7, 7), QTime(12, 0)); { Folder folder("sink.dummy.instance1"); Sink::Store::create(folder).exec().waitForFinished(); Sink::Query query; query.resourceFilter("sink.dummy.instance1"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); folderEntity = model->index(0, 0).data(Sink::Store::DomainObjectRole).value(); QVERIFY(!folderEntity->identifier().isEmpty()); //Add enough data so the query takes long enough that we remove the data from disk whlie the query is ongoing. for (int i = 0; i < 100; i++) { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test" + QByteArray::number(i)); mail.setFolder(folderEntity->identifier()); mail.setExtractedDate(date.addDays(i)); Sink::Store::create(mail).exec().waitForFinished(); } } // Test Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(*folderEntity); query.sort(); query.setFlags(Query::LiveQuery); query.reduce(Query::Reduce::Selector::max()) .count("count") .collect("unreadCollected") .collect("importantCollected"); // Ensure all local data is processed VERIFYEXEC(Sink::ResourceControl::flushMessageQueue(QByteArrayList() << "sink.dummy.instance1")); auto model = Sink::Store::loadModel(query); } //FIXME: this will result in a crash in the above still running query. VERIFYEXEC(Sink::Store::removeDataFromDisk(QByteArray("sink.dummy.instance1"))); } void testMailFulltextSubject() { // Setup { auto msg = KMime::Message::Ptr::create(); msg->subject()->from7BitString("Subject To Search"); msg->setBody("This is the searchable body."); msg->from()->from7BitString("\"The Sender\""); msg->assemble(); { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test1"); mail.setFolder("folder1"); mail.setMimeMessage(msg->encodedContent()); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("test2"); mail.setFolder("folder2"); mail.setExtractedSubject("Stuff"); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); } // Test { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("Subject To Search"), QueryBase::Comparator::Fulltext)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("Subject"), QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Case-insensitive { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("Search"), QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Case-insensitive { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("search"), QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Wildcard match { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("sear*"), QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by body { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("searchable"), QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by folder { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("Subject"), QueryBase::Comparator::Fulltext)); query.filter("folder1"); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by folder { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QString("Subject"), QueryBase::Comparator::Fulltext)); query.filter("folder2"); QCOMPARE(Sink::Store::read(query).size(), 0); } //Filter by sender { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter({}, Sink::QueryBase::Comparator(QString("sender"), Sink::QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by sender { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter({}, Sink::QueryBase::Comparator(QString("Sender"), Sink::QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by sender { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter({}, Sink::QueryBase::Comparator(QString("sender@example"), Sink::QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } //Filter by sender { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter({}, Sink::QueryBase::Comparator(QString("The Sender"), Sink::QueryBase::Comparator::Fulltext)); QCOMPARE(Sink::Store::read(query).size(), 1); } } void mailsWithDates() { { Mail mail("sink.dummy.instance1"); mail.setExtractedDate(QDateTime::fromString("2018-05-23T13:49:41Z", Qt::ISODate)); mail.setExtractedMessageId("message1"); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedDate(QDateTime::fromString("2018-05-23T13:50:00Z", Qt::ISODate)); mail.setExtractedMessageId("message2"); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedDate(QDateTime::fromString("2018-05-27T13:50:00Z", Qt::ISODate)); mail.setExtractedMessageId("message3"); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedMessageId("message4"); VERIFYEXEC(Sink::Store::create(mail)); } { Mail mail("sink.dummy.instance1"); mail.setExtractedDate(QDateTime::fromString("2078-05-23T13:49:41Z", Qt::ISODate)); mail.setExtractedMessageId("message5"); VERIFYEXEC(Sink::Store::create(mail)); } VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); } void testMailDate() { mailsWithDates(); { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QDateTime::fromString("2018-05-23T13:49:41Z", Qt::ISODate)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QDateTime::fromString("2018-05-27T13:49:41Z", Qt::ISODate)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 0); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QDateTime::fromString("2018-05-27T13:50:00Z", Qt::ISODate)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } } void testMailRange() { mailsWithDates(); { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QVariantList{QDateTime::fromString("2018-05-23T13:49:41Z", Qt::ISODate), QDateTime::fromString("2018-05-23T13:49:41Z", Qt::ISODate)}, QueryBase::Comparator::Within)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 1); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QVariantList{QDateTime::fromString("2018-05-22T13:49:41Z", Qt::ISODate), QDateTime::fromString("2018-05-25T13:49:41Z", Qt::ISODate)}, QueryBase::Comparator::Within)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 2); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QVariantList{QDateTime::fromString("2018-05-22T13:49:41Z", Qt::ISODate), QDateTime::fromString("2018-05-30T13:49:41Z", Qt::ISODate)}, QueryBase::Comparator::Within)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 3); } { Sink::Query query; query.resourceFilter("sink.dummy.instance1"); query.filter(QueryBase::Comparator(QVariantList{QDateTime::fromString("2018-05-22T13:49:41Z", Qt::ISODate), QDateTime::fromString("2118-05-30T13:49:41Z", Qt::ISODate)}, QueryBase::Comparator::Within)); auto model = Sink::Store::loadModel(query); QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); QCOMPARE(model->rowCount(), 4); } } + + void eventsWithDates() + { + { + Event event("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-23T12:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-23T13:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + } + { + Event event("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-23T13:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-23T14:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + } + { + Event event("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-23T14:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-23T15:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + } + { + Event event("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-23T12:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-23T14:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + } + { + Event event("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-24T12:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-24T14:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + } + { + Event event("sink.dummy.instance1"); + VERIFYEXEC(Sink::Store::create(event)); + } + + VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); + } + + void testOverlap() + { + eventsWithDates(); + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-22T12:00:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-30T13:00:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 5); + } + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-22T12:30:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-22T12:31:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 0); + } + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-24T10:00:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-24T11:00:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 0); + } + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-23T12:30:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-23T12:31:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 2); + } + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-22T12:30:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-23T12:00:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 2); + } + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-23T14:30:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-23T16:00:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 1); + } + + } + + void testOverlapLive() + { + eventsWithDates(); + + { + Sink::Query query; + query.resourceFilter("sink.dummy.instance1"); + query.setFlags(Query::LiveQuery); + query.filter(QueryBase::Comparator( + QVariantList{ QDateTime::fromString("2018-05-22T12:00:00Z", Qt::ISODate), + QDateTime::fromString("2018-05-30T13:00:00Z", Qt::ISODate) }, + QueryBase::Comparator::Overlap)); + auto model = Sink::Store::loadModel(query); + QTRY_VERIFY(model->data(QModelIndex(), Sink::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(), 5); + + Event event = Event::createEntity("sink.dummy.instance1"); + event.setExtractedStartTime(QDateTime::fromString("2018-05-23T12:00:00Z", Qt::ISODate)); + event.setExtractedEndTime(QDateTime::fromString("2018-05-23T13:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event)); + + Event event2 = Event::createEntity("sink.dummy.instance1"); + event2.setExtractedStartTime(QDateTime::fromString("2018-05-33T12:00:00Z", Qt::ISODate)); + event2.setExtractedEndTime(QDateTime::fromString("2018-05-33T13:00:00Z", Qt::ISODate)); + VERIFYEXEC(Sink::Store::create(event2)); + + QTest::qWait(500); + QCOMPARE(model->rowCount(), 6); + + VERIFYEXEC(Sink::Store::remove(event)); + VERIFYEXEC(Sink::Store::remove(event2)); + + QTest::qWait(500); + QCOMPARE(model->rowCount(), 5); + } + + } + }; QTEST_MAIN(QueryTest) #include "querytest.moc"