Changeset View
Changeset View
Standalone View
Standalone View
src/server/collectionscheduler.cpp
Show First 20 Lines • Show All 42 Lines • ▼ Show 20 Line(s) | |||||
43 | public: | 43 | public: | ||
44 | PauseableTimer(QObject *parent = nullptr) | 44 | PauseableTimer(QObject *parent = nullptr) | ||
45 | : QTimer(parent) | 45 | : QTimer(parent) | ||
46 | { | 46 | { | ||
47 | } | 47 | } | ||
48 | 48 | | |||
49 | void start(int interval) | 49 | void start(int interval) | ||
50 | { | 50 | { | ||
51 | mStarted = QDateTime::currentDateTime(); | 51 | mStarted = QDateTime::currentDateTimeUtc(); | ||
52 | mPaused = QDateTime(); | 52 | mPaused = QDateTime(); | ||
53 | setInterval(interval); | 53 | setInterval(interval); | ||
54 | QTimer::start(interval); | 54 | QTimer::start(interval); | ||
55 | } | 55 | } | ||
56 | 56 | | |||
57 | void start() | 57 | void start() | ||
58 | { | 58 | { | ||
59 | start(interval()); | 59 | start(interval()); | ||
60 | } | 60 | } | ||
61 | 61 | | |||
62 | void stop() | 62 | void stop() | ||
63 | { | 63 | { | ||
64 | mStarted = QDateTime(); | 64 | mStarted = QDateTime(); | ||
65 | mPaused = QDateTime(); | 65 | mPaused = QDateTime(); | ||
66 | QTimer::stop(); | 66 | QTimer::stop(); | ||
67 | } | 67 | } | ||
68 | 68 | | |||
69 | Q_INVOKABLE void pause() | 69 | Q_INVOKABLE void pause() | ||
70 | { | 70 | { | ||
71 | if (!isActive() || isPaused()) { | 71 | if (!isActive() || isPaused()) { | ||
72 | return; | 72 | return; | ||
73 | } | 73 | } | ||
74 | 74 | | |||
75 | mPaused = QDateTime::currentDateTime(); | 75 | mPaused = QDateTime::currentDateTimeUtc(); | ||
76 | QTimer::stop(); | 76 | QTimer::stop(); | ||
77 | } | 77 | } | ||
78 | 78 | | |||
79 | Q_INVOKABLE void resume() | 79 | Q_INVOKABLE void resume() | ||
80 | { | 80 | { | ||
81 | if (!isPaused()) { | 81 | if (!isPaused()) { | ||
82 | return; | 82 | return; | ||
83 | } | 83 | } | ||
84 | 84 | | |||
85 | const int remainder = interval() - (mStarted.secsTo(mPaused) * 1000); | 85 | const int remainder = interval() - (mStarted.secsTo(mPaused) * 1000); | ||
86 | start(qMax(0, remainder)); | 86 | start(qMax(0, remainder)); | ||
87 | mPaused = QDateTime(); | 87 | mPaused = QDateTime(); | ||
88 | // Update mStarted so that pause() can be called repeatedly | 88 | // Update mStarted so that pause() can be called repeatedly | ||
89 | mStarted = QDateTime::currentDateTime(); | 89 | mStarted = QDateTime::currentDateTimeUtc(); | ||
90 | } | 90 | } | ||
91 | 91 | | |||
92 | bool isPaused() const | 92 | bool isPaused() const | ||
93 | { | 93 | { | ||
94 | return mPaused.isValid(); | 94 | return mPaused.isValid(); | ||
95 | } | 95 | } | ||
96 | 96 | | |||
97 | private: | 97 | private: | ||
98 | QDateTime mStarted; | 98 | QDateTime mStarted; | ||
99 | QDateTime mPaused; | 99 | QDateTime mPaused; | ||
100 | }; | 100 | }; | ||
101 | 101 | | |||
102 | } // namespace Server | 102 | } // namespace Server | ||
103 | } // namespace Akonadi | 103 | } // namespace Akonadi | ||
104 | 104 | | |||
105 | using namespace Akonadi::Server; | 105 | using namespace Akonadi::Server; | ||
106 | 106 | | |||
107 | CollectionScheduler::CollectionScheduler(const QString &threadName, QThread::Priority priority, QObject *parent) | 107 | CollectionScheduler::CollectionScheduler(const QString &threadName, QThread::Priority priority, QObject *parent) | ||
108 | : AkThread(threadName, priority, parent) | 108 | : AkThread(threadName, priority, parent) | ||
109 | , mScheduler(nullptr) | | |||
110 | , mMinInterval(5) | | |||
111 | { | 109 | { | ||
112 | } | 110 | } | ||
113 | 111 | | |||
114 | CollectionScheduler::~CollectionScheduler() | 112 | CollectionScheduler::~CollectionScheduler() | ||
115 | { | 113 | { | ||
116 | } | 114 | } | ||
117 | 115 | | |||
116 | // Called in secondary thread | ||||
118 | void CollectionScheduler::quit() | 117 | void CollectionScheduler::quit() | ||
119 | { | 118 | { | ||
120 | delete mScheduler; | 119 | delete mScheduler; | ||
121 | mScheduler = nullptr; | 120 | mScheduler = nullptr; | ||
122 | 121 | | |||
123 | AkThread::quit(); | 122 | AkThread::quit(); | ||
124 | } | 123 | } | ||
125 | 124 | | |||
126 | void CollectionScheduler::inhibit(bool inhibit) | 125 | void CollectionScheduler::inhibit(bool inhibit) | ||
127 | { | 126 | { | ||
128 | if (inhibit) { | 127 | if (inhibit) { | ||
129 | const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::pause, Qt::QueuedConnection); | 128 | const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::pause, Qt::QueuedConnection); | ||
130 | Q_ASSERT(success); Q_UNUSED(success); | 129 | Q_ASSERT(success); Q_UNUSED(success); | ||
131 | } else { | 130 | } else { | ||
132 | const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::resume, Qt::QueuedConnection); | 131 | const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::resume, Qt::QueuedConnection); | ||
133 | Q_ASSERT(success); Q_UNUSED(success); | 132 | Q_ASSERT(success); Q_UNUSED(success); | ||
134 | } | 133 | } | ||
135 | } | 134 | } | ||
136 | 135 | | |||
137 | int CollectionScheduler::minimumInterval() const | 136 | int CollectionScheduler::minimumInterval() const | ||
138 | { | 137 | { | ||
139 | return mMinInterval; | 138 | return mMinInterval; | ||
140 | } | 139 | } | ||
141 | 140 | | |||
141 | uint CollectionScheduler::nextScheduledTime(qint64 collectionId) const | ||||
142 | { | ||||
143 | QMutexLocker locker(&mScheduleLock); | ||||
144 | const auto i = constFind(collectionId); | ||||
145 | if (i != mSchedule.cend()) { | ||||
146 | return i.key(); | ||||
147 | } | ||||
148 | return 0; | ||||
149 | } | ||||
150 | | ||||
142 | void CollectionScheduler::setMinimumInterval(int intervalMinutes) | 151 | void CollectionScheduler::setMinimumInterval(int intervalMinutes) | ||
143 | { | 152 | { | ||
153 | // No mutex -- you can only call this before starting the thread | ||||
144 | mMinInterval = intervalMinutes; | 154 | mMinInterval = intervalMinutes; | ||
145 | } | 155 | } | ||
146 | 156 | | |||
147 | void CollectionScheduler::collectionAdded(qint64 collectionId) | 157 | void CollectionScheduler::collectionAdded(qint64 collectionId) | ||
148 | { | 158 | { | ||
149 | Collection collection = Collection::retrieveById(collectionId); | 159 | Collection collection = Collection::retrieveById(collectionId); | ||
150 | DataStore::self()->activeCachePolicy(collection); | 160 | DataStore::self()->activeCachePolicy(collection); | ||
151 | if (shouldScheduleCollection(collection)) { | 161 | if (shouldScheduleCollection(collection)) { | ||
152 | QMetaObject::invokeMethod(this, [this, collection]() {scheduleCollection(collection);}, Qt::QueuedConnection); | 162 | QMetaObject::invokeMethod(this, [this, collection]() {scheduleCollection(collection);}, Qt::QueuedConnection); | ||
153 | } | 163 | } | ||
154 | } | 164 | } | ||
155 | 165 | | |||
156 | void CollectionScheduler::collectionChanged(qint64 collectionId) | 166 | void CollectionScheduler::collectionChanged(qint64 collectionId) | ||
157 | { | 167 | { | ||
158 | QMutexLocker locker(&mScheduleLock); | 168 | QMutexLocker locker(&mScheduleLock); | ||
159 | for (const Collection &collection : qAsConst(mSchedule)) { | 169 | const auto it = constFind(collectionId); | ||
160 | if (collection.id() == collectionId) { | 170 | if (it != mSchedule.cend()) { | ||
171 | const Collection oldCollection = it.value(); | ||||
161 | Collection changed = Collection::retrieveById(collectionId); | 172 | Collection changed = Collection::retrieveById(collectionId); | ||
162 | DataStore::self()->activeCachePolicy(changed); | 173 | DataStore::self()->activeCachePolicy(changed); | ||
163 | if (hasChanged(collection, changed)) { | 174 | if (hasChanged(oldCollection, changed)) { | ||
164 | if (shouldScheduleCollection(changed)) { | 175 | if (shouldScheduleCollection(changed)) { | ||
165 | locker.unlock(); | 176 | locker.unlock(); | ||
166 | // Scheduling the changed collection will automatically remove the old one | 177 | // Scheduling the changed collection will automatically remove the old one | ||
167 | scheduleCollection(changed); | 178 | QMetaObject::invokeMethod(this, [this, changed]() {scheduleCollection(changed);}, Qt::QueuedConnection); | ||
168 | } else { | 179 | } else { | ||
169 | locker.unlock(); | 180 | locker.unlock(); | ||
170 | // If the collection should no longer be scheduled then remove it | 181 | // If the collection should no longer be scheduled then remove it | ||
171 | collectionRemoved(collectionId); | 182 | collectionRemoved(collectionId); | ||
172 | } | 183 | } | ||
173 | } | 184 | } | ||
174 | 185 | } else { | |||
175 | return; | | |||
176 | } | | |||
177 | } | | |||
178 | | ||||
179 | // We don't know the collection yet, but maybe now it can be scheduled | 186 | // We don't know the collection yet, but maybe now it can be scheduled | ||
180 | collectionAdded(collectionId); | 187 | collectionAdded(collectionId); | ||
181 | } | 188 | } | ||
189 | } | ||||
182 | 190 | | |||
183 | void CollectionScheduler::collectionRemoved(qint64 collectionId) | 191 | void CollectionScheduler::collectionRemoved(qint64 collectionId) | ||
184 | { | 192 | { | ||
185 | QMutexLocker locker(&mScheduleLock); | 193 | QMutexLocker locker(&mScheduleLock); | ||
186 | for (const Collection &collection : qAsConst(mSchedule)) { | 194 | auto it = find(collectionId); | ||
187 | if (collection.id() == collectionId) { | 195 | if (it != mSchedule.end()) { | ||
188 | const uint key = mSchedule.key(collection); | 196 | const bool reschedule = it == mSchedule.begin(); | ||
189 | const bool reschedule = (key == mSchedule.constBegin().key()); | 197 | mSchedule.erase(it); | ||
190 | mSchedule.remove(key); | | |||
191 | locker.unlock(); | | |||
192 | 198 | | |||
193 | // If we just remove currently scheduled collection, schedule the next one | 199 | // If we just remove currently scheduled collection, schedule the next one | ||
194 | if (reschedule) { | 200 | if (reschedule) { | ||
195 | startScheduler(); | 201 | QMetaObject::invokeMethod(this, &CollectionScheduler::startScheduler, Qt::QueuedConnection); | ||
196 | } | | |||
197 | | ||||
198 | return; | | |||
199 | } | 202 | } | ||
200 | } | 203 | } | ||
201 | } | 204 | } | ||
202 | 205 | | |||
206 | // Called in secondary thread | ||||
203 | void CollectionScheduler::startScheduler() | 207 | void CollectionScheduler::startScheduler() | ||
204 | { | 208 | { | ||
209 | QMutexLocker locker(&mScheduleLock); | ||||
205 | // Don't restart timer if we are paused. | 210 | // Don't restart timer if we are paused. | ||
206 | if (mScheduler->isPaused()) { | 211 | if (mScheduler->isPaused()) { | ||
207 | return; | 212 | return; | ||
208 | } | 213 | } | ||
209 | 214 | | |||
210 | QMutexLocker locker(&mScheduleLock); | | |||
211 | if (mSchedule.isEmpty()) { | 215 | if (mSchedule.isEmpty()) { | ||
212 | // Stop the timer. It will be started again once some collection is scheduled | 216 | // Stop the timer. It will be started again once some collection is scheduled | ||
213 | mScheduler->stop(); | 217 | mScheduler->stop(); | ||
214 | return; | 218 | return; | ||
215 | } | 219 | } | ||
216 | 220 | | |||
217 | // Get next collection to expire and start the timer | 221 | // Get next collection to expire and start the timer | ||
218 | const uint next = mSchedule.constBegin().key(); | 222 | const uint next = mSchedule.constBegin().key(); | ||
219 | // cast next - now() to int, so that we get negative result when next is in the past | 223 | // cast next - now() to int, so that we get negative result when next is in the past | ||
220 | mScheduler->start(qMax(0, (int)(next - QDateTime::currentDateTimeUtc().toTime_t()) * 1000)); | 224 | mScheduler->start(qMax(0, (int)(next - QDateTime::currentDateTimeUtc().toTime_t()) * 1000)); | ||
221 | } | 225 | } | ||
222 | 226 | | |||
227 | // Called in secondary thread | ||||
223 | void CollectionScheduler::scheduleCollection(Collection collection, bool shouldStartScheduler) | 228 | void CollectionScheduler::scheduleCollection(Collection collection, bool shouldStartScheduler) | ||
224 | { | 229 | { | ||
225 | QMutexLocker locker(&mScheduleLock); | 230 | QMutexLocker locker(&mScheduleLock); | ||
226 | auto i = std::find(mSchedule.cbegin(), mSchedule.cend(), collection); | 231 | auto i = find(collection.id()); | ||
227 | if (i != mSchedule.cend()) { | 232 | if (i != mSchedule.end()) { | ||
228 | mSchedule.remove(i.key(), i.value()); | 233 | mSchedule.erase(i); | ||
229 | } | 234 | } | ||
230 | 235 | | |||
231 | DataStore::self()->activeCachePolicy(collection); | 236 | DataStore::self()->activeCachePolicy(collection); | ||
232 | 237 | | |||
233 | if (!shouldScheduleCollection(collection)) { | 238 | if (!shouldScheduleCollection(collection)) { | ||
234 | return; | 239 | return; | ||
235 | } | 240 | } | ||
236 | 241 | | |||
237 | const int expireMinutes = qMax(mMinInterval, collectionScheduleInterval(collection)); | 242 | const int expireMinutes = qMax(mMinInterval, collectionScheduleInterval(collection)); | ||
243 | // TODO: port to qint64 and toSecsSinceEpoch | ||||
238 | uint nextCheck = QDateTime::currentDateTimeUtc().toTime_t() + (expireMinutes * 60); | 244 | uint nextCheck = QDateTime::currentDateTimeUtc().toTime_t() + (expireMinutes * 60); | ||
239 | 245 | | |||
240 | // Check whether there's another check scheduled within a minute after this one. | 246 | // Check whether there's another check scheduled within a minute after this one. | ||
241 | // If yes, then delay this check so that it's scheduled together with the others | 247 | // If yes, then delay this check so that it's scheduled together with the others | ||
242 | // This is a minor optimization to reduce wakeups and SQL queries | 248 | // This is a minor optimization to reduce wakeups and SQL queries | ||
243 | QMap<uint, Collection>::iterator it = mSchedule.lowerBound(nextCheck); | 249 | auto it = constLowerBound(nextCheck); | ||
244 | if (it != mSchedule.end() && it.key() - nextCheck < 60) { | 250 | if (it != mSchedule.cend() && it.key() - nextCheck < 60) { | ||
245 | nextCheck = it.key(); | 251 | nextCheck = it.key(); | ||
246 | 252 | | |||
247 | // Also check whether there's another checked scheduled within a minute before | 253 | // Also check whether there's another checked scheduled within a minute before | ||
248 | // this one. | 254 | // this one. | ||
249 | } else if (it != mSchedule.begin()) { | 255 | } else if (it != mSchedule.cbegin()) { | ||
250 | --it; | 256 | --it; | ||
251 | if (nextCheck - it.key() < 60) { | 257 | if (nextCheck - it.key() < 60) { | ||
252 | nextCheck = it.key(); | 258 | nextCheck = it.key(); | ||
253 | } | 259 | } | ||
254 | } | 260 | } | ||
255 | 261 | | |||
256 | mSchedule.insert(nextCheck, collection); | 262 | mSchedule.insert(nextCheck, collection); | ||
257 | if (shouldStartScheduler && !mScheduler->isActive()) { | 263 | if (shouldStartScheduler && !mScheduler->isActive()) { | ||
258 | locker.unlock(); | 264 | locker.unlock(); | ||
259 | startScheduler(); | 265 | startScheduler(); | ||
260 | } | 266 | } | ||
261 | } | 267 | } | ||
262 | 268 | | |||
269 | CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constFind(qint64 collectionId) const | ||||
270 | { | ||||
271 | return std::find_if(mSchedule.cbegin(), mSchedule.cend(), [collectionId](const Collection &c) { return c.id() == collectionId; }); | ||||
272 | } | ||||
273 | | ||||
274 | CollectionScheduler::ScheduleMap::iterator CollectionScheduler::find(qint64 collectionId) | ||||
275 | { | ||||
276 | return std::find_if(mSchedule.begin(), mSchedule.end(), [collectionId](const Collection &c) { return c.id() == collectionId; }); | ||||
277 | } | ||||
278 | | ||||
279 | // separate method so we call the const version of QMap::lowerBound | ||||
280 | CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constLowerBound(qint64 collectionId) const | ||||
281 | { | ||||
282 | return mSchedule.lowerBound(collectionId); | ||||
283 | } | ||||
284 | | ||||
285 | // Called in secondary thread | ||||
263 | void CollectionScheduler::init() | 286 | void CollectionScheduler::init() | ||
264 | { | 287 | { | ||
265 | AkThread::init(); | 288 | AkThread::init(); | ||
266 | 289 | | |||
267 | mScheduler = new PauseableTimer(); | 290 | mScheduler = new PauseableTimer(); | ||
268 | mScheduler->setSingleShot(true); | 291 | mScheduler->setSingleShot(true); | ||
269 | connect(mScheduler, &QTimer::timeout, | 292 | connect(mScheduler, &QTimer::timeout, | ||
270 | this, &CollectionScheduler::schedulerTimeout); | 293 | this, &CollectionScheduler::schedulerTimeout); | ||
Show All 17 Lines | |||||
288 | const Collection::List collections = qb.result(); | 311 | const Collection::List collections = qb.result(); | ||
289 | for (const Collection &collection : collections) { | 312 | for (const Collection &collection : collections) { | ||
290 | scheduleCollection(collection); | 313 | scheduleCollection(collection); | ||
291 | } | 314 | } | ||
292 | 315 | | |||
293 | startScheduler(); | 316 | startScheduler(); | ||
294 | } | 317 | } | ||
295 | 318 | | |||
319 | // Called in secondary thread | ||||
296 | void CollectionScheduler::schedulerTimeout() | 320 | void CollectionScheduler::schedulerTimeout() | ||
297 | { | 321 | { | ||
322 | QMutexLocker locker(&mScheduleLock); | ||||
323 | | ||||
298 | // Call stop() explicitly to reset the timer | 324 | // Call stop() explicitly to reset the timer | ||
299 | mScheduler->stop(); | 325 | mScheduler->stop(); | ||
300 | 326 | | |||
301 | mScheduleLock.lock(); | | |||
302 | const uint timestamp = mSchedule.constBegin().key(); | 327 | const uint timestamp = mSchedule.constBegin().key(); | ||
dvratil: Could you please also fix this to use `QMutexLocker` and move it to the top of the method? | |||||
303 | const QList<Collection> collections = mSchedule.values(timestamp); | 328 | const QList<Collection> collections = mSchedule.values(timestamp); | ||
304 | mSchedule.remove(timestamp); | 329 | mSchedule.remove(timestamp); | ||
305 | mScheduleLock.unlock(); | 330 | locker.unlock(); | ||
This is also broken, as below we are iterating over un-protected collections list... dvratil: This is also broken, as below we are iterating over un-protected `collections` list... | |||||
dfaure: It's local copy on stack, seems fine to me. | |||||
306 | 331 | | |||
307 | for (const Collection &collection : collections) { | 332 | for (const Collection &collection : collections) { | ||
308 | collectionExpired(collection); | 333 | collectionExpired(collection); | ||
309 | scheduleCollection(collection, false); | 334 | scheduleCollection(collection, false); | ||
310 | } | 335 | } | ||
311 | 336 | | |||
312 | startScheduler(); | 337 | startScheduler(); | ||
313 | } | 338 | } | ||
314 | 339 | | |||
315 | #include "collectionscheduler.moc" | 340 | #include "collectionscheduler.moc" |
Could you please also fix this to use QMutexLocker and move it to the top of the method?