diff --git a/libs/image/kis_update_job_item.h b/libs/image/kis_update_job_item.h index aabd4876a0..9d7354f967 100644 --- a/libs/image/kis_update_job_item.h +++ b/libs/image/kis_update_job_item.h @@ -1,288 +1,293 @@ /* * Copyright (c) 2011 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATE_JOB_ITEM_H #define __KIS_UPDATE_JOB_ITEM_H #include #include #include #include "kis_stroke_job.h" #include "kis_spontaneous_job.h" #include "kis_base_rects_walker.h" #include "kis_async_merger.h" #include "kis_debug.h" +#include "kis_updater_context.h" class KisUpdateJobItem : public QObject, public QRunnable { Q_OBJECT public: enum class Type : int { EMPTY = 0, WAITING, MERGE, STROKE, SPONTANEOUS }; public: - KisUpdateJobItem(QReadWriteLock *exclusiveJobLock, int index) - : m_exclusiveJobLock(exclusiveJobLock), m_index(index), + KisUpdateJobItem(KisUpdaterContext *updaterContext, QReadWriteLock *exclusiveJobLock, int index) + : m_updaterContext(updaterContext), m_exclusiveJobLock(exclusiveJobLock), m_index(index), m_atomicType(Type::EMPTY), m_runnableJob(0) { setAutoDelete(false); KIS_SAFE_ASSERT_RECOVER_NOOP(m_atomicType.is_lock_free()); } ~KisUpdateJobItem() override { delete m_runnableJob; } void run() override { if (!isRunning()) return; /** * Here we break the idea of QThreadPool a bit. Ideally, we should split the * jobs into distinct QRunnable objects and pass all of them to QThreadPool. * That is a nice idea, but it doesn't work well when the jobs are small enough * and the number of available cores is high (>4 cores). It this case the * threads just tend to execute the job very quickly and go to sleep, which is * an expensive operation. * * To overcome this problem we try to bulk-process the jobs. In sigJobFinished() * signal (which is DirectConnection), the context may add the job to ourselves(!!!), * so we switch from "done" state into "running" again. */ while (1) { KIS_SAFE_ASSERT_RECOVER_RETURN(isRunning()); if(m_exclusive) { m_exclusiveJobLock->lockForWrite(); } else { m_exclusiveJobLock->lockForRead(); } if(m_atomicType == Type::MERGE) { runMergeJob(); } else { KIS_ASSERT(m_atomicType == Type::STROKE || m_atomicType == Type::SPONTANEOUS); m_runnableJob->run(); } setDone(); - emit sigDoSomeUsefulWork(); +// emit sigDoSomeUsefulWork(); + m_updaterContext->doSomeUsefulWork(); // may flip the current state from Waiting -> Running again - emit sigJobFinished(m_index); +// emit sigJobFinished(m_index); + m_updaterContext->jobFinished(m_index); m_exclusiveJobLock->unlock(); // try to exit the loop. Please note, that no one can flip the state from // WAITING to EMPTY except ourselves! Type expectedValue = Type::WAITING; if (m_atomicType.compare_exchange_strong(expectedValue, Type::EMPTY)) { break; } } } inline void runMergeJob() { KIS_SAFE_ASSERT_RECOVER_RETURN(m_atomicType == Type::MERGE); // KIS_SAFE_ASSERT_RECOVER_RETURN(m_walker); // dbgKrita << "Executing merge job" << m_walker->changeRect() // << "on thread" << QThread::currentThreadId(); QRect changeRect; for (auto walker : m_walkers) { m_merger.startMerge(*walker); changeRect |= walker->changeRect(); } - emit sigContinueUpdate(changeRect); +// emit sigContinueUpdate(changeRect); + m_updaterContext->continueUpdate(changeRect); } // return true if the thread should actually be started inline bool setWalker(KisBaseRectsWalkerSP walker) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_accessRect = walker->accessRect(); m_changeRect = walker->changeRect(); m_walker = walker; m_exclusive = false; m_runnableJob = 0; const Type oldState = m_atomicType.exchange(Type::MERGE); return oldState == Type::EMPTY; } inline bool setWalkers(QVector &walkers) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_accessRect = QRect(); m_changeRect = QRect(); m_walkers.swap(walkers); m_exclusive = false; m_runnableJob = 0; for (auto walker : m_walkers) { m_accessRect |= walker->accessRect(); m_changeRect |= walker->changeRect(); } const Type oldState = m_atomicType.exchange(Type::MERGE); return oldState == Type::EMPTY; } // return true if the thread should actually be started inline bool setStrokeJob(KisStrokeJob *strokeJob) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_runnableJob = strokeJob; m_strokeJobSequentiality = strokeJob->sequentiality(); m_exclusive = strokeJob->isExclusive(); m_walkers.clear(); m_walker = 0; m_accessRect = m_changeRect = QRect(); const Type oldState = m_atomicType.exchange(Type::STROKE); return oldState == Type::EMPTY; } // return true if the thread should actually be started inline bool setSpontaneousJob(KisSpontaneousJob *spontaneousJob) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_runnableJob = spontaneousJob; m_exclusive = spontaneousJob->isExclusive(); m_walkers.clear(); m_walker = 0; m_accessRect = m_changeRect = QRect(); const Type oldState = m_atomicType.exchange(Type::SPONTANEOUS); return oldState == Type::EMPTY; } inline void setDone() { m_walkers.clear(); m_walker = 0; delete m_runnableJob; m_runnableJob = 0; m_atomicType = Type::WAITING; } inline bool isRunning() const { return m_atomicType >= Type::MERGE; } inline Type type() const { return m_atomicType; } inline const QRect& accessRect() const { return m_accessRect; } inline const QRect& changeRect() const { return m_changeRect; } inline KisStrokeJobData::Sequentiality strokeJobSequentiality() const { return m_strokeJobSequentiality; } Q_SIGNALS: void sigContinueUpdate(const QRect& rc); void sigDoSomeUsefulWork(); void sigJobFinished(int index); private: /** * Open walker and stroke job for the testing suite. * Please, do not use it in production code. */ friend class KisSimpleUpdateQueueTest; friend class KisStrokesQueueTest; friend class KisUpdateSchedulerTest; friend class KisTestableUpdaterContext; inline KisBaseRectsWalkerSP walker() const { return m_walker; } inline KisStrokeJob* strokeJob() const { KisStrokeJob *job = dynamic_cast(m_runnableJob); Q_ASSERT(job); return job; } inline void testingSetDone() { setDone(); } private: /** * \see KisUpdaterContext::m_exclusiveJobLock */ + KisUpdaterContext *m_updaterContext; QReadWriteLock *m_exclusiveJobLock; const int m_index; bool m_exclusive; std::atomic m_atomicType; volatile KisStrokeJobData::Sequentiality m_strokeJobSequentiality; /** * Runnable jobs part * The job is owned by the context and deleted after completion */ KisRunnable *m_runnableJob; /** * Merge jobs part */ KisBaseRectsWalkerSP m_walker; QVector m_walkers; KisAsyncMerger m_merger; /** * These rects cache actual values from the walker * to eliminate concurrent access to a walker structure */ QRect m_accessRect; QRect m_changeRect; }; #endif /* __KIS_UPDATE_JOB_ITEM_H */ diff --git a/libs/image/kis_update_scheduler.cpp b/libs/image/kis_update_scheduler.cpp index 68b5568ff7..b5f8112bcb 100644 --- a/libs/image/kis_update_scheduler.cpp +++ b/libs/image/kis_update_scheduler.cpp @@ -1,503 +1,503 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_update_scheduler.h" #include "klocalizedstring.h" #include "kis_image_config.h" #include "kis_merge_walker.h" #include "kis_full_refresh_walker.h" #include "kis_updater_context.h" #include "kis_simple_update_queue.h" #include "kis_strokes_queue.h" #include "kis_queues_progress_updater.h" #include "KisImageConfigNotifier.h" #include #include "kis_lazy_wait_condition.h" #include //#define DEBUG_BALANCING #ifdef DEBUG_BALANCING #define DEBUG_BALANCING_METRICS(decidedFirst, excl) \ dbgKrita << "Balance decision:" << decidedFirst \ << "(" << excl << ")" \ << "updates:" << m_d->updatesQueue.sizeMetric() \ << "strokes:" << m_d->strokesQueue.sizeMetric() #else #define DEBUG_BALANCING_METRICS(decidedFirst, excl) #endif struct Q_DECL_HIDDEN KisUpdateScheduler::Private { Private(KisUpdateScheduler *_q, KisProjectionUpdateListener *p) : q(_q) , updaterContext(KisImageConfig(true).maxNumberOfThreads(), q) , projectionUpdateListener(p) {} KisUpdateScheduler *q; KisSimpleUpdateQueue updatesQueue; KisStrokesQueue strokesQueue; KisUpdaterContext updaterContext; bool processingBlocked = false; qreal defaultBalancingRatio = 1.0; // desired strokes-queue-size / updates-queue-size KisProjectionUpdateListener *projectionUpdateListener; KisQueuesProgressUpdater *progressUpdater = 0; QAtomicInt updatesLockCounter; QReadWriteLock updatesStartLock; KisLazyWaitCondition updatesFinishedCondition; qreal balancingRatio() const { const qreal strokeRatioOverride = strokesQueue.balancingRatioOverride(); return strokeRatioOverride > 0 ? strokeRatioOverride : defaultBalancingRatio; } }; KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent) : QObject(parent), m_d(new Private(this, projectionUpdateListener)) { updateSettings(); connectSignals(); } KisUpdateScheduler::KisUpdateScheduler() : m_d(new Private(this, 0)) { } KisUpdateScheduler::~KisUpdateScheduler() { delete m_d->progressUpdater; delete m_d; } void KisUpdateScheduler::setThreadsLimit(int value) { KIS_SAFE_ASSERT_RECOVER_RETURN(!m_d->processingBlocked); /** * Thread limit can be changed without the full-featured barrier * lock, we can avoid waiting for all the jobs to complete. We * should just ensure there is no more jobs in the updater context. */ lock(); m_d->updaterContext.lock(); m_d->updaterContext.setThreadsLimit(value); m_d->updaterContext.unlock(); unlock(false); } int KisUpdateScheduler::threadsLimit() const { std::lock_guard l(m_d->updaterContext); return m_d->updaterContext.threadsLimit(); } void KisUpdateScheduler::connectSignals() { - connect(&m_d->updaterContext, SIGNAL(sigContinueUpdate(const QRect&)), - SLOT(continueUpdate(const QRect&)), - Qt::DirectConnection); +// connect(&m_d->updaterContext, SIGNAL(sigContinueUpdate(const QRect&)), +// SLOT(continueUpdate(const QRect&)), +// Qt::DirectConnection); - connect(&m_d->updaterContext, SIGNAL(sigDoSomeUsefulWork()), - SLOT(doSomeUsefulWork()), Qt::DirectConnection); +// connect(&m_d->updaterContext, SIGNAL(sigDoSomeUsefulWork()), +// SLOT(doSomeUsefulWork()), Qt::DirectConnection); - connect(&m_d->updaterContext, SIGNAL(sigSpareThreadAppeared()), - SLOT(spareThreadAppeared()), Qt::DirectConnection); +// connect(&m_d->updaterContext, SIGNAL(sigSpareThreadAppeared()), +// SLOT(spareThreadAppeared()), Qt::DirectConnection); connect(KisImageConfigNotifier::instance(), SIGNAL(configChanged()), SLOT(updateSettings())); } void KisUpdateScheduler::setProgressProxy(KoProgressProxy *progressProxy) { delete m_d->progressUpdater; m_d->progressUpdater = progressProxy ? new KisQueuesProgressUpdater(progressProxy, this) : 0; } void KisUpdateScheduler::progressUpdate() { if (!m_d->progressUpdater) return; if(!m_d->strokesQueue.hasOpenedStrokes()) { QString jobName = m_d->strokesQueue.currentStrokeName().toString(); if(jobName.isEmpty()) { jobName = i18n("Updating..."); } int sizeMetric = m_d->strokesQueue.sizeMetric(); if (!sizeMetric) { sizeMetric = m_d->updatesQueue.sizeMetric(); } m_d->progressUpdater->updateProgress(sizeMetric, jobName); } else { m_d->progressUpdater->hide(); } } void KisUpdateScheduler::updateProjection(KisNodeSP node, const QVector &rects, const QRect &cropRect) { m_d->updatesQueue.addUpdateJob(node, rects, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect) { m_d->updatesQueue.addUpdateJob(node, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect) { m_d->updatesQueue.addUpdateNoFilthyJob(node, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::fullRefreshAsync(KisNodeSP root, const QRect& rc, const QRect &cropRect) { m_d->updatesQueue.addFullRefreshJob(root, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect) { KisBaseRectsWalkerSP walker = new KisFullRefreshWalker(cropRect); walker->collectRects(root, rc); bool needLock = true; if(m_d->processingBlocked) { warnImage << "WARNING: Calling synchronous fullRefresh under a scheduler lock held"; warnImage << "We will not assert for now, but please port caller's to strokes"; warnImage << "to avoid this warning"; needLock = false; } if(needLock) lock(); m_d->updaterContext.lock(); Q_ASSERT(m_d->updaterContext.isJobAllowed(walker)); m_d->updaterContext.addMergeJob(walker); m_d->updaterContext.waitForDone(); m_d->updaterContext.unlock(); if(needLock) unlock(true); } void KisUpdateScheduler::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { m_d->updatesQueue.addSpontaneousJob(spontaneousJob); processQueues(); } KisStrokeId KisUpdateScheduler::startStroke(KisStrokeStrategy *strokeStrategy) { KisStrokeId id = m_d->strokesQueue.startStroke(strokeStrategy); processQueues(); return id; } void KisUpdateScheduler::addJob(KisStrokeId id, KisStrokeJobData *data) { m_d->strokesQueue.addJob(id, data); processQueues(); } void KisUpdateScheduler::endStroke(KisStrokeId id) { m_d->strokesQueue.endStroke(id); processQueues(); } bool KisUpdateScheduler::cancelStroke(KisStrokeId id) { bool result = m_d->strokesQueue.cancelStroke(id); processQueues(); return result; } bool KisUpdateScheduler::tryCancelCurrentStrokeAsync() { return m_d->strokesQueue.tryCancelCurrentStrokeAsync(); } UndoResult KisUpdateScheduler::tryUndoLastStrokeAsync() { return m_d->strokesQueue.tryUndoLastStrokeAsync(); } bool KisUpdateScheduler::wrapAroundModeSupported() const { return m_d->strokesQueue.wrapAroundModeSupported(); } void KisUpdateScheduler::setDesiredLevelOfDetail(int lod) { m_d->strokesQueue.setDesiredLevelOfDetail(lod); /** * The queue might have started an internal stroke for * cache synchronization. Process the queues to execute * it if needed. */ processQueues(); } void KisUpdateScheduler::explicitRegenerateLevelOfDetail() { m_d->strokesQueue.explicitRegenerateLevelOfDetail(); // \see a comment in setDesiredLevelOfDetail() processQueues(); } int KisUpdateScheduler::currentLevelOfDetail() const { int levelOfDetail = m_d->updaterContext.currentLevelOfDetail(); if (levelOfDetail < 0) { levelOfDetail = m_d->updatesQueue.overrideLevelOfDetail(); } if (levelOfDetail < 0) { levelOfDetail = 0; } return levelOfDetail; } void KisUpdateScheduler::setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory) { m_d->strokesQueue.setLod0ToNStrokeStrategyFactory(factory); } void KisUpdateScheduler::setSuspendUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->strokesQueue.setSuspendUpdatesStrokeStrategyFactory(factory); } void KisUpdateScheduler::setResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->strokesQueue.setResumeUpdatesStrokeStrategyFactory(factory); } KisPostExecutionUndoAdapter *KisUpdateScheduler::lodNPostExecutionUndoAdapter() const { return m_d->strokesQueue.lodNPostExecutionUndoAdapter(); } void KisUpdateScheduler::updateSettings() { m_d->updatesQueue.updateSettings(); KisImageConfig config(true); m_d->defaultBalancingRatio = config.schedulerBalancingRatio(); setThreadsLimit(config.maxNumberOfThreads()); } void KisUpdateScheduler::lock() { m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); } void KisUpdateScheduler::unlock(bool resetLodLevels) { if (resetLodLevels) { /** * Legacy strokes may have changed the image while we didn't * control it. Notify the queue to take it into account. */ m_d->strokesQueue.notifyUFOChangedImage(); } m_d->processingBlocked = false; processQueues(); } bool KisUpdateScheduler::isIdle() { bool result = false; if (tryBarrierLock()) { result = true; unlock(false); } return result; } void KisUpdateScheduler::waitForDone() { do { processQueues(); m_d->updaterContext.waitForDone(); } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); } bool KisUpdateScheduler::tryBarrierLock() { if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { return false; } m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { m_d->processingBlocked = false; processQueues(); return false; } return true; } void KisUpdateScheduler::barrierLock() { do { m_d->processingBlocked = false; processQueues(); m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); } void KisUpdateScheduler::processQueues() { wakeUpWaitingThreads(); if(m_d->processingBlocked) return; if(m_d->strokesQueue.needsExclusiveAccess()) { DEBUG_BALANCING_METRICS("STROKES", "X"); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); if(!m_d->strokesQueue.needsExclusiveAccess()) { tryProcessUpdatesQueue(); } } else if(m_d->balancingRatio() * m_d->strokesQueue.sizeMetric() > m_d->updatesQueue.sizeMetric()) { DEBUG_BALANCING_METRICS("STROKES", "N"); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); tryProcessUpdatesQueue(); } else { DEBUG_BALANCING_METRICS("UPDATES", "N"); tryProcessUpdatesQueue(); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); } progressUpdate(); } void KisUpdateScheduler::blockUpdates() { m_d->updatesFinishedCondition.initWaiting(); m_d->updatesLockCounter.ref(); while(haveUpdatesRunning()) { m_d->updatesFinishedCondition.wait(); } m_d->updatesFinishedCondition.endWaiting(); } void KisUpdateScheduler::unblockUpdates() { m_d->updatesLockCounter.deref(); processQueues(); } void KisUpdateScheduler::wakeUpWaitingThreads() { if(m_d->updatesLockCounter && !haveUpdatesRunning()) { m_d->updatesFinishedCondition.wakeAll(); } } void KisUpdateScheduler::tryProcessUpdatesQueue() { QReadLocker locker(&m_d->updatesStartLock); if(m_d->updatesLockCounter) return; m_d->updatesQueue.processQueue(m_d->updaterContext); } bool KisUpdateScheduler::haveUpdatesRunning() { QWriteLocker locker(&m_d->updatesStartLock); qint32 numMergeJobs, numStrokeJobs; m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs); return numMergeJobs; } void KisUpdateScheduler::continueUpdate(const QRect &rect) { Q_ASSERT(m_d->projectionUpdateListener); m_d->projectionUpdateListener->notifyProjectionUpdated(rect); } void KisUpdateScheduler::doSomeUsefulWork() { m_d->updatesQueue.optimize(); } void KisUpdateScheduler::spareThreadAppeared() { processQueues(); } KisTestableUpdateScheduler::KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, qint32 threadCount) { Q_UNUSED(threadCount); updateSettings(); m_d->projectionUpdateListener = projectionUpdateListener; // The queue will update settings in a constructor itself // m_d->updatesQueue = new KisTestableSimpleUpdateQueue(); // m_d->strokesQueue = new KisStrokesQueue(); // m_d->updaterContext = new KisTestableUpdaterContext(threadCount); connectSignals(); } KisTestableUpdaterContext* KisTestableUpdateScheduler::updaterContext() { return dynamic_cast(&m_d->updaterContext); } KisTestableSimpleUpdateQueue* KisTestableUpdateScheduler::updateQueue() { return dynamic_cast(&m_d->updatesQueue); } diff --git a/libs/image/kis_update_scheduler.h b/libs/image/kis_update_scheduler.h index 85fcc204a5..c36e7d9dd4 100644 --- a/libs/image/kis_update_scheduler.h +++ b/libs/image/kis_update_scheduler.h @@ -1,253 +1,257 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATE_SCHEDULER_H #define __KIS_UPDATE_SCHEDULER_H #include #include "kritaimage_export.h" #include "kis_types.h" #include "kis_image_interfaces.h" #include "kis_stroke_strategy_factory.h" #include "kis_strokes_queue_undo_result.h" class QRect; class KoProgressProxy; class KisProjectionUpdateListener; class KisSpontaneousJob; class KisPostExecutionUndoAdapter; class KRITAIMAGE_EXPORT KisUpdateScheduler : public QObject, public KisStrokesFacade { Q_OBJECT public: KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent = 0); ~KisUpdateScheduler() override; /** * Set the number of threads used by the scheduler */ void setThreadsLimit(int value); /** * Return the number of threads available to the scheduler */ int threadsLimit() const; /** * Sets the proxy that is going to be notified about the progress * of processing of the queues. If you want to switch the proxy * on runtime, you should do it under the lock held. * * \see lock(), unlock() */ void setProgressProxy(KoProgressProxy *progressProxy); /** * Blocks processing of the queues. * The function will wait until all the executing jobs * are finished. * NOTE: you may add new jobs while the block held, but they * will be delayed until unlock() is called. * * \see unlock() */ void lock(); /** * Unblocks the process and calls processQueues() * * \see processQueues() */ void unlock(bool resetLodLevels = true); /** * Waits until all the running jobs are finished. * * If some other thread adds jobs in parallel, then you may * wait forever. If you you don't want it, consider lock() instead. * * \see lock() */ void waitForDone(); /** * Waits until the queues become empty, then blocks the processing. * To unblock processing you should use unlock(). * * If some other thread adds jobs in parallel, then you may * wait forever. If you you don't want it, consider lock() instead. * * \see unlock(), lock() */ void barrierLock(); /** * Works like barrier lock, but returns false immediately if barrierLock * can't be acquired. * * \see barrierLock() */ bool tryBarrierLock(); /** * Tells if there are no strokes or updates are running at the * moment. Internally calls to tryBarrierLock(), so it is not O(1). */ bool isIdle(); /** * Blocks all the updates from execution. It doesn't affect * strokes execution in any way. This type of lock is supposed * to be held by the strokes themselves when they need a short * access to some parts of the projection of the image. * * From all the other places you should use usual lock()/unlock() * methods * * \see lock(), unlock() */ void blockUpdates(); /** * Unblocks updates from execution previously locked by blockUpdates() * * \see blockUpdates() */ void unblockUpdates(); void updateProjection(KisNodeSP node, const QVector &rects, const QRect &cropRect); void updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect); void updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect); void fullRefreshAsync(KisNodeSP root, const QRect& rc, const QRect &cropRect); void fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect); void addSpontaneousJob(KisSpontaneousJob *spontaneousJob); KisStrokeId startStroke(KisStrokeStrategy *strokeStrategy) override; void addJob(KisStrokeId id, KisStrokeJobData *data) override; void endStroke(KisStrokeId id) override; bool cancelStroke(KisStrokeId id) override; /** * Sets the desired level of detail on which the strokes should * work. Please note that this configuration will be applied * starting from the next stroke. Please also note that this value * is not guaranteed to coincide with the one returned by * currentLevelOfDetail() */ void setDesiredLevelOfDetail(int lod); /** * Explicitly start regeneration of LoD planes of all the devices * in the image. This call should be performed when the user is idle, * just to make the quality of image updates better. */ void explicitRegenerateLevelOfDetail(); /** * Install a factory of a stroke strategy, that will be started * every time when the scheduler needs to synchronize LOD caches * of all the paint devices of the image. */ void setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory); /** * Install a factory of a stroke strategy, that will be started * every time when the scheduler needs to postpone all the updates * of the *LOD0* strokes. */ void setSuspendUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory); /** * \see setSuspendUpdatesStrokeStrategyFactory() */ void setResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory); KisPostExecutionUndoAdapter* lodNPostExecutionUndoAdapter() const; /** * tryCancelCurrentStrokeAsync() checks whether there is a * *running* stroke (which is being executed at this very moment) * which is not still open by the owner (endStroke() or * cancelStroke() have already been called) and cancels it. * * \return true if some stroke has been found and cancelled * * \note This method is *not* part of KisStrokesFacade! It is too * low level for KisImage. In KisImage it is combined with * more high level requestStrokeCancellation(). */ bool tryCancelCurrentStrokeAsync(); UndoResult tryUndoLastStrokeAsync(); bool wrapAroundModeSupported() const; int currentLevelOfDetail() const; + void continueUpdate(const QRect &rect); + void doSomeUsefulWork(); + void spareThreadAppeared(); + protected: // Trivial constructor for testing support KisUpdateScheduler(); void connectSignals(); void processQueues(); protected Q_SLOTS: /** * Called when it is necessary to reread configuration */ void updateSettings(); -private Q_SLOTS: - void continueUpdate(const QRect &rect); - void doSomeUsefulWork(); - void spareThreadAppeared(); +//private Q_SLOTS: +// void continueUpdate(const QRect &rect); +// void doSomeUsefulWork(); +// void spareThreadAppeared(); private: friend class UpdatesBlockTester; bool haveUpdatesRunning(); void tryProcessUpdatesQueue(); void wakeUpWaitingThreads(); void progressUpdate(); protected: struct Private; Private * const m_d; }; class KisTestableUpdaterContext; class KisTestableSimpleUpdateQueue; class KRITAIMAGE_EXPORT KisTestableUpdateScheduler : public KisUpdateScheduler { public: KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, qint32 threadCount); KisTestableUpdaterContext* updaterContext(); KisTestableSimpleUpdateQueue* updateQueue(); using KisUpdateScheduler::processQueues; }; #endif /* __KIS_UPDATE_SCHEDULER_H */ diff --git a/libs/image/kis_updater_context.cpp b/libs/image/kis_updater_context.cpp index a27544839f..a7ac3a36be 100644 --- a/libs/image/kis_updater_context.cpp +++ b/libs/image/kis_updater_context.cpp @@ -1,382 +1,404 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_updater_context.h" #include #include #include "kis_update_job_item.h" #include "kis_stroke_job.h" const int KisUpdaterContext::useIdealThreadCountTag = -1; KisUpdaterContext::KisUpdaterContext(qint32 threadCount, QObject *parent) - : QObject(parent) + : QObject(parent), m_scheduler(qobject_cast(parent)) { if (threadCount <= 0) { threadCount = QThread::idealThreadCount(); threadCount = threadCount > 0 ? threadCount : 1; } setThreadsLimit(threadCount); } KisUpdaterContext::~KisUpdaterContext() { m_threadPool.waitForDone(); for (qint32 i = 0; i < m_jobs.size(); i++) delete m_jobs[i]; } void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, qint32 &numStrokeJobs) { QReadLocker locker(&m_rwLock); numMergeJobs = 0; numStrokeJobs = 0; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->type() == KisUpdateJobItem::Type::MERGE || item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { numMergeJobs++; } else if (item->type() == KisUpdateJobItem::Type::STROKE) { numStrokeJobs++; } } } KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const { QReadLocker locker(&m_rwLock); KisUpdaterContextSnapshotEx state = ContextEmpty; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->type() == KisUpdateJobItem::Type::MERGE || item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { state |= HasMergeJob; } else if (item->type() == KisUpdateJobItem::Type::STROKE) { switch (item->strokeJobSequentiality()) { case KisStrokeJobData::SEQUENTIAL: state |= HasSequentialJob; break; case KisStrokeJobData::CONCURRENT: state |= HasConcurrentJob; break; case KisStrokeJobData::BARRIER: state |= HasBarrierJob; break; case KisStrokeJobData::UNIQUELY_CONCURRENT: state |= HasUniquelyConcurrentJob; break; } } } return state; } int KisUpdaterContext::currentLevelOfDetail() const { return m_lodCounter.readLod(); } bool KisUpdaterContext::hasSpareThread() { return !m_spareThreadsIndexes.isEmpty(); // QReadLocker locker(&m_rwLock); // bool found = false; // Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { // if (!item->isRunning()) { // found = true; // break; // } // } // return found; } bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) { int lod = this->currentLevelOfDetail(); if (lod >= 0 && walker->levelOfDetail() != lod) return false; QReadLocker locker(&m_rwLock); bool intersects = false; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->isRunning() && walkerIntersectsJob(walker, item)) { intersects = true; break; } } return !intersects; } /** * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into * one atomic method like `bool push()`, because this implementation * of KisUpdaterContext will not work in case of multiple * producers. But currently we have only one producer (one thread * in a time), that is guaranteed by the lock()/unlock() pair in * KisAbstractUpdateQueue::processQueue. */ bool KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(walker->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } bool KisUpdaterContext::addMergeJobs(QVector &walkers) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(walkers[0]->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setWalkers(walkers); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(walker->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } bool KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(strokeJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(strokeJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } bool KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(spontaneousJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(spontaneousJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } void KisUpdaterContext::waitForDone() { m_threadPool.waitForDone(); } bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, const KisUpdateJobItem* job) { return (walker->accessRect().intersects(job->changeRect())) || (job->accessRect().intersects(walker->changeRect())); } qint32 KisUpdaterContext::findSpareThread() { int index = -1; m_spareThreadsIndexes.pop(index); return index; // for (qint32 i = 0; i < m_jobs.size(); i++) // if (!m_jobs[i]->isRunning()) // return i; // return -1; } void KisUpdaterContext::slotJobFinished(int index) { m_lodCounter.removeLod(); m_spareThreadsIndexes.push(index); // Be careful. This slot can be called asynchronously without locks. emit sigSpareThreadAppeared(); } void KisUpdaterContext::lock() { // m_lock.lock(); } void KisUpdaterContext::unlock() { // m_lock.unlock(); } void KisUpdaterContext::setThreadsLimit(int value) { QWriteLocker locker(&m_rwLock); m_threadPool.setMaxThreadCount(value); for (int i = 0; i < m_jobs.size(); i++) { KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning()); // don't delete the jobs until all of them are checked! } for (int i = 0; i < m_jobs.size(); i++) { delete m_jobs[i]; } m_spareThreadsIndexes.clear(); m_jobs.resize(value); for (qint32 i = 0; i < m_jobs.size(); i++) { - m_jobs[i] = new KisUpdateJobItem(&m_exclusiveJobLock, i); + m_jobs[i] = new KisUpdateJobItem(this, &m_exclusiveJobLock, i); m_spareThreadsIndexes.push(i); - connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), - SIGNAL(sigContinueUpdate(const QRect&)), - Qt::DirectConnection); +// connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), +// SIGNAL(sigContinueUpdate(const QRect&)), +// Qt::DirectConnection); - connect(m_jobs[i], SIGNAL(sigDoSomeUsefulWork()), - SIGNAL(sigDoSomeUsefulWork()), Qt::DirectConnection); +// connect(m_jobs[i], SIGNAL(sigDoSomeUsefulWork()), +// SIGNAL(sigDoSomeUsefulWork()), Qt::DirectConnection); - connect(m_jobs[i], SIGNAL(sigJobFinished(int)), - SLOT(slotJobFinished(int)), Qt::DirectConnection); +// connect(m_jobs[i], SIGNAL(sigJobFinished(int)), +// SLOT(slotJobFinished(int)), Qt::DirectConnection); } } int KisUpdaterContext::threadsLimit() const { QReadLocker locker(&m_rwLock); KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount()); return m_jobs.size(); } +void KisUpdaterContext::jobFinished(int index) +{ + m_lodCounter.removeLod(); + m_spareThreadsIndexes.push(index); + + // Be careful. This slot can be called asynchronously without locks. +// emit sigSpareThreadAppeared(); + m_scheduler->spareThreadAppeared(); +} + +void KisUpdaterContext::continueUpdate(const QRect &rc) +{ +// emit sigContinueUpdate(rc); + m_scheduler->continueUpdate(rc); +} + +void KisUpdaterContext::doSomeUsefulWork() +{ +// emit doSomeUsefulWork(); + m_scheduler->doSomeUsefulWork(); +} + KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) : KisUpdaterContext(threadCount) { } KisTestableUpdaterContext::~KisTestableUpdaterContext() { clear(); } const QVector KisTestableUpdaterContext::getJobs() { return m_jobs; } void KisTestableUpdaterContext::clear() { Q_FOREACH (KisUpdateJobItem *item, m_jobs) { item->testingSetDone(); } m_lodCounter.testingClear(); } diff --git a/libs/image/kis_updater_context.h b/libs/image/kis_updater_context.h index 8ee5a75e07..1cb4c1d713 100644 --- a/libs/image/kis_updater_context.h +++ b/libs/image/kis_updater_context.h @@ -1,199 +1,205 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATER_CONTEXT_H #define __KIS_UPDATER_CONTEXT_H #include #include #include #include #include "kis_base_rects_walker.h" #include "kis_async_merger.h" #include "kis_lock_free_lod_counter.h" #include "KisUpdaterContextSnapshotEx.h" #include "tiles3/kis_lockless_stack.h" +#include "kis_update_scheduler.h" class KisUpdateJobItem; class KisSpontaneousJob; class KisStrokeJob; class KRITAIMAGE_EXPORT KisUpdaterContext : public QObject { Q_OBJECT public: static const int useIdealThreadCountTag; public: KisUpdaterContext(qint32 threadCount = useIdealThreadCountTag, QObject *parent = 0); ~KisUpdaterContext() override; /** * Returns the number of currently running jobs of each type. * To use this information you should lock the context beforehand. * * \see lock() */ void getJobsSnapshot(qint32 &numMergeJobs, qint32 &numStrokeJobs); KisUpdaterContextSnapshotEx getContextSnapshotEx() const; /** * Returns the current level of detail of all the running jobs in the * context. If there are no jobs, returns -1. */ int currentLevelOfDetail() const; /** * Check whether there is a spare thread for running * one more job */ bool hasSpareThread(); /** * Checks whether the walker intersects with any * of currently executing walkers. If it does, * it is not allowed to go in. It should be called * with the lock held. * * \see lock() */ bool isJobAllowed(KisBaseRectsWalkerSP walker); /** * Registers the job and starts executing it. * The caller must ensure that the context is locked * with lock(), job is allowed with isWalkerAllowed() and * there is a spare thread for running it with hasSpareThread() * * \see lock() * \see isWalkerAllowed() * \see hasSpareThread() */ virtual bool addMergeJob(KisBaseRectsWalkerSP walker); bool addMergeJobs(QVector &walkers); /** * Adds a stroke job to the context. The prerequisites are * the same as for addMergeJob() * \see addMergeJob() */ virtual bool addStrokeJob(KisStrokeJob *strokeJob); /** * Adds a spontaneous job to the context. The prerequisites are * the same as for addMergeJob() * \see addMergeJob() */ virtual bool addSpontaneousJob(KisSpontaneousJob *spontaneousJob); /** * Block execution of the caller until all the jobs are finished */ void waitForDone(); /** * Locks the context to guarantee an exclusive access * to the context */ void lock(); /** * Unlocks the context * * \see lock() */ void unlock(); /** * Set the number of threads available for this updater context * WARNING: one cannot change the number of threads if there is * at least one job running in the context! So before * calling this method make sure you do two things: * 1) barrierLock() the update scheduler * 2) lock() the context */ void setThreadsLimit(int value); /** * Return the number of available threads in the context. Make sure you * lock the context before calling this function! */ int threadsLimit() const; + void jobFinished(int index); + void continueUpdate(const QRect& rc); + void doSomeUsefulWork(); + Q_SIGNALS: void sigContinueUpdate(const QRect& rc); void sigDoSomeUsefulWork(); void sigSpareThreadAppeared(); protected Q_SLOTS: void slotJobFinished(int index); protected: static bool walkerIntersectsJob(KisBaseRectsWalkerSP walker, const KisUpdateJobItem* job); qint32 findSpareThread(); protected: /** * The lock is shared by all the child update job items. * When an item wants to run a usual (non-exclusive) job, * it locks the lock for read access. When an exclusive * access is requested, it locks it for write */ QReadWriteLock m_exclusiveJobLock; // QMutex m_lock; mutable QReadWriteLock m_rwLock; QVector m_jobs; QThreadPool m_threadPool; KisLockFreeLodCounter m_lodCounter; KisLocklessStack m_spareThreadsIndexes; + KisUpdateScheduler *m_scheduler; }; class KRITAIMAGE_EXPORT KisTestableUpdaterContext : public KisUpdaterContext { public: /** * Creates an explicit number of threads */ KisTestableUpdaterContext(qint32 threadCount); ~KisTestableUpdaterContext() override; /** * The only difference - it doesn't start execution * of the job */ bool addMergeJob(KisBaseRectsWalkerSP walker) override; bool addStrokeJob(KisStrokeJob *strokeJob) override; bool addSpontaneousJob(KisSpontaneousJob *spontaneousJob) override; const QVector getJobs(); void clear(); }; #endif /* __KIS_UPDATER_CONTEXT_H */