diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2492982..a4e93b8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,62 +1,64 @@ set(kasync_SRCS async.cpp future.cpp debug.cpp ) set(kasync_priv_HEADERS continuations_p.h + execution_p.h + executor_p.h async_impl.h job_impl.h debug.h ) ecm_generate_headers(kasync_HEADERS HEADER_NAMES Async Future REQUIRED_HEADERS kasync_HEADERS ) add_library(KAsync ${kasync_SRCS}) generate_export_header(KAsync BASE_NAME kasync) target_include_directories(KAsync INTERFACE "$") target_include_directories(KAsync PUBLIC "$") target_link_libraries(KAsync PUBLIC Qt5::Core ) set_target_properties(KAsync PROPERTIES VERSION ${KASYNC_VERSION_STRING} SOVERSION ${KASYNC_SOVERSION} EXPORT_NAME KAsync ) ecm_generate_pri_file(BASE_NAME KAsync LIB_NAME KAsync FILENAME_VAR PRI_FILENAME ) install(TARGETS KAsync EXPORT KAsyncTargets ${KDE_INSTALL_TARGETS_DEFAULT_ARGS} ) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/kasync_export.h ${kasync_HEADERS} ${kasync_priv_HEADERS} DESTINATION ${KDE_INSTALL_INCLUDEDIR}/KAsync COMPONENT Devel ) install(FILES ${PRI_FILENAME} DESTINATION ${ECM_MKSPECS_INSTALL_DIR} ) diff --git a/src/async.cpp b/src/async.cpp index c762e2b..9cbf174 100644 --- a/src/async.cpp +++ b/src/async.cpp @@ -1,88 +1,63 @@ /* * Copyright 2014 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #include "async.h" #include #include #include #include using namespace KAsync; -Private::Execution::Execution(const Private::ExecutorBasePtr &executor) - : executor(executor) -{ -} - -Private::Execution::~Execution() -{ - if (resultBase) { - resultBase->releaseExecution(); - delete resultBase; - } - prevExecution.reset(); -} - -void Private::Execution::setFinished() -{ - tracer.reset(); -} - -void Private::Execution::releaseFuture() -{ - resultBase = nullptr; -} - - Job KAsync::doWhile(const Job &body) { return KAsync::start([body] (KAsync::Future &future) { auto job = body.then([&future, body](const KAsync::Error &error, ControlFlowFlag flag) { if (error) { future.setError(error); future.setFinished(); } else if (flag == ControlFlowFlag::Continue) { doWhile(body).then([&future](const KAsync::Error &error) { if (error) { future.setError(error); } future.setFinished(); }).exec(); } else { future.setFinished(); } }).exec(); }); } Job KAsync::doWhile(const JobContinuation &body) { return doWhile(KAsync::start([body] { return body(); })); } Job KAsync::wait(int delay) { return KAsync::start([delay](KAsync::Future &future) { QTimer::singleShot(delay, [&future]() { future.setFinished(); }); }); } diff --git a/src/async.h b/src/async.h index 2cb5ae0..105f538 100644 --- a/src/async.h +++ b/src/async.h @@ -1,701 +1,583 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2016 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_H #define KASYNC_H #include "kasync_export.h" #include -#include #include #include -#include -#include #include "future.h" #include "debug.h" + #include "async_impl.h" #include "continuations_p.h" +#include "executor_p.h" -#include -#include -#include -#include - -#include - +class QObject; /** * @mainpage KAsync * * @brief API to help write async code. * * This API is based around jobs that take lambdas to execute asynchronous tasks. * Each async operation can take a continuation that can then be used to execute * further async operations. That way it is possible to build async chains of * operations that can be stored and executed later on. Jobs can be composed, * similarly to functions. * * Relations between the components: * * Job: API wrapper around Executors chain. Can be destroyed while still running, * because the actual execution happens in the background * * Executor: Describes task to execute. Executors form a linked list matching the * order in which they will be executed. The Executor chain is destroyed when * the parent Job is destroyed. However if the Job is still running it is * guaranteed that the Executor chain will not be destroyed until the execution * is finished. * * Execution: The running execution of the task stored in Executor. Each call to * Job::exec() instantiates new Execution chain, which makes it possible for * the Job to be executed multiple times (even in parallel). * * Future: Representation of the result that is being calculated * * * TODO: Possibility to abort a job through future (perhaps optional?) * TODO: Support for timeout, specified during exec call, after which the error * handler gets called with a defined errorCode. */ namespace KAsync { template class Executor; class JobBase; template class Job; - -//@cond PRIVATE -namespace Private -{ - -class ExecutorBase; -typedef QSharedPointer ExecutorBasePtr; - -class ExecutionContext; - -struct KASYNC_EXPORT Execution { - explicit Execution(const ExecutorBasePtr &executor); - virtual ~Execution(); - void setFinished(); - - template - KAsync::Future* result() const - { - return static_cast*>(resultBase); - } - - void releaseFuture(); - - ExecutorBasePtr executor; - ExecutionPtr prevExecution; - std::unique_ptr tracer; - FutureBase *resultBase = nullptr; -}; - -typedef QSharedPointer ExecutionPtr; - -class KASYNC_EXPORT ExecutorBase -{ - template - friend class Executor; - - template - friend class KAsync::Job; - - friend struct Execution; - friend class KAsync::Tracer; - -public: - virtual ~ExecutorBase() = default; - virtual ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) = 0; - -protected: - ExecutorBase(const ExecutorBasePtr &parent) - : mPrev(parent) - {} - - template - KAsync::Future* createFuture(const ExecutionPtr &execution) const; - - ExecutorBasePtr mPrev; - - void prepend(const ExecutorBasePtr &e) - { - if (mPrev) { - mPrev->prepend(e); - } else { - mPrev = e; - } - } - - void addToContext(const QVariant &entry) - { - mContext << entry; - } - - void guard(const QObject *o) - { - mGuards.append(QPointer{o}); - } - - QString mExecutorName; - QVector mContext; - QVector> mGuards; -}; - -enum ExecutionFlag { - Always, - ErrorCase, - GoodCase -}; - -template -class Executor : public ExecutorBase -{ -protected: - - Executor(const Private::ExecutorBasePtr &parent, ExecutionFlag executionFlag) - : ExecutorBase(parent) - , executionFlag(executionFlag) - {} - - virtual ~Executor() {} - virtual void run(const ExecutionPtr &execution) = 0; - - ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) override; - - const ExecutionFlag executionFlag; - -private: - void runExecution(const KAsync::Future *prevFuture, const ExecutionPtr &execution, bool guardIsBroken); -}; - -} // namespace Private -//@endcond - - template Job startImpl(Private::ContinuationHolder &&); template Job syncStartImpl(SyncContinuation &&); /** * @relates Job * * Start an asynchronous job sequence. * * start() is your starting point to build a chain of jobs to be executed * asynchronously. * * @param func A continuation to be executed. */ ///Sync continuation without job: [] () -> T { ... } template auto start(F &&func) -> std::enable_if_t() ...))>::value, Job() ...)), In...>> { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return syncStartImpl(std::forward(func)); } ///continuation with job: [] () -> KAsync::Job<...> { ... } template auto start(F &&func) -> std::enable_if_t() ...))>::value, Job() ...))::OutType, In...>> { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return startImpl(Private::ContinuationHolder(JobContinuation(std::forward(func)))); } ///Handle continuation: [] (KAsync::Future, ...) { ... } template auto start(AsyncContinuation &&func) -> Job { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return startImpl(Private::ContinuationHolder(std::forward>(func))); } enum ControlFlowFlag { Break, Continue }; /** * @relates Job * * Async while loop. * * Loop continues while body returns ControlFlowFlag::Continue. */ KASYNC_EXPORT Job doWhile(const Job &body); /** * @relates Job * * Async while loop. * * Shorthand that takes a continuation. * * @see doWhile */ KASYNC_EXPORT Job doWhile(const JobContinuation &body); /** * @relates Job * * Async delay. */ KASYNC_EXPORT Job wait(int delay); /** * @relates Job * * A null job. * * An async noop. * */ template Job null(); /** * @relates Job * * Async value. */ template Job value(Out); /** * @relates Job * * Async foreach loop. * * This will execute a job for every value in the list. * Errors while not stop processing of other jobs but set an error on the wrapper job. */ template Job forEach(KAsync::Job job); /** * @relates Job * * Async foreach loop. * * Shorthand that takes a continuation. * * @see serialForEach */ template Job forEach(JobContinuation &&); /** * @relates Job * * Serial Async foreach loop. * * This will execute a job for every value in the list sequentially. * Errors while not stop processing of other jobs but set an error on the wrapper job. */ template Job serialForEach(KAsync::Job job); /** * @relates Job * * Serial Async foreach loop. * * Shorthand that takes a continuation. * * @see serialForEach */ template Job serialForEach(JobContinuation &&); /** * @relates Job * * An error job. * * An async error. * */ template Job error(int errorCode = 1, const QString &errorMessage = QString()); /** * @relates Job * * An error job. * * An async error. * */ template Job error(const char *); /** * @relates Job * * An error job. * * An async error. * */ template Job error(const Error &); //@cond PRIVATE class KASYNC_EXPORT JobBase { template friend class Job; public: explicit JobBase(const Private::ExecutorBasePtr &executor) : mExecutor(executor) {} virtual ~JobBase() = default; protected: Private::ExecutorBasePtr mExecutor; }; //@endcond /** * @brief An Asynchronous job * * A single instance of Job represents a single method that will be executed * asynchronously. The Job is started by exec(), which returns Future * immediatelly. The Future will be set to finished state once the asynchronous * task has finished. You can use Future::waitForFinished() to wait for * for the Future in blocking manner. * * It is possible to chain multiple Jobs one after another in different fashion * (sequential, parallel, etc.). Calling exec() will then return a pending * Future, and will execute the entire chain of jobs. * * @code * auto job = Job::start>( * [](KAsync::Future> &future) { * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(dynamic_cast(pu)->userIds()); * future->setFinished(); * }); * }) * .each, int>( * [](const int &userId, KAsync::Future> &future) { * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(Qlist() << dynamic_cast(pu)->user()); * future->setFinished(); * }); * }); * * KAsync::Future> usersFuture = job.exec(); * usersFuture.waitForFinished(); * QList users = usersFuture.value(); * @endcode * * In the example above, calling @p job.exec() will first invoke the first job, * which will retrieve a list of IDs and then will invoke the second function * for each single entry in the list returned by the first function. */ template class Job : public JobBase { //@cond PRIVATE template friend class Job; template friend Job startImpl(Private::ContinuationHolder &&); template friend Job syncStartImpl(SyncContinuation &&); template friend Job forEach(KAsync::Job job); template friend Job serialForEach(KAsync::Job job); // Used to disable implicit conversion of Job which triggers // comiler warning. struct IncompleteType; //@endcond public: typedef Out OutType; ///A continuation template Job then(const Job &job) const; ///Shorthands for a job that returns another job from it's continuation // //OutOther and InOther are only there fore backwards compatibility, but are otherwise ignored. //It should never be neccessary to specify any template arguments, as they are automatically deduced from the provided argument. // //We currently have to write a then overload for: //* One argument in the continuation //* No argument in the continuation //* One argument + error in the continuation //* No argument + error in the continuation //This is due to how we extract the return type with "decltype(func(std::declval()))". //Ideally we could conflate this into at least fewer overloads, but I didn't manage so far and this at least works as expected. ///Continuation returning job: [] (Arg) -> KAsync::Job<...> { ... } template auto then(F &&func) const -> std::enable_if_t()))>::value, Job()))::OutType, In...>> { using ResultJob = decltype(func(std::declval())); //Job return thenImpl( {JobContinuation(std::forward(func))}, Private::ExecutionFlag::GoodCase); } ///Void continuation with job: [] () -> KAsync::Job<...> { ... } template auto then(F &&func) const -> std::enable_if_t::value, Job> { using ResultJob = decltype(func()); //Job return thenImpl( {JobContinuation(std::forward(func))}, Private::ExecutionFlag::GoodCase); } ///Error continuation returning job: [] (KAsync::Error, Arg) -> KAsync::Job<...> { ... } template auto then(F &&func) const -> std::enable_if_t()))>::value, Job()))::OutType, In...>> { using ResultJob = decltype(func(KAsync::Error{}, std::declval())); //Job return thenImpl( {JobErrorContinuation(std::forward(func))}, Private::ExecutionFlag::Always); } ///Error void continuation returning job: [] (KAsync::Error) -> KAsync::Job<...> { ... } template auto then(F &&func) const -> std::enable_if_t::value, Job> { using ResultJob = decltype(func(KAsync::Error{})); return thenImpl( {JobErrorContinuation(std::forward(func))}, Private::ExecutionFlag::Always); } ///Sync continuation: [] (Arg) -> void { ... } template auto then(F &&func) const -> std::enable_if_t()))>::value, Job())), In...>> { using ResultType = decltype(func(std::declval())); //QString return thenImpl( {SyncContinuation(std::forward(func))}, Private::ExecutionFlag::GoodCase); } ///Sync void continuation: [] () -> void { ... } template auto then(F &&func) const -> std::enable_if_t::value, Job> { using ResultType = decltype(func()); //QString return thenImpl( {SyncContinuation(std::forward(func))}, Private::ExecutionFlag::GoodCase); } ///Sync error continuation: [] (KAsync::Error, Arg) -> void { ... } template auto then(F &&func) const -> std::enable_if_t()))>::value, Job())),In...>> { using ResultType = decltype(func(KAsync::Error{}, std::declval())); //QString return thenImpl( {SyncErrorContinuation(std::forward(func))}, Private::ExecutionFlag::Always); } ///Sync void error continuation: [] (KAsync::Error) -> void { ... } template auto then(F &&func) const -> std::enable_if_t::value, Job> { using ResultType = decltype(func(KAsync::Error{})); return thenImpl( {SyncErrorContinuation(std::forward(func))}, Private::ExecutionFlag::Always); } ///Shorthand for a job that receives the error and a handle template Job then(AsyncContinuation &&func) const { return thenImpl({std::forward>(func)}, Private::ExecutionFlag::GoodCase); } ///Shorthand for a job that receives the error and a handle template Job then(AsyncErrorContinuation &&func) const { return thenImpl({std::forward>(func)}, Private::ExecutionFlag::Always); } ///Shorthand for a job that receives the error only Job onError(SyncErrorContinuation &&errorFunc) const; /** * Shorthand for a forEach loop that automatically uses the return type of * this job to deduce the type exepected. */ template::value, int> = 0> Job each(JobContinuation &&func) const { eachInvariants(); return then(forEach(std::forward>(func))); } /** * Shorthand for a serialForEach loop that automatically uses the return type * of this job to deduce the type exepected. */ template::value, int> = 0> Job serialEach(JobContinuation &&func) const { eachInvariants(); return then(serialForEach(std::forward>(func))); } /** * Enable implicit conversion to Job. * * This is necessary in assignments that only use the return value (which is the normal case). * This avoids constructs like: * auto job = KAsync::start( ... ) * .then( ... ) * .then([](){}); //Necessary for the assignment without the implicit conversion */ template operator std::conditional_t::value, IncompleteType, Job>(); /** * Adds an unnamed value to the context. * The context is guaranteed to persist until the jobs execution has finished. * * Useful for setting smart pointer to manage lifetime of objects required * during the execution of the job. */ template Job &addToContext(const T &value) { assert(mExecutor); mExecutor->addToContext(QVariant::fromValue(value)); return *this; } /** * Adds a guard. * It is guaranteed that no callback is executed after the guard vanishes. * * Use this i.e. ensure you don't call-back into an already destroyed object. */ Job &guard(const QObject *o) { assert(mExecutor); mExecutor->guard(o); return *this; } /** * @brief Starts execution of the job chain. * * This will start the execution of the task chain, starting from the * first one. It is possible to call this function multiple times, each * invocation will start a new processing and provide a new Future to * watch its status. * * @param in Argument to be passed to the very first task * @return Future<Out> object which will contain result of the last * task once if finishes executing. See Future documentation for more details. * * @see exec(), Future */ template KAsync::Future exec(FirstIn in); /** * @brief Starts execution of the job chain. * * This will start the execution of the task chain, starting from the * first one. It is possible to call this function multiple times, each * invocation will start a new processing and provide a new Future to * watch its status. * * @return Future<Out> object which will contain result of the last * task once if finishes executing. See Future documentation for more details. * * @see exec(FirstIn in), Future */ KAsync::Future exec(); explicit Job(JobContinuation &&func); explicit Job(AsyncContinuation &&func); private: //@cond PRIVATE explicit Job(Private::ExecutorBasePtr executor); template Job thenImpl(Private::ContinuationHolder helper, Private::ExecutionFlag execFlag = Private::ExecutionFlag::GoodCase) const; template void thenInvariants() const; //Base case for an empty parameter pack template auto thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>; template void eachInvariants() const; //@endcond }; } // namespace KAsync // out-of-line definitions of Job methods #include "job_impl.h" #endif // KASYNC_H diff --git a/src/execution_p.h b/src/execution_p.h new file mode 100644 index 0000000..8b8d7c9 --- /dev/null +++ b/src/execution_p.h @@ -0,0 +1,116 @@ +/* + * Copyright 2014 - 2015 Daniel Vrátil + * Copyright 2016 Daniel Vrátil + * Copyright 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public License as + * published by the Free Software Foundation; either version 2 of + * the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public License + * along with this library. If not, see . + */ + +#ifndef KASYNC_EXECUTION_P_H_ +#define KASYNC_EXECUTION_P_H_ + +#include "debug.h" + +#include +#include +#include +#include + +#include + +namespace KAsync { + +class FutureBase; + +template +class Future; + +class Tracer; + +//@cond PRIVATE +namespace Private +{ + +class ExecutorBase; +using ExecutorBasePtr = QSharedPointer; + +struct Execution; +using ExecutionPtr = QSharedPointer; + +class ExecutionContext; + +enum ExecutionFlag { + Always, + ErrorCase, + GoodCase +}; + +struct Execution { + explicit Execution(const ExecutorBasePtr &executor) + : executor(executor) + {} + + virtual ~Execution() + { + if (resultBase) { + resultBase->releaseExecution(); + delete resultBase; + } + prevExecution.reset(); + } + + void setFinished() + { + tracer.reset(); + } + + template + KAsync::Future* result() const + { + return static_cast*>(resultBase); + } + + void releaseFuture() + { + resultBase = nullptr; + } + + ExecutorBasePtr executor; + ExecutionPtr prevExecution; + std::unique_ptr tracer; + FutureBase *resultBase = nullptr; +}; + +class ExecutionContext { +public: + using Ptr = QSharedPointer; + + QVector> guards; + bool guardIsBroken() const + { + for (const auto &g : guards) { + if (!g) { + return true; + } + } + return false; + } +}; + +} // namespace Private +//@endcond + +} // namespace KAsync + +#endif diff --git a/src/executor_p.h b/src/executor_p.h new file mode 100644 index 0000000..5cb250b --- /dev/null +++ b/src/executor_p.h @@ -0,0 +1,321 @@ +/* + * Copyright 2014 - 2015 Daniel Vrátil + * Copyright 2015 - 2019 Daniel Vrátil + * Copyright 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public License as + * published by the Free Software Foundation; either version 2 of + * the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public License + * along with this library. If not, see . + */ + +#ifndef KASYNC_EXECUTOR_P_H +#define KASYNC_EXECUTOR_P_H + +#include "async_impl.h" +#include "execution_p.h" +#include "continuations_p.h" +#include "debug.h" + +namespace KAsync { + +template +class Future; + +template +class FutureWatcher; + +template +class ContinuationHolder; + +template +class Job; + +class Tracer; + +namespace Private { + +class ExecutorBase; +using ExecutorBasePtr = QSharedPointer; + +class ExecutorBase +{ + template + friend class Executor; + + template + friend class KAsync::Job; + + friend struct Execution; + friend class KAsync::Tracer; + +public: + virtual ~ExecutorBase() = default; + + virtual ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) = 0; + +protected: + ExecutorBase(const ExecutorBasePtr &parent) + : mPrev(parent) + {} + + template + KAsync::Future* createFuture(const ExecutionPtr &execution) const + { + return new KAsync::Future(execution); + } + + void prepend(const ExecutorBasePtr &e) + { + if (mPrev) { + mPrev->prepend(e); + } else { + mPrev = e; + } + } + + void addToContext(const QVariant &entry) + { + mContext.push_back(entry); + } + + void guard(const QObject *o) + { + mGuards.push_back(QPointer{o}); + } + + QString mExecutorName; + QVector mContext; + QVector> mGuards; + ExecutorBasePtr mPrev; +}; + +template +class Executor : public ExecutorBase +{ + using PrevOut = typename detail::prevOut::type; + +public: + explicit Executor(ContinuationHolder &&workerHelper, const ExecutorBasePtr &parent = {}, + ExecutionFlag executionFlag = ExecutionFlag::GoodCase) + : ExecutorBase(parent) + , mContinuationHolder(std::move(workerHelper)) + , executionFlag(executionFlag) + { + STORE_EXECUTOR_NAME("Executor", Out, In ...); + } + + virtual ~Executor() = default; + + void run(const ExecutionPtr &execution) + { + KAsync::Future::type> *prevFuture = nullptr; + if (execution->prevExecution) { + prevFuture = execution->prevExecution->result::type>(); + assert(prevFuture->isFinished()); + } + + //Execute one of the available workers + KAsync::Future *future = execution->result(); + + const auto &continuation = Executor::mContinuationHolder; + if (continuationIs>(continuation)) { + continuationGet>(continuation)(prevFuture ? prevFuture->value() : In() ..., *future); + } else if (continuationIs>(continuation)) { + continuationGet>(continuation)( + prevFuture->hasError() ? prevFuture->errors().first() : Error(), + prevFuture ? prevFuture->value() : In() ..., *future); + } else if (continuationIs>(continuation)) { + callAndApply(prevFuture ? prevFuture->value() : In() ..., + continuationGet>(continuation), *future, std::is_void()); + future->setFinished(); + } else if (continuationIs>(continuation)) { + assert(prevFuture); + callAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), + prevFuture ? prevFuture->value() : In() ..., + continuationGet>(continuation), *future, std::is_void()); + future->setFinished(); + } else if (continuationIs>(continuation)) { + executeJobAndApply(prevFuture ? prevFuture->value() : In() ..., + continuationGet>(continuation), *future, std::is_void()); + } else if (continuationIs>(continuation)) { + executeJobAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), + prevFuture ? prevFuture->value() : In() ..., + continuationGet>(continuation), *future, std::is_void()); + } + + } + + ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) override + { + /* + * One executor per job, created with the construction of the Job object. + * One execution per job per exec(), created only once exec() is called. + * + * The executors make up the linked list that makes up the complete execution chain. + * + * The execution then tracks the execution of each executor. + */ + + // Passing 'self' to execution ensures that the Executor chain remains + // valid until the entire execution is finished + ExecutionPtr execution = ExecutionPtr::create(self); +#ifndef QT_NO_DEBUG + execution->tracer = std::make_unique(execution.data()); // owned by execution +#endif + + context->guards += mGuards; + + // chainup + execution->prevExecution = mPrev ? mPrev->exec(mPrev, context) : ExecutionPtr(); + + execution->resultBase = ExecutorBase::createFuture(execution); + //We watch our own future to finish the execution once we're done + auto fw = new KAsync::FutureWatcher(); + QObject::connect(fw, &KAsync::FutureWatcher::futureReady, + [fw, execution]() { + execution->setFinished(); + delete fw; + }); + fw->setFuture(*execution->result()); + + KAsync::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() + : nullptr; + if (!prevFuture || prevFuture->isFinished()) { //The previous job is already done + runExecution(prevFuture, execution, context->guardIsBroken()); + } else { //The previous job is still running and we have to wait for it's completion + auto prevFutureWatcher = new KAsync::FutureWatcher(); + QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher::futureReady, + [prevFutureWatcher, execution, this, context]() { + auto prevFuture = prevFutureWatcher->future(); + assert(prevFuture.isFinished()); + delete prevFutureWatcher; + runExecution(&prevFuture, execution, context->guardIsBroken()); + }); + + prevFutureWatcher->setFuture(*static_cast*>(prevFuture)); + } + + return execution; + } + +private: + void runExecution(const KAsync::Future *prevFuture, const ExecutionPtr &execution, bool guardIsBroken) + { + if (guardIsBroken) { + execution->resultBase->setFinished(); + return; + } + if (prevFuture) { + if (prevFuture->hasError() && executionFlag == ExecutionFlag::GoodCase) { + //Propagate the error to the outer Future + Q_ASSERT(prevFuture->errors().size() == 1); + execution->resultBase->setError(prevFuture->errors().first()); + return; + } + if (!prevFuture->hasError() && executionFlag == ExecutionFlag::ErrorCase) { + //Propagate the value to the outer Future + KAsync::detail::copyFutureValue(*prevFuture, *execution->result()); + execution->resultBase->setFinished(); + return; + } + } + run(execution); + } + + void executeJobAndApply(In && ... input, const JobContinuation &func, + Future &future, std::false_type) + { + func(std::forward(input) ...) + .template then([&future](const KAsync::Error &error, const Out &v, + KAsync::Future &f) { + if (error) { + future.setError(error); + } else { + future.setResult(v); + } + f.setFinished(); + }).exec(); + } + + void executeJobAndApply(In && ... input, const JobContinuation &func, + Future &future, std::true_type) + { + func(std::forward(input) ...) + .template then([&future](const KAsync::Error &error, KAsync::Future &f) { + if (error) { + future.setError(error); + } else { + future.setFinished(); + } + f.setFinished(); + }).exec(); + } + + void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, + Future &future, std::false_type) + { + func(error, std::forward(input) ...) + .template then([&future](const KAsync::Error &error, const Out &v, + KAsync::Future &f) { + if (error) { + future.setError(error); + } else { + future.setResult(v); + } + f.setFinished(); + }).exec(); + } + + void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, + Future &future, std::true_type) + { + func(error, std::forward(input) ...) + .template then([&future](const KAsync::Error &error, KAsync::Future &f) { + if (error) { + future.setError(error); + } else { + future.setFinished(); + } + f.setFinished(); + }).exec(); + } + + void callAndApply(In && ... input, const SyncContinuation &func, Future &future, std::false_type) + { + future.setValue(func(std::forward(input) ...)); + } + + void callAndApply(In && ... input, const SyncContinuation &func, Future &, std::true_type) + { + func(std::forward(input) ...); + } + + void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &future, std::false_type) + { + future.setValue(func(error, std::forward(input) ...)); + } + + void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &, std::true_type) + { + func(error, std::forward(input) ...); + } + +private: + ContinuationHolder mContinuationHolder; + const ExecutionFlag executionFlag; +}; + +} // namespace Private +} // nameapce KAsync + +#endif + diff --git a/src/job_impl.h b/src/job_impl.h index c0e4ec0..b44cae6 100644 --- a/src/job_impl.h +++ b/src/job_impl.h @@ -1,556 +1,317 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2015 - 2016 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_JOB_IMPL_H #define KASYNC_JOB_IMPL_H #include "async.h" //@cond PRIVATE namespace KAsync { -namespace Private { - -template -class ThenExecutor: public Executor::type, Out, In ...> -{ -public: - ThenExecutor(ContinuationHolder &&workerHelper, const ExecutorBasePtr &parent = {}, - ExecutionFlag executionFlag = ExecutionFlag::GoodCase) - : Executor::type, Out, In ...>(parent, executionFlag) - , mContinuationHolder(std::move(workerHelper)) - { - STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...); - } - - void run(const ExecutionPtr &execution) Q_DECL_OVERRIDE - { - KAsync::Future::type> *prevFuture = nullptr; - if (execution->prevExecution) { - prevFuture = execution->prevExecution->result::type>(); - assert(prevFuture->isFinished()); - } - - //Execute one of the available workers - KAsync::Future *future = execution->result(); - - const auto &continuation = ThenExecutor::mContinuationHolder; - if (continuationIs>(continuation)) { - continuationGet>(continuation)(prevFuture ? prevFuture->value() : In() ..., *future); - } else if (continuationIs>(continuation)) { - continuationGet>(continuation)( - prevFuture->hasError() ? prevFuture->errors().first() : Error(), - prevFuture ? prevFuture->value() : In() ..., *future); - } else if (continuationIs>(continuation)) { - callAndApply(prevFuture ? prevFuture->value() : In() ..., - continuationGet>(continuation), *future, std::is_void()); - future->setFinished(); - } else if (continuationIs>(continuation)) { - assert(prevFuture); - callAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), - prevFuture ? prevFuture->value() : In() ..., - continuationGet>(continuation), *future, std::is_void()); - future->setFinished(); - } else if (continuationIs>(continuation)) { - executeJobAndApply(prevFuture ? prevFuture->value() : In() ..., - continuationGet>(continuation), *future, std::is_void()); - } else if (continuationIs>(continuation)) { - executeJobAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), - prevFuture ? prevFuture->value() : In() ..., - continuationGet>(continuation), *future, std::is_void()); - } - } - -private: - - void executeJobAndApply(In && ... input, const JobContinuation &func, - Future &future, std::false_type) - { - func(std::forward(input) ...) - .template then([&future](const KAsync::Error &error, const Out &v, - KAsync::Future &f) { - if (error) { - future.setError(error); - } else { - future.setResult(v); - } - f.setFinished(); - }).exec(); - } - - void executeJobAndApply(In && ... input, const JobContinuation &func, - Future &future, std::true_type) - { - func(std::forward(input) ...) - .template then([&future](const KAsync::Error &error, KAsync::Future &f) { - if (error) { - future.setError(error); - } else { - future.setFinished(); - } - f.setFinished(); - }).exec(); - } - - void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, - Future &future, std::false_type) - { - func(error, std::forward(input) ...) - .template then([&future](const KAsync::Error &error, const Out &v, - KAsync::Future &f) { - if (error) { - future.setError(error); - } else { - future.setResult(v); - } - f.setFinished(); - }).exec(); - } - - void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, - Future &future, std::true_type) - { - func(error, std::forward(input) ...) - .template then([&future](const KAsync::Error &error, KAsync::Future &f) { - if (error) { - future.setError(error); - } else { - future.setFinished(); - } - f.setFinished(); - }).exec(); - } - - void callAndApply(In && ... input, const SyncContinuation &func, Future &future, std::false_type) - { - future.setValue(func(std::forward(input) ...)); - } - - void callAndApply(In && ... input, const SyncContinuation &func, Future &, std::true_type) - { - func(std::forward(input) ...); - } - - void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &future, std::false_type) - { - future.setValue(func(error, std::forward(input) ...)); - } - - void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &, std::true_type) - { - func(error, std::forward(input) ...); - } - - ContinuationHolder mContinuationHolder; -}; - -template -KAsync::Future* ExecutorBase::createFuture(const ExecutionPtr &execution) const -{ - return new KAsync::Future(execution); -} - -template -void Executor::runExecution(const KAsync::Future *prevFuture, - const ExecutionPtr &execution, bool guardIsBroken) -{ - if (guardIsBroken) { - execution->resultBase->setFinished(); - return; - } - if (prevFuture) { - if (prevFuture->hasError() && executionFlag == ExecutionFlag::GoodCase) { - //Propagate the error to the outer Future - Q_ASSERT(prevFuture->errors().size() == 1); - execution->resultBase->setError(prevFuture->errors().first()); - return; - } - if (!prevFuture->hasError() && executionFlag == ExecutionFlag::ErrorCase) { - //Propagate the value to the outer Future - KAsync::detail::copyFutureValue(*prevFuture, *execution->result()); - execution->resultBase->setFinished(); - return; - } - } - run(execution); -} - -class ExecutionContext { -public: - typedef QSharedPointer Ptr; - - QVector> guards; - bool guardIsBroken() const - { - for (const auto &g : guards) { - if (!g) { - return true; - } - } - return false; - } -}; - -template -ExecutionPtr Executor::exec(const ExecutorBasePtr &self, ExecutionContext::Ptr context) -{ - /* - * One executor per job, created with the construction of the Job object. - * One execution per job per exec(), created only once exec() is called. - * - * The executors make up the linked list that makes up the complete execution chain. - * - * The execution then tracks the execution of each executor. - */ - - // Passing 'self' to execution ensures that the Executor chain remains - // valid until the entire execution is finished - ExecutionPtr execution = ExecutionPtr::create(self); -#ifndef QT_NO_DEBUG - execution->tracer = std::make_unique(execution.data()); // owned by execution -#endif - - context->guards += mGuards; - - // chainup - execution->prevExecution = mPrev ? mPrev->exec(mPrev, context) : ExecutionPtr(); - - execution->resultBase = ExecutorBase::createFuture(execution); - //We watch our own future to finish the execution once we're done - auto fw = new KAsync::FutureWatcher(); - QObject::connect(fw, &KAsync::FutureWatcher::futureReady, - [fw, execution]() { - execution->setFinished(); - delete fw; - }); - fw->setFuture(*execution->result()); - - KAsync::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() - : nullptr; - if (!prevFuture || prevFuture->isFinished()) { //The previous job is already done - runExecution(prevFuture, execution, context->guardIsBroken()); - } else { //The previous job is still running and we have to wait for it's completion - auto prevFutureWatcher = new KAsync::FutureWatcher(); - QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher::futureReady, - [prevFutureWatcher, execution, this, context]() { - auto prevFuture = prevFutureWatcher->future(); - assert(prevFuture.isFinished()); - delete prevFutureWatcher; - runExecution(&prevFuture, execution, context->guardIsBroken()); - }); - - prevFutureWatcher->setFuture(*static_cast*>(prevFuture)); - } - - return execution; -} - -} // namespace Private - - template template Job::operator std::conditional_t::value, IncompleteType, Job> () { return thenImpl({JobContinuation([](InOther ...){ return KAsync::null(); })}, {}); } template template Job Job::thenImpl(Private::ContinuationHolder workHelper, Private::ExecutionFlag execFlag) const { thenInvariants(); - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( std::forward>(workHelper), mExecutor, execFlag)); } template template Job Job::then(const Job &job) const { thenInvariants(); auto executor = job.mExecutor; executor->prepend(mExecutor); return Job(executor); } template Job Job::onError(SyncErrorContinuation &&errorFunc) const { - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( // Extra indirection to allow propagating the result of a previous future when no // error occurs Private::ContinuationHolder([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) { errorFunc(error); return val; }), mExecutor, Private::ExecutionFlag::ErrorCase)); } template<> // Specialize for void jobs inline Job Job::onError(SyncErrorContinuation &&errorFunc) const { - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( Private::ContinuationHolder(std::forward>(errorFunc)), mExecutor, Private::ExecutionFlag::ErrorCase)); } template template KAsync::Future Job::exec(FirstIn in) { // Inject a fake sync executor that will return the initial value Private::ExecutorBasePtr first = mExecutor; while (first->mPrev) { first = first->mPrev; } - first->mPrev = QSharedPointer>::create( + first->mPrev = QSharedPointer>::create( Private::ContinuationHolder([val = std::move(in)](Future &future) { future.setResult(val); })); auto result = exec(); // Remove the injected executor first->mPrev.reset(); return result; } template KAsync::Future Job::exec() { Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create()); KAsync::Future result = *execution->result(); return result; } template Job::Job(Private::ExecutorBasePtr executor) : JobBase(executor) {} template Job::Job(JobContinuation &&func) - : JobBase(new Private::ThenExecutor(std::forward>(func), {})) + : JobBase(new Private::Executor(std::forward>(func), {})) { qWarning() << "Creating job job"; static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); } template template void Job::eachInvariants() const { static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or an array."); static_assert(std::is_void::value || detail::isIterable::value, "The result type of 'Each' task must be void, a list or an array."); } template template void Job::thenInvariants() const { static_assert(!std::is_void::value && (std::is_convertible::value || std::is_base_of::value), "The return type of previous task must be compatible with input type of this task"); } template template auto Job::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)> { } template Job startImpl(Private::ContinuationHolder &&helper) { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( std::forward>(helper), nullptr, Private::ExecutionFlag::GoodCase)); } template Job syncStartImpl(SyncContinuation &&func) { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( Private::ContinuationHolder(std::forward>(func)), nullptr, Private::ExecutionFlag::GoodCase)); } static inline KAsync::Job waitForCompletion(QList> &futures) { auto context = new QObject; return start([futures, context](KAsync::Future &future) { const auto total = futures.size(); auto count = QSharedPointer::create(); int i = 0; for (KAsync::Future subFuture : futures) { i++; if (subFuture.isFinished()) { *count += 1; continue; } // FIXME bind lifetime all watcher to future (repectively the main job auto watcher = QSharedPointer>::create(); QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future, context]() { *count += 1; if (*count == total) { delete context; future.setFinished(); } }); watcher->setFuture(subFuture); context->setProperty(QString::fromLatin1("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); } if (*count == total) { delete context; future.setFinished(); } }); // .finally([context]() { delete context; }); } template Job forEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); QList> list; for (const auto &v : values) { auto future = job .template then([error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } }) .exec(v); list << future; } return waitForCompletion(list) .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( Private::ContinuationHolder(JobContinuation(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); } template Job serialForEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); auto serialJob = KAsync::null(); for (const auto &value : values) { serialJob = serialJob.then([value, job, error](KAsync::Future &future) { job.template then([&future, error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } future.setFinished(); }) .exec(value); }); } return serialJob .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; - return Job(QSharedPointer>::create( + return Job(QSharedPointer>::create( Private::ContinuationHolder(JobContinuation(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); } template Job forEach(JobContinuation &&func) { return forEach(KAsync::start(std::forward>(func))); } template Job serialForEach(JobContinuation &&func) { return serialForEach(KAsync::start(std::forward>(func))); } template Job null() { return KAsync::start( [](KAsync::Future &future) { future.setFinished(); }); } template Job value(Out v) { return KAsync::start( [val = std::move(v)](KAsync::Future &future) { future.setResult(val); }); } template Job error(int errorCode, const QString &errorMessage) { return error({errorCode, errorMessage}); } template Job error(const char *message) { return error(Error(message)); } template Job error(const Error &error) { return KAsync::start( [error](KAsync::Future &future) { future.setError(error); }); } } // namespace KAsync //@endconf #endif // KASYNC_JOB_IMPL_H