diff --git a/libs/image/kis_simple_update_queue.cpp b/libs/image/kis_simple_update_queue.cpp index 2e2d6bde13..4e0d28c019 100644 --- a/libs/image/kis_simple_update_queue.cpp +++ b/libs/image/kis_simple_update_queue.cpp @@ -1,407 +1,394 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_simple_update_queue.h" #include #include #include "kis_image_config.h" #include "kis_full_refresh_walker.h" #include "kis_spontaneous_job.h" //#define ENABLE_DEBUG_JOIN //#define ENABLE_ACCUMULATOR #ifdef ENABLE_DEBUG_JOIN #define DEBUG_JOIN(baseRect, newRect, alpha) \ dbgKrita << "Two rects were joined:\t" \ << (baseRect) << "+" << (newRect) << "->" \ << ((baseRect) | (newRect)) << "(" << alpha << ")" #else #define DEBUG_JOIN(baseRect, newRect, alpha) #endif /* ENABLE_DEBUG_JOIN */ #ifdef ENABLE_ACCUMULATOR #define DECLARE_ACCUMULATOR() static qreal _baseAmount=0, _newAmount=0 #define ACCUMULATOR_ADD(baseAmount, newAmount) \ do {_baseAmount += baseAmount; _newAmount += newAmount;} while (0) #define ACCUMULATOR_DEBUG() \ dbgKrita << "Accumulated alpha:" << _newAmount / _baseAmount #else #define DECLARE_ACCUMULATOR() #define ACCUMULATOR_ADD(baseAmount, newAmount) #define ACCUMULATOR_DEBUG() #endif /* ENABLE_ACCUMULATOR */ KisSimpleUpdateQueue::KisSimpleUpdateQueue() : m_overrideLevelOfDetail(-1) { updateSettings(); } KisSimpleUpdateQueue::~KisSimpleUpdateQueue() { - QWriteLocker locker(&m_rwLock); + QMutexLocker locker(&m_mutex); while (!m_spontaneousJobsList.isEmpty()) { delete m_spontaneousJobsList.takeLast(); } } void KisSimpleUpdateQueue::updateSettings() { - QWriteLocker locker(&m_rwLock); + QMutexLocker locker(&m_mutex); KisImageConfig config(true); m_patchWidth = config.updatePatchWidth(); m_patchHeight = config.updatePatchHeight(); m_maxCollectAlpha = config.maxCollectAlpha(); m_maxMergeAlpha = config.maxMergeAlpha(); m_maxMergeCollectAlpha = config.maxMergeCollectAlpha(); } int KisSimpleUpdateQueue::overrideLevelOfDetail() const { return m_overrideLevelOfDetail; } void KisSimpleUpdateQueue::processQueue(KisUpdaterContext &updaterContext) { - updaterContext.lock(); - m_rwLock.lockForWrite(); + m_mutex.lock(); while(updaterContext.hasSpareThread() && processOneJob(updaterContext)); - m_rwLock.unlock(); - updaterContext.unlock(); + m_mutex.unlock(); } bool KisSimpleUpdateQueue::processOneJob(KisUpdaterContext &updaterContext) { KisBaseRectsWalkerSP item; KisMutableWalkersListIterator iter(m_updatesList); -// QVector walkers; bool jobAdded = false; int currentLevelOfDetail = updaterContext.currentLevelOfDetail(); while(iter.hasNext()) { item = iter.next(); if ((currentLevelOfDetail < 0 || currentLevelOfDetail == item->levelOfDetail()) && !item->checksumValid()) { m_overrideLevelOfDetail = item->levelOfDetail(); item->recalculate(item->requestedRect()); m_overrideLevelOfDetail = -1; } if ((currentLevelOfDetail < 0 || currentLevelOfDetail == item->levelOfDetail()) && updaterContext.isJobAllowed(item)) { -// walkers.append(item); while (!updaterContext.addMergeJob(item)); - iter.remove(); - jobAdded = true; break; } } -// if (!walkers.isEmpty()) { -// while (!updaterContext.addMergeJobs(walkers)); -// jobAdded = true; -// } - if (jobAdded) return true; if (!m_spontaneousJobsList.isEmpty()) { /** * WARNING: Please note that this still doesn't guarantee that * the spontaneous jobs are exclusive, since updates and/or * strokes can be added after them. The only thing it * guarantees that two spontaneous jobs will not be executed * in parallel. * * Right now it works as it is. Probably will need to be fixed * in the future. */ qint32 numMergeJobs; qint32 numStrokeJobs; updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs); if (!numMergeJobs && !numStrokeJobs) { KisSpontaneousJob *job = m_spontaneousJobsList.takeFirst(); while (!updaterContext.addSpontaneousJob(job)); jobAdded = true; } } return jobAdded; } void KisSimpleUpdateQueue::addUpdateJob(KisNodeSP node, const QVector &rects, const QRect& cropRect, int levelOfDetail) { addJob(node, rects, cropRect, levelOfDetail, KisBaseRectsWalker::UPDATE); } void KisSimpleUpdateQueue::addUpdateJob(KisNodeSP node, const QRect &rc, const QRect& cropRect, int levelOfDetail) { addJob(node, {rc}, cropRect, levelOfDetail, KisBaseRectsWalker::UPDATE); } void KisSimpleUpdateQueue::addUpdateNoFilthyJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail) { addJob(node, {rc}, cropRect, levelOfDetail, KisBaseRectsWalker::UPDATE_NO_FILTHY); } void KisSimpleUpdateQueue::addFullRefreshJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail) { addJob(node, {rc}, cropRect, levelOfDetail, KisBaseRectsWalker::FULL_REFRESH); } void KisSimpleUpdateQueue::addJob(KisNodeSP node, const QVector &rects, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type) { QList walkers; Q_FOREACH (const QRect &rc, rects) { if (rc.isEmpty()) continue; KisBaseRectsWalkerSP walker; if(trySplitJob(node, rc, cropRect, levelOfDetail, type)) continue; if(tryMergeJob(node, rc, cropRect, levelOfDetail, type)) continue; if (type == KisBaseRectsWalker::UPDATE) { walker = new KisMergeWalker(cropRect, KisMergeWalker::DEFAULT); } else if (type == KisBaseRectsWalker::FULL_REFRESH) { walker = new KisFullRefreshWalker(cropRect); } else if (type == KisBaseRectsWalker::UPDATE_NO_FILTHY) { walker = new KisMergeWalker(cropRect, KisMergeWalker::NO_FILTHY); } /* else if(type == KisBaseRectsWalker::UNSUPPORTED) fatalKrita; */ walker->collectRects(node, rc); walkers.append(walker); } if (!walkers.isEmpty()) { - m_rwLock.lockForWrite(); + m_mutex.lock(); m_updatesList.append(walkers); - m_rwLock.unlock(); + m_mutex.unlock(); } } void KisSimpleUpdateQueue::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { - QWriteLocker locker(&m_rwLock); + QMutexLocker locker(&m_mutex); KisSpontaneousJob *item; KisMutableSpontaneousJobsListIterator iter(m_spontaneousJobsList); iter.toBack(); while(iter.hasPrevious()) { item = iter.previous(); if (spontaneousJob->overrides(item)) { iter.remove(); delete item; } } m_spontaneousJobsList.append(spontaneousJob); } bool KisSimpleUpdateQueue::isEmpty() const { -// QReadLocker locker(&m_rwLock); return m_updatesList.isEmpty() && m_spontaneousJobsList.isEmpty(); } qint32 KisSimpleUpdateQueue::sizeMetric() const { -// QReadLocker locker(&m_rwLock); return m_updatesList.size() + m_spontaneousJobsList.size(); } bool KisSimpleUpdateQueue::trySplitJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type) { if(rc.width() <= m_patchWidth || rc.height() <= m_patchHeight) return false; // a bit of recursive splitting... qint32 firstCol = rc.x() / m_patchWidth; qint32 firstRow = rc.y() / m_patchHeight; qint32 lastCol = (rc.x() + rc.width()) / m_patchWidth; qint32 lastRow = (rc.y() + rc.height()) / m_patchHeight; QVector splitRects; for(qint32 i = firstRow; i <= lastRow; i++) { for(qint32 j = firstCol; j <= lastCol; j++) { QRect maxPatchRect(j * m_patchWidth, i * m_patchHeight, m_patchWidth, m_patchHeight); QRect patchRect = rc & maxPatchRect; splitRects.append(patchRect); } } KIS_SAFE_ASSERT_RECOVER_NOOP(!splitRects.isEmpty()); addJob(node, splitRects, cropRect, levelOfDetail, type); return true; } bool KisSimpleUpdateQueue::tryMergeJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type) { - QWriteLocker locker(&m_rwLock); + QMutexLocker locker(&m_mutex); QRect baseRect = rc; KisBaseRectsWalkerSP goodCandidate; KisBaseRectsWalkerSP item; KisWalkersListIterator iter(m_updatesList); /** * We add new jobs to the tail of the list, * so it's more probable to find a good candidate here. */ iter.toBack(); while(iter.hasPrevious()) { item = iter.previous(); if(item->startNode() != node) continue; if(item->type() != type) continue; if(item->cropRect() != cropRect) continue; if(item->levelOfDetail() != levelOfDetail) continue; if(joinRects(baseRect, item->requestedRect(), m_maxMergeAlpha)) { goodCandidate = item; break; } } if(goodCandidate) collectJobs(goodCandidate, baseRect, m_maxMergeCollectAlpha); return (bool)goodCandidate; } void KisSimpleUpdateQueue::optimize() { - QWriteLocker locker(&m_rwLock); + QMutexLocker locker(&m_mutex); if(m_updatesList.size() <= 1) { return; } KisBaseRectsWalkerSP baseWalker = m_updatesList.first(); QRect baseRect = baseWalker->requestedRect(); collectJobs(baseWalker, baseRect, m_maxCollectAlpha); } void KisSimpleUpdateQueue::collectJobs(KisBaseRectsWalkerSP &baseWalker, QRect baseRect, const qreal maxAlpha) { KisBaseRectsWalkerSP item; KisMutableWalkersListIterator iter(m_updatesList); while(iter.hasNext()) { item = iter.next(); if(item == baseWalker) continue; if(item->type() != baseWalker->type()) continue; if(item->startNode() != baseWalker->startNode()) continue; if(item->cropRect() != baseWalker->cropRect()) continue; if(item->levelOfDetail() != baseWalker->levelOfDetail()) continue; if(joinRects(baseRect, item->requestedRect(), maxAlpha)) { iter.remove(); } } if(baseWalker->requestedRect() != baseRect) { baseWalker->collectRects(baseWalker->startNode(), baseRect); } } bool KisSimpleUpdateQueue::joinRects(QRect& baseRect, const QRect& newRect, qreal maxAlpha) { QRect unitedRect = baseRect | newRect; if(unitedRect.width() > m_patchWidth || unitedRect.height() > m_patchHeight) return false; bool result = false; qint64 baseWork = baseRect.width() * baseRect.height() + newRect.width() * newRect.height(); qint64 newWork = unitedRect.width() * unitedRect.height(); qreal alpha = qreal(newWork) / baseWork; if(alpha < maxAlpha) { DEBUG_JOIN(baseRect, newRect, alpha); DECLARE_ACCUMULATOR(); ACCUMULATOR_ADD(baseWork, newWork); ACCUMULATOR_DEBUG(); baseRect = unitedRect; result = true; } return result; } KisWalkersList& KisTestableSimpleUpdateQueue::getWalkersList() { return m_updatesList; } KisSpontaneousJobsList& KisTestableSimpleUpdateQueue::getSpontaneousJobsList() { return m_spontaneousJobsList; } diff --git a/libs/image/kis_simple_update_queue.h b/libs/image/kis_simple_update_queue.h index 113adca935..6314a098ce 100644 --- a/libs/image/kis_simple_update_queue.h +++ b/libs/image/kis_simple_update_queue.h @@ -1,116 +1,116 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_SIMPLE_UPDATE_QUEUE_H #define __KIS_SIMPLE_UPDATE_QUEUE_H #include #include "kis_updater_context.h" typedef QList KisWalkersList; typedef QListIterator KisWalkersListIterator; typedef QMutableListIterator KisMutableWalkersListIterator; typedef QList KisSpontaneousJobsList; typedef QListIterator KisSpontaneousJobsListIterator; typedef QMutableListIterator KisMutableSpontaneousJobsListIterator; class KRITAIMAGE_EXPORT KisSimpleUpdateQueue { public: KisSimpleUpdateQueue(); virtual ~KisSimpleUpdateQueue(); void processQueue(KisUpdaterContext &updaterContext); void addUpdateJob(KisNodeSP node, const QVector &rects, const QRect& cropRect, int levelOfDetail); void addUpdateJob(KisNodeSP node, const QRect &rc, const QRect& cropRect, int levelOfDetail); void addUpdateNoFilthyJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail); void addFullRefreshJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail); void addSpontaneousJob(KisSpontaneousJob *spontaneousJob); void optimize(); bool isEmpty() const; qint32 sizeMetric() const; void updateSettings(); int overrideLevelOfDetail() const; protected: void addJob(KisNodeSP node, const QVector &rects, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type); bool processOneJob(KisUpdaterContext &updaterContext); bool trySplitJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type); bool tryMergeJob(KisNodeSP node, const QRect& rc, const QRect& cropRect, int levelOfDetail, KisBaseRectsWalker::UpdateType type); void collectJobs(KisBaseRectsWalkerSP &baseWalker, QRect baseRect, const qreal maxAlpha); bool joinRects(QRect& baseRect, const QRect& newRect, qreal maxAlpha); protected: - mutable QReadWriteLock m_rwLock; + mutable QMutex m_mutex; KisWalkersList m_updatesList; KisSpontaneousJobsList m_spontaneousJobsList; /** * Parameters of optimization * (loaded from a configuration file) */ /** * Big update areas are split into a set of smaller * ones, m_patchWidth and m_patchHeight represent the * size of these areas. */ qint32 m_patchWidth; qint32 m_patchHeight; /** * Maximum coefficient of work while regular optimization() */ qreal m_maxCollectAlpha; /** * Maximum coefficient of work when to rects are considered * similar and are merged in tryMergeJob() */ qreal m_maxMergeAlpha; /** * The coefficient of work used while collecting phase of tryToMerge() */ qreal m_maxMergeCollectAlpha; int m_overrideLevelOfDetail; }; class KRITAIMAGE_EXPORT KisTestableSimpleUpdateQueue : public KisSimpleUpdateQueue { public: KisWalkersList& getWalkersList(); KisSpontaneousJobsList& getSpontaneousJobsList(); }; #endif /* __KIS_SIMPLE_UPDATE_QUEUE_H */ diff --git a/libs/image/kis_strokes_queue.cpp b/libs/image/kis_strokes_queue.cpp index ee4ab7ba93..cb29606dd5 100644 --- a/libs/image/kis_strokes_queue.cpp +++ b/libs/image/kis_strokes_queue.cpp @@ -1,852 +1,827 @@ /* * Copyright (c) 2011 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_strokes_queue.h" #include #include #include #include "kis_stroke.h" #include "kis_updater_context.h" #include "kis_stroke_job_strategy.h" #include "kis_stroke_strategy.h" #include "kis_undo_stores.h" #include "kis_post_execution_undo_adapter.h" typedef QQueue StrokesQueue; typedef QQueue::iterator StrokesQueueIterator; #include "kis_image_interfaces.h" class KisStrokesQueue::LodNUndoStrokesFacade : public KisStrokesFacade { public: LodNUndoStrokesFacade(KisStrokesQueue *_q) : q(_q) {} KisStrokeId startStroke(KisStrokeStrategy *strokeStrategy) override { return q->startLodNUndoStroke(strokeStrategy); } void addJob(KisStrokeId id, KisStrokeJobData *data) override { KisStrokeSP stroke = id.toStrongRef(); KIS_SAFE_ASSERT_RECOVER_NOOP(stroke); KIS_SAFE_ASSERT_RECOVER_NOOP(!stroke->lodBuddy()); KIS_SAFE_ASSERT_RECOVER_NOOP(stroke->type() == KisStroke::LODN); q->addJob(id, data); } void endStroke(KisStrokeId id) override { KisStrokeSP stroke = id.toStrongRef(); KIS_SAFE_ASSERT_RECOVER_NOOP(stroke); KIS_SAFE_ASSERT_RECOVER_NOOP(!stroke->lodBuddy()); KIS_SAFE_ASSERT_RECOVER_NOOP(stroke->type() == KisStroke::LODN); q->endStroke(id); } bool cancelStroke(KisStrokeId id) override { Q_UNUSED(id); qFatal("Not implemented"); return false; } private: KisStrokesQueue *q; }; struct Q_DECL_HIDDEN KisStrokesQueue::Private { Private(KisStrokesQueue *_q) : q(_q), openedStrokesCounter(0), needsExclusiveAccess(false), wrapAroundModeSupported(false), balancingRatioOverride(-1.0), currentStrokeLoaded(false), lodNNeedsSynchronization(true), desiredLevelOfDetail(0), nextDesiredLevelOfDetail(0), lodNStrokesFacade(_q), lodNPostExecutionUndoAdapter(&lodNUndoStore, &lodNStrokesFacade) {} KisStrokesQueue *q; StrokesQueue strokesQueue; int openedStrokesCounter; bool needsExclusiveAccess; bool wrapAroundModeSupported; qreal balancingRatioOverride; bool currentStrokeLoaded; bool lodNNeedsSynchronization; int desiredLevelOfDetail; int nextDesiredLevelOfDetail; -// QMutex mutex; - QReadWriteLock rwLock; + QMutex mutex; KisLodSyncStrokeStrategyFactory lod0ToNStrokeStrategyFactory; KisSuspendResumeStrategyFactory suspendUpdatesStrokeStrategyFactory; KisSuspendResumeStrategyFactory resumeUpdatesStrokeStrategyFactory; KisSurrogateUndoStore lodNUndoStore; LodNUndoStrokesFacade lodNStrokesFacade; KisPostExecutionUndoAdapter lodNPostExecutionUndoAdapter; void cancelForgettableStrokes(); void startLod0ToNStroke(int levelOfDetail, bool forgettable); bool canUseLodN() const; StrokesQueueIterator findNewLod0Pos(); StrokesQueueIterator findNewLodNPos(KisStrokeSP lodN); bool shouldWrapInSuspendUpdatesStroke() const; void switchDesiredLevelOfDetail(bool forced); bool hasUnfinishedStrokes() const; void tryClearUndoOnStrokeCompletion(KisStrokeSP finishingStroke); }; KisStrokesQueue::KisStrokesQueue() : m_d(new Private(this)) { } KisStrokesQueue::~KisStrokesQueue() { Q_FOREACH (KisStrokeSP stroke, m_d->strokesQueue) { stroke->cancelStroke(); } delete m_d; } template typename StrokesQueue::iterator executeStrokePair(const StrokePair &pair, StrokesQueue &queue, typename StrokesQueue::iterator it, KisStroke::Type type, int levelOfDetail, KisStrokesQueueMutatedJobInterface *mutatedJobsInterface) { KisStrokeStrategy *strategy = pair.first; QList jobsData = pair.second; KisStrokeSP stroke(new KisStroke(strategy, type, levelOfDetail)); strategy->setCancelStrokeId(stroke); strategy->setMutatedJobsInterface(mutatedJobsInterface); it = queue.insert(it, stroke); Q_FOREACH (KisStrokeJobData *jobData, jobsData) { stroke->addJob(jobData); } stroke->endStroke(); return it; } void KisStrokesQueue::Private::startLod0ToNStroke(int levelOfDetail, bool forgettable) { // precondition: lock held! // precondition: lod > 0 KIS_ASSERT_RECOVER_RETURN(levelOfDetail); if (!this->lod0ToNStrokeStrategyFactory) return; KisLodSyncPair syncPair = this->lod0ToNStrokeStrategyFactory(forgettable); executeStrokePair(syncPair, this->strokesQueue, this->strokesQueue.end(), KisStroke::LODN, levelOfDetail, q); this->lodNNeedsSynchronization = false; } void KisStrokesQueue::Private::cancelForgettableStrokes() { if (!strokesQueue.isEmpty() && !hasUnfinishedStrokes()) { Q_FOREACH (KisStrokeSP stroke, strokesQueue) { KIS_ASSERT_RECOVER_NOOP(stroke->isEnded()); if (stroke->canForgetAboutMe()) { stroke->cancelStroke(); } } } } bool KisStrokesQueue::Private::canUseLodN() const { Q_FOREACH (KisStrokeSP stroke, strokesQueue) { if (stroke->type() == KisStroke::LEGACY) { return false; } } return true; } bool KisStrokesQueue::Private::shouldWrapInSuspendUpdatesStroke() const { Q_FOREACH (KisStrokeSP stroke, strokesQueue) { if (stroke->isCancelled()) continue; if (stroke->type() == KisStroke::RESUME) { /** * Resuming process is long-running and consists of * multiple actions, therefore, if it has already started, * we cannot use it to guard our new stroke, so just skip it. * see https://phabricator.kde.org/T2542 */ if (stroke->isInitialized()) continue; return false; } } return true; } StrokesQueueIterator KisStrokesQueue::Private::findNewLod0Pos() { StrokesQueueIterator it = strokesQueue.begin(); StrokesQueueIterator end = strokesQueue.end(); for (; it != end; ++it) { if ((*it)->isCancelled()) continue; if ((*it)->type() == KisStroke::RESUME) { // \see a comment in shouldWrapInSuspendUpdatesStroke() if ((*it)->isInitialized()) continue; return it; } } return it; } StrokesQueueIterator KisStrokesQueue::Private::findNewLodNPos(KisStrokeSP lodN) { StrokesQueueIterator it = strokesQueue.begin(); StrokesQueueIterator end = strokesQueue.end(); for (; it != end; ++it) { if ((*it)->isCancelled()) continue; if (((*it)->type() == KisStroke::SUSPEND || (*it)->type() == KisStroke::RESUME) && (*it)->isInitialized()) { // \see a comment in shouldWrapInSuspendUpdatesStroke() continue; } if ((*it)->type() == KisStroke::LOD0 || (*it)->type() == KisStroke::SUSPEND || (*it)->type() == KisStroke::RESUME) { if (it != end && it == strokesQueue.begin()) { KisStrokeSP head = *it; if (head->supportsSuspension()) { head->suspendStroke(lodN); } } return it; } } return it; } KisStrokeId KisStrokesQueue::startLodNUndoStroke(KisStrokeStrategy *strokeStrategy) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KIS_SAFE_ASSERT_RECOVER_NOOP(!m_d->lodNNeedsSynchronization); KIS_SAFE_ASSERT_RECOVER_NOOP(m_d->desiredLevelOfDetail > 0); KisStrokeSP buddy(new KisStroke(strokeStrategy, KisStroke::LODN, m_d->desiredLevelOfDetail)); strokeStrategy->setCancelStrokeId(buddy); strokeStrategy->setMutatedJobsInterface(this); m_d->strokesQueue.insert(m_d->findNewLodNPos(buddy), buddy); KisStrokeId id(buddy); m_d->openedStrokesCounter++; return id; } KisStrokeId KisStrokesQueue::startStroke(KisStrokeStrategy *strokeStrategy) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KisStrokeSP stroke; KisStrokeStrategy* lodBuddyStrategy; m_d->cancelForgettableStrokes(); if (m_d->desiredLevelOfDetail && m_d->canUseLodN() && (lodBuddyStrategy = strokeStrategy->createLodClone(m_d->desiredLevelOfDetail))) { if (m_d->lodNNeedsSynchronization) { m_d->startLod0ToNStroke(m_d->desiredLevelOfDetail, false); } stroke = KisStrokeSP(new KisStroke(strokeStrategy, KisStroke::LOD0, 0)); KisStrokeSP buddy(new KisStroke(lodBuddyStrategy, KisStroke::LODN, m_d->desiredLevelOfDetail)); lodBuddyStrategy->setCancelStrokeId(buddy); lodBuddyStrategy->setMutatedJobsInterface(this); stroke->setLodBuddy(buddy); m_d->strokesQueue.insert(m_d->findNewLodNPos(buddy), buddy); if (m_d->shouldWrapInSuspendUpdatesStroke()) { KisSuspendResumePair suspendPair = m_d->suspendUpdatesStrokeStrategyFactory(); KisSuspendResumePair resumePair = m_d->resumeUpdatesStrokeStrategyFactory(); StrokesQueueIterator it = m_d->findNewLod0Pos(); it = executeStrokePair(resumePair, m_d->strokesQueue, it, KisStroke::RESUME, 0, this); it = m_d->strokesQueue.insert(it, stroke); it = executeStrokePair(suspendPair, m_d->strokesQueue, it, KisStroke::SUSPEND, 0, this); } else { m_d->strokesQueue.insert(m_d->findNewLod0Pos(), stroke); } } else { stroke = KisStrokeSP(new KisStroke(strokeStrategy, KisStroke::LEGACY, 0)); m_d->strokesQueue.enqueue(stroke); } KisStrokeId id(stroke); strokeStrategy->setCancelStrokeId(id); strokeStrategy->setMutatedJobsInterface(this); m_d->openedStrokesCounter++; if (stroke->type() == KisStroke::LEGACY) { m_d->lodNNeedsSynchronization = true; } return id; } void KisStrokesQueue::addJob(KisStrokeId id, KisStrokeJobData *data) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KisStrokeSP stroke = id.toStrongRef(); KIS_SAFE_ASSERT_RECOVER_RETURN(stroke); KisStrokeSP buddy = stroke->lodBuddy(); if (buddy) { KisStrokeJobData *clonedData = data->createLodClone(buddy->worksOnLevelOfDetail()); KIS_ASSERT_RECOVER_RETURN(clonedData); buddy->addJob(clonedData); } stroke->addJob(data); } void KisStrokesQueue::addMutatedJobs(KisStrokeId id, const QVector list) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KisStrokeSP stroke = id.toStrongRef(); KIS_SAFE_ASSERT_RECOVER_RETURN(stroke); stroke->addMutatedJobs(list); } void KisStrokesQueue::endStroke(KisStrokeId id) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KisStrokeSP stroke = id.toStrongRef(); KIS_SAFE_ASSERT_RECOVER_RETURN(stroke); stroke->endStroke(); m_d->openedStrokesCounter--; KisStrokeSP buddy = stroke->lodBuddy(); if (buddy) { buddy->endStroke(); } } bool KisStrokesQueue::cancelStroke(KisStrokeId id) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); KisStrokeSP stroke = id.toStrongRef(); if(stroke) { stroke->cancelStroke(); m_d->openedStrokesCounter--; KisStrokeSP buddy = stroke->lodBuddy(); if (buddy) { buddy->cancelStroke(); } } return stroke; } bool KisStrokesQueue::Private::hasUnfinishedStrokes() const { Q_FOREACH (KisStrokeSP stroke, strokesQueue) { if (!stroke->isEnded()) { return true; } } return false; } bool KisStrokesQueue::tryCancelCurrentStrokeAsync() { bool anythingCanceled = false; -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); /** * We cancel only ended strokes. This is done to avoid * handling dangling pointers problem (KisStrokeId). The owner * of a stroke will cancel the stroke itself if needed. */ if (!m_d->strokesQueue.isEmpty() && !m_d->hasUnfinishedStrokes()) { anythingCanceled = true; Q_FOREACH (KisStrokeSP currentStroke, m_d->strokesQueue) { KIS_ASSERT_RECOVER_NOOP(currentStroke->isEnded()); currentStroke->cancelStroke(); // we shouldn't cancel buddies... if (currentStroke->type() == KisStroke::LOD0) { /** * If the buddy has already finished, we cannot undo it because * it doesn't store any undo data. Therefore we just regenerate * the LOD caches. */ m_d->lodNNeedsSynchronization = true; } } } /** * NOTE: We do not touch the openedStrokesCounter here since * we work with closed id's only here */ return anythingCanceled; } UndoResult KisStrokesQueue::tryUndoLastStrokeAsync() { UndoResult result = UNDO_FAIL; -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); std::reverse_iterator it(m_d->strokesQueue.constEnd()); std::reverse_iterator end(m_d->strokesQueue.constBegin()); KisStrokeSP lastStroke; KisStrokeSP lastBuddy; bool buddyFound = false; for (; it != end; ++it) { if ((*it)->type() == KisStroke::LEGACY) { break; } if (!lastStroke && (*it)->type() == KisStroke::LOD0 && !(*it)->isCancelled()) { lastStroke = *it; lastBuddy = lastStroke->lodBuddy(); KIS_SAFE_ASSERT_RECOVER(lastBuddy) { lastStroke.clear(); lastBuddy.clear(); break; } } KIS_SAFE_ASSERT_RECOVER(!lastStroke || *it == lastBuddy || (*it)->type() != KisStroke::LODN) { lastStroke.clear(); lastBuddy.clear(); break; } if (lastStroke && *it == lastBuddy) { KIS_SAFE_ASSERT_RECOVER(lastBuddy->type() == KisStroke::LODN) { lastStroke.clear(); lastBuddy.clear(); break; } buddyFound = true; break; } } if (!lastStroke) return UNDO_FAIL; if (!lastStroke->isEnded()) return UNDO_FAIL; if (lastStroke->isCancelled()) return UNDO_FAIL; KIS_SAFE_ASSERT_RECOVER_NOOP(!buddyFound || lastStroke->isCancelled() == lastBuddy->isCancelled()); KIS_SAFE_ASSERT_RECOVER_NOOP(lastBuddy->isEnded()); if (!lastStroke->canCancel()) { return UNDO_WAIT; } lastStroke->cancelStroke(); if (buddyFound && lastBuddy->canCancel()) { lastBuddy->cancelStroke(); } else { // TODO: assert that checks that there is no other lodn strokes locker.unlock(); m_d->lodNUndoStore.undo(); m_d->lodNUndoStore.purgeRedoState(); locker.relock(); } result = UNDO_OK; return result; } void KisStrokesQueue::Private::tryClearUndoOnStrokeCompletion(KisStrokeSP finishingStroke) { if (finishingStroke->type() != KisStroke::RESUME) return; bool hasResumeStrokes = false; bool hasLod0Strokes = false; Q_FOREACH (KisStrokeSP stroke, strokesQueue) { if (stroke == finishingStroke) continue; hasLod0Strokes |= stroke->type() == KisStroke::LOD0; hasResumeStrokes |= stroke->type() == KisStroke::RESUME; } KIS_SAFE_ASSERT_RECOVER_NOOP(!hasLod0Strokes || hasResumeStrokes); if (!hasResumeStrokes && !hasLod0Strokes) { lodNUndoStore.clear(); } } void KisStrokesQueue::processQueue(KisUpdaterContext &updaterContext, bool externalJobsPending) { - updaterContext.lock(); -// m_d->mutex.lock(); - m_d->rwLock.lockForWrite(); + m_d->mutex.lock(); while(updaterContext.hasSpareThread() && processOneJob(updaterContext, externalJobsPending)); - m_d->rwLock.unlock(); -// m_d->mutex.unlock(); - updaterContext.unlock(); + m_d->mutex.unlock(); } bool KisStrokesQueue::needsExclusiveAccess() const { return m_d->needsExclusiveAccess; } bool KisStrokesQueue::wrapAroundModeSupported() const { return m_d->wrapAroundModeSupported; } qreal KisStrokesQueue::balancingRatioOverride() const { return m_d->balancingRatioOverride; } bool KisStrokesQueue::isEmpty() const { -// QMutexLocker locker(&m_d->mutex); -// QReadLocker locker(&m_d->rwLock); return m_d->strokesQueue.isEmpty(); } qint32 KisStrokesQueue::sizeMetric() const { -// QMutexLocker locker(&m_d->mutex); -// QReadLocker locker(&m_d->rwLock); if(m_d->strokesQueue.isEmpty()) return 0; // just a rough approximation return qMax(1, m_d->strokesQueue.head()->numJobs()) * m_d->strokesQueue.size(); } void KisStrokesQueue::Private::switchDesiredLevelOfDetail(bool forced) { if (forced || nextDesiredLevelOfDetail != desiredLevelOfDetail) { Q_FOREACH (KisStrokeSP stroke, strokesQueue) { if (stroke->type() != KisStroke::LEGACY) return; } const bool forgettable = forced && !lodNNeedsSynchronization && desiredLevelOfDetail == nextDesiredLevelOfDetail; desiredLevelOfDetail = nextDesiredLevelOfDetail; lodNNeedsSynchronization |= !forgettable; if (desiredLevelOfDetail) { startLod0ToNStroke(desiredLevelOfDetail, forgettable); } } } void KisStrokesQueue::explicitRegenerateLevelOfDetail() { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); m_d->switchDesiredLevelOfDetail(true); } void KisStrokesQueue::setDesiredLevelOfDetail(int lod) { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); if (lod == m_d->nextDesiredLevelOfDetail) return; m_d->nextDesiredLevelOfDetail = lod; m_d->switchDesiredLevelOfDetail(false); } void KisStrokesQueue::notifyUFOChangedImage() { -// QMutexLocker locker(&m_d->mutex); - QWriteLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); m_d->lodNNeedsSynchronization = true; } void KisStrokesQueue::debugDumpAllStrokes() { -// QMutexLocker locker(&m_d->mutex); - QReadLocker locker(&m_d->rwLock); + QMutexLocker locker(&m_d->mutex); dbgImage <<"==="; Q_FOREACH (KisStrokeSP stroke, m_d->strokesQueue) { dbgImage << ppVar(stroke->name()) << ppVar(stroke->type()) << ppVar(stroke->numJobs()) << ppVar(stroke->isInitialized()) << ppVar(stroke->isCancelled()); } dbgImage <<"==="; } void KisStrokesQueue::setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory) { m_d->lod0ToNStrokeStrategyFactory = factory; } void KisStrokesQueue::setSuspendUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->suspendUpdatesStrokeStrategyFactory = factory; } void KisStrokesQueue::setResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->resumeUpdatesStrokeStrategyFactory = factory; } KisPostExecutionUndoAdapter *KisStrokesQueue::lodNPostExecutionUndoAdapter() const { return &m_d->lodNPostExecutionUndoAdapter; } KUndo2MagicString KisStrokesQueue::currentStrokeName() const { -// QMutexLocker locker(&m_d->mutex); -// QReadLocker locker(&m_d->rwLock); if(m_d->strokesQueue.isEmpty()) return KUndo2MagicString(); return m_d->strokesQueue.head()->name(); } bool KisStrokesQueue::hasOpenedStrokes() const { -// QMutexLocker locker(&m_d->mutex); -// QReadLocker locker(&m_d->rwLock); return m_d->openedStrokesCounter; } bool KisStrokesQueue::processOneJob(KisUpdaterContext &updaterContext, bool externalJobsPending) { if(m_d->strokesQueue.isEmpty()) return false; bool result = false; const int levelOfDetail = updaterContext.currentLevelOfDetail(); const KisUpdaterContextSnapshotEx snapshot = updaterContext.getContextSnapshotEx(); const bool hasStrokeJobs = !(snapshot == ContextEmpty || snapshot == HasMergeJob); const bool hasMergeJobs = snapshot & HasMergeJob; if(checkStrokeState(hasStrokeJobs, levelOfDetail) && checkExclusiveProperty(hasMergeJobs, hasStrokeJobs) && checkSequentialProperty(snapshot, externalJobsPending)) { KisStrokeJob *job = m_d->strokesQueue.head()->popOneJob(); while (!updaterContext.addStrokeJob(job)) {} result = true; } return result; } bool KisStrokesQueue::checkStrokeState(bool hasStrokeJobsRunning, int runningLevelOfDetail) { KisStrokeSP stroke = m_d->strokesQueue.head(); bool result = false; /** * We cannot start/continue a stroke if its LOD differs from * the one that is running on CPU */ bool hasLodCompatibility = checkLevelOfDetailProperty(runningLevelOfDetail); bool hasJobs = stroke->hasJobs(); /** * The stroke may be cancelled very fast. In this case it will * end up in the state: * * !stroke->isInitialized() && stroke->isEnded() && !stroke->hasJobs() * * This means that !isInitialised() doesn't imply there are any * jobs present. */ if(!stroke->isInitialized() && hasJobs && hasLodCompatibility) { /** * It might happen that the stroke got initialized, but its job was not * started due to some other reasons like exclusivity. Therefore the * stroke might end up in loaded, but uninitialized state. */ if (!m_d->currentStrokeLoaded) { m_d->needsExclusiveAccess = stroke->isExclusive(); m_d->wrapAroundModeSupported = stroke->supportsWrapAroundMode(); m_d->balancingRatioOverride = stroke->balancingRatioOverride(); m_d->currentStrokeLoaded = true; } result = true; } else if(hasJobs && hasLodCompatibility) { /** * If the stroke has no initialization phase, then it can * arrive here unloaded. */ if (!m_d->currentStrokeLoaded) { m_d->needsExclusiveAccess = stroke->isExclusive(); m_d->wrapAroundModeSupported = stroke->supportsWrapAroundMode(); m_d->balancingRatioOverride = stroke->balancingRatioOverride(); m_d->currentStrokeLoaded = true; } result = true; } else if(stroke->isEnded() && !hasJobs && !hasStrokeJobsRunning) { m_d->tryClearUndoOnStrokeCompletion(stroke); m_d->strokesQueue.dequeue(); // deleted by shared pointer m_d->needsExclusiveAccess = false; m_d->wrapAroundModeSupported = false; m_d->balancingRatioOverride = -1.0; m_d->currentStrokeLoaded = false; m_d->switchDesiredLevelOfDetail(false); if(!m_d->strokesQueue.isEmpty()) { result = checkStrokeState(false, runningLevelOfDetail); } } return result; } bool KisStrokesQueue::checkExclusiveProperty(bool hasMergeJobs, bool hasStrokeJobs) { Q_UNUSED(hasStrokeJobs); if(!m_d->strokesQueue.head()->isExclusive()) return true; return hasMergeJobs == 0; } bool KisStrokesQueue::checkSequentialProperty(KisUpdaterContextSnapshotEx snapshot, bool externalJobsPending) { KisStrokeSP stroke = m_d->strokesQueue.head(); if (snapshot & HasSequentialJob || snapshot & HasBarrierJob) { return false; } KisStrokeJobData::Sequentiality nextSequentiality = stroke->nextJobSequentiality(); if (nextSequentiality == KisStrokeJobData::UNIQUELY_CONCURRENT && snapshot & HasUniquelyConcurrentJob) { return false; } if (nextSequentiality == KisStrokeJobData::SEQUENTIAL && (snapshot & HasUniquelyConcurrentJob || snapshot & HasConcurrentJob)) { return false; } if (nextSequentiality == KisStrokeJobData::BARRIER && (snapshot & HasUniquelyConcurrentJob || snapshot & HasConcurrentJob || snapshot & HasMergeJob || externalJobsPending)) { return false; } return true; } bool KisStrokesQueue::checkLevelOfDetailProperty(int runningLevelOfDetail) { KisStrokeSP stroke = m_d->strokesQueue.head(); return runningLevelOfDetail < 0 || stroke->worksOnLevelOfDetail() == runningLevelOfDetail; } diff --git a/libs/image/kis_update_job_item.h b/libs/image/kis_update_job_item.h index 9d7354f967..ebf13a0baa 100644 --- a/libs/image/kis_update_job_item.h +++ b/libs/image/kis_update_job_item.h @@ -1,293 +1,266 @@ /* * Copyright (c) 2011 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATE_JOB_ITEM_H #define __KIS_UPDATE_JOB_ITEM_H #include #include #include #include "kis_stroke_job.h" #include "kis_spontaneous_job.h" #include "kis_base_rects_walker.h" #include "kis_async_merger.h" #include "kis_debug.h" #include "kis_updater_context.h" class KisUpdateJobItem : public QObject, public QRunnable { Q_OBJECT public: enum class Type : int { EMPTY = 0, WAITING, MERGE, STROKE, SPONTANEOUS }; public: KisUpdateJobItem(KisUpdaterContext *updaterContext, QReadWriteLock *exclusiveJobLock, int index) : m_updaterContext(updaterContext), m_exclusiveJobLock(exclusiveJobLock), m_index(index), m_atomicType(Type::EMPTY), m_runnableJob(0) { setAutoDelete(false); KIS_SAFE_ASSERT_RECOVER_NOOP(m_atomicType.is_lock_free()); } ~KisUpdateJobItem() override { delete m_runnableJob; } void run() override { if (!isRunning()) return; /** * Here we break the idea of QThreadPool a bit. Ideally, we should split the * jobs into distinct QRunnable objects and pass all of them to QThreadPool. * That is a nice idea, but it doesn't work well when the jobs are small enough * and the number of available cores is high (>4 cores). It this case the * threads just tend to execute the job very quickly and go to sleep, which is * an expensive operation. * * To overcome this problem we try to bulk-process the jobs. In sigJobFinished() * signal (which is DirectConnection), the context may add the job to ourselves(!!!), * so we switch from "done" state into "running" again. */ while (1) { KIS_SAFE_ASSERT_RECOVER_RETURN(isRunning()); if(m_exclusive) { m_exclusiveJobLock->lockForWrite(); } else { m_exclusiveJobLock->lockForRead(); } if(m_atomicType == Type::MERGE) { runMergeJob(); } else { KIS_ASSERT(m_atomicType == Type::STROKE || m_atomicType == Type::SPONTANEOUS); m_runnableJob->run(); } setDone(); -// emit sigDoSomeUsefulWork(); m_updaterContext->doSomeUsefulWork(); // may flip the current state from Waiting -> Running again -// emit sigJobFinished(m_index); m_updaterContext->jobFinished(m_index); m_exclusiveJobLock->unlock(); // try to exit the loop. Please note, that no one can flip the state from // WAITING to EMPTY except ourselves! Type expectedValue = Type::WAITING; if (m_atomicType.compare_exchange_strong(expectedValue, Type::EMPTY)) { break; } } } inline void runMergeJob() { KIS_SAFE_ASSERT_RECOVER_RETURN(m_atomicType == Type::MERGE); -// KIS_SAFE_ASSERT_RECOVER_RETURN(m_walker); + KIS_SAFE_ASSERT_RECOVER_RETURN(m_walker); // dbgKrita << "Executing merge job" << m_walker->changeRect() // << "on thread" << QThread::currentThreadId(); - QRect changeRect; + m_merger.startMerge(*m_walker); - for (auto walker : m_walkers) { - m_merger.startMerge(*walker); - changeRect |= walker->changeRect(); - } - -// emit sigContinueUpdate(changeRect); + QRect changeRect = m_walker->changeRect(); m_updaterContext->continueUpdate(changeRect); } // return true if the thread should actually be started inline bool setWalker(KisBaseRectsWalkerSP walker) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_accessRect = walker->accessRect(); m_changeRect = walker->changeRect(); m_walker = walker; m_exclusive = false; m_runnableJob = 0; const Type oldState = m_atomicType.exchange(Type::MERGE); return oldState == Type::EMPTY; } - inline bool setWalkers(QVector &walkers) { - KIS_ASSERT(m_atomicType <= Type::WAITING); - - m_accessRect = QRect(); - m_changeRect = QRect(); - - m_walkers.swap(walkers); - - m_exclusive = false; - m_runnableJob = 0; - - for (auto walker : m_walkers) { - m_accessRect |= walker->accessRect(); - m_changeRect |= walker->changeRect(); - } - - const Type oldState = m_atomicType.exchange(Type::MERGE); - return oldState == Type::EMPTY; - } - // return true if the thread should actually be started inline bool setStrokeJob(KisStrokeJob *strokeJob) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_runnableJob = strokeJob; m_strokeJobSequentiality = strokeJob->sequentiality(); m_exclusive = strokeJob->isExclusive(); - m_walkers.clear(); m_walker = 0; m_accessRect = m_changeRect = QRect(); const Type oldState = m_atomicType.exchange(Type::STROKE); return oldState == Type::EMPTY; } // return true if the thread should actually be started inline bool setSpontaneousJob(KisSpontaneousJob *spontaneousJob) { KIS_ASSERT(m_atomicType <= Type::WAITING); m_runnableJob = spontaneousJob; m_exclusive = spontaneousJob->isExclusive(); - m_walkers.clear(); m_walker = 0; m_accessRect = m_changeRect = QRect(); const Type oldState = m_atomicType.exchange(Type::SPONTANEOUS); return oldState == Type::EMPTY; } inline void setDone() { - m_walkers.clear(); m_walker = 0; delete m_runnableJob; m_runnableJob = 0; m_atomicType = Type::WAITING; } inline bool isRunning() const { return m_atomicType >= Type::MERGE; } inline Type type() const { return m_atomicType; } inline const QRect& accessRect() const { return m_accessRect; } inline const QRect& changeRect() const { return m_changeRect; } inline KisStrokeJobData::Sequentiality strokeJobSequentiality() const { return m_strokeJobSequentiality; } + inline int index() const { + return m_index; + } + Q_SIGNALS: void sigContinueUpdate(const QRect& rc); void sigDoSomeUsefulWork(); void sigJobFinished(int index); private: /** * Open walker and stroke job for the testing suite. * Please, do not use it in production code. */ friend class KisSimpleUpdateQueueTest; friend class KisStrokesQueueTest; friend class KisUpdateSchedulerTest; friend class KisTestableUpdaterContext; inline KisBaseRectsWalkerSP walker() const { return m_walker; } inline KisStrokeJob* strokeJob() const { KisStrokeJob *job = dynamic_cast(m_runnableJob); Q_ASSERT(job); return job; } inline void testingSetDone() { setDone(); } private: /** * \see KisUpdaterContext::m_exclusiveJobLock */ KisUpdaterContext *m_updaterContext; QReadWriteLock *m_exclusiveJobLock; const int m_index; bool m_exclusive; std::atomic m_atomicType; volatile KisStrokeJobData::Sequentiality m_strokeJobSequentiality; /** * Runnable jobs part * The job is owned by the context and deleted after completion */ KisRunnable *m_runnableJob; /** * Merge jobs part */ KisBaseRectsWalkerSP m_walker; - QVector m_walkers; KisAsyncMerger m_merger; /** * These rects cache actual values from the walker * to eliminate concurrent access to a walker structure */ QRect m_accessRect; QRect m_changeRect; }; #endif /* __KIS_UPDATE_JOB_ITEM_H */ diff --git a/libs/image/kis_update_scheduler.cpp b/libs/image/kis_update_scheduler.cpp index b5f8112bcb..2526b4dc71 100644 --- a/libs/image/kis_update_scheduler.cpp +++ b/libs/image/kis_update_scheduler.cpp @@ -1,503 +1,488 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_update_scheduler.h" #include "klocalizedstring.h" #include "kis_image_config.h" #include "kis_merge_walker.h" #include "kis_full_refresh_walker.h" #include "kis_updater_context.h" #include "kis_simple_update_queue.h" #include "kis_strokes_queue.h" #include "kis_queues_progress_updater.h" #include "KisImageConfigNotifier.h" #include #include "kis_lazy_wait_condition.h" #include //#define DEBUG_BALANCING #ifdef DEBUG_BALANCING #define DEBUG_BALANCING_METRICS(decidedFirst, excl) \ dbgKrita << "Balance decision:" << decidedFirst \ << "(" << excl << ")" \ << "updates:" << m_d->updatesQueue.sizeMetric() \ << "strokes:" << m_d->strokesQueue.sizeMetric() #else #define DEBUG_BALANCING_METRICS(decidedFirst, excl) #endif struct Q_DECL_HIDDEN KisUpdateScheduler::Private { Private(KisUpdateScheduler *_q, KisProjectionUpdateListener *p) : q(_q) , updaterContext(KisImageConfig(true).maxNumberOfThreads(), q) , projectionUpdateListener(p) {} KisUpdateScheduler *q; KisSimpleUpdateQueue updatesQueue; KisStrokesQueue strokesQueue; KisUpdaterContext updaterContext; bool processingBlocked = false; qreal defaultBalancingRatio = 1.0; // desired strokes-queue-size / updates-queue-size KisProjectionUpdateListener *projectionUpdateListener; KisQueuesProgressUpdater *progressUpdater = 0; QAtomicInt updatesLockCounter; QReadWriteLock updatesStartLock; KisLazyWaitCondition updatesFinishedCondition; qreal balancingRatio() const { const qreal strokeRatioOverride = strokesQueue.balancingRatioOverride(); return strokeRatioOverride > 0 ? strokeRatioOverride : defaultBalancingRatio; } }; KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent) : QObject(parent), m_d(new Private(this, projectionUpdateListener)) { updateSettings(); connectSignals(); } KisUpdateScheduler::KisUpdateScheduler() : m_d(new Private(this, 0)) { } KisUpdateScheduler::~KisUpdateScheduler() { delete m_d->progressUpdater; delete m_d; } void KisUpdateScheduler::setThreadsLimit(int value) { KIS_SAFE_ASSERT_RECOVER_RETURN(!m_d->processingBlocked); /** * Thread limit can be changed without the full-featured barrier * lock, we can avoid waiting for all the jobs to complete. We * should just ensure there is no more jobs in the updater context. */ lock(); - m_d->updaterContext.lock(); m_d->updaterContext.setThreadsLimit(value); - m_d->updaterContext.unlock(); unlock(false); } int KisUpdateScheduler::threadsLimit() const { - std::lock_guard l(m_d->updaterContext); return m_d->updaterContext.threadsLimit(); } void KisUpdateScheduler::connectSignals() { -// connect(&m_d->updaterContext, SIGNAL(sigContinueUpdate(const QRect&)), -// SLOT(continueUpdate(const QRect&)), -// Qt::DirectConnection); - -// connect(&m_d->updaterContext, SIGNAL(sigDoSomeUsefulWork()), -// SLOT(doSomeUsefulWork()), Qt::DirectConnection); - -// connect(&m_d->updaterContext, SIGNAL(sigSpareThreadAppeared()), -// SLOT(spareThreadAppeared()), Qt::DirectConnection); - connect(KisImageConfigNotifier::instance(), SIGNAL(configChanged()), SLOT(updateSettings())); } void KisUpdateScheduler::setProgressProxy(KoProgressProxy *progressProxy) { delete m_d->progressUpdater; m_d->progressUpdater = progressProxy ? new KisQueuesProgressUpdater(progressProxy, this) : 0; } void KisUpdateScheduler::progressUpdate() { if (!m_d->progressUpdater) return; if(!m_d->strokesQueue.hasOpenedStrokes()) { QString jobName = m_d->strokesQueue.currentStrokeName().toString(); if(jobName.isEmpty()) { jobName = i18n("Updating..."); } int sizeMetric = m_d->strokesQueue.sizeMetric(); if (!sizeMetric) { sizeMetric = m_d->updatesQueue.sizeMetric(); } m_d->progressUpdater->updateProgress(sizeMetric, jobName); } else { m_d->progressUpdater->hide(); } } void KisUpdateScheduler::updateProjection(KisNodeSP node, const QVector &rects, const QRect &cropRect) { m_d->updatesQueue.addUpdateJob(node, rects, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect) { m_d->updatesQueue.addUpdateJob(node, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect) { m_d->updatesQueue.addUpdateNoFilthyJob(node, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::fullRefreshAsync(KisNodeSP root, const QRect& rc, const QRect &cropRect) { m_d->updatesQueue.addFullRefreshJob(root, rc, cropRect, currentLevelOfDetail()); processQueues(); } void KisUpdateScheduler::fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect) { KisBaseRectsWalkerSP walker = new KisFullRefreshWalker(cropRect); walker->collectRects(root, rc); bool needLock = true; if(m_d->processingBlocked) { warnImage << "WARNING: Calling synchronous fullRefresh under a scheduler lock held"; warnImage << "We will not assert for now, but please port caller's to strokes"; warnImage << "to avoid this warning"; needLock = false; } if(needLock) lock(); - m_d->updaterContext.lock(); Q_ASSERT(m_d->updaterContext.isJobAllowed(walker)); m_d->updaterContext.addMergeJob(walker); m_d->updaterContext.waitForDone(); - m_d->updaterContext.unlock(); if(needLock) unlock(true); } void KisUpdateScheduler::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { m_d->updatesQueue.addSpontaneousJob(spontaneousJob); processQueues(); } KisStrokeId KisUpdateScheduler::startStroke(KisStrokeStrategy *strokeStrategy) { KisStrokeId id = m_d->strokesQueue.startStroke(strokeStrategy); processQueues(); return id; } void KisUpdateScheduler::addJob(KisStrokeId id, KisStrokeJobData *data) { m_d->strokesQueue.addJob(id, data); processQueues(); } void KisUpdateScheduler::endStroke(KisStrokeId id) { m_d->strokesQueue.endStroke(id); processQueues(); } bool KisUpdateScheduler::cancelStroke(KisStrokeId id) { bool result = m_d->strokesQueue.cancelStroke(id); processQueues(); return result; } bool KisUpdateScheduler::tryCancelCurrentStrokeAsync() { return m_d->strokesQueue.tryCancelCurrentStrokeAsync(); } UndoResult KisUpdateScheduler::tryUndoLastStrokeAsync() { return m_d->strokesQueue.tryUndoLastStrokeAsync(); } bool KisUpdateScheduler::wrapAroundModeSupported() const { return m_d->strokesQueue.wrapAroundModeSupported(); } void KisUpdateScheduler::setDesiredLevelOfDetail(int lod) { m_d->strokesQueue.setDesiredLevelOfDetail(lod); /** * The queue might have started an internal stroke for * cache synchronization. Process the queues to execute * it if needed. */ processQueues(); } void KisUpdateScheduler::explicitRegenerateLevelOfDetail() { m_d->strokesQueue.explicitRegenerateLevelOfDetail(); // \see a comment in setDesiredLevelOfDetail() processQueues(); } int KisUpdateScheduler::currentLevelOfDetail() const { int levelOfDetail = m_d->updaterContext.currentLevelOfDetail(); if (levelOfDetail < 0) { levelOfDetail = m_d->updatesQueue.overrideLevelOfDetail(); } if (levelOfDetail < 0) { levelOfDetail = 0; } return levelOfDetail; } void KisUpdateScheduler::setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory) { m_d->strokesQueue.setLod0ToNStrokeStrategyFactory(factory); } void KisUpdateScheduler::setSuspendUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->strokesQueue.setSuspendUpdatesStrokeStrategyFactory(factory); } void KisUpdateScheduler::setResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory) { m_d->strokesQueue.setResumeUpdatesStrokeStrategyFactory(factory); } KisPostExecutionUndoAdapter *KisUpdateScheduler::lodNPostExecutionUndoAdapter() const { return m_d->strokesQueue.lodNPostExecutionUndoAdapter(); } void KisUpdateScheduler::updateSettings() { m_d->updatesQueue.updateSettings(); KisImageConfig config(true); m_d->defaultBalancingRatio = config.schedulerBalancingRatio(); setThreadsLimit(config.maxNumberOfThreads()); } void KisUpdateScheduler::lock() { m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); } void KisUpdateScheduler::unlock(bool resetLodLevels) { if (resetLodLevels) { /** * Legacy strokes may have changed the image while we didn't * control it. Notify the queue to take it into account. */ m_d->strokesQueue.notifyUFOChangedImage(); } m_d->processingBlocked = false; processQueues(); } bool KisUpdateScheduler::isIdle() { bool result = false; if (tryBarrierLock()) { result = true; unlock(false); } return result; } void KisUpdateScheduler::waitForDone() { do { processQueues(); m_d->updaterContext.waitForDone(); } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); } bool KisUpdateScheduler::tryBarrierLock() { if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { return false; } m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); if(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()) { m_d->processingBlocked = false; processQueues(); return false; } return true; } void KisUpdateScheduler::barrierLock() { do { m_d->processingBlocked = false; processQueues(); m_d->processingBlocked = true; m_d->updaterContext.waitForDone(); } while(!m_d->updatesQueue.isEmpty() || !m_d->strokesQueue.isEmpty()); } void KisUpdateScheduler::processQueues() { wakeUpWaitingThreads(); if(m_d->processingBlocked) return; if(m_d->strokesQueue.needsExclusiveAccess()) { DEBUG_BALANCING_METRICS("STROKES", "X"); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); if(!m_d->strokesQueue.needsExclusiveAccess()) { tryProcessUpdatesQueue(); } } else if(m_d->balancingRatio() * m_d->strokesQueue.sizeMetric() > m_d->updatesQueue.sizeMetric()) { DEBUG_BALANCING_METRICS("STROKES", "N"); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); tryProcessUpdatesQueue(); } else { DEBUG_BALANCING_METRICS("UPDATES", "N"); tryProcessUpdatesQueue(); m_d->strokesQueue.processQueue(m_d->updaterContext, !m_d->updatesQueue.isEmpty()); } progressUpdate(); } void KisUpdateScheduler::blockUpdates() { m_d->updatesFinishedCondition.initWaiting(); m_d->updatesLockCounter.ref(); while(haveUpdatesRunning()) { m_d->updatesFinishedCondition.wait(); } m_d->updatesFinishedCondition.endWaiting(); } void KisUpdateScheduler::unblockUpdates() { m_d->updatesLockCounter.deref(); processQueues(); } void KisUpdateScheduler::wakeUpWaitingThreads() { if(m_d->updatesLockCounter && !haveUpdatesRunning()) { m_d->updatesFinishedCondition.wakeAll(); } } void KisUpdateScheduler::tryProcessUpdatesQueue() { QReadLocker locker(&m_d->updatesStartLock); if(m_d->updatesLockCounter) return; m_d->updatesQueue.processQueue(m_d->updaterContext); } bool KisUpdateScheduler::haveUpdatesRunning() { QWriteLocker locker(&m_d->updatesStartLock); qint32 numMergeJobs, numStrokeJobs; m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs); return numMergeJobs; } void KisUpdateScheduler::continueUpdate(const QRect &rect) { Q_ASSERT(m_d->projectionUpdateListener); m_d->projectionUpdateListener->notifyProjectionUpdated(rect); } void KisUpdateScheduler::doSomeUsefulWork() { m_d->updatesQueue.optimize(); } void KisUpdateScheduler::spareThreadAppeared() { processQueues(); } KisTestableUpdateScheduler::KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, qint32 threadCount) { Q_UNUSED(threadCount); updateSettings(); m_d->projectionUpdateListener = projectionUpdateListener; // The queue will update settings in a constructor itself // m_d->updatesQueue = new KisTestableSimpleUpdateQueue(); // m_d->strokesQueue = new KisStrokesQueue(); // m_d->updaterContext = new KisTestableUpdaterContext(threadCount); connectSignals(); } KisTestableUpdaterContext* KisTestableUpdateScheduler::updaterContext() { return dynamic_cast(&m_d->updaterContext); } KisTestableSimpleUpdateQueue* KisTestableUpdateScheduler::updateQueue() { return dynamic_cast(&m_d->updatesQueue); } diff --git a/libs/image/kis_update_scheduler.h b/libs/image/kis_update_scheduler.h index c36e7d9dd4..5b08e619c3 100644 --- a/libs/image/kis_update_scheduler.h +++ b/libs/image/kis_update_scheduler.h @@ -1,257 +1,252 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATE_SCHEDULER_H #define __KIS_UPDATE_SCHEDULER_H #include #include "kritaimage_export.h" #include "kis_types.h" #include "kis_image_interfaces.h" #include "kis_stroke_strategy_factory.h" #include "kis_strokes_queue_undo_result.h" class QRect; class KoProgressProxy; class KisProjectionUpdateListener; class KisSpontaneousJob; class KisPostExecutionUndoAdapter; class KRITAIMAGE_EXPORT KisUpdateScheduler : public QObject, public KisStrokesFacade { Q_OBJECT public: KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, QObject *parent = 0); ~KisUpdateScheduler() override; /** * Set the number of threads used by the scheduler */ void setThreadsLimit(int value); /** * Return the number of threads available to the scheduler */ int threadsLimit() const; /** * Sets the proxy that is going to be notified about the progress * of processing of the queues. If you want to switch the proxy * on runtime, you should do it under the lock held. * * \see lock(), unlock() */ void setProgressProxy(KoProgressProxy *progressProxy); /** * Blocks processing of the queues. * The function will wait until all the executing jobs * are finished. * NOTE: you may add new jobs while the block held, but they * will be delayed until unlock() is called. * * \see unlock() */ void lock(); /** * Unblocks the process and calls processQueues() * * \see processQueues() */ void unlock(bool resetLodLevels = true); /** * Waits until all the running jobs are finished. * * If some other thread adds jobs in parallel, then you may * wait forever. If you you don't want it, consider lock() instead. * * \see lock() */ void waitForDone(); /** * Waits until the queues become empty, then blocks the processing. * To unblock processing you should use unlock(). * * If some other thread adds jobs in parallel, then you may * wait forever. If you you don't want it, consider lock() instead. * * \see unlock(), lock() */ void barrierLock(); /** * Works like barrier lock, but returns false immediately if barrierLock * can't be acquired. * * \see barrierLock() */ bool tryBarrierLock(); /** * Tells if there are no strokes or updates are running at the * moment. Internally calls to tryBarrierLock(), so it is not O(1). */ bool isIdle(); /** * Blocks all the updates from execution. It doesn't affect * strokes execution in any way. This type of lock is supposed * to be held by the strokes themselves when they need a short * access to some parts of the projection of the image. * * From all the other places you should use usual lock()/unlock() * methods * * \see lock(), unlock() */ void blockUpdates(); /** * Unblocks updates from execution previously locked by blockUpdates() * * \see blockUpdates() */ void unblockUpdates(); void updateProjection(KisNodeSP node, const QVector &rects, const QRect &cropRect); void updateProjection(KisNodeSP node, const QRect &rc, const QRect &cropRect); void updateProjectionNoFilthy(KisNodeSP node, const QRect& rc, const QRect &cropRect); void fullRefreshAsync(KisNodeSP root, const QRect& rc, const QRect &cropRect); void fullRefresh(KisNodeSP root, const QRect& rc, const QRect &cropRect); void addSpontaneousJob(KisSpontaneousJob *spontaneousJob); KisStrokeId startStroke(KisStrokeStrategy *strokeStrategy) override; void addJob(KisStrokeId id, KisStrokeJobData *data) override; void endStroke(KisStrokeId id) override; bool cancelStroke(KisStrokeId id) override; /** * Sets the desired level of detail on which the strokes should * work. Please note that this configuration will be applied * starting from the next stroke. Please also note that this value * is not guaranteed to coincide with the one returned by * currentLevelOfDetail() */ void setDesiredLevelOfDetail(int lod); /** * Explicitly start regeneration of LoD planes of all the devices * in the image. This call should be performed when the user is idle, * just to make the quality of image updates better. */ void explicitRegenerateLevelOfDetail(); /** * Install a factory of a stroke strategy, that will be started * every time when the scheduler needs to synchronize LOD caches * of all the paint devices of the image. */ void setLod0ToNStrokeStrategyFactory(const KisLodSyncStrokeStrategyFactory &factory); /** * Install a factory of a stroke strategy, that will be started * every time when the scheduler needs to postpone all the updates * of the *LOD0* strokes. */ void setSuspendUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory); /** * \see setSuspendUpdatesStrokeStrategyFactory() */ void setResumeUpdatesStrokeStrategyFactory(const KisSuspendResumeStrategyFactory &factory); KisPostExecutionUndoAdapter* lodNPostExecutionUndoAdapter() const; /** * tryCancelCurrentStrokeAsync() checks whether there is a * *running* stroke (which is being executed at this very moment) * which is not still open by the owner (endStroke() or * cancelStroke() have already been called) and cancels it. * * \return true if some stroke has been found and cancelled * * \note This method is *not* part of KisStrokesFacade! It is too * low level for KisImage. In KisImage it is combined with * more high level requestStrokeCancellation(). */ bool tryCancelCurrentStrokeAsync(); UndoResult tryUndoLastStrokeAsync(); bool wrapAroundModeSupported() const; int currentLevelOfDetail() const; void continueUpdate(const QRect &rect); void doSomeUsefulWork(); void spareThreadAppeared(); protected: // Trivial constructor for testing support KisUpdateScheduler(); void connectSignals(); void processQueues(); protected Q_SLOTS: /** * Called when it is necessary to reread configuration */ void updateSettings(); -//private Q_SLOTS: -// void continueUpdate(const QRect &rect); -// void doSomeUsefulWork(); -// void spareThreadAppeared(); - private: friend class UpdatesBlockTester; bool haveUpdatesRunning(); void tryProcessUpdatesQueue(); void wakeUpWaitingThreads(); void progressUpdate(); protected: struct Private; Private * const m_d; }; class KisTestableUpdaterContext; class KisTestableSimpleUpdateQueue; class KRITAIMAGE_EXPORT KisTestableUpdateScheduler : public KisUpdateScheduler { public: KisTestableUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener, qint32 threadCount); KisTestableUpdaterContext* updaterContext(); KisTestableSimpleUpdateQueue* updateQueue(); using KisUpdateScheduler::processQueues; }; #endif /* __KIS_UPDATE_SCHEDULER_H */ diff --git a/libs/image/kis_updater_context.cpp b/libs/image/kis_updater_context.cpp index a7ac3a36be..80ea1c15fd 100644 --- a/libs/image/kis_updater_context.cpp +++ b/libs/image/kis_updater_context.cpp @@ -1,404 +1,342 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_updater_context.h" #include #include #include "kis_update_job_item.h" #include "kis_stroke_job.h" const int KisUpdaterContext::useIdealThreadCountTag = -1; KisUpdaterContext::KisUpdaterContext(qint32 threadCount, QObject *parent) : QObject(parent), m_scheduler(qobject_cast(parent)) { if (threadCount <= 0) { threadCount = QThread::idealThreadCount(); threadCount = threadCount > 0 ? threadCount : 1; } setThreadsLimit(threadCount); } KisUpdaterContext::~KisUpdaterContext() { m_threadPool.waitForDone(); for (qint32 i = 0; i < m_jobs.size(); i++) delete m_jobs[i]; } void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, qint32 &numStrokeJobs) { QReadLocker locker(&m_rwLock); numMergeJobs = 0; numStrokeJobs = 0; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->type() == KisUpdateJobItem::Type::MERGE || item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { numMergeJobs++; } else if (item->type() == KisUpdateJobItem::Type::STROKE) { numStrokeJobs++; } } } KisUpdaterContextSnapshotEx KisUpdaterContext::getContextSnapshotEx() const { QReadLocker locker(&m_rwLock); KisUpdaterContextSnapshotEx state = ContextEmpty; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->type() == KisUpdateJobItem::Type::MERGE || item->type() == KisUpdateJobItem::Type::SPONTANEOUS) { state |= HasMergeJob; } else if (item->type() == KisUpdateJobItem::Type::STROKE) { switch (item->strokeJobSequentiality()) { case KisStrokeJobData::SEQUENTIAL: state |= HasSequentialJob; break; case KisStrokeJobData::CONCURRENT: state |= HasConcurrentJob; break; case KisStrokeJobData::BARRIER: state |= HasBarrierJob; break; case KisStrokeJobData::UNIQUELY_CONCURRENT: state |= HasUniquelyConcurrentJob; break; } } } return state; } int KisUpdaterContext::currentLevelOfDetail() const { return m_lodCounter.readLod(); } bool KisUpdaterContext::hasSpareThread() { return !m_spareThreadsIndexes.isEmpty(); -// QReadLocker locker(&m_rwLock); -// bool found = false; - -// Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { -// if (!item->isRunning()) { -// found = true; -// break; -// } -// } -// return found; } bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) { int lod = this->currentLevelOfDetail(); if (lod >= 0 && walker->levelOfDetail() != lod) return false; QReadLocker locker(&m_rwLock); bool intersects = false; Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { if (item->isRunning() && walkerIntersectsJob(walker, item)) { intersects = true; break; } } return !intersects; } /** * NOTE: In theory, isJobAllowed() and addMergeJob() should be merged into * one atomic method like `bool push()`, because this implementation * of KisUpdaterContext will not work in case of multiple * producers. But currently we have only one producer (one thread * in a time), that is guaranteed by the lock()/unlock() pair in * KisAbstractUpdateQueue::processQueue. */ bool KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(walker->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } -bool KisUpdaterContext::addMergeJobs(QVector &walkers) -{ - qint32 jobIndex = findSpareThread(); - if (jobIndex < 0) return false; - - m_lodCounter.addLod(walkers[0]->levelOfDetail()); - - m_rwLock.lockForWrite(); - const bool shouldStartThread = m_jobs[jobIndex]->setWalkers(walkers); - m_rwLock.unlock(); - - // it might happen that we call this function from within - // the thread itself, right when it finished its work - if (shouldStartThread) { - m_threadPool.start(m_jobs[jobIndex]); - } - - return true; -} - /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(walker->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setWalker(walker); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } bool KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(strokeJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(strokeJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setStrokeJob(strokeJob); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } bool KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(spontaneousJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); m_rwLock.unlock(); // it might happen that we call this function from within // the thread itself, right when it finished its work if (shouldStartThread) { m_threadPool.start(m_jobs[jobIndex]); } return true; } /** * This variant is for use in a testing suite only */ bool KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { qint32 jobIndex = findSpareThread(); if (jobIndex < 0) return false; m_lodCounter.addLod(spontaneousJob->levelOfDetail()); m_rwLock.lockForWrite(); const bool shouldStartThread = m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); m_rwLock.unlock(); // HINT: Not calling start() here Q_UNUSED(shouldStartThread); return true; } void KisUpdaterContext::waitForDone() { m_threadPool.waitForDone(); } bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, const KisUpdateJobItem* job) { return (walker->accessRect().intersects(job->changeRect())) || (job->accessRect().intersects(walker->changeRect())); } qint32 KisUpdaterContext::findSpareThread() { int index = -1; m_spareThreadsIndexes.pop(index); return index; -// for (qint32 i = 0; i < m_jobs.size(); i++) -// if (!m_jobs[i]->isRunning()) -// return i; - -// return -1; -} - -void KisUpdaterContext::slotJobFinished(int index) -{ - m_lodCounter.removeLod(); - m_spareThreadsIndexes.push(index); - - // Be careful. This slot can be called asynchronously without locks. - emit sigSpareThreadAppeared(); -} - -void KisUpdaterContext::lock() -{ -// m_lock.lock(); -} - -void KisUpdaterContext::unlock() -{ -// m_lock.unlock(); } void KisUpdaterContext::setThreadsLimit(int value) { QWriteLocker locker(&m_rwLock); m_threadPool.setMaxThreadCount(value); for (int i = 0; i < m_jobs.size(); i++) { KIS_SAFE_ASSERT_RECOVER_RETURN(!m_jobs[i]->isRunning()); // don't delete the jobs until all of them are checked! } for (int i = 0; i < m_jobs.size(); i++) { delete m_jobs[i]; } m_spareThreadsIndexes.clear(); m_jobs.resize(value); for (qint32 i = 0; i < m_jobs.size(); i++) { m_jobs[i] = new KisUpdateJobItem(this, &m_exclusiveJobLock, i); m_spareThreadsIndexes.push(i); -// connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), -// SIGNAL(sigContinueUpdate(const QRect&)), -// Qt::DirectConnection); - -// connect(m_jobs[i], SIGNAL(sigDoSomeUsefulWork()), -// SIGNAL(sigDoSomeUsefulWork()), Qt::DirectConnection); - -// connect(m_jobs[i], SIGNAL(sigJobFinished(int)), -// SLOT(slotJobFinished(int)), Qt::DirectConnection); } } int KisUpdaterContext::threadsLimit() const { QReadLocker locker(&m_rwLock); KIS_SAFE_ASSERT_RECOVER_NOOP(m_jobs.size() == m_threadPool.maxThreadCount()); return m_jobs.size(); } void KisUpdaterContext::jobFinished(int index) { m_lodCounter.removeLod(); m_spareThreadsIndexes.push(index); // Be careful. This slot can be called asynchronously without locks. -// emit sigSpareThreadAppeared(); - m_scheduler->spareThreadAppeared(); + if (m_scheduler) m_scheduler->spareThreadAppeared(); } void KisUpdaterContext::continueUpdate(const QRect &rc) { -// emit sigContinueUpdate(rc); - m_scheduler->continueUpdate(rc); + if (m_scheduler) m_scheduler->continueUpdate(rc); } void KisUpdaterContext::doSomeUsefulWork() { -// emit doSomeUsefulWork(); - m_scheduler->doSomeUsefulWork(); + if (m_scheduler) m_scheduler->doSomeUsefulWork(); } KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) : KisUpdaterContext(threadCount) { } KisTestableUpdaterContext::~KisTestableUpdaterContext() { clear(); } const QVector KisTestableUpdaterContext::getJobs() { + QReadLocker locker(&m_rwLock); return m_jobs; } void KisTestableUpdaterContext::clear() { + QWriteLocker locker(&m_rwLock); + Q_FOREACH (KisUpdateJobItem *item, m_jobs) { item->testingSetDone(); + m_spareThreadsIndexes.push(item->index()); } m_lodCounter.testingClear(); } diff --git a/libs/image/kis_updater_context.h b/libs/image/kis_updater_context.h index 1cb4c1d713..f4e15e1fa0 100644 --- a/libs/image/kis_updater_context.h +++ b/libs/image/kis_updater_context.h @@ -1,205 +1,195 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef __KIS_UPDATER_CONTEXT_H #define __KIS_UPDATER_CONTEXT_H #include #include #include #include #include "kis_base_rects_walker.h" #include "kis_async_merger.h" #include "kis_lock_free_lod_counter.h" #include "KisUpdaterContextSnapshotEx.h" #include "tiles3/kis_lockless_stack.h" #include "kis_update_scheduler.h" class KisUpdateJobItem; class KisSpontaneousJob; class KisStrokeJob; class KRITAIMAGE_EXPORT KisUpdaterContext : public QObject { Q_OBJECT public: static const int useIdealThreadCountTag; public: KisUpdaterContext(qint32 threadCount = useIdealThreadCountTag, QObject *parent = 0); ~KisUpdaterContext() override; /** * Returns the number of currently running jobs of each type. * To use this information you should lock the context beforehand. * * \see lock() */ void getJobsSnapshot(qint32 &numMergeJobs, qint32 &numStrokeJobs); KisUpdaterContextSnapshotEx getContextSnapshotEx() const; /** * Returns the current level of detail of all the running jobs in the * context. If there are no jobs, returns -1. */ int currentLevelOfDetail() const; /** * Check whether there is a spare thread for running * one more job */ bool hasSpareThread(); /** * Checks whether the walker intersects with any * of currently executing walkers. If it does, * it is not allowed to go in. It should be called * with the lock held. * * \see lock() */ bool isJobAllowed(KisBaseRectsWalkerSP walker); /** * Registers the job and starts executing it. * The caller must ensure that the context is locked * with lock(), job is allowed with isWalkerAllowed() and * there is a spare thread for running it with hasSpareThread() * * \see lock() * \see isWalkerAllowed() * \see hasSpareThread() */ virtual bool addMergeJob(KisBaseRectsWalkerSP walker); - bool addMergeJobs(QVector &walkers); /** * Adds a stroke job to the context. The prerequisites are * the same as for addMergeJob() * \see addMergeJob() */ virtual bool addStrokeJob(KisStrokeJob *strokeJob); /** * Adds a spontaneous job to the context. The prerequisites are * the same as for addMergeJob() * \see addMergeJob() */ virtual bool addSpontaneousJob(KisSpontaneousJob *spontaneousJob); /** * Block execution of the caller until all the jobs are finished */ void waitForDone(); /** * Locks the context to guarantee an exclusive access * to the context */ void lock(); /** * Unlocks the context * * \see lock() */ void unlock(); /** * Set the number of threads available for this updater context * WARNING: one cannot change the number of threads if there is * at least one job running in the context! So before * calling this method make sure you do two things: * 1) barrierLock() the update scheduler * 2) lock() the context */ void setThreadsLimit(int value); /** * Return the number of available threads in the context. Make sure you * lock the context before calling this function! */ int threadsLimit() const; + void jobFinished(int index); void continueUpdate(const QRect& rc); void doSomeUsefulWork(); - -Q_SIGNALS: - void sigContinueUpdate(const QRect& rc); - void sigDoSomeUsefulWork(); - void sigSpareThreadAppeared(); - -protected Q_SLOTS: - void slotJobFinished(int index); - protected: static bool walkerIntersectsJob(KisBaseRectsWalkerSP walker, const KisUpdateJobItem* job); qint32 findSpareThread(); protected: /** * The lock is shared by all the child update job items. * When an item wants to run a usual (non-exclusive) job, * it locks the lock for read access. When an exclusive * access is requested, it locks it for write */ QReadWriteLock m_exclusiveJobLock; -// QMutex m_lock; mutable QReadWriteLock m_rwLock; QVector m_jobs; QThreadPool m_threadPool; KisLockFreeLodCounter m_lodCounter; KisLocklessStack m_spareThreadsIndexes; KisUpdateScheduler *m_scheduler; }; class KRITAIMAGE_EXPORT KisTestableUpdaterContext : public KisUpdaterContext { public: /** * Creates an explicit number of threads */ KisTestableUpdaterContext(qint32 threadCount); ~KisTestableUpdaterContext() override; /** * The only difference - it doesn't start execution * of the job */ bool addMergeJob(KisBaseRectsWalkerSP walker) override; bool addStrokeJob(KisStrokeJob *strokeJob) override; bool addSpontaneousJob(KisSpontaneousJob *spontaneousJob) override; const QVector getJobs(); void clear(); }; #endif /* __KIS_UPDATER_CONTEXT_H */ diff --git a/libs/image/tests/kis_updater_context_test.cpp b/libs/image/tests/kis_updater_context_test.cpp index d5d68d33dd..f380858ef5 100644 --- a/libs/image/tests/kis_updater_context_test.cpp +++ b/libs/image/tests/kis_updater_context_test.cpp @@ -1,241 +1,226 @@ /* * Copyright (c) 2010 Dmitry Kazakov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "kis_updater_context_test.h" #include #include #include #include #include "kis_paint_layer.h" #include "kis_merge_walker.h" #include "kis_updater_context.h" #include "kis_image.h" #include "scheduler_utils.h" #include "lod_override.h" void KisUpdaterContextTest::testJobInterference() { KisTestableUpdaterContext context(3); QRect imageRect(0,0,100,100); const KoColorSpace * cs = KoColorSpaceRegistry::instance()->rgb8(); KisImageSP image = new KisImage(0, imageRect.width(), imageRect.height(), cs, "merge test"); KisPaintLayerSP paintLayer = new KisPaintLayer(image, "test", OPACITY_OPAQUE_U8); image->lock(); image->addNode(paintLayer); image->unlock(); QRect dirtyRect1(0,0,50,100); KisBaseRectsWalkerSP walker1 = new KisMergeWalker(imageRect); walker1->collectRects(paintLayer, dirtyRect1); - context.lock(); context.addMergeJob(walker1); - context.unlock(); // overlapping job --- forbidden { QRect dirtyRect(30,0,100,100); KisBaseRectsWalkerSP walker = new KisMergeWalker(imageRect); walker->collectRects(paintLayer, dirtyRect); - context.lock(); QVERIFY(!context.isJobAllowed(walker)); - context.unlock(); } // not overlapping job --- allowed { QRect dirtyRect(60,0,100,100); KisBaseRectsWalkerSP walker = new KisMergeWalker(imageRect); walker->collectRects(paintLayer, dirtyRect); - context.lock(); QVERIFY(context.isJobAllowed(walker)); - context.unlock(); } // not overlapping job, conflicting LOD --- forbidden { TestUtil::LodOverride l(1, image); QCOMPARE(paintLayer->paintDevice()->defaultBounds()->currentLevelOfDetail(), 1); QRect dirtyRect(60,0,100,100); KisBaseRectsWalkerSP walker = new KisMergeWalker(imageRect); walker->collectRects(paintLayer, dirtyRect); - context.lock(); QVERIFY(!context.isJobAllowed(walker)); - context.unlock(); } } void KisUpdaterContextTest::testSnapshot() { KisTestableUpdaterContext context(3); QRect imageRect(0,0,100,100); KisBaseRectsWalkerSP walker1 = new KisMergeWalker(imageRect); qint32 numMergeJobs = -777; qint32 numStrokeJobs = -777; - context.lock(); - context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 0); QCOMPARE(numStrokeJobs, 0); QCOMPARE(context.currentLevelOfDetail(), -1); context.addMergeJob(walker1); context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 1); QCOMPARE(numStrokeJobs, 0); QCOMPARE(context.currentLevelOfDetail(), 0); KisStrokeJobData *data = new KisStrokeJobData(KisStrokeJobData::SEQUENTIAL, KisStrokeJobData::NORMAL); QScopedPointer strategy( new KisNoopDabStrategy("test")); context.addStrokeJob(new KisStrokeJob(strategy.data(), data, 0, true)); context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 1); QCOMPARE(numStrokeJobs, 1); QCOMPARE(context.currentLevelOfDetail(), 0); context.addSpontaneousJob(new KisNoopSpontaneousJob()); context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 2); QCOMPARE(numStrokeJobs, 1); QCOMPARE(context.currentLevelOfDetail(), 0); - context.unlock(); - { - context.lock(); context.clear(); context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 0); QCOMPARE(numStrokeJobs, 0); QCOMPARE(context.currentLevelOfDetail(), -1); data = new KisStrokeJobData(KisStrokeJobData::SEQUENTIAL, KisStrokeJobData::NORMAL); context.addStrokeJob(new KisStrokeJob(strategy.data(), data, 2, true)); context.getJobsSnapshot(numMergeJobs, numStrokeJobs); QCOMPARE(numMergeJobs, 0); QCOMPARE(numStrokeJobs, 1); QCOMPARE(context.currentLevelOfDetail(), 2); - - context.unlock(); } } #define NUM_THREADS 10 #define NUM_JOBS 6000 #define EXCLUSIVE_NTH 3 #define NUM_CHECKS 10 #define CHECK_DELAY 3 // ms class ExclusivenessCheckerStrategy : public KisStrokeJobStrategy { public: ExclusivenessCheckerStrategy(QAtomicInt &counter, QAtomicInt &hadConcurrency) : m_counter(counter), m_hadConcurrency(hadConcurrency) { } void run(KisStrokeJobData *data) override { Q_UNUSED(data); m_counter.ref(); for(int i = 0; i < NUM_CHECKS; i++) { if(data->isExclusive()) { Q_ASSERT(m_counter == 1); } else if (m_counter > 1) { m_hadConcurrency.ref(); } QTest::qSleep(CHECK_DELAY); } m_counter.deref(); } private: QAtomicInt &m_counter; QAtomicInt &m_hadConcurrency; }; void KisUpdaterContextTest::stressTestExclusiveJobs() { KisUpdaterContext context(NUM_THREADS); QAtomicInt counter; QAtomicInt hadConcurrency; for(int i = 0; i < NUM_JOBS; i++) { if(context.hasSpareThread()) { bool isExclusive = i % EXCLUSIVE_NTH == 0; KisStrokeJobData *data = new KisStrokeJobData(KisStrokeJobData::SEQUENTIAL, isExclusive ? KisStrokeJobData::EXCLUSIVE : KisStrokeJobData::NORMAL); KisStrokeJobStrategy *strategy = new ExclusivenessCheckerStrategy(counter, hadConcurrency); context.addStrokeJob(new KisStrokeJob(strategy, data, 0, true)); } else { QTest::qSleep(CHECK_DELAY); } } context.waitForDone(); QVERIFY(!counter); dbgKrita << "Concurrency observed:" << hadConcurrency << "/" << NUM_CHECKS * NUM_JOBS; } QTEST_MAIN(KisUpdaterContextTest)