Changeset View
Changeset View
Standalone View
Standalone View
libs/image/kis_updater_context.cpp
Show All 15 Lines | |||||
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | 16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | ||
17 | */ | 17 | */ | ||
18 | 18 | | |||
19 | #include "kis_updater_context.h" | 19 | #include "kis_updater_context.h" | ||
20 | 20 | | |||
21 | #include <QThread> | 21 | #include <QThread> | ||
22 | #include <QThreadPool> | 22 | #include <QThreadPool> | ||
23 | 23 | | |||
24 | #include "kis_safe_read_list.h" | ||||
24 | #include "kis_update_job_item.h" | 25 | #include "kis_update_job_item.h" | ||
25 | #include "kis_stroke_job.h" | 26 | #include "kis_stroke_job.h" | ||
26 | 27 | | |||
27 | 28 | | |||
28 | KisUpdaterContext::KisUpdaterContext(qint32 threadCount) | 29 | KisUpdaterContext::KisUpdaterContext(qint32 threadCount): | ||
30 | m_jobs(threadCount > 0 ? threadCount : defaultThreadCount()) | ||||
29 | { | 31 | { | ||
30 | if(threadCount <= 0) { | | |||
31 | threadCount = QThread::idealThreadCount(); | | |||
32 | threadCount = threadCount > 0 ? threadCount : 1; | | |||
33 | } | | |||
34 | | ||||
35 | m_jobs.resize(threadCount); | | |||
36 | for(qint32 i = 0; i < m_jobs.size(); i++) { | 32 | for(qint32 i = 0; i < m_jobs.size(); i++) { | ||
37 | m_jobs[i] = new KisUpdateJobItem(&m_exclusiveJobLock); | 33 | m_jobs[i] = new KisUpdateJobItem(&m_exclusiveJobLock); | ||
38 | connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), | 34 | connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)), | ||
39 | SIGNAL(sigContinueUpdate(const QRect&)), | 35 | SIGNAL(sigContinueUpdate(const QRect&)), | ||
40 | Qt::DirectConnection); | 36 | Qt::DirectConnection); | ||
41 | 37 | | |||
42 | connect(m_jobs[i], SIGNAL(sigDoSomeUsefulWork()), | 38 | connect(m_jobs[i], SIGNAL(sigDoSomeUsefulWork()), | ||
43 | SIGNAL(sigDoSomeUsefulWork()), Qt::DirectConnection); | 39 | SIGNAL(sigDoSomeUsefulWork()), Qt::DirectConnection); | ||
44 | 40 | | |||
45 | connect(m_jobs[i], SIGNAL(sigJobFinished()), | 41 | connect(m_jobs[i], SIGNAL(sigJobFinished()), | ||
46 | SLOT(slotJobFinished()), Qt::DirectConnection); | 42 | SLOT(slotJobFinished()), Qt::DirectConnection); | ||
47 | } | 43 | } | ||
44 | | ||||
45 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
46 | m_lockedBy = (Qt::HANDLE) -1; | ||||
47 | #endif | ||||
48 | } | 48 | } | ||
49 | 49 | | |||
50 | KisUpdaterContext::~KisUpdaterContext() | 50 | KisUpdaterContext::~KisUpdaterContext() | ||
51 | { | 51 | { | ||
52 | m_threadPool.waitForDone(); | 52 | m_threadPool.waitForDone(); | ||
53 | for(qint32 i = 0; i < m_jobs.size(); i++) | 53 | for(qint32 i = 0; i < m_jobs.size(); i++) | ||
54 | delete m_jobs[i]; | 54 | delete m_jobs[i]; | ||
55 | } | 55 | } | ||
56 | 56 | | |||
57 | qint32 KisUpdaterContext::defaultThreadCount() const | ||||
58 | { | ||||
59 | int threadCount = QThread::idealThreadCount(); | ||||
60 | return threadCount > 0 ? threadCount : 1; | ||||
61 | } | ||||
62 | | ||||
57 | void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, | 63 | void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs, | ||
58 | qint32 &numStrokeJobs) | 64 | qint32 &numStrokeJobs) | ||
59 | { | 65 | { | ||
66 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
67 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
68 | #endif | ||||
69 | | ||||
60 | numMergeJobs = 0; | 70 | numMergeJobs = 0; | ||
61 | numStrokeJobs = 0; | 71 | numStrokeJobs = 0; | ||
62 | 72 | | |||
63 | Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { | 73 | Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { | ||
64 | if(item->type() == KisUpdateJobItem::MERGE || | 74 | if(item->type() == KisUpdateJobItem::MERGE || | ||
65 | item->type() == KisUpdateJobItem::SPONTANEOUS) { | 75 | item->type() == KisUpdateJobItem::SPONTANEOUS) { | ||
66 | numMergeJobs++; | 76 | numMergeJobs++; | ||
67 | } | 77 | } | ||
Show All 18 Lines | 94 | if(!item->isRunning()) { | |||
86 | break; | 96 | break; | ||
87 | } | 97 | } | ||
88 | } | 98 | } | ||
89 | return found; | 99 | return found; | ||
90 | } | 100 | } | ||
91 | 101 | | |||
92 | bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) | 102 | bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker) | ||
93 | { | 103 | { | ||
104 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
105 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
106 | #endif | ||||
107 | | ||||
94 | int lod = this->currentLevelOfDetail(); | 108 | int lod = this->currentLevelOfDetail(); | ||
95 | if (lod >= 0 && walker->levelOfDetail() != lod) return false; | 109 | if (lod >= 0 && walker->levelOfDetail() != lod) return false; | ||
96 | 110 | | |||
97 | bool intersects = false; | 111 | bool intersects = false; | ||
98 | 112 | | |||
99 | Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { | 113 | Q_FOREACH (const KisUpdateJobItem *item, m_jobs) { | ||
100 | if(item->isRunning() && walkerIntersectsJob(walker, item)) { | 114 | if(item->isRunning() && walkerIntersectsJob(walker, item)) { | ||
101 | intersects = true; | 115 | intersects = true; | ||
Show All 9 Lines | |||||
111 | * one atomic method like `bool push()`, because this implementation | 125 | * one atomic method like `bool push()`, because this implementation | ||
112 | * of KisUpdaterContext will not work in case of multiple | 126 | * of KisUpdaterContext will not work in case of multiple | ||
113 | * producers. But currently we have only one producer (one thread | 127 | * producers. But currently we have only one producer (one thread | ||
114 | * in a time), that is guaranteed by the lock()/unlock() pair in | 128 | * in a time), that is guaranteed by the lock()/unlock() pair in | ||
115 | * KisAbstractUpdateQueue::processQueue. | 129 | * KisAbstractUpdateQueue::processQueue. | ||
116 | */ | 130 | */ | ||
117 | void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) | 131 | void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) | ||
118 | { | 132 | { | ||
133 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
134 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
135 | #endif | ||||
136 | | ||||
119 | m_lodCounter.addLod(walker->levelOfDetail()); | 137 | m_lodCounter.addLod(walker->levelOfDetail()); | ||
120 | qint32 jobIndex = findSpareThread(); | 138 | qint32 jobIndex = findSpareThread(); | ||
121 | Q_ASSERT(jobIndex >= 0); | 139 | KIS_ASSERT(jobIndex >= 0); | ||
122 | 140 | | |||
123 | m_jobs[jobIndex]->setWalker(walker); | 141 | m_jobs[jobIndex]->setWalker(walker); | ||
124 | m_threadPool.start(m_jobs[jobIndex]); | 142 | m_threadPool.start(m_jobs[jobIndex]); | ||
125 | } | 143 | } | ||
126 | 144 | | |||
127 | /** | 145 | /** | ||
128 | * This variant is for use in a testing suite only | 146 | * This variant is for use in a testing suite only | ||
129 | */ | 147 | */ | ||
130 | void KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) | 148 | void KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker) | ||
131 | { | 149 | { | ||
150 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
151 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
152 | #endif | ||||
153 | | ||||
132 | m_lodCounter.addLod(walker->levelOfDetail()); | 154 | m_lodCounter.addLod(walker->levelOfDetail()); | ||
133 | qint32 jobIndex = findSpareThread(); | 155 | qint32 jobIndex = findSpareThread(); | ||
134 | Q_ASSERT(jobIndex >= 0); | 156 | KIS_ASSERT(jobIndex >= 0); | ||
135 | 157 | | |||
136 | m_jobs[jobIndex]->setWalker(walker); | 158 | m_jobs[jobIndex]->setWalker(walker); | ||
137 | // HINT: Not calling start() here | 159 | // HINT: Not calling start() here | ||
138 | } | 160 | } | ||
139 | 161 | | |||
140 | void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) | 162 | void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) | ||
141 | { | 163 | { | ||
164 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
165 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
166 | #endif | ||||
167 | | ||||
142 | m_lodCounter.addLod(strokeJob->levelOfDetail()); | 168 | m_lodCounter.addLod(strokeJob->levelOfDetail()); | ||
143 | qint32 jobIndex = findSpareThread(); | 169 | qint32 jobIndex = findSpareThread(); | ||
144 | Q_ASSERT(jobIndex >= 0); | 170 | KIS_ASSERT(jobIndex >= 0); | ||
145 | 171 | | |||
146 | m_jobs[jobIndex]->setStrokeJob(strokeJob); | 172 | m_jobs[jobIndex]->setStrokeJob(strokeJob); | ||
147 | m_threadPool.start(m_jobs[jobIndex]); | 173 | m_threadPool.start(m_jobs[jobIndex]); | ||
148 | } | 174 | } | ||
149 | 175 | | |||
150 | /** | 176 | /** | ||
151 | * This variant is for use in a testing suite only | 177 | * This variant is for use in a testing suite only | ||
152 | */ | 178 | */ | ||
153 | void KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) | 179 | void KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob) | ||
154 | { | 180 | { | ||
181 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
182 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
183 | #endif | ||||
184 | | ||||
155 | m_lodCounter.addLod(strokeJob->levelOfDetail()); | 185 | m_lodCounter.addLod(strokeJob->levelOfDetail()); | ||
156 | qint32 jobIndex = findSpareThread(); | 186 | qint32 jobIndex = findSpareThread(); | ||
157 | Q_ASSERT(jobIndex >= 0); | 187 | KIS_ASSERT(jobIndex >= 0); | ||
158 | 188 | | |||
159 | m_jobs[jobIndex]->setStrokeJob(strokeJob); | 189 | m_jobs[jobIndex]->setStrokeJob(strokeJob); | ||
160 | // HINT: Not calling start() here | 190 | // HINT: Not calling start() here | ||
161 | } | 191 | } | ||
162 | 192 | | |||
163 | void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) | 193 | void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) | ||
164 | { | 194 | { | ||
195 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
196 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
197 | #endif | ||||
198 | | ||||
165 | m_lodCounter.addLod(spontaneousJob->levelOfDetail()); | 199 | m_lodCounter.addLod(spontaneousJob->levelOfDetail()); | ||
166 | qint32 jobIndex = findSpareThread(); | 200 | qint32 jobIndex = findSpareThread(); | ||
167 | Q_ASSERT(jobIndex >= 0); | 201 | KIS_ASSERT(jobIndex >= 0); | ||
168 | 202 | | |||
169 | m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); | 203 | m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); | ||
170 | m_threadPool.start(m_jobs[jobIndex]); | 204 | m_threadPool.start(m_jobs[jobIndex]); | ||
171 | } | 205 | } | ||
172 | 206 | | |||
173 | /** | 207 | /** | ||
174 | * This variant is for use in a testing suite only | 208 | * This variant is for use in a testing suite only | ||
175 | */ | 209 | */ | ||
176 | void KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) | 210 | void KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob) | ||
177 | { | 211 | { | ||
178 | m_lodCounter.addLod(spontaneousJob->levelOfDetail()); | 212 | m_lodCounter.addLod(spontaneousJob->levelOfDetail()); | ||
179 | qint32 jobIndex = findSpareThread(); | 213 | qint32 jobIndex = findSpareThread(); | ||
180 | Q_ASSERT(jobIndex >= 0); | 214 | KIS_ASSERT(jobIndex >= 0); | ||
181 | 215 | | |||
182 | m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); | 216 | m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob); | ||
183 | // HINT: Not calling start() here | 217 | // HINT: Not calling start() here | ||
184 | } | 218 | } | ||
185 | 219 | | |||
186 | void KisUpdaterContext::waitForDone() | 220 | void KisUpdaterContext::waitForDone() | ||
187 | { | 221 | { | ||
188 | m_threadPool.waitForDone(); | 222 | lock(); | ||
223 | | ||||
224 | while(true) { | ||||
225 | bool allDone = true; | ||||
226 | | ||||
227 | QVector<KisUpdateJobItem*>::const_iterator iter; | ||||
dkazakov: (This comment deprecated due to my comment below in the review section)
To have safe access to… | |||||
228 | FOREACH_SAFE(iter, m_jobs) { | ||||
229 | if ((*iter)->isRunning()) { | ||||
230 | allDone = false; | ||||
231 | break; | ||||
232 | } | ||||
233 | } | ||||
234 | | ||||
235 | if (!allDone) { | ||||
236 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
237 | m_lockedBy = (Qt::HANDLE) -1; | ||||
238 | #endif | ||||
239 | | ||||
240 | m_waitAllCond.wait(&m_lock); | ||||
241 | | ||||
242 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
243 | m_lockedBy = QThread::currentThreadId(); | ||||
244 | #endif | ||||
245 | } else { | ||||
246 | break; | ||||
247 | } | ||||
248 | } | ||||
249 | | ||||
250 | unlock(); | ||||
189 | } | 251 | } | ||
190 | 252 | | |||
191 | bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, | 253 | bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker, | ||
192 | const KisUpdateJobItem* job) | 254 | const KisUpdateJobItem* job) | ||
193 | { | 255 | { | ||
194 | return (walker->accessRect().intersects(job->changeRect())) || | 256 | return (walker->accessRect().intersects(job->changeRect())) || | ||
195 | (job->accessRect().intersects(walker->changeRect())); | 257 | (job->accessRect().intersects(walker->changeRect())); | ||
196 | } | 258 | } | ||
197 | 259 | | |||
198 | qint32 KisUpdaterContext::findSpareThread() | 260 | qint32 KisUpdaterContext::findSpareThread() | ||
199 | { | 261 | { | ||
200 | for(qint32 i=0; i < m_jobs.size(); i++) | 262 | for(qint32 i=0; i < m_jobs.size(); i++) | ||
201 | if(!m_jobs[i]->isRunning()) | 263 | if(!m_jobs[i]->isRunning()) | ||
202 | return i; | 264 | return i; | ||
203 | 265 | | |||
204 | return -1; | 266 | return -1; | ||
205 | } | 267 | } | ||
206 | 268 | | |||
207 | void KisUpdaterContext::slotJobFinished() | 269 | void KisUpdaterContext::slotJobFinished() | ||
208 | { | 270 | { | ||
209 | m_lodCounter.removeLod(); | 271 | m_lodCounter.removeLod(); | ||
210 | 272 | | |||
273 | m_waitAllCond.wakeOne(); | ||||
211 | // Be careful. This slot can be called asynchronously without locks. | 274 | // Be careful. This slot can be called asynchronously without locks. | ||
212 | emit sigSpareThreadAppeared(); | 275 | emit sigSpareThreadAppeared(); | ||
213 | } | 276 | } | ||
214 | 277 | | |||
215 | void KisUpdaterContext::lock() | 278 | void KisUpdaterContext::lock() | ||
216 | { | 279 | { | ||
217 | m_lock.lock(); | 280 | m_lock.lock(); | ||
281 | | ||||
282 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
283 | KIS_ASSERT_X(m_lockedBy == (Qt::HANDLE) -1, "KisUpdaterContext", | ||||
284 | "context is already locked"); | ||||
285 | | ||||
286 | m_lockedBy = QThread::currentThreadId(); | ||||
287 | #endif | ||||
218 | } | 288 | } | ||
219 | 289 | | |||
220 | void KisUpdaterContext::unlock() | 290 | void KisUpdaterContext::unlock() | ||
221 | { | 291 | { | ||
292 | #ifdef SANITY_CHECK_CONTEXT_LOCKING | ||||
293 | KIS_ASSERT(m_lockedBy == QThread::currentThreadId()); | ||||
294 | m_lockedBy = (Qt::HANDLE) -1; | ||||
295 | #endif | ||||
296 | | ||||
222 | m_lock.unlock(); | 297 | m_lock.unlock(); | ||
223 | } | 298 | } | ||
224 | 299 | | |||
225 | KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) | 300 | KisTestableUpdaterContext::KisTestableUpdaterContext(qint32 threadCount) | ||
226 | : KisUpdaterContext(threadCount) | 301 | : KisUpdaterContext(threadCount) | ||
227 | { | 302 | { | ||
228 | } | 303 | } | ||
229 | 304 | | |||
Show All 18 Lines |
(This comment deprecated due to my comment below in the review section)
To have safe access to m_jobs the context itself should also be locked while doing this iteration. Otherwise the code in KisStrokesQueue::processQueue() and KisSimpleUpdatesQueue::processQueue() will overwrite the contents of m_jobs.
And using Q_FOREACH without locking will cause a crash in a case when there is more than three threads accessing the m_jobs object. It happens because of implicit sharing in a temporary object (created inside Q_FOREACH), which will be detached when some other thread changed the original vector. You can check implementation of KisSafeReadNodeList and FOREACH_SAFE in KisNode to find more details about this problem.
Basically, I would suggest just to add the locking, then using Q_FOREACH will become safe.
But take care about the order of locking to avoid deadlocks. Ideally you'd better avoid having both locks locked at the same time. I'm not sure but it seems like moving m_condWaitLock.lock() below the loop should be ok.