diff --git a/CMakeLists.txt b/CMakeLists.txt index 035108d3..d5e1a2a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,160 +1,160 @@ cmake_minimum_required(VERSION 3.0) cmake_policy(SET CMP0048 NEW) cmake_policy(SET CMP0028 NEW) cmake_policy(SET CMP0063 NEW) project(sink VERSION 0.8) option(BUILD_MAILDIR "BUILD_MAILDIR" ON) option(BUILD_DAV "BUILD_DAV" ON) option(CATCH_ERRORS "CATCH_ERRORS" OFF) option(ENABLE_MEMCHECK "Build valgrind tests" OFF) option(ENABLE_ASAN "Enable the address sanitizer" OFF) option(ENABLE_TSAN "Enable the thread sanitizer" OFF) # ECM setup find_package(ECM 1.0.0 REQUIRED NO_MODULE) set(CMAKE_MODULE_PATH ${ECM_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules ${CMAKE_CURRENT_SOURCE_DIR}/tests ${CMAKE_MODULE_PATH}) include(FeatureSummary) include(GenerateExportHeader) include(CMakePackageConfigHelpers) include(ECMSetupVersion) include(KDEInstallDirs) #Avoid building appstreamtest set(KDE_SKIP_TEST_SETTINGS true) #Pick up rpath settings include(KDECMakeSettings NO_POLICY_SCOPE) #We only have console applications here set(CMAKE_MACOSX_BUNDLE OFF) set(CMAKE_WIN32_EXECUTABLE OFF) ecm_setup_version(PROJECT SOVERSION sink_VERSION_MAJOR VERSION_HEADER sink_version.h ) find_package(Qt5 COMPONENTS REQUIRED Core Concurrent Network Gui Test) find_package(KF5 COMPONENTS REQUIRED Mime Contacts CalendarCore) find_package(FlatBuffers REQUIRED 1.4.0) find_package(KAsync REQUIRED 0.1.2) find_package(LMDB REQUIRED 0.9) find_package(Xapian REQUIRED 1.4) if (${ENABLE_MEMCHECK}) message("Enabled memcheck") find_program(MEMORYCHECK_COMMAND valgrind) if(NOT MEMORYCHECK_COMMAND) message(FATAL_ERROR "valgrind not found!") endif() set(MEMORYCHECK_COMMAND_OPTIONS "--trace-children=yes --leak-check=full") endif() if (${ENABLE_ASAN}) message("Enabled ASAN") set(SINK_ASAN_FLAG "-fsanitize=address -fPIE -fno-omit-frame-pointer -O1 ") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SINK_ASAN_FLAG}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SINK_ASAN_FLAG}") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${SINK_ASAN_FLAG}") set(CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} ${SINK_ASAN_FLAG}") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${SINK_ASAN_FLAG}") endif() if (${ENABLE_TSAN}) message("Enabled TSAN") set(SINK_TSAN_FLAG "-fsanitize=thread -fPIE -fno-omit-frame-pointer -O1 ") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SINK_TSAN_FLAG}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SINK_TSAN_FLAG}") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${SINK_TSAN_FLAG}") set(CMAKE_MODULE_LINKER_FLAGS "${CMAKE_MODULE_LINKER_FLAGS} ${SINK_TSAN_FLAG}") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${SINK_TSAN_FLAG}") endif() #Clang-format support add_custom_command( OUTPUT format.dummy WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} COMMAND clang-format -i ${CMAKE_SOURCE_DIR}/**.{cpp,h} ) add_custom_target(format DEPENDS format.dummy) function(generate_flatbuffers _target) foreach(fbs ${ARGN}) #Necessary because we can get relative paths as name, e.g. commands/create_entity get_filename_component(filename ${fbs} NAME) #We first generate into a temporary directory to avoid changing the timestamp of the actual dependency unnecessarily. #Otherwise we'd end up unnecessarily rebuilding the target. add_custom_command( OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${fbs}_generated.h COMMAND ${FLATBUFFERS_FLATC_EXECUTABLE} -c -b -o ${CMAKE_CURRENT_BINARY_DIR}/flatbufferstmp ${CMAKE_CURRENT_SOURCE_DIR}/${fbs}.fbs COMMAND ${CMAKE_COMMAND} -E copy_if_different ${CMAKE_CURRENT_BINARY_DIR}/flatbufferstmp/${filename}_generated.h ${CMAKE_CURRENT_BINARY_DIR}/${filename}_generated.h DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${fbs}.fbs ) target_sources(${_target} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/${fbs}_generated.h) set_property(SOURCE ${fbs}_generated.h PROPERTY SKIP_AUTOMOC ON) endforeach(fbs) endfunction(generate_flatbuffers) #Clang-analyze support add_custom_target(analyze) function(add_clang_static_analysis target) get_target_property(SRCs ${target} SOURCES) get_target_property(INCLUDEs ${target} INCLUDE_DIRECTORIES) add_library(${target}_analyze OBJECT EXCLUDE_FROM_ALL ${SRCs}) set_target_properties(${target}_analyze PROPERTIES COMPILE_OPTIONS "--analyze" EXCLUDE_FROM_DEFAULT_BUILD true INCLUDE_DIRECTORIES "${INCLUDEs};${KDE_INSTALL_FULL_INCLUDEDIR}/KF5/" # Had to hardcode include directory to find KAsync includes #COMPILE_FLAGS is deprecated, but the only way that -Xanalyzer isn't erronously deduplicated COMPILE_FLAGS "-Xanalyzer -analyzer-eagerly-assume -Xanalyzer -analyzer-opt-analyze-nested-blocks" ) target_compile_options(${target}_analyze PRIVATE ${Qt5Core_EXECUTABLE_COMPILE_FLAGS})# Necessary to get options such as fPIC add_dependencies(analyze ${target}_analyze) endfunction() set(CMAKE_AUTOMOC ON) if (${CATCH_ERRORS}) - add_definitions("-Werror -Wall -Weverything -Wno-unused-function -Wno-cast-align -Wno-used-but-marked-unused -Wno-shadow -Wno-weak-vtables -Wno-global-constructors -Wno-deprecated -Wno-weak-template-vtables -Wno-exit-time-destructors -Wno-covered-switch-default -Wno-shorten-64-to-32 -Wno-documentation -Wno-old-style-cast -Wno-extra-semi -Wno-unused-parameter -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-noreturn -Wno-missing-prototypes -Wno-documentation-unknown-command -Wno-sign-conversion -Wno-gnu-zero-variadic-macro-arguments -Wno-disabled-macro-expansion -Wno-vla-extension -Wno-vla -Wno-undefined-func-template -Wno-#warnings -Wno-unused-template -Wno-inconsistent-missing-destructor-override -Wno-zero-as-null-pointer-constant -Wno-unused-lambda-capture -Wno-switch-enum -Wno-redundant-parens") + add_definitions("-Werror -Wall -Weverything -Wno-unused-function -Wno-cast-align -Wno-used-but-marked-unused -Wno-shadow -Wno-weak-vtables -Wno-global-constructors -Wno-deprecated -Wno-weak-template-vtables -Wno-exit-time-destructors -Wno-covered-switch-default -Wno-shorten-64-to-32 -Wno-documentation -Wno-old-style-cast -Wno-extra-semi -Wno-unused-parameter -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-noreturn -Wno-missing-prototypes -Wno-documentation-unknown-command -Wno-sign-conversion -Wno-gnu-zero-variadic-macro-arguments -Wno-disabled-macro-expansion -Wno-vla-extension -Wno-vla -Wno-undefined-func-template -Wno-#warnings -Wno-unused-template -Wno-inconsistent-missing-destructor-override -Wno-zero-as-null-pointer-constant -Wno-unused-lambda-capture -Wno-switch-enum -Wno-redundant-parens -Wno-extra-semi-stmt") endif() set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) # Workaround for older cmake versions if (MSVC) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /std:c++17") endif() include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${FLATBUFFERS_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/common 3rdparty) include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/common ${CMAKE_SOURCE_DIR}/common/domain) configure_file(hawd.conf hawd.conf) enable_testing() set(SINK_RESOURCE_PLUGINS_PATH ${QT_PLUGIN_INSTALL_DIR}/sink/resources) # common, eventually a lib but right now just the command buffers add_subdirectory(common) # the synchronizer add_subdirectory(synchronizer) # example implementations add_subdirectory(examples) # some tests add_subdirectory(tests) # cli add_subdirectory(sinksh) feature_summary(WHAT ALL FATAL_ON_MISSING_REQUIRED_PACKAGES) diff --git a/common/domainadaptor.h b/common/domainadaptor.h index a5a0ade5..dcacf401 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -1,228 +1,228 @@ /* * Copyright (C) 2014 Christian Mollekopf * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #pragma once #include #include #include #include "domaintypeadaptorfactoryinterface.h" #include "domain/applicationdomaintype.h" #include "domain/typeimplementations.h" #include "bufferadaptor.h" #include "entity_generated.h" #include "entitybuffer.h" #include "propertymapper.h" #include "log.h" /** * Create a buffer from a domain object using the provided mappings */ template flatbuffers::Offset createBufferPart(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, const PropertyMapper &mapper) { // First create a primitives such as strings using the mappings QList> propertiesToAddToResource; for (const auto &property : domainObject.changedProperties()) { // SinkTrace() << "copying property " << property; const auto value = domainObject.getProperty(property); if (mapper.hasMapping(property)) { mapper.setProperty(property, domainObject.getProperty(property), propertiesToAddToResource, fbb); } else { // SinkTrace() << "no mapping for property available " << property; } } // Then create all porperties using the above generated builderCalls Builder builder(fbb); for (auto propertyBuilder : propertiesToAddToResource) { propertyBuilder(&builder); } return builder.Finish(); } /** * Create the buffer and finish the FlatBufferBuilder. * * After this the buffer can be extracted from the FlatBufferBuilder object. */ template static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, PropertyMapper &mapper) { auto pos = createBufferPart(domainObject, fbb, mapper); // Because we cannot template the following call // Sink::ApplicationDomain::Buffer::FinishEventBuffer(fbb, pos); // FIXME: This means all buffers in here must have the AKFB identifier fbb.Finish(pos, "AKFB"); flatbuffers::Verifier verifier(fbb.GetBufferPointer(), fbb.GetSize()); if (!verifier.VerifyBuffer(nullptr)) { SinkWarning_(0, "bufferadaptor") << "Created invalid uffer"; } } class IndexPropertyMapper { public: typedef std::function Accessor; virtual ~IndexPropertyMapper(){}; virtual QVariant getProperty(const QByteArray &key, TypeIndex &index, const Sink::ApplicationDomain::BufferAdaptor &adaptor) const { auto accessor = mReadAccessors.value(key); Q_ASSERT(accessor); if (!accessor) { return QVariant(); } return accessor(index, adaptor); } bool hasMapping(const QByteArray &key) const { return mReadAccessors.contains(key); } QList availableProperties() const { return mReadAccessors.keys(); } template void addIndexLookupProperty(const Accessor &accessor) { mReadAccessors.insert(Property::name, accessor); } private: QHash mReadAccessors; }; /** * A generic adaptor implementation that uses a property mapper to read/write values. */ class DatastoreBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor { public: DatastoreBufferAdaptor() : BufferAdaptor() { } virtual void setProperty(const QByteArray &key, const QVariant &value) Q_DECL_OVERRIDE { SinkWarning() << "Can't set property " << key; Q_ASSERT(false); } virtual QVariant getProperty(const QByteArray &key) const Q_DECL_OVERRIDE { if (mLocalBuffer && mLocalMapper->hasMapping(key)) { return mLocalMapper->getProperty(key, mLocalBuffer); } else if (mIndex && mIndexMapper->hasMapping(key)) { return mIndexMapper->getProperty(key, *mIndex, *this); } return QVariant(); } /** * Returns all available properties for which a mapping exists (no matter what the buffer contains) */ virtual QList availableProperties() const Q_DECL_OVERRIDE { return mLocalMapper->availableProperties() + mIndexMapper->availableProperties(); } void const *mLocalBuffer; QSharedPointer mLocalMapper; QSharedPointer mIndexMapper; TypeIndex *mIndex; }; /** * The factory should define how to go from an entitybuffer (local buffer), to a domain type adapter. * It defines how values are split accross local and resource buffer. * This is required by the facade the read the value, and by the pipeline preprocessors to access the domain values in a generic way. */ template class DomainTypeAdaptorFactory : public DomainTypeAdaptorFactoryInterface { typedef typename Sink::ApplicationDomain::TypeImplementation::Buffer LocalBuffer; typedef typename Sink::ApplicationDomain::TypeImplementation::BufferBuilder LocalBuilder; public: DomainTypeAdaptorFactory() : mPropertyMapper(QSharedPointer::create()), mIndexMapper(QSharedPointer::create()) { Sink::ApplicationDomain::TypeImplementation::configure(*mPropertyMapper); Sink::ApplicationDomain::TypeImplementation::configure(*mIndexMapper); } virtual ~DomainTypeAdaptorFactory(){}; /** * Creates an adaptor for the given domain types. * * This returns by default a DatastoreBufferAdaptor initialized with the corresponding property mappers. */ virtual QSharedPointer createAdaptor(const Sink::Entity &entity, TypeIndex *index = nullptr) Q_DECL_OVERRIDE { auto adaptor = QSharedPointer::create(); adaptor->mLocalBuffer = Sink::EntityBuffer::readBuffer(entity.local()); adaptor->mLocalMapper = mPropertyMapper; adaptor->mIndexMapper = mIndexMapper; adaptor->mIndex = index; - return adaptor; + return std::move(adaptor); } virtual bool createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE { flatbuffers::FlatBufferBuilder localFbb; createBufferPartBuffer(domainObject, localFbb, *mPropertyMapper); Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, 0, 0, localFbb.GetBufferPointer(), localFbb.GetSize()); return true; } virtual bool createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = nullptr, size_t metadataSize = 0) Q_DECL_OVERRIDE { //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor); //Serialize all properties newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet()); return createBuffer(newObject, fbb, metadataData, metadataSize); } protected: QSharedPointer mPropertyMapper; QSharedPointer mIndexMapper; }; /** * A default adaptorfactory implemenation that simply instantiates a generic resource */ template class DefaultAdaptorFactory : public DomainTypeAdaptorFactory { public: DefaultAdaptorFactory() : DomainTypeAdaptorFactory() {} virtual ~DefaultAdaptorFactory(){} }; diff --git a/common/log.cpp b/common/log.cpp index 7450f3fb..40fdb365 100644 --- a/common/log.cpp +++ b/common/log.cpp @@ -1,470 +1,470 @@ #include "log.h" #include #include #include #include #include #include #include #include #include #ifdef Q_OS_WIN #include #include #include #else #include #endif #include #include #include #include using namespace Sink::Log; const char *getComponentName() { return nullptr; } static QThreadStorage> sSettings; static QSettings &config() { if (!sSettings.hasLocalData()) { sSettings.setLocalData(QSharedPointer::create(Sink::configLocation() + "/log.ini", QSettings::IniFormat)); } return *sSettings.localData(); } Q_GLOBAL_STATIC(QByteArray, sPrimaryComponent); void Sink::Log::setPrimaryComponent(const QString &component) { if (!sPrimaryComponent.isDestroyed()) { *sPrimaryComponent = component.toUtf8(); } } class DebugStream : public QIODevice { public: QString m_location; DebugStream() : QIODevice() { open(WriteOnly); } virtual ~DebugStream(); bool isSequential() const { return true; } qint64 readData(char *, qint64) { return 0; /* eof */ } qint64 readLineData(char *, qint64) { return 0; /* eof */ } qint64 writeData(const char *data, qint64 len) { #ifdef Q_OS_WIN const auto string = QString::fromUtf8(data, len); OutputDebugStringW(reinterpret_cast(string.utf16())); #else std::cout << data << std::endl; #endif return len; } private: Q_DISABLE_COPY(DebugStream) }; // Virtual method anchor DebugStream::~DebugStream() { } class NullStream : public QIODevice { public: NullStream() : QIODevice() { open(WriteOnly); } virtual ~NullStream(); bool isSequential() const { return true; } qint64 readData(char *, qint64) { return 0; /* eof */ } qint64 readLineData(char *, qint64) { return 0; /* eof */ } qint64 writeData(const char *data, qint64 len) { return len; } private: Q_DISABLE_COPY(NullStream) }; // Virtual method anchor NullStream::~NullStream() { } /* * ANSI color codes: * 0: reset colors/style * 1: bold * 4: underline * 30 - 37: black, red, green, yellow, blue, magenta, cyan, and white text * 40 - 47: black, red, green, yellow, blue, magenta, cyan, and white background */ enum ANSI_Colors { DoNothing = -1, Reset = 0, Bold = 1, Underline = 4, Black = 30, Red = 31, Green = 32, Yellow = 33, Blue = 34 }; static QString colorCommand(int colorCode) { return QString("\x1b[%1m").arg(colorCode); } static QString colorCommand(QList colorCodes) { colorCodes.removeAll(ANSI_Colors::DoNothing); if (colorCodes.isEmpty()) { return QString(); } QString string("\x1b["); for (int code : colorCodes) { string += QString("%1;").arg(code); } string.chop(1); string += "m"; return string; } QByteArray Sink::Log::debugLevelName(DebugLevel debugLevel) { switch (debugLevel) { case DebugLevel::Trace: return "Trace"; case DebugLevel::Log: return "Log"; case DebugLevel::Warning: return "Warning"; case DebugLevel::Error: return "Error"; default: break; - }; + } Q_ASSERT(false); return QByteArray(); } DebugLevel Sink::Log::debugLevelFromName(const QByteArray &name) { const QByteArray lowercaseName = name.toLower(); if (lowercaseName == "trace") return DebugLevel::Trace; if (lowercaseName == "log") return DebugLevel::Log; if (lowercaseName == "warning") return DebugLevel::Warning; if (lowercaseName == "error") return DebugLevel::Error; return DebugLevel::Log; } void Sink::Log::setDebugOutputLevel(DebugLevel debugLevel) { config().setValue("level", debugLevel); } Sink::Log::DebugLevel Sink::Log::debugOutputLevel() { return static_cast(config().value("level", Sink::Log::Log).toInt()); } void Sink::Log::setDebugOutputFilter(FilterType type, const QByteArrayList &filter) { switch (type) { case ApplicationName: config().setValue("applicationfilter", QVariant::fromValue(filter)); break; case Area: config().setValue("areafilter", QVariant::fromValue(filter)); break; } } QByteArrayList Sink::Log::debugOutputFilter(FilterType type) { switch (type) { case ApplicationName: return config().value("applicationfilter").value(); case Area: return config().value("areafilter").value(); default: return QByteArrayList(); } } void Sink::Log::setDebugOutputFields(const QByteArrayList &output) { config().setValue("outputfields", QVariant::fromValue(output)); } QByteArrayList Sink::Log::debugOutputFields() { return config().value("outputfields").value(); } static QByteArray getProgramName() { if (QCoreApplication::instance()) { return QCoreApplication::instance()->applicationName().toLocal8Bit(); } else { return ""; } } static QSharedPointer debugAreasConfig() { return QSharedPointer::create(Sink::dataLocation() + "/debugAreas.ini", QSettings::IniFormat); } class DebugAreaCollector { public: DebugAreaCollector() { //This call can potentially print a log message (if we fail to remove the qsettings lockfile), which would result in a deadlock if we locked over all of it. const auto areas = debugAreasConfig()->value("areas").value().split(';').toSet(); { QMutexLocker locker(&mutex); mDebugAreas = areas; } } ~DebugAreaCollector() { //This call can potentially print a log message (if we fail to remove the qsettings lockfile), which would result in a deadlock if we locked over all of it. const auto areas = debugAreasConfig()->value("areas").value().split(';').toSet(); { QMutexLocker locker(&mutex); mDebugAreas += areas; } debugAreasConfig()->setValue("areas", QVariant::fromValue(mDebugAreas.toList().join(';'))); } void add(const QString &area) { QMutexLocker locker(&mutex); mDebugAreas << area; } QSet debugAreas() { QMutexLocker locker(&mutex); return mDebugAreas; } QMutex mutex; QSet mDebugAreas; }; Q_GLOBAL_STATIC(DebugAreaCollector, sDebugAreaCollector); QSet Sink::Log::debugAreas() { if (!sDebugAreaCollector.isDestroyed()) { return sDebugAreaCollector->debugAreas(); } return {}; } static void collectDebugArea(const QString &debugArea) { if (!sDebugAreaCollector.isDestroyed()) { sDebugAreaCollector->add(debugArea); } } static bool containsItemStartingWith(const QByteArray &pattern, const QByteArrayList &list) { for (const auto &item : list) { int start = 0; int end = item.size(); if (item.startsWith('*')) { start++; } if (item.endsWith('*')) { end--; } if (pattern.contains(item.mid(start, end - start))) { return true; } } return false; } static bool caseInsensitiveContains(const QByteArray &pattern, const QByteArrayList &list) { for (const auto &item : list) { if (item.toLower() == pattern) { return true; } } return false; } static QByteArray getFileName(const char *file) { static char sep = QDir::separator().toLatin1(); auto filename = QByteArray(file).split(sep).last(); return filename.split('.').first(); } static QString assembleDebugArea(const char *debugArea, const char *debugComponent, const char *file) { if (!sPrimaryComponent.isDestroyed() && sPrimaryComponent->isEmpty()) { *sPrimaryComponent = getProgramName(); } if (!sPrimaryComponent.isDestroyed()) { //Using stringbuilder for fewer allocations return QLatin1String{*sPrimaryComponent} % QLatin1String{"."} % (debugComponent ? (QLatin1String{debugComponent} + QLatin1String{"."}) : QLatin1String{""}) % (debugArea ? QLatin1String{debugArea} : QLatin1String{getFileName(file)}); } else { return {}; } } static bool isFiltered(DebugLevel debugLevel, const QByteArray &fullDebugArea) { if (debugLevel < debugOutputLevel()) { return true; } const auto areas = debugOutputFilter(Sink::Log::Area); if ((debugLevel <= Sink::Log::Trace) && !areas.isEmpty()) { if (!containsItemStartingWith(fullDebugArea, areas)) { return true; } } return false; } bool Sink::Log::isFiltered(DebugLevel debugLevel, const char *debugArea, const char *debugComponent, const char *file) { //Avoid assembleDebugArea if we can, because it's fairly expensive. if (debugLevel < debugOutputLevel()) { return true; } return isFiltered(debugLevel, assembleDebugArea(debugArea, debugComponent, file).toLatin1()); } Q_GLOBAL_STATIC(NullStream, sNullStream); Q_GLOBAL_STATIC(DebugStream, sDebugStream); QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea, const char *debugComponent) { const auto fullDebugArea = assembleDebugArea(debugArea, debugComponent, file); collectDebugArea(fullDebugArea); if (isFiltered(debugLevel, fullDebugArea.toLatin1())) { if (!sNullStream.isDestroyed()) { return QDebug(sNullStream); } return QDebug{QtDebugMsg}; } QString prefix; int prefixColorCode = ANSI_Colors::DoNothing; switch (debugLevel) { case DebugLevel::Trace: prefix = "Trace: "; break; case DebugLevel::Log: prefix = "Log: "; prefixColorCode = ANSI_Colors::Green; break; case DebugLevel::Warning: prefix = "Warning:"; prefixColorCode = ANSI_Colors::Red; break; case DebugLevel::Error: prefix = "Error: "; prefixColorCode = ANSI_Colors::Red; break; - }; + } auto debugOutput = debugOutputFields(); bool showLocation = debugOutput.isEmpty() ? false : caseInsensitiveContains("location", debugOutput); bool showFunction = debugOutput.isEmpty() ? false : caseInsensitiveContains("function", debugOutput); bool showProgram = debugOutput.isEmpty() ? false : caseInsensitiveContains("application", debugOutput); #ifdef Q_OS_WIN bool useColor = false; #else bool useColor = true; #endif bool multiline = false; const QString resetColor = colorCommand(ANSI_Colors::Reset); QString output; if (useColor) { output += colorCommand(QList() << ANSI_Colors::Bold << prefixColorCode); } output += prefix; if (useColor) { output += resetColor; } if (showProgram) { int width = 10; output += QString(" %1(%2)").arg(QString::fromLatin1(getProgramName()).leftJustified(width, ' ', true)).arg(unsigned(getpid())).rightJustified(width + 8, ' '); } if (useColor) { output += colorCommand(QList() << ANSI_Colors::Bold << prefixColorCode); } static std::atomic maxDebugAreaSize{25}; maxDebugAreaSize = qMax(fullDebugArea.size(), maxDebugAreaSize.load()); output += QString(" %1 ").arg(fullDebugArea.leftJustified(maxDebugAreaSize, ' ', false)); if (useColor) { output += resetColor; } if (showFunction) { output += QString(" %3").arg(fullDebugArea.leftJustified(25, ' ', true)); } if (showLocation) { const auto filename = QString::fromLatin1(file).split('/').last(); output += QString(" %1:%2").arg(filename.right(25)).arg(QString::number(line).leftJustified(4, ' ')).leftJustified(30, ' ', true); } if (multiline) { output += "\n "; } output += ":"; if (sDebugStream.isDestroyed()) { return QDebug{QtDebugMsg}; } QDebug debug(sDebugStream); debug.noquote().nospace() << output; return debug.space().quote(); } diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 57f5ce84..69523ea2 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -1,440 +1,440 @@ /* * Copyright (C) 2014 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "modelresult.h" #include #include #include #include "log.h" #include "notifier.h" #include "notification.h" using namespace Sink; static uint getInternalIdentifer(const QByteArray &resourceId, const QByteArray &entityId) { return qHash(resourceId + entityId); } static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) { Q_ASSERT(!type.identifier().isEmpty()); return getInternalIdentifer(type.resourceInstanceIdentifier(), type.identifier()); } static qint64 getIdentifier(const QModelIndex &idx) { if (!idx.isValid()) { return 0; } return idx.internalId(); } template ModelResult::ModelResult(const Sink::Query &query, const QList &propertyColumns, const Sink::Log::Context &ctx) : QAbstractItemModel(), mLogCtx(ctx.subContext("modelresult")), mPropertyColumns(propertyColumns), mQuery(query) { if (query.flags().testFlag(Sink::Query::UpdateStatus)) { Sink::Query resourceQuery; resourceQuery.setFilter(query.getResourceFilter()); mNotifier.reset(new Sink::Notifier{resourceQuery}); mNotifier->registerHandler([this](const Notification ¬ification) { switch (notification.type) { case Notification::Status: case Notification::Warning: case Notification::Error: case Notification::Info: case Notification::Progress: //These are the notifications we care about break; default: //We're not interested return; - }; + } if (notification.resource.isEmpty() || notification.entities.isEmpty()) { return; } QVector idList; for (const auto &entity : notification.entities) { auto id = getInternalIdentifer(notification.resource, entity); if (mEntities.contains(id)) { idList << id; } } if (idList.isEmpty()) { //We don't have this entity in our model return; } const int newStatus = [&] { if (notification.type == Notification::Warning || notification.type == Notification::Error) { return ApplicationDomain::SyncStatus::SyncError; } if (notification.type == Notification::Info) { switch (notification.code) { case ApplicationDomain::SyncInProgress: return ApplicationDomain::SyncInProgress; case ApplicationDomain::SyncSuccess: return ApplicationDomain::SyncSuccess; case ApplicationDomain::SyncError: return ApplicationDomain::SyncError; case ApplicationDomain::NoSyncStatus: break; } return ApplicationDomain::NoSyncStatus; } if (notification.type == Notification::Progress) { return ApplicationDomain::SyncStatus::SyncInProgress; } return ApplicationDomain::NoSyncStatus; }(); for (const auto id : idList) { const auto oldStatus = mEntityStatus.value(id); QVector changedRoles; if (oldStatus != newStatus) { SinkTraceCtx(mLogCtx) << "Status changed for entity:" << newStatus << ", id: " << id; mEntityStatus.insert(id, newStatus); changedRoles << StatusRole; } if (notification.type == Notification::Progress) { changedRoles << ProgressRole; } else if (notification.type == Notification::Warning || notification.type == Notification::Error) { changedRoles << WarningRole; } if (!changedRoles.isEmpty()) { const auto idx = createIndexFromId(id); SinkTraceCtx(mLogCtx) << "Index changed:" << idx << changedRoles; //We don't emit the changedRoles because the consuming model likely remaps the role anyways and would then need to translate dataChanged signals as well. emit dataChanged(idx, idx); } } }); } } template ModelResult::~ModelResult() { if (mEmitter) { mEmitter->waitForMethodExecutionEnd(); } } template qint64 ModelResult::parentId(const Ptr &value) { if (!mQuery.parentProperty().isEmpty()) { const auto identifier = value->getProperty(mQuery.parentProperty()).toByteArray(); if (!identifier.isEmpty()) { return getInternalIdentifer(value->resourceInstanceIdentifier(), identifier); } } return 0; } template int ModelResult::rowCount(const QModelIndex &parent) const { Q_ASSERT(QThread::currentThread() == this->thread()); return mTree[getIdentifier(parent)].size(); } template int ModelResult::columnCount(const QModelIndex &parent) const { Q_ASSERT(QThread::currentThread() == this->thread()); return mPropertyColumns.size(); } template QVariant ModelResult::headerData(int section, Qt::Orientation orientation, int role) const { if (role == Qt::DisplayRole) { if (section < mPropertyColumns.size()) { return mPropertyColumns.at(section); } } return QVariant(); } template QVariant ModelResult::data(const QModelIndex &index, int role) const { Q_ASSERT(QThread::currentThread() == this->thread()); if (role == DomainObjectRole && index.isValid()) { Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); } if (role == DomainObjectBaseRole && index.isValid()) { Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId()).template staticCast()); } if (role == ChildrenFetchedRole) { return childrenFetched(index); } if (role == StatusRole) { auto it = mEntityStatus.constFind(index.internalId()); if (it != mEntityStatus.constEnd()) { return *it; } return {}; } if (role == Qt::DisplayRole && index.isValid()) { if (index.column() < mPropertyColumns.size()) { Q_ASSERT(mEntities.contains(index.internalId())); auto entity = mEntities.value(index.internalId()); return entity->getProperty(mPropertyColumns.at(index.column())).toString(); } else { return "No data available"; } } return QVariant(); } template QModelIndex ModelResult::index(int row, int column, const QModelIndex &parent) const { Q_ASSERT(QThread::currentThread() == this->thread()); const auto id = getIdentifier(parent); const auto list = mTree.value(id); if (list.size() > row) { const auto childId = list.at(row); return createIndex(row, column, childId); } SinkWarningCtx(mLogCtx) << "Index not available " << row << column << parent; return QModelIndex(); } template QModelIndex ModelResult::createIndexFromId(const qint64 &id) const { Q_ASSERT(QThread::currentThread() == this->thread()); if (id == 0) { return QModelIndex(); } auto grandParentId = mParents.value(id, 0); auto row = mTree.value(grandParentId).indexOf(id); Q_ASSERT(row >= 0); return createIndex(row, 0, id); } template QModelIndex ModelResult::parent(const QModelIndex &index) const { auto id = getIdentifier(index); auto parentId = mParents.value(id); return createIndexFromId(parentId); } template bool ModelResult::hasChildren(const QModelIndex &parent) const { if (mQuery.parentProperty().isEmpty() && parent.isValid()) { return false; } return QAbstractItemModel::hasChildren(parent); } template bool ModelResult::canFetchMore(const QModelIndex &parent) const { //We fetch trees immediately so can never fetch more if (parent.isValid() || mFetchedAll) { return false; } return true; } template void ModelResult::fetchMore(const QModelIndex &parent) { SinkTraceCtx(mLogCtx) << "Fetching more: " << parent; Q_ASSERT(QThread::currentThread() == this->thread()); //We only suppor fetchMore for flat lists if (parent.isValid()) { return; } //There is already a fetch in progress, don't fetch again. if (mFetchInProgress) { SinkTraceCtx(mLogCtx) << "A fetch is already in progress."; return; } mFetchInProgress = true; mFetchComplete = false; SinkTraceCtx(mLogCtx) << "Fetching more."; if (loadEntities) { loadEntities(); } else { SinkWarningCtx(mLogCtx) << "No way to fetch entities"; } } template bool ModelResult::allParentsAvailable(qint64 id) const { auto p = id; while (p) { if (!mEntities.contains(p)) { return false; } p = mParents.value(p, 0); } return true; } template void ModelResult::add(const Ptr &value) { const auto childId = qHash(*value); const auto pId = parentId(value); if (mEntities.contains(childId)) { SinkWarningCtx(mLogCtx) << "Entity already in model: " << value->identifier(); return; } const auto keys = mTree[pId]; int idx = 0; for (; idx < keys.size(); idx++) { if (childId < keys.at(idx)) { break; } } bool parentIsVisible = allParentsAvailable(pId); // SinkTraceCtx(mLogCtx) << "Inserting rows " << index << parent; if (parentIsVisible) { auto parent = createIndexFromId(pId); beginInsertRows(parent, idx, idx); } mEntities.insert(childId, value); mTree[pId].insert(idx, childId); mParents.insert(childId, pId); if (parentIsVisible) { endInsertRows(); } // SinkTraceCtx(mLogCtx) << "Inserted rows " << mTree[id].size(); } template void ModelResult::remove(const Ptr &value) { auto childId = qHash(*value); if (!mEntities.contains(childId)) { return; } //The removed entity will have no properties, but we at least need the parent property. auto actualEntity = mEntities.value(childId); auto id = parentId(actualEntity); auto parent = createIndexFromId(id); SinkTraceCtx(mLogCtx) << "Removed entity" << childId; auto index = mTree[id].indexOf(childId); if (index >= 0) { beginRemoveRows(parent, index, index); mEntities.remove(childId); mTree[id].removeAll(childId); mParents.remove(childId); // TODO remove children endRemoveRows(); } } template void ModelResult::setFetcher(const std::function &fetcher) { SinkTraceCtx(mLogCtx) << "Setting fetcher"; loadEntities = fetcher; } template void ModelResult::setEmitter(const typename Sink::ResultEmitter::Ptr &emitter) { setFetcher([this]() { mEmitter->fetch(); }); QPointer guard(this); emitter->onAdded([this, guard](const Ptr &value) { SinkTraceCtx(mLogCtx) << "Received addition: " << value->identifier(); Q_ASSERT(guard); threadBoundary.callInMainThread([this, value, guard]() { Q_ASSERT(guard); add(value); }); }); emitter->onModified([this, guard](const Ptr &value) { SinkTraceCtx(mLogCtx) << "Received modification: " << value->identifier(); Q_ASSERT(guard); threadBoundary.callInMainThread([this, value]() { modify(value); }); }); emitter->onRemoved([this, guard](const Ptr &value) { SinkTraceCtx(mLogCtx) << "Received removal: " << value->identifier(); Q_ASSERT(guard); threadBoundary.callInMainThread([this, value]() { remove(value); }); }); emitter->onInitialResultSetComplete([this, guard](bool fetchedAll) { SinkTraceCtx(mLogCtx) << "Initial result set complete. Fetched all: " << fetchedAll; Q_ASSERT(guard); Q_ASSERT(QThread::currentThread() == this->thread()); mFetchInProgress = false; mFetchedAll = fetchedAll; mFetchComplete = true; emit dataChanged({}, {}, QVector() << ChildrenFetchedRole); }); mEmitter = emitter; } template bool ModelResult::childrenFetched(const QModelIndex &index) const { return mFetchComplete; } template void ModelResult::modify(const Ptr &value) { auto childId = qHash(*value); if (!mEntities.contains(childId)) { //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. SinkTraceCtx(mLogCtx) << "Tried to modify a value that is not yet part of the model"; add(value); return; } auto id = parentId(value); auto parent = createIndexFromId(id); SinkTraceCtx(mLogCtx) << "Modified entity:" << value->identifier() << ", id: " << childId; auto i = mTree[id].indexOf(childId); Q_ASSERT(i >= 0); mEntities.remove(childId); mEntities.insert(childId, value); // TODO check for change of parents auto idx = index(i, 0, parent); emit dataChanged(idx, idx); } #define REGISTER_TYPE(T) \ template class ModelResult; \ SINK_REGISTER_TYPES() diff --git a/common/store.cpp b/common/store.cpp index 84a52fed..b3a9888a 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -1,553 +1,553 @@ /* * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "store.h" #include #include #include #include #include "resourceaccess.h" #include "commands.h" #include "resourcefacade.h" #include "definitions.h" #include "resourceconfig.h" #include "facadefactory.h" #include "modelresult.h" #include "storage.h" #include "log.h" #include "utils.h" #define ASSERT_ENUMS_MATCH(A, B) Q_STATIC_ASSERT_X(static_cast(A) == static_cast(B), "The enum values must match"); //Ensure the copied enum matches typedef ModelResult MailModelResult; ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectBaseRole, MailModelResult::DomainObjectBaseRole) ASSERT_ENUMS_MATCH(Sink::Store::ChildrenFetchedRole, MailModelResult::ChildrenFetchedRole) ASSERT_ENUMS_MATCH(Sink::Store::DomainObjectRole, MailModelResult::DomainObjectRole) ASSERT_ENUMS_MATCH(Sink::Store::StatusRole, MailModelResult::StatusRole) ASSERT_ENUMS_MATCH(Sink::Store::WarningRole, MailModelResult::WarningRole) ASSERT_ENUMS_MATCH(Sink::Store::ProgressRole, MailModelResult::ProgressRole) Q_DECLARE_METATYPE(QSharedPointer>) Q_DECLARE_METATYPE(QSharedPointer); Q_DECLARE_METATYPE(std::shared_ptr); static bool sanityCheckQuery(const Sink::Query &query) { for (const auto &id : query.ids()) { if (id.isEmpty()) { SinkError() << "Empty id in query."; return false; } } return true; } static KAsync::Job forEachResource(const Sink::SyncScope &scope, std::function(const Sink::ApplicationDomain::SinkResource::Ptr &resource)> callback) { using namespace Sink; auto resourceFilter = scope.getResourceFilter(); //Filter resources by type by default if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name}) && !scope.type().isEmpty()) { resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{scope.type(), Query::Comparator::Contains}); } Sink::Query query; query.setFilter(resourceFilter); return Store::fetchAll(query) .template each(callback); } namespace Sink { QString Store::storageLocation() { return Sink::storageLocation(); } template KAsync::Job queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) { auto ctx = ctx_.subContext(resourceInstanceIdentifier); auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); if (facade) { SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; auto result = facade->load(query, ctx); if (result.second) { aggregatingEmitter->addEmitter(result.second); } else { SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; } return result.first; } else { SinkTraceCtx(ctx) << "Couldn' find a facade for " << resourceInstanceIdentifier; // Ignore the error and carry on return KAsync::null(); } } template QPair::Ptr, typename ResultEmitter::Ptr> getEmitter(Query query, const Log::Context &ctx) { query.setType(ApplicationDomain::getTypeName()); SinkTraceCtx(ctx) << "Query: " << query; // Query all resources and aggregate results auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { //For global types we don't need to query for the resources first. queryResource("", "", query, aggregatingEmitter, ctx).exec(); } else { auto resourceCtx = ctx.subContext("resourceQuery"); auto facade = FacadeFactory::instance().getFacade(); Q_ASSERT(facade); Sink::Query resourceQuery; resourceQuery.request(); if (query.liveQuery()) { SinkTraceCtx(ctx) << "Listening for new resources."; resourceQuery.setFlags(Query::LiveQuery); } //Filter resources by available content types (unless the query already specifies a capability filter) auto resourceFilter = query.getResourceFilter(); if (!resourceFilter.propertyFilter.contains({ApplicationDomain::SinkResource::Capabilities::name})) { resourceFilter.propertyFilter.insert({ApplicationDomain::SinkResource::Capabilities::name}, Query::Comparator{ApplicationDomain::getTypeName(), Query::Comparator::Contains}); } resourceQuery.setFilter(resourceFilter); for (auto const &properties : resourceFilter.propertyFilter.keys()) { resourceQuery.requestedProperties << properties; } auto result = facade->load(resourceQuery, resourceCtx); auto emitter = result.second; emitter->onAdded([=](const ApplicationDomain::SinkResource::Ptr &resource) { SinkTraceCtx(resourceCtx) << "Found new resources: " << resource->identifier(); const auto resourceType = ResourceConfig::getResourceType(resource->identifier()); Q_ASSERT(!resourceType.isEmpty()); queryResource(resourceType, resource->identifier(), query, aggregatingEmitter, ctx).exec(); }); emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { SinkTraceCtx(resourceCtx) << "Resource query complete"; }); return qMakePair(aggregatingEmitter, emitter); } return qMakePair(aggregatingEmitter, ResultEmitter::Ptr{}); } static Log::Context getQueryContext(const Sink::Query &query, const QByteArray &type) { if (!query.id().isEmpty()) { return Log::Context{"query." + type + "." + query.id()}; } return Log::Context{"query." + type}; } template QSharedPointer Store::loadModel(const Query &query) { Q_ASSERT(sanityCheckQuery(query)); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); auto model = QSharedPointer>::create(query, query.requestedProperties, ctx); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks //* The emitter needs to live or the duration of query (respectively, the model) //* The result provider needs to live for as long as results are provided (until the last thread exits). auto result = getEmitter(query, ctx); model->setEmitter(result.first); //Keep the emitter alive if (auto resourceEmitter = result.second) { model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries resourceEmitter->fetch(); } //Automatically populate the top-level model->fetchMore(QModelIndex()); - return model; + return std::move(model); } template static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName())) { if (auto facade = FacadeFactory::instance().getFacade()) { return facade; } } if (auto facade = FacadeFactory::instance().getFacade(ResourceConfig::getResourceType(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } return std::make_shared>(); } template KAsync::Job Store::create(const DomainType &domainObject) { SinkLog() << "Create: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); return facade->create(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create " << error; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->modify(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify " << error; }); }); } return facade->modify(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); } template KAsync::Job Store::modify(const Query &query, const DomainType &domainObject) { if (domainObject.changedProperties().isEmpty()) { SinkLog() << "Nothing to modify: " << domainObject.identifier(); return KAsync::null(); } SinkLog() << "Modify: " << query << domainObject; return fetchAll(query) .each([=] (const typename DomainType::Ptr &entity) { auto copy = *entity; for (const auto &p : domainObject.changedProperties()) { copy.setProperty(p, domainObject.getProperty(p)); } return modify(copy); }); } template KAsync::Job Store::move(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Move: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->move(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); }); } return facade->move(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to move " << error; }); } template KAsync::Job Store::copy(const DomainType &domainObject, const QByteArray &newResource) { SinkLog() << "Copy: " << domainObject << newResource; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->copy(object, newResource).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); }); } return facade->copy(domainObject, newResource).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to copy " << error; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { SinkLog() << "Remove: " << domainObject; auto facade = getFacade(domainObject.resourceInstanceIdentifier()); if (domainObject.isAggregate()) { return KAsync::value(domainObject.aggregatedIds()) .addToContext(std::shared_ptr(facade)) .each([=] (const QByteArray &id) { auto object = Sink::ApplicationDomain::ApplicationDomainType::createCopy(id, domainObject); return facade->remove(object).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); }); } return facade->remove(domainObject).addToContext(std::shared_ptr(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove " << error; }); } template KAsync::Job Store::remove(const Sink::Query &query) { SinkLog() << "Remove: " << query; return fetchAll(query) .each([] (const typename DomainType::Ptr &entity) { return remove(*entity); }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification from the resource Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); resourceAccess->open(); return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) .addToContext(resourceAccess) .then([resourceAccess](KAsync::Future &future) { if (resourceAccess->isReady()) { //Wait for the resource shutdown auto guard = new QObject; QObject::connect(resourceAccess.data(), &ResourceAccess::ready, guard, [&future, guard](bool ready) { if (!ready) { //We don't disconnect if ResourceAccess get's recycled, so ready can fire multiple times, which can result in a crash if the future is no longer valid. delete guard; future.setFinished(); } }); } else { future.setFinished(); } }) .then([time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } static KAsync::Job upgrade(const QByteArray &resource) { auto store = Sink::Storage::DataStore(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); if (!store.exists() || Storage::DataStore::databaseVersion(store.createTransaction(Storage::DataStore::ReadOnly)) == Sink::latestDatabaseVersion()) { return KAsync::value(Store::UpgradeResult{false}); } SinkLog() << "Upgrading " << resource; //We're not using the factory to avoid getting a cached resourceaccess with the wrong resourceType auto resourceAccess = Sink::ResourceAccess::Ptr{new Sink::ResourceAccess(resource, ResourceConfig::getResourceType(resource)), &QObject::deleteLater}; return resourceAccess->sendCommand(Sink::Commands::UpgradeCommand) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during upgrade."; return KAsync::error(error); } SinkTrace() << "Upgrade of resource " << resource << " complete."; return KAsync::null(); }) .then(KAsync::value(Store::UpgradeResult{true})); } KAsync::Job Store::upgrade() { SinkLog() << "Upgrading..."; //Migrate from sink.dav to sink.carddav const auto resources = ResourceConfig::getResources(); for (auto it = resources.constBegin(); it != resources.constEnd(); it++) { if (it.value() == "sink.dav") { ResourceConfig::setResourceType(it.key(), "sink.carddav"); } } auto ret = QSharedPointer::create(false); return fetchAll({}) .template each([ret](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job { return Sink::upgrade(resource->identifier()) .then([ret](UpgradeResult returnValue) { if (returnValue.upgradeExecuted) { SinkLog() << "Upgrade executed."; *ret = true; } }); }) .then([ret] { if (*ret) { SinkLog() << "Upgrade complete."; } return Store::UpgradeResult{*ret}; }); } static KAsync::Job synchronize(const QByteArray &resource, const Sink::SyncScope &scope) { SinkLog() << "Synchronizing " << resource << scope; auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); return resourceAccess->synchronizeResource(scope) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error during sync."; return KAsync::error(error); } SinkTrace() << "Synchronization of resource " << resource << " complete."; return KAsync::null(); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { return synchronize(Sink::SyncScope{query}); } KAsync::Job Store::synchronize(const Sink::SyncScope &scope) { SinkLog() << "Synchronizing all resource matching: " << scope; return forEachResource(scope, [=] (const auto &resource) { return synchronize(resource->identifier(), scope); }); } KAsync::Job Store::abortSynchronization(const Sink::SyncScope &scope) { return forEachResource(scope, [] (const auto &resource) { auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource->identifier(), ResourceConfig::getResourceType(resource->identifier())); return resourceAccess->sendCommand(Sink::Commands::AbortSynchronizationCommand) .addToContext(resourceAccess) .then([=](const KAsync::Error &error) { if (error) { SinkWarning() << "Error aborting synchronization."; return KAsync::error(error); } return KAsync::null(); }); }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return fetch(query, 1).template then>([](const QList &list) { return KAsync::value(*list.first()); }); } template KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { Q_ASSERT(sanityCheckQuery(query)); auto model = loadModel(query); auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } } else { QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, list](const QModelIndex &index, int start, int end) { for (int i = start; i <= end; i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { if (roles.contains(ModelResult::ChildrenFetchedRole)) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); future.setFinished(); } } }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { future.setError(1, "Not enough values."); } else { future.setValue(*list); } future.setFinished(); } }); } template DomainType Store::readOne(const Sink::Query &query) { const auto list = read(query); if (!list.isEmpty()) { return list.first(); } SinkWarning() << "Tried to read value but no values are available."; return DomainType(); } template QList Store::read(const Sink::Query &query_) { Q_ASSERT(sanityCheckQuery(query_)); auto query = query_; query.setFlags(Query::SynchronousQuery); auto ctx = getQueryContext(query, ApplicationDomain::getTypeName()); QList list; auto result = getEmitter(query, ctx); auto aggregatingEmitter = result.first; aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ SinkTraceCtx(ctx) << "Found value: " << value->identifier(); list << *value; }); if (auto resourceEmitter = result.second) { resourceEmitter->fetch(); } aggregatingEmitter->fetch(); return list; } #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::remove(const Query &); \ template KAsync::Job Store::create(const T &domainObject); \ template KAsync::Job Store::modify(const T &domainObject); \ template KAsync::Job Store::modify(const Query &, const T &); \ template KAsync::Job Store::move(const T &domainObject, const QByteArray &newResource); \ template KAsync::Job Store::copy(const T &domainObject, const QByteArray &newResource); \ template QSharedPointer Store::loadModel(const Query &query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ template KAsync::Job> Store::fetch(const Query &, int); \ template T Store::readOne(const Query &); \ template QList Store::read(const Query &); SINK_REGISTER_TYPES() } // namespace Sink