diff --git a/libs/image/kis_update_scheduler.cpp b/libs/image/kis_update_scheduler.cpp --- a/libs/image/kis_update_scheduler.cpp +++ b/libs/image/kis_update_scheduler.cpp @@ -55,15 +55,19 @@ KisSimpleUpdateQueue updatesQueue; KisStrokesQueue strokesQueue; - KisUpdaterContext updaterContext; bool processingBlocked = false; qreal balancingRatio = 1.0; // updates-queue-size/strokes-queue-size KisProjectionUpdateListener *projectionUpdateListener; KisQueuesProgressUpdater *progressUpdater = 0; QAtomicInt updatesLockCounter; QReadWriteLock updatesStartLock; KisLazyWaitCondition updatesFinishedCondition; + + // KisUpdaterContext can emit signals to KisUpdateScheduler in the dtor, so it + // must to be deleted before anything else. + // That means updaterContext must be declared last. + KisUpdaterContext updaterContext; }; KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener) @@ -163,9 +167,11 @@ Q_ASSERT(m_d->updaterContext.isJobAllowed(walker)); m_d->updaterContext.addMergeJob(walker); - m_d->updaterContext.waitForDone(); m_d->updaterContext.unlock(); + + m_d->updaterContext.waitForDone(); + if(needLock) unlock(true); } @@ -408,7 +414,9 @@ QWriteLocker locker(&m_d->updatesStartLock); qint32 numMergeJobs, numStrokeJobs; + m_d->updaterContext.lock(); m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs); + m_d->updaterContext.unlock(); return numMergeJobs; } diff --git a/libs/image/kis_updater_context.h b/libs/image/kis_updater_context.h --- a/libs/image/kis_updater_context.h +++ b/libs/image/kis_updater_context.h @@ -23,11 +23,16 @@ #include #include #include +#include #include "kis_base_rects_walker.h" #include "kis_async_merger.h" #include "kis_lock_free_lod_counter.h" +// TODO: uncomment ifndef for release on 3.0.1 +// #ifndef QT_NO_DEBUG +#define SANITY_CHECK_CONTEXT_LOCKING +// #endif // QT_NO_DEBUG class KisUpdateJobItem; class KisSpontaneousJob; @@ -128,6 +133,9 @@ protected: static bool walkerIntersectsJob(KisBaseRectsWalkerSP walker, const KisUpdateJobItem* job); + + qint32 defaultThreadCount() const; + qint32 findSpareThread(); protected: @@ -141,8 +149,14 @@ QMutex m_lock; QVector m_jobs; + QWaitCondition m_waitAllCond; QThreadPool m_threadPool; KisLockFreeLodCounter m_lodCounter; + +#ifdef SANITY_CHECK_CONTEXT_LOCKING + // Thread ID of the owner or -1 if not locked + volatile Qt::HANDLE m_lockedBy; +#endif }; class KRITAIMAGE_EXPORT KisTestableUpdaterContext : public KisUpdaterContext diff --git a/libs/image/kis_updater_context.cpp b/libs/image/kis_updater_context.cpp --- a/libs/image/kis_updater_context.cpp +++ b/libs/image/kis_updater_context.cpp @@ -21,18 +21,14 @@ #include #include +#include "kis_safe_read_list.h" #include "kis_update_job_item.h" #include "kis_stroke_job.h" -KisUpdaterContext::KisUpdaterContext(qint32 threadCount) +KisUpdaterContext::KisUpdaterContext(qint32 threadCount): + m_jobs(threadCount > 0 ? threadCount : defaultThreadCount()) { - if(threadCount <= 0) { - threadCount = QThread::idealThreadCount(); - threadCount = threadCount > 0 ? threadCount : 1; - } - - m_jobs.resize(threadCount); for(qint32 i = 0; i < m_jobs.size(); i++) { m_jobs[i] = new KisUpdateJobItem(&m_exclusiveJobLock); connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), @@ -45,6 +41,10 @@ connect(m_jobs[i], SIGNAL(sigJobFinished()), SLOT(slotJobFinished()), Qt::DirectConnection); } + +#ifdef SANITY_CHECK_CONTEXT_LOCKING + m_lockedBy = (Qt::HANDLE) -1; +#endif } KisUpdaterContext::~KisUpdaterContext() @@ -54,9 +54,19 @@ delete m_jobs[i]; } +qint32 KisUpdaterContext::defaultThreadCount() const +{ + int threadCount = QThread::idealThreadCount(); + return threadCount > 0 ? threadCount : 1; +} + void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, qint32 &numStrokeJobs) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + numMergeJobs = 0; numStrokeJobs = 0; @@ -91,6 +101,10 @@ bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + int lod = this->currentLevelOfDetail(); if (lod >= 0 && walker->levelOfDetail() != lod) return false; @@ -116,9 +130,13 @@ */ void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + m_lodCounter.addLod(walker->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setWalker(walker); m_threadPool.start(m_jobs[jobIndex]); @@ -129,19 +147,27 @@ */ void KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + m_lodCounter.addLod(walker->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setWalker(walker); // HINT: Not calling start() here } void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + m_lodCounter.addLod(strokeJob->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setStrokeJob(strokeJob); m_threadPool.start(m_jobs[jobIndex]); @@ -152,19 +178,27 @@ */ void KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + m_lodCounter.addLod(strokeJob->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setStrokeJob(strokeJob); // HINT: Not calling start() here } void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); +#endif + m_lodCounter.addLod(spontaneousJob->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); m_threadPool.start(m_jobs[jobIndex]); @@ -177,15 +211,43 @@ { m_lodCounter.addLod(spontaneousJob->levelOfDetail()); qint32 jobIndex = findSpareThread(); - Q_ASSERT(jobIndex >= 0); + KIS_ASSERT(jobIndex >= 0); m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); // HINT: Not calling start() here } void KisUpdaterContext::waitForDone() { - m_threadPool.waitForDone(); + lock(); + + while(true) { + bool allDone = true; + + QVector::const_iterator iter; + FOREACH_SAFE(iter, m_jobs) { + if ((*iter)->isRunning()) { + allDone = false; + break; + } + } + + if (!allDone) { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + m_lockedBy = (Qt::HANDLE) -1; +#endif + + m_waitAllCond.wait(&m_lock); + +#ifdef SANITY_CHECK_CONTEXT_LOCKING + m_lockedBy = QThread::currentThreadId(); +#endif + } else { + break; + } + } + + unlock(); } bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, @@ -208,17 +270,30 @@ { m_lodCounter.removeLod(); + m_waitAllCond.wakeOne(); // Be careful. This slot can be called asynchronously without locks. emit sigSpareThreadAppeared(); } void KisUpdaterContext::lock() { m_lock.lock(); + +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT_X(m_lockedBy == (Qt::HANDLE) -1, "KisUpdaterContext", + "context is already locked"); + + m_lockedBy = QThread::currentThreadId(); +#endif } void KisUpdaterContext::unlock() { +#ifdef SANITY_CHECK_CONTEXT_LOCKING + KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); + m_lockedBy = (Qt::HANDLE) -1; +#endif + m_lock.unlock(); } diff --git a/libs/image/tests/kis_strokes_queue_test.cpp b/libs/image/tests/kis_strokes_queue_test.cpp --- a/libs/image/tests/kis_strokes_queue_test.cpp +++ b/libs/image/tests/kis_strokes_queue_test.cpp @@ -114,7 +114,9 @@ KisTestableUpdaterContext context(2); QVector jobs; + context.lock(); context.addMergeJob(walker); + context.unlock(); queue.processQueue(context, false); jobs = context.getJobs(); @@ -139,7 +141,9 @@ QCOMPARE(queue.needsExclusiveAccess(), true); context.clear(); + context.lock(); context.addMergeJob(walker); + context.unlock(); queue.processQueue(context, false); COMPARE_WALKER(jobs[0], walker); @@ -201,7 +205,9 @@ VERIFY_EMPTY(jobs[2]); // Now some updates has come... + context.lock(); context.addMergeJob(walker); + context.unlock(); jobs = context.getJobs(); COMPARE_NAME(jobs[0], "nor_dab"); @@ -239,7 +245,9 @@ VERIFY_EMPTY(jobs[2]); // Process the last update... + context.lock(); context.addMergeJob(walker); + context.unlock(); externalJobsPending = false; // Yep, the queue is still waiting @@ -416,7 +424,9 @@ KisTestableUpdaterContext context(2); QVector jobs; + context.lock(); context.addMergeJob(walker); + context.unlock(); queue.processQueue(context, false); jobs = context.getJobs(); @@ -439,10 +449,14 @@ QCOMPARE(queue.needsExclusiveAccess(), false); // walker of a different LOD must not be allowed + context.lock(); QCOMPARE(context.isJobAllowed(walker), false); + context.unlock(); context.clear(); + context.lock(); context.addMergeJob(walker); + context.unlock(); queue.processQueue(context, false); jobs = context.getJobs(); diff --git a/libs/image/tests/kis_updater_context_test.cpp b/libs/image/tests/kis_updater_context_test.cpp --- a/libs/image/tests/kis_updater_context_test.cpp +++ b/libs/image/tests/kis_updater_context_test.cpp @@ -223,7 +223,9 @@ KisStrokeJobStrategy *strategy = new ExclusivenessCheckerStrategy(counter, hadConcurrency); + context.lock(); context.addStrokeJob(new KisStrokeJob(strategy, data, 0, true)); + context.unlock(); } else { QTest::qSleep(CHECK_DELAY);