diff --git a/src/async.h b/src/async.h index 15cc6cc..008b425 100644 --- a/src/async.h +++ b/src/async.h @@ -1,750 +1,739 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2016 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_H #define KASYNC_H #include "kasync_export.h" #include #include #include #include #include #include #include "future.h" #include "debug.h" #include "async_impl.h" #include #include #include #include #include /** * @mainpage KAsync * * @brief API to help write async code. * * This API is based around jobs that take lambdas to execute asynchronous tasks. * Each async operation can take a continuation that can then be used to execute * further async operations. That way it is possible to build async chains of * operations that can be stored and executed later on. Jobs can be composed, * similarly to functions. * * Relations between the components: * * Job: API wrapper around Executors chain. Can be destroyed while still running, * because the actual execution happens in the background * * Executor: Describes task to execute. Executors form a linked list matching the * order in which they will be executed. The Executor chain is destroyed when * the parent Job is destroyed. However if the Job is still running it is * guaranteed that the Executor chain will not be destroyed until the execution * is finished. * * Execution: The running execution of the task stored in Executor. Each call to * Job::exec() instantiates new Execution chain, which makes it possible for * the Job to be executed multiple times (even in parallel). * * Future: Representation of the result that is being calculated * * * TODO: Possibility to abort a job through future (perhaps optional?) * TODO: Support for timeout, specified during exec call, after which the error * handler gets called with a defined errorCode. */ namespace KAsync { template class Executor; class JobBase; template class Job; template using HandleContinuation = typename detail::identity&)>>::type; template using HandleErrorContinuation = typename detail::identity&)>>::type; template using SyncContinuation = typename detail::identity>::type; template using SyncErrorContinuation = typename detail::identity>::type; template using JobContinuation = typename detail::identity(In ...)>>::type; template using JobErrorContinuation = typename detail::identity(const KAsync::Error &, In ...)>>::type; //@cond PRIVATE namespace Private { class ExecutorBase; typedef QSharedPointer ExecutorBasePtr; class ExecutionContext; struct KASYNC_EXPORT Execution { explicit Execution(const ExecutorBasePtr &executor); virtual ~Execution(); void setFinished(); template KAsync::Future* result() const { return static_cast*>(resultBase); } void releaseFuture(); ExecutorBasePtr executor; ExecutionPtr prevExecution; std::unique_ptr tracer; FutureBase *resultBase = nullptr; }; template struct ContinuationHelper { ContinuationHelper(HandleContinuation &&func) : handleContinuation(std::move(func)) {}; ContinuationHelper(HandleErrorContinuation &&func) : handleErrorContinuation(std::move(func)) {}; ContinuationHelper(JobContinuation &&func) : jobContinuation(std::move(func)) {}; ContinuationHelper(JobErrorContinuation &&func) : jobErrorContinuation(std::move(func)) {}; HandleContinuation handleContinuation; HandleErrorContinuation handleErrorContinuation; JobContinuation jobContinuation; JobErrorContinuation jobErrorContinuation; }; typedef QSharedPointer ExecutionPtr; class KASYNC_EXPORT ExecutorBase { template friend class Executor; template friend class KAsync::Job; friend struct Execution; friend class KAsync::Tracer; public: virtual ~ExecutorBase() = default; virtual ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) = 0; protected: ExecutorBase(const ExecutorBasePtr &parent) : mPrev(parent) {} template KAsync::Future* createFuture(const ExecutionPtr &execution) const; ExecutorBasePtr mPrev; void prepend(const ExecutorBasePtr &e) { if (mPrev) { mPrev->prepend(e); } else { mPrev = e; } } void addToContext(const QVariant &entry) { mContext << entry; } void guard(const QObject *o) { mGuards.append(QPointer{o}); } QString mExecutorName; QVector mContext; QVector> mGuards; }; enum ExecutionFlag { Always, ErrorCase, GoodCase }; template class Executor : public ExecutorBase { protected: Executor(const Private::ExecutorBasePtr &parent, ExecutionFlag executionFlag) : ExecutorBase(parent) , executionFlag(executionFlag) {} virtual ~Executor() {} virtual void run(const ExecutionPtr &execution) = 0; ExecutionPtr exec(const ExecutorBasePtr &self, QSharedPointer context) override; const ExecutionFlag executionFlag; private: void runExecution(const KAsync::Future *prevFuture, const ExecutionPtr &execution, bool guardIsBroken); }; } // namespace Private //@endcond template Job startImpl(Private::ContinuationHelper &&); template Job syncStartImpl(SyncContinuation &&); /** * @relates Job * * Start an asynchronous job sequence. * * start() is your starting point to build a chain of jobs to be executed * asynchronously. * * @param func A continuation to be executed. */ ///Sync continuation without job: [] () -> T { ... } template -auto start(F &&func) -> typename std::enable_if() ...))>::value, - Job() ...)), In...> - >::type +auto start(F &&func) -> std::enable_if_t() ...))>::value, + Job() ...)), In...>> { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return syncStartImpl(std::forward(func)); } ///continuation with job: [] () -> KAsync::Job<...> { ... } template -auto start(F &&func) -> typename std::enable_if() ...))>::value, - Job() ...))::OutType, In...> - >::type +auto start(F &&func) -> std::enable_if_t() ...))>::value, + Job() ...))::OutType, In...>> { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return startImpl(Private::ContinuationHelper(std::forward(func))); } ///Handle continuation: [] (KAsync::Future, ...) { ... } template auto start(HandleContinuation &&func) -> Job { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return startImpl(Private::ContinuationHelper(std::forward>(func))); } enum ControlFlowFlag { Break, Continue }; /** * @relates Job * * Async while loop. * * Loop continues while body returns ControlFlowFlag::Continue. */ KASYNC_EXPORT Job doWhile(const Job &body); /** * @relates Job * * Async while loop. * * Shorthand that takes a continuation. * * @see doWhile */ KASYNC_EXPORT Job doWhile(const JobContinuation &body); /** * @relates Job * * Async delay. */ KASYNC_EXPORT Job wait(int delay); /** * @relates Job * * A null job. * * An async noop. * */ template Job null(); /** * @relates Job * * Async value. */ template Job value(Out); /** * @relates Job * * Async foreach loop. * * This will execute a job for every value in the list. * Errors while not stop processing of other jobs but set an error on the wrapper job. */ template Job forEach(KAsync::Job job); /** * @relates Job * * Async foreach loop. * * Shorthand that takes a continuation. * * @see serialForEach */ template Job forEach(JobContinuation &&); /** * @relates Job * * Serial Async foreach loop. * * This will execute a job for every value in the list sequentially. * Errors while not stop processing of other jobs but set an error on the wrapper job. */ template Job serialForEach(KAsync::Job job); /** * @relates Job * * Serial Async foreach loop. * * Shorthand that takes a continuation. * * @see serialForEach */ template Job serialForEach(JobContinuation &&); /** * @relates Job * * An error job. * * An async error. * */ template Job error(int errorCode = 1, const QString &errorMessage = QString()); /** * @relates Job * * An error job. * * An async error. * */ template Job error(const char *); /** * @relates Job * * An error job. * * An async error. * */ template Job error(const Error &); //@cond PRIVATE class KASYNC_EXPORT JobBase { template friend class Job; public: explicit JobBase(const Private::ExecutorBasePtr &executor) : mExecutor(executor) {} virtual ~JobBase() = default; protected: Private::ExecutorBasePtr mExecutor; }; //@endcond /** * @brief An Asynchronous job * * A single instance of Job represents a single method that will be executed * asynchronously. The Job is started by exec(), which returns Future * immediatelly. The Future will be set to finished state once the asynchronous * task has finished. You can use Future::waitForFinished() to wait for * for the Future in blocking manner. * * It is possible to chain multiple Jobs one after another in different fashion * (sequential, parallel, etc.). Calling exec() will then return a pending * Future, and will execute the entire chain of jobs. * * @code * auto job = Job::start>( * [](KAsync::Future> &future) { * MyREST::PendingUsers *pu = MyREST::requestListOfUsers(); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(dynamic_cast(pu)->userIds()); * future->setFinished(); * }); * }) * .each, int>( * [](const int &userId, KAsync::Future> &future) { * MyREST::PendingUser *pu = MyREST::requestUserDetails(userId); * QObject::connect(pu, &PendingOperation::finished, * [&](PendingOperation *pu) { * future->setValue(Qlist() << dynamic_cast(pu)->user()); * future->setFinished(); * }); * }); * * KAsync::Future> usersFuture = job.exec(); * usersFuture.waitForFinished(); * QList users = usersFuture.value(); * @endcode * * In the example above, calling @p job.exec() will first invoke the first job, * which will retrieve a list of IDs and then will invoke the second function * for each single entry in the list returned by the first function. */ template class Job : public JobBase { //@cond PRIVATE template friend class Job; template friend Job startImpl(Private::ContinuationHelper &&); template friend Job syncStartImpl(SyncContinuation &&); template friend Job forEach(KAsync::Job job); template friend Job serialForEach(KAsync::Job job); // Used to disable implicit conversion of Job which triggers // comiler warning. struct IncompleteType; //@endcond public: typedef Out OutType; ///A continuation template Job then(const Job &job) const; ///Shorthands for a job that returns another job from it's continuation // //OutOther and InOther are only there fore backwards compatibility, but are otherwise ignored. //It should never be neccessary to specify any template arguments, as they are automatically deduced from the provided argument. // //We currently have to write a then overload for: //* One argument in the continuation //* No argument in the continuation //* One argument + error in the continuation //* No argument + error in the continuation //This is due to how we extract the return type with "decltype(func(std::declval()))". //Ideally we could conflate this into at least fewer overloads, but I didn't manage so far and this at least works as expected. ///Continuation returning job: [] (Arg) -> KAsync::Job<...> { ... } template - auto then(F &&func) const -> typename std::enable_if()))>::value, - Job()))::OutType, In...> - >::type + auto then(F &&func) const -> std::enable_if_t()))>::value, + Job()))::OutType, In...>> { using ResultJob = decltype(func(std::declval())); //Job return thenImpl({std::forward(func)}, Private::ExecutionFlag::GoodCase); } ///Void continuation with job: [] () -> KAsync::Job<...> { ... } template - auto then(F &&func) const -> typename std::enable_if::value, - Job - >::type + auto then(F &&func) const -> std::enable_if_t::value, + Job> { using ResultJob = decltype(func()); //Job return thenImpl({std::forward(func)}, Private::ExecutionFlag::GoodCase); } ///Error continuation returning job: [] (KAsync::Error, Arg) -> KAsync::Job<...> { ... } template - auto then(F &&func) const -> typename std::enable_if()))>::value, - Job()))::OutType, In...> - >::type + auto then(F &&func) const -> std::enable_if_t()))>::value, + Job()))::OutType, In...>> { using ResultJob = decltype(func(KAsync::Error{}, std::declval())); //Job return thenImpl({std::forward(func)}, Private::ExecutionFlag::Always); } ///Error void continuation returning job: [] (KAsync::Error) -> KAsync::Job<...> { ... } template - auto then(F &&func) const -> typename std::enable_if::value, - Job - >::type + auto then(F &&func) const -> std::enable_if_t::value, + Job> { using ResultJob = decltype(func(KAsync::Error{})); return thenImpl({std::forward(func)}, Private::ExecutionFlag::Always); } ///Sync continuation: [] (Arg) -> void { ... } template - auto then(F &&func) const -> typename std::enable_if()))>::value, - Job())), In...> - >::type + auto then(F &&func) const -> std::enable_if_t()))>::value, + Job())), In...>> { using ResultType = decltype(func(std::declval())); //QString return syncThenImpl({std::forward(func)}, Private::ExecutionFlag::GoodCase); } ///Sync void continuation: [] () -> void { ... } template - auto then(F &&func) const -> typename std::enable_if::value, - Job - >::type + auto then(F &&func) const -> std::enable_if_t::value, + Job> { using ResultType = decltype(func()); //QString return syncThenImpl({std::forward(func)}, Private::ExecutionFlag::GoodCase); } ///Sync error continuation: [] (KAsync::Error, Arg) -> void { ... } template - auto then(F &&func) const -> typename std::enable_if()))>::value, - Job())),In...> - >::type + auto then(F &&func) const -> std::enable_if_t()))>::value, + Job())),In...>> { using ResultType = decltype(func(KAsync::Error{}, std::declval())); //QString return syncThenImpl({std::forward(func)}, Private::ExecutionFlag::Always); } ///Sync void error continuation: [] (KAsync::Error) -> void { ... } template - auto then(F &&func) const -> typename std::enable_if::value, - Job - >::type + auto then(F &&func) const -> std::enable_if_t::value, + Job> { using ResultType = decltype(func(KAsync::Error{})); return syncThenImpl({std::forward(func)}, Private::ExecutionFlag::Always); } ///Shorthand for a job that receives the error and a handle template Job then(HandleContinuation &&func) const { return thenImpl({std::forward>(func)}, Private::ExecutionFlag::GoodCase); } ///Shorthand for a job that receives the error and a handle template Job then(HandleErrorContinuation &&func) const { return thenImpl({std::forward>(func)}, Private::ExecutionFlag::Always); } ///Shorthand for a job that receives the error only Job onError(const SyncErrorContinuation &errorFunc) const; /** * Shorthand for a forEach loop that automatically uses the return type of * this job to deduce the type exepected. */ - template::value, int>::type = 0> + template::value, int> = 0> Job each(JobContinuation &&func) const { eachInvariants(); return then(forEach(std::forward>(func))); } /** * Shorthand for a serialForEach loop that automatically uses the return type * of this job to deduce the type exepected. */ - template::value, int>::type = 0> + template::value, int> = 0> Job serialEach(JobContinuation &&func) const { eachInvariants(); return then(serialForEach(std::forward>(func))); } /** * Enable implicit conversion to Job. * * This is necessary in assignments that only use the return value (which is the normal case). * This avoids constructs like: * auto job = KAsync::start( ... ) * .then( ... ) * .then([](){}); //Necessary for the assignment without the implicit conversion */ template - operator typename std::conditional::value, IncompleteType, Job>::type (); + operator std::conditional_t::value, IncompleteType, Job>(); /** * Adds an unnamed value to the context. * The context is guaranteed to persist until the jobs execution has finished. * * Useful for setting smart pointer to manage lifetime of objects required * during the execution of the job. */ template Job &addToContext(const T &value) { assert(mExecutor); mExecutor->addToContext(QVariant::fromValue(value)); return *this; } /** * Adds a guard. * It is guaranteed that no callback is executed after the guard vanishes. * * Use this i.e. ensure you don't call-back into an already destroyed object. */ Job &guard(const QObject *o) { assert(mExecutor); mExecutor->guard(o); return *this; } /** * @brief Starts execution of the job chain. * * This will start the execution of the task chain, starting from the * first one. It is possible to call this function multiple times, each * invocation will start a new processing and provide a new Future to * watch its status. * * @param in Argument to be passed to the very first task * @return Future<Out> object which will contain result of the last * task once if finishes executing. See Future documentation for more details. * * @see exec(), Future */ template KAsync::Future exec(FirstIn in); /** * @brief Starts execution of the job chain. * * This will start the execution of the task chain, starting from the * first one. It is possible to call this function multiple times, each * invocation will start a new processing and provide a new Future to * watch its status. * * @return Future<Out> object which will contain result of the last * task once if finishes executing. See Future documentation for more details. * * @see exec(FirstIn in), Future */ KAsync::Future exec(); explicit Job(JobContinuation &&func); explicit Job(HandleContinuation &&func); private: //@cond PRIVATE explicit Job(Private::ExecutorBasePtr executor); template Job thenImpl(Private::ContinuationHelper helper, Private::ExecutionFlag execFlag = Private::ExecutionFlag::GoodCase) const; template Job syncThenImpl(SyncContinuation &&func, Private::ExecutionFlag execFlag = Private::ExecutionFlag::GoodCase) const; template Job syncThenImpl(SyncErrorContinuation &&func, Private::ExecutionFlag execFlag = Private::ExecutionFlag::Always) const; template void thenInvariants() const; //Base case for an empty parameter pack template - typename std::enable_if<(sizeof...(InOther) == 0)>::type - thenInvariants() const; + auto thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>; template void eachInvariants() const; //@endcond }; } // namespace KAsync // out-of-line definitions of Job methods #include "job_impl.h" #endif // KASYNC_H diff --git a/src/async_impl.h b/src/async_impl.h index 4ad246f..9abfd85 100644 --- a/src/async_impl.h +++ b/src/async_impl.h @@ -1,97 +1,97 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2016 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_IMPL_H #define KASYNC_IMPL_H #include "async.h" #include //@cond PRIVATE namespace KAsync { namespace detail { template struct identity { typedef T type; }; template struct isIterable { enum { value = 0 }; }; template -struct isIterable::type> { +struct isIterable> { enum { value = 1 }; }; template struct prevOut { - using type = typename std::tuple_element<0, std::tuple>::type; + using type = std::tuple_element_t<0, std::tuple>; }; template struct funcHelper { using type = void(T::*)(In ..., KAsync::Future &); }; template struct syncFuncHelper { using type = Out(T::*)(In ...); }; template -inline typename std::enable_if::value, void>::type +inline std::enable_if_t::value, void> copyFutureValue(const KAsync::Future &in, KAsync::Future &out) { out.setValue(in.value()); } template -inline typename std::enable_if::value, void>::type +inline std::enable_if_t::value, void> copyFutureValue(const KAsync::Future &/* in */, KAsync::Future &/* out */) { // noop } template -inline typename std::enable_if::value, void>::type +inline std::enable_if_t::value, void> aggregateFutureValue(const KAsync::Future &in, KAsync::Future &out) { out.setValue(out.value() + in.value()); } template -inline typename std::enable_if::value, void>::type +inline std::enable_if_t::value, void> aggregateFutureValue(const KAsync::Future & /*in */, KAsync::Future & /*out */) { // noop } } // namespace Detail } // namespace KAsync //@endcond #endif // KASYNC_IMPL_H diff --git a/src/debug.h b/src/debug.h index b343083..5195c19 100644 --- a/src/debug.h +++ b/src/debug.h @@ -1,84 +1,84 @@ /* * Copyright 2015 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_DEBUG_H #define KASYNC_DEBUG_H //krazy:excludeall=dpointer #include "kasync_export.h" #include #include #ifndef QT_NO_DEBUG #include #endif namespace KAsync { Q_DECLARE_LOGGING_CATEGORY(Debug) Q_DECLARE_LOGGING_CATEGORY(Trace) KASYNC_EXPORT QString demangleName(const char *name); namespace Private { struct Execution; } class KASYNC_EXPORT Tracer { public: explicit Tracer(Private::Execution *execution); ~Tracer(); private: enum MsgType { Start, End }; void msg(MsgType); int mId; Private::Execution *mExecution; static int lastId; }; } #ifndef QT_NO_DEBUG template QString storeExecutorNameExpanded() { return KAsync::demangleName(typeid(T).name()); } template - typename std::enable_if::type - storeExecutorNameExpanded() { + auto storeExecutorNameExpanded() -> std::enable_if_t + { return storeExecutorNameExpanded() % QStringLiteral(", ") % storeExecutorNameExpanded(); } #define STORE_EXECUTOR_NAME(name, ...) \ ExecutorBase::mExecutorName = QStringLiteral(name) % QStringLiteral("<") % storeExecutorNameExpanded<__VA_ARGS__>() % QStringLiteral(">") #else #define STORE_EXECUTOR_NAME(...) #endif #endif // KASYNC_DEBUG_H diff --git a/src/future.h b/src/future.h index afc3a4d..533cfec 100644 --- a/src/future.h +++ b/src/future.h @@ -1,527 +1,526 @@ /* * Copyright 2014 Daniel Vrátil * Copyright 2016 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef FUTURE_H #define FUTURE_H #include "kasync_export.h" class QEventLoop; #include #include #include #include #include namespace KAsync { //@cond PRIVATE class FutureWatcherBase; template class FutureWatcher; namespace Private { struct Execution; class ExecutorBase; typedef QSharedPointer ExecutionPtr; } // namespace Private struct KASYNC_EXPORT Error { Error() : errorCode(0) {}; explicit Error(const char *message) : errorCode(1), errorMessage(QString::fromLatin1(message)) {} Error(int code, const char *message) : errorCode(code), errorMessage(QString::fromLatin1(message)) {} Error(int code, const QString &message) : errorCode(code), errorMessage(message) {} bool operator ==(const Error &other) const { return (errorCode == other.errorCode) && (errorMessage == other.errorMessage); } bool operator !=(const Error &other) const { return !(*this == other); } operator bool() const { return (errorCode != 0); } int errorCode; QString errorMessage; private: //Disable all implicit conversions except to bool, to avoid accidentally implicitly casting an error to a continuation argument. //This becomes an issue if you forget to specify all template arguments, as the template argument deduction may employ a nonsensical implicit conversion from i.e. error to int. So as long as the Error object is used in the Job::then overload resolution no implicit conversions here. //Of course this "solution" still breaks if you forget the template argument with a boolean parameter.... template operator T() const; }; class KASYNC_EXPORT FutureBase { friend struct KAsync::Private::Execution; friend class FutureWatcherBase; public: virtual ~FutureBase(); void setFinished(); bool isFinished() const; void setError(int code = 1, const QString &message = QString()); void setError(const Error &error); void addError(const Error &error); void clearErrors(); bool hasError() const; int errorCode() const; QString errorMessage() const; QVector errors() const; void setProgress(qreal progress); void setProgress(int processed, int total); protected: class KASYNC_EXPORT PrivateBase : public QSharedData { public: explicit PrivateBase(const KAsync::Private::ExecutionPtr &execution); virtual ~PrivateBase(); void releaseExecution(); bool finished; QVector errors; QVector> watchers; private: QWeakPointer mExecution; }; explicit FutureBase(); explicit FutureBase(FutureBase::PrivateBase *dd); FutureBase(const FutureBase &other); void addWatcher(KAsync::FutureWatcherBase *watcher); void releaseExecution(); protected: QExplicitlySharedDataPointer d; }; template class FutureWatcher; template class Future; template class FutureGeneric : public FutureBase { friend class FutureWatcher; public: void waitForFinished() const { if (isFinished()) { return; } FutureWatcher watcher; QEventLoop eventLoop; QObject::connect(&watcher, &KAsync::FutureWatcher::futureReady, &eventLoop, &QEventLoop::quit); watcher.setFuture(*static_cast*>(this)); eventLoop.exec(); } protected: //@cond PRIVATE explicit FutureGeneric(const KAsync::Private::ExecutionPtr &execution) : FutureBase(new Private(execution)) {} FutureGeneric(const FutureGeneric &other) : FutureBase(other) {} protected: class Private : public FutureBase::PrivateBase { public: explicit Private(const KAsync::Private::ExecutionPtr &execution) : FutureBase::PrivateBase(execution) {} - typename std::conditional::value, int /* dummy */, T>::type - value; + std::conditional_t::value, int /* dummy */, T> value; }; }; //@endcond /** * @ingroup Future * * @brief Future is a promise that is used by Job to deliver result * of an asynchronous execution. * * The Future is passed internally to each executed task, and the task can use * it to report its progress, result and notify when it is finished. * * Users use Future they receive from calling Job::exec() to get access * to the overall result of the execution. FutureWatcher<T> can be used * to wait for the Future to finish in non-blocking manner. * * @see Future */ template class Future : public FutureGeneric { //@cond PRIVATE friend class KAsync::Private::ExecutorBase; template friend class KAsync::FutureWatcher; //@endcond public: /** * @brief Constructor */ explicit Future() : FutureGeneric(KAsync::Private::ExecutionPtr()) {} /** * @brief Copy constructor */ Future(const Future &other) : FutureGeneric(other) {} /** * Set the result of the Future. This method is called by the task upon * calculating the result. After setting the value, the caller must also * call setFinished() to notify users that the result * is available. * * @warning This method must only be called by the tasks inside Job, * never by outside users. * * @param value The result value */ void setValue(const T &value) { dataImpl()->value = value; } /** * Retrieve the result of the Future. Calling this method when the future has * not yet finished (i.e. isFinished() returns false) * returns undefined result. */ T value() const { return dataImpl()->value; } T *operator->() { return &(dataImpl()->value); } const T *operator->() const { return &(dataImpl()->value); } T &operator*() { return dataImpl()->value; } const T &operator*() const { return dataImpl()->value; } #ifdef ONLY_DOXYGEN /** * Will block until the Future has finished. * * @note Internally this method is using a nested QEventLoop, which can * in some situation cause problems and deadlocks. It is recommended to use * FutureWatcher. * * @see isFinished() */ void waitForFinished() const; /** * Marks the future as finished. This will cause all FutureWatcher<T> * objects watching this particular instance to emit FutureWatcher::futureReady() * signal, and will cause all callers currently blocked in Future::waitForFinished() * method of this particular instance to resume. * * @warning This method must only be called by the tasks inside Job, * never by outside users. * * @see isFinished() */ void setFinished(); /** * Query whether the Future has already finished. * * @see setFinished() */ bool isFinished() const; /** * Used by tasks to report an error that happened during execution. If an * error handler was provided to the task, it will be executed with the * given arguments. Otherwise the error will be propagated to next task * that has an error handler, or all the way up to user. * * This method also internally calls setFinished() * * @warning This method must only be called by the tasks inside Job, * never by outside users. * * @param code Optional error code * @param message Optional error message * * @see errorCode(), errorMessage() */ void setError(int code = 1, const QString &message = QString()); /** * Returns error code set via setError() or 0 if no * error has occurred. * * @see setError(), errorMessage() */ int errorCode() const; /** * Returns error message set via setError() or empty * string if no error occurred. * * @see setError(), errorCode() */ QString errorMessage() const; /** * Sets progress of the task. All FutureWatcher instances watching * this particular future will then emit FutureWatcher::futureProgress() * signal. * * @param processed Already processed amount * @param total Total amount to process */ void setProgress(int processed, int total); /** * Sets progress of the task. * * @param progress Progress */ void setProgress(qreal progress); #endif // ONLY_DOXYGEN void setResult(const T &value) { dataImpl()->value = value; FutureBase::setFinished(); } protected: //@cond PRIVATE Future(const KAsync::Private::ExecutionPtr &execution) : FutureGeneric(execution) {} //@endcond private: inline auto dataImpl() { return static_cast::Private*>(this->d.data()); } inline auto dataImpl() const { return static_cast::Private*>(this->d.data()); } }; /** * @ingroup Future * * @brief A specialization of Future<T> for tasks that have no (void) * result. * * Unlike the generic Future<T> this specialization does not have * setValue() and value() methods to set/retrieve result. * * @see Future */ template<> class Future : public FutureGeneric { friend class KAsync::Private::ExecutorBase; public: /** * @brief Constructor */ Future() : FutureGeneric(KAsync::Private::ExecutionPtr()) {} /** * @brief Copy constructor */ Future(const Future &other) : FutureGeneric(other) {} protected: //@cond PRIVATE Future(const KAsync::Private::ExecutionPtr &execution) : FutureGeneric(execution) {} //@endcond }; //@cond PRIVATE class KASYNC_EXPORT FutureWatcherBase : public QObject { Q_OBJECT friend class FutureBase; Q_SIGNALS: void futureReady(); void futureProgress(qreal progress); protected: FutureWatcherBase(QObject *parent = nullptr); virtual ~FutureWatcherBase(); void futureReadyCallback(); void futureProgressCallback(qreal progress); void setFutureImpl(const KAsync::FutureBase &future); protected: class Private { public: KAsync::FutureBase future; }; Private * const d; private: Q_DISABLE_COPY(FutureWatcherBase) }; //@endcond /** * @ingroup Future * * @brief The FutureWatcher allows monitoring of Job results using * signals and slots. * * FutureWatcher is returned by Job upon execution. User can then * connect to its futureReady() and futureProgress() signals to be notified * about progress of the asynchronous job. When futureReady() signal is emitted, * the result of the job is available in Future::value(). */ template class FutureWatcher : public FutureWatcherBase { //@cond PRIVATE friend class KAsync::FutureGeneric; //@endcond public: /** * Constructs a new FutureWatcher that can watch for status of Future<T> */ FutureWatcher(QObject *parent = nullptr) : FutureWatcherBase(parent) {} ~FutureWatcher() {} /** * Set future to watch. * * @param future Future object to watch */ void setFuture(const KAsync::Future &future) { setFutureImpl(*static_cast(&future)); } /** * Returns currently watched future. */ KAsync::Future future() const { return *static_cast*>(&d->future); } #ifdef ONLY_DOXYGEN Q_SIGNALS: /** * The signal is emitted when the execution has finished and the result * can be collected. * * @see Future::setFinished(), Future::setError() */ void futureReady(); /** * The signal is emitted when progress of the execution changes. This has * to be explicitly supported by the job being executed, otherwise the * signal is not emitted. * * @see Future::setProgress() */ void futureProgress(qreal progress); #endif private: Q_DISABLE_COPY(FutureWatcher) }; } // namespace Async KASYNC_EXPORT QDebug& operator<<(QDebug &dbg, const KAsync::Error &error); #endif // FUTURE_H diff --git a/src/job_impl.h b/src/job_impl.h index 8726fb0..2f5c004 100644 --- a/src/job_impl.h +++ b/src/job_impl.h @@ -1,647 +1,646 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2015 - 2016 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_JOB_IMPL_H #define KASYNC_JOB_IMPL_H #include "async.h" //@cond PRIVATE namespace KAsync { namespace Private { template class ThenExecutor: public Executor::type, Out, In ...> { public: ThenExecutor(ContinuationHelper &&workerHelper, const ExecutorBasePtr &parent = {}, ExecutionFlag executionFlag = ExecutionFlag::GoodCase) : Executor::type, Out, In ...>(parent, executionFlag) , mContinuationHelper(std::move(workerHelper)) { STORE_EXECUTOR_NAME("ThenExecutor", Out, In ...); } void run(const ExecutionPtr &execution) Q_DECL_OVERRIDE { KAsync::Future::type> *prevFuture = nullptr; if (execution->prevExecution) { prevFuture = execution->prevExecution->result::type>(); assert(prevFuture->isFinished()); } //Execute one of the available workers KAsync::Future *future = execution->result(); const auto &helper = ThenExecutor::mContinuationHelper; if (helper.handleContinuation) { helper.handleContinuation(prevFuture ? prevFuture->value() : In() ..., *future); } else if (helper.handleErrorContinuation) { helper.handleErrorContinuation(prevFuture->hasError() ? prevFuture->errors().first() : Error(), prevFuture ? prevFuture->value() : In() ..., *future); } else if (helper.jobContinuation) { executeJobAndApply(prevFuture ? prevFuture->value() : In() ..., helper.jobContinuation, *future, std::is_void()); } else if (helper.jobErrorContinuation) { executeJobAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), prevFuture ? prevFuture->value() : In() ..., helper.jobErrorContinuation, *future, std::is_void()); } } private: void executeJobAndApply(In && ... input, const JobContinuation &func, Future &future, std::false_type) { func(std::forward(input) ...) .template then([&future](const KAsync::Error &error, const Out &v, KAsync::Future &f) { if (error) { future.setError(error); } else { future.setResult(v); } f.setFinished(); }).exec(); } void executeJobAndApply(In && ... input, const JobContinuation &func, Future &future, std::true_type) { func(std::forward(input) ...) .template then([&future](const KAsync::Error &error, KAsync::Future &f) { if (error) { future.setError(error); } else { future.setFinished(); } f.setFinished(); }).exec(); } void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, Future &future, std::false_type) { func(error, std::forward(input) ...) .template then([&future](const KAsync::Error &error, const Out &v, KAsync::Future &f) { if (error) { future.setError(error); } else { future.setResult(v); } f.setFinished(); }).exec(); } void executeJobAndApply(const Error &error, In && ... input, const JobErrorContinuation &func, Future &future, std::true_type) { func(error, std::forward(input) ...) .template then([&future](const KAsync::Error &error, KAsync::Future &f) { if (error) { future.setError(error); } else { future.setFinished(); } f.setFinished(); }).exec(); } ContinuationHelper mContinuationHelper; }; template class SyncThenExecutor: public Executor::type, Out, In ...> { private: void callAndApply(In && ... input, const SyncContinuation &func, Future &future, std::false_type) { future.setValue(func(std::forward(input) ...)); } void callAndApply(In && ... input, const SyncContinuation &func, Future &, std::true_type) { func(std::forward(input) ...); } void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &future, std::false_type) { future.setValue(func(error, std::forward(input) ...)); } void callAndApply(const Error &error, In && ... input, const SyncErrorContinuation &func, Future &, std::true_type) { func(error, std::forward(input) ...); } const SyncContinuation mContinuation; const SyncErrorContinuation mErrorContinuation; public: SyncThenExecutor(SyncContinuation &&worker, const ExecutorBasePtr &parent = ExecutorBasePtr(), ExecutionFlag executionFlag = Always) : Executor::type, Out, In ...>(parent, executionFlag) , mContinuation(std::move(worker)) { } SyncThenExecutor(SyncErrorContinuation &&worker, const ExecutorBasePtr &parent = ExecutorBasePtr(), ExecutionFlag executionFlag = Always) : Executor::type, Out, In ...>(parent, executionFlag) , mErrorContinuation(std::move(worker)) { } void run(const ExecutionPtr &execution) Q_DECL_OVERRIDE { KAsync::Future::type> *prevFuture = nullptr; if (execution->prevExecution) { prevFuture = execution->prevExecution->result::type>(); assert(prevFuture->isFinished()); } KAsync::Future *future = execution->result(); if (SyncThenExecutor::mContinuation) { callAndApply(prevFuture ? prevFuture->value() : In() ..., SyncThenExecutor::mContinuation, *future, std::is_void()); } if (SyncThenExecutor::mErrorContinuation) { assert(prevFuture); callAndApply(prevFuture->hasError() ? prevFuture->errors().first() : Error(), prevFuture ? prevFuture->value() : In() ..., SyncThenExecutor::mErrorContinuation, *future, std::is_void()); } future->setFinished(); } }; template class SyncErrorExecutor: public Executor::type, Out, In ...> { private: const SyncErrorContinuation mContinuation; public: SyncErrorExecutor(SyncErrorContinuation &&worker, const ExecutorBasePtr &parent = ExecutorBasePtr(), ExecutionFlag executionFlag = Always) : Executor::type, Out, In ...>(parent, executionFlag) , mContinuation(std::move(worker)) { } void run(const ExecutionPtr &execution) Q_DECL_OVERRIDE { KAsync::Future::type> *prevFuture = nullptr; if (execution->prevExecution) { prevFuture = execution->prevExecution->result::type>(); assert(prevFuture->isFinished()); } KAsync::Future *future = execution->result(); assert(prevFuture->hasError()); mContinuation(prevFuture->errors().first()); future->setError(prevFuture->errors().first()); } }; template KAsync::Future* ExecutorBase::createFuture(const ExecutionPtr &execution) const { return new KAsync::Future(execution); } template void Executor::runExecution(const KAsync::Future *prevFuture, const ExecutionPtr &execution, bool guardIsBroken) { if (guardIsBroken) { execution->resultBase->setFinished(); return; } if (prevFuture) { if (prevFuture->hasError() && executionFlag == ExecutionFlag::GoodCase) { //Propagate the error to the outer Future Q_ASSERT(prevFuture->errors().size() == 1); execution->resultBase->setError(prevFuture->errors().first()); return; } if (!prevFuture->hasError() && executionFlag == ExecutionFlag::ErrorCase) { //Propagate the value to the outer Future KAsync::detail::copyFutureValue(*prevFuture, *execution->result()); execution->resultBase->setFinished(); return; } } run(execution); } class ExecutionContext { public: typedef QSharedPointer Ptr; QVector> guards; bool guardIsBroken() const { for (const auto &g : guards) { if (!g) { return true; } } return false; } }; template ExecutionPtr Executor::exec(const ExecutorBasePtr &self, ExecutionContext::Ptr context) { /* * One executor per job, created with the construction of the Job object. * One execution per job per exec(), created only once exec() is called. * * The executors make up the linked list that makes up the complete execution chain. * * The execution then tracks the execution of each executor. */ // Passing 'self' to execution ensures that the Executor chain remains // valid until the entire execution is finished ExecutionPtr execution = ExecutionPtr::create(self); #ifndef QT_NO_DEBUG execution->tracer = std::make_unique(execution.data()); // owned by execution #endif context->guards += mGuards; // chainup execution->prevExecution = mPrev ? mPrev->exec(mPrev, context) : ExecutionPtr(); execution->resultBase = ExecutorBase::createFuture(execution); //We watch our own future to finish the execution once we're done auto fw = new KAsync::FutureWatcher(); QObject::connect(fw, &KAsync::FutureWatcher::futureReady, [fw, execution]() { execution->setFinished(); delete fw; }); fw->setFuture(*execution->result()); KAsync::Future *prevFuture = execution->prevExecution ? execution->prevExecution->result() : nullptr; if (!prevFuture || prevFuture->isFinished()) { //The previous job is already done runExecution(prevFuture, execution, context->guardIsBroken()); } else { //The previous job is still running and we have to wait for it's completion auto prevFutureWatcher = new KAsync::FutureWatcher(); QObject::connect(prevFutureWatcher, &KAsync::FutureWatcher::futureReady, [prevFutureWatcher, execution, this, context]() { auto prevFuture = prevFutureWatcher->future(); assert(prevFuture.isFinished()); delete prevFutureWatcher; runExecution(&prevFuture, execution, context->guardIsBroken()); }); prevFutureWatcher->setFuture(*static_cast*>(prevFuture)); } return execution; } } // namespace Private template template -Job::operator typename std::conditional::value, IncompleteType, Job>::type () +Job::operator std::conditional_t::value, IncompleteType, Job> () { return thenImpl({[](InOther ...){ return KAsync::null(); }}, {}); } template template Job Job::thenImpl(Private::ContinuationHelper workHelper, Private::ExecutionFlag execFlag) const { thenInvariants(); return Job(QSharedPointer>::create( std::forward>(workHelper), mExecutor, execFlag)); } template template Job Job::then(const Job &job) const { thenInvariants(); auto executor = job.mExecutor; executor->prepend(mExecutor); return Job(executor); } template template Job Job::syncThenImpl(SyncContinuation &&func, Private::ExecutionFlag execFlag) const { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); thenInvariants(); return Job(QSharedPointer>::create( std::forward>(func), mExecutor, execFlag)); } template template Job Job::syncThenImpl(SyncErrorContinuation &&func, Private::ExecutionFlag execFlag) const { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); thenInvariants(); return Job(QSharedPointer>::create( std::forward>(func), mExecutor, execFlag)); } template Job Job::onError(const SyncErrorContinuation &errorFunc) const { return Job(QSharedPointer>::create( [=](const Error &error) { errorFunc(error); }, mExecutor, Private::ExecutionFlag::ErrorCase)); } template template KAsync::Future Job::exec(FirstIn in) { // Inject a fake sync executor that will return the initial value Private::ExecutorBasePtr first = mExecutor; while (first->mPrev) { first = first->mPrev; } first->mPrev = QSharedPointer>::create( Private::ContinuationHelper([val = std::move(in)](Future &future) { future.setResult(val); })); auto result = exec(); // Remove the injected executor first->mPrev.reset(); return result; } template KAsync::Future Job::exec() { Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create()); KAsync::Future result = *execution->result(); return result; } template Job::Job(Private::ExecutorBasePtr executor) : JobBase(executor) {} template Job::Job(JobContinuation &&func) : JobBase(new Private::ThenExecutor(std::forward>(func), {})) { qWarning() << "Creating job job"; static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); } template template void Job::eachInvariants() const { static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or an array."); static_assert(std::is_void::value || detail::isIterable::value, "The result type of 'Each' task must be void, a list or an array."); } template template void Job::thenInvariants() const { static_assert(!std::is_void::value && (std::is_convertible::value || std::is_base_of::value), "The return type of previous task must be compatible with input type of this task"); } template template -typename std::enable_if<(sizeof...(InOther) == 0)>::type -Job::thenInvariants() const +auto Job::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)> { } template Job startImpl(Private::ContinuationHelper &&helper) { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return Job(QSharedPointer>::create( std::forward>(helper), nullptr, Private::ExecutionFlag::GoodCase)); } template Job syncStartImpl(SyncContinuation &&func) { static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); return Job(QSharedPointer>::create( std::forward>(func))); } static inline KAsync::Job waitForCompletion(QList> &futures) { auto context = new QObject; return start([futures, context](KAsync::Future &future) { const auto total = futures.size(); auto count = QSharedPointer::create(); int i = 0; for (KAsync::Future subFuture : futures) { i++; if (subFuture.isFinished()) { *count += 1; continue; } // FIXME bind lifetime all watcher to future (repectively the main job auto watcher = QSharedPointer>::create(); QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future, context]() { *count += 1; if (*count == total) { delete context; future.setFinished(); } }); watcher->setFuture(subFuture); context->setProperty(QString::fromLatin1("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); } if (*count == total) { delete context; future.setFinished(); } }); // .finally([context]() { delete context; }); } template Job forEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); QList> list; for (const auto &v : values) { auto future = job .template then([error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } }) .exec(v); list << future; } return waitForCompletion(list) .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; return Job(QSharedPointer>::create( Private::ContinuationHelper(std::move(cont)), nullptr, Private::ExecutionFlag::GoodCase)); } template Job serialForEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); auto serialJob = KAsync::null(); for (const auto &value : values) { serialJob = serialJob.then([value, job, error](KAsync::Future &future) { job.template then([&future, error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } future.setFinished(); }) .exec(value); }); } return serialJob .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; return Job(QSharedPointer>::create( Private::ContinuationHelper(std::move(cont)), nullptr, Private::ExecutionFlag::GoodCase)); } template Job forEach(JobContinuation &&func) { return forEach(KAsync::start(std::forward>(func))); } template Job serialForEach(JobContinuation &&func) { return serialForEach(KAsync::start(std::forward>(func))); } template Job null() { return KAsync::start( [](KAsync::Future &future) { future.setFinished(); }); } template Job value(Out v) { return KAsync::start( [val = std::move(v)](KAsync::Future &future) { future.setResult(val); }); } template Job error(int errorCode, const QString &errorMessage) { return error({errorCode, errorMessage}); } template Job error(const char *message) { return error(Error(message)); } template Job error(const Error &error) { return KAsync::start( [error](KAsync::Future &future) { future.setError(error); }); } } // namespace KAsync //@endconf #endif // KASYNC_JOB_IMPL_H