diff --git a/src/job_impl.h b/src/job_impl.h index 96ac79e..bf5cafe 100644 --- a/src/job_impl.h +++ b/src/job_impl.h @@ -1,334 +1,342 @@ /* * Copyright 2014 - 2015 Daniel Vrátil * Copyright 2015 - 2016 Daniel Vrátil * Copyright 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public License as * published by the Free Software Foundation; either version 2 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Library General Public License for more details. * * You should have received a copy of the GNU Library General Public License * along with this library. If not, see . */ #ifndef KASYNC_JOB_IMPL_H #define KASYNC_JOB_IMPL_H #include "async.h" #include //@cond PRIVATE namespace KAsync { template template Job::operator std::conditional_t::value, IncompleteType, Job> () { return thenImpl({JobContinuation([](InOther ...){ return KAsync::null(); })}, {}); } template template Job Job::thenImpl(Private::ContinuationHolder workHelper, Private::ExecutionFlag execFlag) const { thenInvariants(); return Job(QSharedPointer>::create( std::forward>(workHelper), mExecutor, execFlag)); } template template Job Job::then(const Job &job) const { thenInvariants(); auto executor = job.mExecutor; executor->prepend(mExecutor); return Job(executor); } template Job Job::onError(SyncErrorContinuation &&errorFunc) const { return Job(QSharedPointer>::create( // Extra indirection to allow propagating the result of a previous future when no // error occurs Private::ContinuationHolder([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) { errorFunc(error); return val; }), mExecutor, Private::ExecutionFlag::ErrorCase)); } template<> // Specialize for void jobs inline Job Job::onError(SyncErrorContinuation &&errorFunc) const { return Job(QSharedPointer>::create( Private::ContinuationHolder(std::forward>(errorFunc)), mExecutor, Private::ExecutionFlag::ErrorCase)); } template template KAsync::Future Job::exec(FirstIn in) { // Inject a fake sync executor that will return the initial value Private::ExecutorBasePtr first = mExecutor; while (first->mPrev) { first = first->mPrev; } first->mPrev = QSharedPointer>::create( Private::ContinuationHolder([val = std::move(in)](Future &future) { future.setResult(val); })); auto result = exec(); // Remove the injected executor first->mPrev.reset(); return result; } template KAsync::Future Job::exec() { Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create()); KAsync::Future result = *execution->result(); return result; } template Job::Job(Private::ExecutorBasePtr executor) : JobBase(executor) {} template Job::Job(JobContinuation &&func) : JobBase(new Private::Executor(std::forward>(func), {})) { qWarning() << "Creating job job"; static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed."); } template template void Job::eachInvariants() const { static_assert(detail::isIterable::value, "The 'Each' task can only be connected to a job that returns a list or an array."); static_assert(std::is_void::value || detail::isIterable::value, "The result type of 'Each' task must be void, a list or an array."); } template template void Job::thenInvariants() const { static_assert(!std::is_void::value && (std::is_convertible::value || std::is_base_of::value), "The return type of previous task must be compatible with input type of this task"); } template template auto Job::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)> { } -static inline KAsync::Job waitForCompletion(QList> &futures) +inline KAsync::Job waitForCompletion(QVector> &futures) { - auto context = new QObject; - return start([futures, context](KAsync::Future &future) { - const auto total = futures.size(); - auto count = QSharedPointer::create(); - int i = 0; + struct Context { + void removeWatcher(KAsync::FutureWatcher *w) + { + pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) { + return w == watcher.get(); + })); + } + + std::vector>> pending; + }; + + return start([]() { + return new Context(); + }) + .then([futures](Context *context, KAsync::Future &future) { for (KAsync::Future subFuture : futures) { - i++; if (subFuture.isFinished()) { - *count += 1; continue; } // FIXME bind lifetime all watcher to future (repectively the main job - auto watcher = QSharedPointer>::create(); - QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, - [count, total, &future, context]() { - *count += 1; - if (*count == total) { - delete context; - future.setFinished(); + auto watcher = std::make_unique>(); + QObject::connect(watcher.get(), &KAsync::FutureWatcher::futureReady, + [&future, watcher = watcher.get(), context]() { + context->removeWatcher(watcher); + if (context->pending.empty()) { + future.setResult(context); } }); watcher->setFuture(subFuture); - context->setProperty(QString::fromLatin1("future%1").arg(i).toLatin1().data(), - QVariant::fromValue(watcher)); + context->pending.push_back(std::move(watcher)); } - if (*count == total) { - delete context; - future.setFinished(); + if (context->pending.empty()) { + future.setResult(context); } + }) + .then([](Context *context) { + delete context; }); // .finally([context]() { delete context; }); } template Job forEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); - QList> list; + QVector> list; for (const auto &v : values) { auto future = job .template then([error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } }) .exec(v); - list << future; + list.push_back(future); } return waitForCompletion(list) .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; return Job(QSharedPointer>::create( Private::ContinuationHolder(JobContinuation(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); } template Job serialForEach(KAsync::Job job) { auto cont = [job] (const List &values) mutable { auto error = QSharedPointer::create(); auto serialJob = KAsync::null(); for (const auto &value : values) { serialJob = serialJob.then([value, job, error](KAsync::Future &future) { job.template then([&future, error] (const KAsync::Error &e) { if (e && !*error) { //TODO ideally we would aggregate the errors instead of just using the first one *error = e; } future.setFinished(); }) .exec(value); }); } return serialJob .then([error](KAsync::Future &future) { if (*error) { future.setError(*error); } else { future.setFinished(); } }); }; return Job(QSharedPointer>::create( Private::ContinuationHolder(JobContinuation(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase)); } template Job forEach(JobContinuation &&func) { return forEach(KAsync::start(std::forward>(func))); } template Job serialForEach(JobContinuation &&func) { return serialForEach(KAsync::start(std::forward>(func))); } template Job null() { return KAsync::start( [](KAsync::Future &future) { future.setFinished(); }); } template Job value(Out v) { return KAsync::start( [val = std::move(v)](KAsync::Future &future) { future.setResult(val); }); } template Job error(int errorCode, const QString &errorMessage) { return error({errorCode, errorMessage}); } template Job error(const char *message) { return error(Error(message)); } template Job error(const Error &error) { return KAsync::start( [error](KAsync::Future &future) { future.setError(error); }); } inline Job doWhile(const Job &body) { return KAsync::start([body] (KAsync::Future &future) { auto job = body.then([&future, body](const KAsync::Error &error, ControlFlowFlag flag) { if (error) { future.setError(error); future.setFinished(); } else if (flag == ControlFlowFlag::Continue) { doWhile(body).then([&future](const KAsync::Error &error) { if (error) { future.setError(error); } future.setFinished(); }).exec(); } else { future.setFinished(); } }).exec(); }); } inline Job doWhile(const JobContinuation &body) { return doWhile(KAsync::start([body] { return body(); })); } inline Job wait(int delay) { return KAsync::start([delay](KAsync::Future &future) { QTimer::singleShot(delay, [&future]() { future.setFinished(); }); }); } } // namespace KAsync //@endcond #endif // KASYNC_JOB_IMPL_H