diff --git a/events/eventdispatcher.cpp b/events/eventdispatcher.cpp index 3b6ccad..3231af6 100644 --- a/events/eventdispatcher.cpp +++ b/events/eventdispatcher.cpp @@ -1,324 +1,345 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "eventdispatcher.h" #include "eventdispatcher_p.h" #ifdef __linux__ #include "epolleventpoller.h" #elif _WIN32 #include "selecteventpoller_win32.h" #else #include "selecteventpoller_unix.h" #endif #include "event.h" #include "ieventpoller.h" #include "iioeventclient.h" #include "platformtime.h" #include "transceiver_p.h" #include "timer.h" #include #include #include //#define EVENTDISPATCHER_DEBUG using namespace std; EventDispatcher::EventDispatcher() : d(new EventDispatcherPrivate) { #ifdef __linux__ d->m_poller = new EpollEventPoller(this); #else // TODO high performance IO multiplexers for non-Linux platforms d->m_poller = new SelectEventPoller(this); #endif } EventDispatcherPrivate::~EventDispatcherPrivate() { for (const pair &fdCon : m_ioClients) { fdCon.second->setEventDispatcher(0); } for (const pair &dt : m_timers) { dt.second->m_eventDispatcher = nullptr; dt.second->m_isRunning = false; } delete m_poller; } EventDispatcher::~EventDispatcher() { delete d; d = nullptr; } bool EventDispatcher::poll(int timeout) { int nextDue = d->timeToFirstDueTimer(); if (timeout < 0) { timeout = nextDue; } else if (nextDue >= 0) { timeout = min(timeout, nextDue); } #ifdef EVENTDISPATCHER_DEBUG printf("EventDispatcher::poll(): timeout=%d, nextDue=%d.\n", timeout, nextDue); #endif IEventPoller::InterruptAction interrupAction = d->m_poller->poll(timeout); if (interrupAction == IEventPoller::Stop) { return false; } else if (interrupAction == IEventPoller::ProcessAuxEvents && d->m_transceiverToNotify) { d->processAuxEvents(); } d->triggerDueTimers(); return true; } void EventDispatcher::interrupt() { d->m_poller->interrupt(IEventPoller::Stop); } void EventDispatcherPrivate::wakeForEvents() { m_poller->interrupt(IEventPoller::ProcessAuxEvents); } bool EventDispatcherPrivate::addIoEventClient(IioEventClient *ioc) { pair::iterator, bool> insertResult; insertResult = m_ioClients.insert(make_pair(ioc->fileDescriptor(), ioc)); const bool ret = insertResult.second; if (ret) { m_poller->addIoEventClient(ioc); } return ret; } bool EventDispatcherPrivate::removeIoEventClient(IioEventClient *ioc) { const bool ret = m_ioClients.erase(ioc->fileDescriptor()); if (ret) { m_poller->removeIoEventClient(ioc); } return ret; } void EventDispatcherPrivate::setReadWriteInterest(IioEventClient *ioc, bool read, bool write) { m_poller->setReadWriteInterest(ioc, read, write); } void EventDispatcherPrivate::notifyClientForReading(FileDescriptor fd) { unordered_map::iterator it = m_ioClients.find(fd); if (it != m_ioClients.end()) { it->second->notifyRead(); } else { #ifdef IEVENTDISPATCHER_DEBUG // while interesting for debugging, this is not an error if a connection was in the epoll // set and disconnected in its notifyRead() or notifyWrite() implementation std::cerr << "EventDispatcherPrivate::notifyClientForReading(): unhandled file descriptor " << fd << ".\n"; #endif } } void EventDispatcherPrivate::notifyClientForWriting(FileDescriptor fd) { unordered_map::iterator it = m_ioClients.find(fd); if (it != m_ioClients.end()) { it->second->notifyWrite(); } else { #ifdef IEVENTDISPATCHER_DEBUG // while interesting for debugging, this is not an error if a connection was in the epoll // set and disconnected in its notifyRead() or notifyWrite() implementation std::cerr << "EventDispatcherPrivate::notifyClientForWriting(): unhandled file descriptor " << fd << ".\n"; #endif } } int EventDispatcherPrivate::timeToFirstDueTimer() const { - if (m_timers.empty()) { + multimap::const_iterator it = m_timers.cbegin(); + if (it == m_timers.cend()) { return -1; } - uint64 nextTimeout = (*m_timers.cbegin()).first >> 10; + if (it->second == nullptr) { + // this is the dead entry of the currently triggered, and meanwhile removed timer + if (++it == m_timers.cend()) { + return -1; + } + } + + uint64 nextTimeout = it->first >> 10; uint64 currentTime = PlatformTime::monotonicMsecs(); if (currentTime >= nextTimeout) { return 0; } return nextTimeout - currentTime; } uint EventDispatcherPrivate::nextTimerSerial() { if (++m_lastTimerSerial > s_maxTimerSerial) { m_lastTimerSerial = 0; } return m_lastTimerSerial; } void EventDispatcherPrivate::addTimer(Timer *timer) { - if (timer == m_triggeredTimer) { - m_isTriggeredTimerPendingRemoval = false; - return; - } if (timer->m_tag == 0) { timer->m_tag = nextTimerSerial(); } uint64 dueTime = PlatformTime::monotonicMsecs() + uint64(timer->m_interval); // ### When a timer is added from a timer callback, make sure it only runs in the *next* // iteration of the event loop. Otherwise, endless cascades of timers triggering, adding // more timers etc could occur without ever returning from triggerDueTimers(). // For the condition for this hazard, see "invariant:" in triggerDueTimers(): the only way // the new timer could trigger in this event loop iteration is when: // // m_triggerTime == currentTime(before call to trigger()) == timerAddedInTrigger().dueTime // // note: m_triggeredTimer.dueTime < m_triggerTime is well possible; if ==, the additional // condition applies that timerAddedInTrigger().serial >= m_triggeredTimer.serial; - // we ignore this and do it conservative and less complicated. + // we ignore this and do it conservatively and less complicated. // (the additional condition comes from serials as keys and that each "slot" in multimap with // the same keys is a list where new entries are back-inserted) // // As a countermeasure, tweak the new timer's timeout, putting it well before m_triggeredTimer's // iterator position in the multimap... because the new timer must have zero timeout in order for // its due time to occur within this triggerDueTimers() iteration, it is supposed to trigger ASAP // anyway. This disturbs the order of triggering a little compared to the usual, but all // timeouts are properly respected - the next event loop iteration is guaranteed to trigger - // timers at times strictly greater-equal than this iteration ;) - if (m_triggeredTimer && dueTime == m_triggerTime) { - dueTime = (m_triggeredTimer->m_tag >> 10) - 1; + // timers at times strictly greater-equal than this iteration (time goes only one way) ;) + if (m_triggerTime && dueTime == m_triggerTime) { + dueTime = m_triggerTime - 1; } timer->m_tag = (dueTime << 10) + (timer->m_tag & s_maxTimerSerial); m_timers.emplace(timer->m_tag, timer); } void EventDispatcherPrivate::removeTimer(Timer *timer) { assert(timer->m_tag != 0); - if (timer == m_triggeredTimer) { + // We cannot toggle m_isTriggeredTimerPendingRemoval back and forth, we can only set it once. + // Because after the timer has been removed once, the next time we see the same pointer value, + // it could be an entirely different timer. Consider this: + // delete timer1; // calls removeTimer() + // Timer *timer2 = new Timer(); // accidentally gets same memory address as timer1 + // timer2->start(...); + // timer2->stop(); // timer == m_triggeredTimer, uh oh + // The last line does not necessarily cause a problem, but just don't be excessively clever. + // On the other hand, not special-casing the currently triggered timer after it has been marked + // for removal once is fine. In case it is re-added, it gets a new map entry in addTimer() + // and from then on it can be handled like any other timer. + bool removingTriggeredTimer = false; + if (!m_isTriggeredTimerPendingRemoval && timer == m_triggeredTimer) { // using this variable, we can avoid dereferencing m_triggeredTimer should it have been // deleted while triggered m_isTriggeredTimerPendingRemoval = true; - return; + removingTriggeredTimer = true; } auto iterRange = m_timers.equal_range(timer->m_tag); for (; iterRange.first != iterRange.second; ++iterRange.first) { if (iterRange.first->second == timer) { - m_timers.erase(iterRange.first); + if (!removingTriggeredTimer) { + m_timers.erase(iterRange.first); + } else { + // mark it as dead for query methods such as timeToFirstDueTimer() + iterRange.first->second = nullptr; + } return; } } assert(false); // the timer should never request a remove when it has not been added } void EventDispatcherPrivate::triggerDueTimers() { m_triggerTime = PlatformTime::monotonicMsecs(); for (auto it = m_timers.begin(); it != m_timers.end();) { const uint64 timerTimeout = (it->first >> 10); if (timerTimeout > m_triggerTime) { break; } // careful here - protect against adding and removing any timer while inside its trigger()! // we do this by keeping the iterator at the current position (so changing any other timer // doesn't invalidate it) and blocking changes to the timer behind that iterator // (so we don't mess with its data should it have been deleted outright in the callback) m_triggeredTimer = it->second; Timer *const timer = m_triggeredTimer; m_isTriggeredTimerPendingRemoval = false; // invariant: // m_triggeredTimer.dueTime <= m_triggerTime <= currentTime(here) <= .dueTime timer->trigger(); m_triggeredTimer = nullptr; if (!m_isTriggeredTimerPendingRemoval && timer->m_isRunning) { // ### we are rescheduling timers based on triggerTime even though real time can be // much later - is this the desired behavior? I think so... if (timer->m_interval == 0) { // With the other branch we might iterate over this timer again in this invocation because // if there are several timers with the same tag, this entry will be back-inserted into the // list of values for the current tag / key slot. We only break out of the loop if // timerTimeout > m_triggerTime, so there would be an infinite loop. // Instead, we just leave the iterator alone, which does not put it in front of the current // iterator position. It's also good for performance. Win-win! ++it; } else { timer->m_tag = ((m_triggerTime + uint64(timer->m_interval)) << 10) + (timer->m_tag & s_maxTimerSerial); m_timers.erase(it++); m_timers.emplace(timer->m_tag, timer); } } else { m_timers.erase(it++); } } m_triggerTime = 0; } void EventDispatcherPrivate::queueEvent(std::unique_ptr evt) { // std::cerr << "EventDispatcherPrivate::queueEvent() " << evt->type << " " << this << std::endl; { SpinLocker locker(&m_queuedEventsLock); m_queuedEvents.emplace_back(std::move(evt)); } wakeForEvents(); } void EventDispatcherPrivate::processAuxEvents() { // std::cerr << "EventDispatcherPrivate::processAuxEvents() " << this << std::endl; // don't hog the lock while processing the events std::vector> events; { SpinLocker locker(&m_queuedEventsLock); std::swap(events, m_queuedEvents); } if (m_transceiverToNotify) { for (const std::unique_ptr &evt : events) { m_transceiverToNotify->processEvent(evt.get()); } } } diff --git a/tests/events/tst_timer_slow.cpp b/tests/events/tst_timer_slow.cpp index d012c40..6f30492 100644 --- a/tests/events/tst_timer_slow.cpp +++ b/tests/events/tst_timer_slow.cpp @@ -1,355 +1,450 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "eventdispatcher.h" #include "icompletionclient.h" #include "platformtime.h" #include "timer.h" #include "../testutil.h" +#include #include class BamPrinter : public ICompletionClient { public: BamPrinter(const char *customMessage, uint64 startTime) : m_customMessage(customMessage), m_startTime(startTime) {} void notifyCompletion(void *task) override { uint64 timeDiff = PlatformTime::monotonicMsecs() - m_startTime; std::cout << "BAM " << task << ' ' << timeDiff << ' ' << m_customMessage << " #" << m_counter++ << '\n'; } const char *m_customMessage; uint64 m_startTime; int m_counter = 0; }; // supposed to print some output to prove timers are working, and not crash :) static void testBasic() { EventDispatcher dispatcher; uint64 baseTime = PlatformTime::monotonicMsecs(); const char *customMessage1 = "Hello, world 1!"; BamPrinter printer1(customMessage1, baseTime); Timer t(&dispatcher); t.setCompletionClient(&printer1); t.setInterval(231); t.setRunning(true); const char *customMessage2 = "Hello, world 2!"; BamPrinter printer2(customMessage2, baseTime); Timer t2(&dispatcher); t2.setCompletionClient(&printer2); t2.setInterval(100); t2.setRunning(true); const char *customMessage3 = "Hello, other world!"; int booCounter = 0; CompletionFunc booPrinter([baseTime, customMessage3, &booCounter, &dispatcher, &t] (void *task) { uint64 timeDiff = PlatformTime::monotonicMsecs() - baseTime; std::cout << "boo " << task << ' ' << timeDiff << ' ' << customMessage3 << " #" << booCounter << " - Timer 1 remaining time: " << t.remainingTime() << '\n'; if (booCounter >= 4) { dispatcher.interrupt(); } booCounter++; }); Timer t3(&dispatcher); t3.setCompletionClient(&booPrinter); t3.setInterval(420); t3.setRunning(true); while (dispatcher.poll()) { } } class AccuracyTester : public ICompletionClient { public: AccuracyTester() : m_lastTriggerTime(PlatformTime::monotonicMsecs()) {} void notifyCompletion(void *task) override { Timer *timer = reinterpret_cast(task); uint64 currentTime = PlatformTime::monotonicMsecs(); int timeDiff = int64(currentTime) - int64(m_lastTriggerTime); m_lastTriggerTime = currentTime; std::cout << timer->interval() << ' ' << timeDiff << std::endl; TEST(std::abs(timeDiff - timer->interval()) < 5); m_count++; TEST(m_count < 26); // event loop should have stopped right at 25 if (m_count == 25) { timer->eventDispatcher()->interrupt(); } } uint64 m_lastTriggerTime; uint m_count = 0; }; static void testAccuracy() { // this test is likely to fail spuriously on a machine under load EventDispatcher dispatcher; AccuracyTester at1; Timer t1(&dispatcher); t1.setCompletionClient(&at1); t1.setInterval(225); t1.setRunning(true); AccuracyTester at2; Timer t2(&dispatcher); t2.setCompletionClient(&at2); t2.setInterval(42); t2.setRunning(true); while (dispatcher.poll()) { } } // this not only bounds how long the dispatcher runs, it also creates another timer to make the // situation more interesting class EventDispatcherInterruptor : public ICompletionClient { public: EventDispatcherInterruptor(EventDispatcher *ed, int timeout) : m_ttl(ed) { m_ttl.setInterval(timeout); m_ttl.setCompletionClient(this); m_ttl.setRunning(true); } void notifyCompletion(void * /*task*/) override { m_ttl.eventDispatcher()->interrupt(); m_ttl.setRunning(false); } Timer m_ttl; }; static void testDeleteInTrigger() { EventDispatcher dispatcher; bool alreadyCalled = false; CompletionFunc deleter([&alreadyCalled] (void *task) { TEST(!alreadyCalled); alreadyCalled = true; Timer *timer = reinterpret_cast(task); delete timer; }); Timer *t1 = new Timer(&dispatcher); t1->setCompletionClient(&deleter); t1->setRunning(true); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { } } static void testAddInTrigger() { // A timer added from the callback of another timer should not trigger in the same event loop // iteration, otherwise there could be an (accidental or intended) infinite cascade of zero interval // timers adding zero interval timers // since this test has a (small) false negative (note: negative == no problem found) rate - if // the current millisecond changes at certain points, it can mask a problem - just run it a couple // of times... for (int i = 0; i < 5; i++) { EventDispatcher dispatcher; int dispatchCounter = 0; int t2Counter = 0; CompletionFunc iterChecker([&dispatchCounter, &t2Counter] (void * /*task*/) { TEST(dispatchCounter > 0); t2Counter++; }); Timer t1(&dispatcher); Timer *t2 = nullptr; CompletionFunc adder([&dispatcher, &t2, &iterChecker] (void * /*task*/) { if (!t2) { t2 = new Timer(&dispatcher); t2->setCompletionClient(&iterChecker); t2->setRunning(true); // this could go wrong because we manipulate the due time in EventDispatcher::addTimer(), // but should be caught in Timer::remainingTime() TEST(t2->remainingTime() == 0); } }); t1.setInterval(10); t1.setRunning(true); t1.setCompletionClient(&adder); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { dispatchCounter++; } TEST(t2Counter > 1); delete t2; } } +static void testReAddInTrigger() +{ + // - Add a timer + // - Remove it + // - Remove it, then add it + // - Remove, add, remove + // - Remove, add, remove, add + // - Check timer's isRunning() considering whether last action was add or remove + // - Check if the timer triggers next time or not, consistent with previous point + + + // Repeat the tests that include re-adding with "pointer aliased" timers, i.e. add a new timer created + // at the same memory location as the old one. That tests whether a known difficulty of the chosen + // implementation is handled correctly. + + // Use the array to ensure we have pointer aliasing or no pointer aliasing + std::aligned_storage::type timerStorage[2]; + memset(timerStorage, 0, sizeof(timerStorage)); + + Timer *const timerArray = reinterpret_cast(timerStorage); + + for (int i = 0; i < 2; i++) { + const bool withAliasing = i == 1; + + for (int j = 0; j < 5; j++) { // j = number of add / remove ops + EventDispatcher dispatcher; + + Timer *t = &timerArray[0]; + bool removeTimer = false; + bool checkTrigger = false; + bool didTrigger = false; + + CompletionFunc addRemove([&] (void * /*task*/) { + if (checkTrigger) { + didTrigger = true; + return; + } + + for (int k = 0; k < j; k++) { + removeTimer = (k & 1) == 0; + if (removeTimer) { + TEST(t->isRunning()); + t->~Timer(); + memset(t, 0, sizeof(Timer)); // ensure that it can't trigger - of course if Timer + // relies on that we should find it in valgrind... + } else { + if (!withAliasing) { + if (t == &timerArray[0]) { + t = &timerArray[1]; + } else { + t = &timerArray[0]; + } + } + new(t) Timer(&dispatcher); + t->setCompletionClient(&addRemove); + t->start(0); + TEST(t->isRunning()); + } + } + }); + + + Timer dummy1(&dispatcher); + dummy1.start(0); + + new(t) Timer(&dispatcher); + t->start(0); + + Timer dummy2(&dispatcher); + dummy2.start(0); + + dispatcher.poll(); // this seems like a good idea for the test... + + // run and test the add / remove sequence + t->setCompletionClient(&addRemove); + dispatcher.poll(); + + // Test that the timer triggers when it should. Triggering when it should not will likely + // cause a segfault or other error because the Timer's memory has been cleared. + + checkTrigger = true; + dispatcher.poll(); + TEST(didTrigger != removeTimer); + + // clean up + if (!removeTimer) { + t->~Timer(); + } + memset(timerStorage, 0, sizeof(timerStorage)); + } + } +} + static void testTriggerOnlyOncePerDispatch() { EventDispatcher dispatcher; int dispatchCounter = 0; int noWorkCounter1 = 0; int noWorkCounter2 = 0; int hardWorkCounter = 0; Timer t1(&dispatcher); t1.setRunning(true); Timer t2(&dispatcher); t2.setRunning(true); Timer t3(&dispatcher); t3.setRunning(true); CompletionFunc noWorkCounter([&noWorkCounter1, &noWorkCounter2, &dispatchCounter, &t1, &t3] (void *task) { if (task == &t1) { TEST(noWorkCounter1 == dispatchCounter); noWorkCounter1++; } else { TEST(task == &t3); TEST(noWorkCounter2 == dispatchCounter); noWorkCounter2++; } }); t1.setCompletionClient(&noWorkCounter); t3.setCompletionClient(&noWorkCounter); CompletionFunc hardWorker([&hardWorkCounter, &dispatchCounter] (void * /*task*/) { TEST(hardWorkCounter == dispatchCounter); uint64 startTime = PlatformTime::monotonicMsecs(); // waste ten milliseconds, trying not to spend all time in PlatformTime::monotonicMsecs() do { for (volatile int i = 0; i < 20000; i++) {} } while (PlatformTime::monotonicMsecs() < startTime + 10); hardWorkCounter++; }); t2.setCompletionClient(&hardWorker); EventDispatcherInterruptor interruptor(&dispatcher, 200); while (dispatcher.poll()) { dispatchCounter++; } TEST(noWorkCounter1 == dispatchCounter || noWorkCounter1 == dispatchCounter - 1); TEST(noWorkCounter2 == dispatchCounter || noWorkCounter2 == dispatchCounter - 1); TEST(hardWorkCounter == dispatchCounter || hardWorkCounter == dispatchCounter - 1); } static void testReEnableNonRepeatingInTrigger() { EventDispatcher dispatcher; int slowCounter = 0; CompletionFunc slowReEnabler([&slowCounter] (void *task) { slowCounter++; Timer *timer = reinterpret_cast(task); TEST(!timer->isRunning()); timer->setRunning(true); TEST(timer->isRunning()); TEST(timer->interval() == 5); }); Timer slow(&dispatcher); slow.setCompletionClient(&slowReEnabler); slow.setRepeating(false); slow.setInterval(5); slow.setRunning(true); int fastCounter = 0; CompletionFunc fastReEnabler([&fastCounter] (void *task) { fastCounter++; Timer *timer = reinterpret_cast(task); TEST(!timer->isRunning()); timer->setRunning(true); TEST(timer->isRunning()); TEST(timer->interval() == 0); }); Timer fast(&dispatcher); fast.setCompletionClient(&fastReEnabler); fast.setRepeating(false); fast.setInterval(0); fast.setRunning(true); // also make sure that setRepeating(false) has any effect at all... int noRepeatCounter = 0; CompletionFunc noRepeatCheck([&noRepeatCounter] (void * /*task*/) { noRepeatCounter++; }); Timer noRepeat(&dispatcher); noRepeat.setCompletionClient(&noRepeatCheck); noRepeat.setRepeating(false); noRepeat.setInterval(10); noRepeat.setRunning(true); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { } TEST(noRepeatCounter == 1); TEST(slowCounter >= 8 && slowCounter <= 12); // std::cout << '\n' << fastCounter << ' ' << slowCounter <<'\n'; TEST(fastCounter >= 200); // ### hopefully low enough even for really slow machines and / or valgrind } int main(int, char *[]) { testBasic(); testAccuracy(); testDeleteInTrigger(); testAddInTrigger(); + testReAddInTrigger(); testTriggerOnlyOncePerDispatch(); testReEnableNonRepeatingInTrigger(); std::cout << "Passed!\n"; }