diff --git a/libs/image/tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h b/libs/image/tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h index cf57fde92c..f60844fec9 100644 --- a/libs/image/tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h +++ b/libs/image/tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h @@ -1,355 +1,348 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef CONCURRENTMAP_LEAPFROG_H #define CONCURRENTMAP_LEAPFROG_H #include "Leapfrog.h" template , class VT = DefaultValueTraits > class ConcurrentMap_Leapfrog { public: typedef K Key; typedef V Value; typedef KT KeyTraits; typedef VT ValueTraits; - typedef typename BestFit::Unsigned Hash; + typedef quint32 Hash; typedef Leapfrog Details; private: Atomic m_root; public: ConcurrentMap_Leapfrog(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) { } ~ConcurrentMap_Leapfrog() { typename Details::Table* table = m_root.loadNonatomic(); table->destroy(); } // publishTableMigration() is called by exactly one thread from Details::TableMigration::run() // after all the threads participating in the migration have completed their work. void publishTableMigration(typename Details::TableMigration* migration) { // There are no racing calls to this function. typename Details::Table* oldRoot = m_root.loadNonatomic(); m_root.store(migration->m_destination, Release); Q_ASSERT(oldRoot == migration->getSources()[0].table); // Caller will GC the TableMigration and the source table. } // A Mutator represents a known cell in the hash table. // It's meant for manipulations within a temporary function scope. // Obviously you must not call QSBR::Update while holding a Mutator. // Any operation that modifies the table (exchangeValue, eraseValue) // may be forced to follow a redirected cell, which changes the Mutator itself. // Note that even if the Mutator was constructed from an existing cell, // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted, // or if another thread deletes the key between the two steps. class Mutator { private: friend class ConcurrentMap_Leapfrog; ConcurrentMap_Leapfrog& m_map; typename Details::Table* m_table; typename Details::Cell* m_cell; Value m_value; // Constructor: Find existing cell Mutator(ConcurrentMap_Leapfrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) { Hash hash = KeyTraits::hash(key); for (;;) { m_table = m_map.m_root.load(Consume); m_cell = Details::find(hash, m_table); if (!m_cell) { return; } Value value = m_cell->value.load(Consume); if (value != Value(ValueTraits::Redirect)) { // Found an existing value m_value = value; return; } // We've encountered a Redirect value. Help finish the migration. m_table->jobCoordinator.participate(); // Try again using the latest root. } } // Constructor: Insert or find cell Mutator(ConcurrentMap_Leapfrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) { Hash hash = KeyTraits::hash(key); for (;;) { m_table = m_map.m_root.load(Consume); quint64 overflowIdx; switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell case Details::InsertResult_InsertedNew: { // We've inserted a new cell. Don't load m_cell->value. return; } case Details::InsertResult_AlreadyFound: { // The hash was already found in the table. Value value = m_cell->value.load(Consume); if (value == Value(ValueTraits::Redirect)) { // We've encountered a Redirect value. break; // Help finish the migration. } // Found an existing value m_value = value; return; } case Details::InsertResult_Overflow: { // Unlike ConcurrentMap_Linear, we don't need to keep track of & pass a "mustDouble" flag. // Passing overflowIdx is sufficient to prevent an infinite loop here. // It defines the start of the range of cells to check while estimating total cells in use. // After the first migration, deleted keys are purged, so if we hit this line during the // second loop iteration, every cell in the range will be in use, thus the estimate will be 100%. // (Concurrent deletes could result in further iterations, but it will eventually settle.) Details::beginTableMigration(m_map, m_table, overflowIdx); break; } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); // Try again using the latest root. } } public: Value getValue() const { // Return previously loaded value. Don't load it again. return Value(m_value); } Value exchangeValue(Value desired) { Q_ASSERT(desired != Value(ValueTraits::NullValue)); Q_ASSERT(desired != Value(ValueTraits::Redirect)); Q_ASSERT(m_cell); // Cell must have been found or inserted for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, ConsumeRelease)) { // Exchange was successful. Return previous value. Value result = m_value; m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { // racing write inserted new value } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. return desired; } // We've encountered a Redirect value. Help finish the migration. Hash hash = m_cell->hash.load(Relaxed); for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); // Try again in the new table. m_table = m_map.m_root.load(Consume); m_value = Value(ValueTraits::NullValue); quint64 overflowIdx; switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(Consume); if (m_value == Value(ValueTraits::Redirect)) { break; } goto breakOuter; case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: Details::beginTableMigration(m_map, m_table, overflowIdx); break; } // We were redirected... again } breakOuter:; // Try again in the new table. } } void assignValue(Value desired) { exchangeValue(desired); } Value eraseValue() { Q_ASSERT(m_cell); // Cell must have been found or inserted for (;;) { if (m_value == Value(ValueTraits::NullValue)) { return Value(m_value); } if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), Consume)) { // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. Q_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. Value result = m_value; m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. // Pretend we erased nothing, and just let the racing write win. return Value(ValueTraits::NullValue); } // We've been redirected to a new table. Hash hash = m_cell->hash.load(Relaxed); // Re-fetch hash for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); // Try again in the new table. m_table = m_map.m_root.load(Consume); m_cell = Details::find(hash, m_table); if (!m_cell) { m_value = Value(ValueTraits::NullValue); return m_value; } m_value = m_cell->value.load(Relaxed); if (m_value != Value(ValueTraits::Redirect)) { break; } } } } }; Mutator insertOrFind(Key key) { return Mutator(*this, key); } Mutator find(Key key) { return Mutator(*this, key, false); } // Lookup without creating a temporary Mutator. Value get(Key key) { Hash hash = KeyTraits::hash(key); for (;;) { typename Details::Table* table = m_root.load(Consume); typename Details::Cell* cell = Details::find(hash, table); if (!cell) { return Value(ValueTraits::NullValue); } Value value = cell->value.load(Consume); if (value != Value(ValueTraits::Redirect)) { return value; // Found an existing value } // We've been redirected to a new table. Help with the migration. table->jobCoordinator.participate(); // Try again in the new table. } } Value assign(Key key, Value desired) { Mutator iter(*this, key); return iter.exchangeValue(desired); } Value exchange(Key key, Value desired) { Mutator iter(*this, key); return iter.exchangeValue(desired); } Value erase(Key key) { Mutator iter(*this, key, false); return iter.eraseValue(); } // The easiest way to implement an Iterator is to prevent all Redirects. // The currrent Iterator does that by forbidding concurrent inserts. // To make it work with concurrent inserts, we'd need a way to block TableMigrations. class Iterator { private: typename Details::Table* m_table; quint64 m_idx; Key m_hash; Value m_value; public: Iterator(ConcurrentMap_Leapfrog& map) { // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: m_table = map.m_root.load(Consume); m_idx = -1; next(); } void next() { Q_ASSERT(m_table); Q_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. while (++m_idx <= m_table->sizeMask) { // Index still inside range of table. typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); typename Details::Cell* cell = group->cells + (m_idx & 3); m_hash = cell->hash.load(Relaxed); if (m_hash != KeyTraits::NullHash) { // Cell has been reserved. m_value = cell->value.load(Relaxed); Q_ASSERT(m_value != Value(ValueTraits::Redirect)); if (m_value != Value(ValueTraits::NullValue)) return; // Yield this cell. } } // That's the end of the map. m_hash = KeyTraits::NullHash; m_value = Value(ValueTraits::NullValue); } bool isValid() const { return m_value != Value(ValueTraits::NullValue); } - Key getKey() const - { - Q_ASSERT(isValid()); - // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: - return KeyTraits::dehash(m_hash); - } - Value getValue() const { Q_ASSERT(isValid()); return m_value; } }; }; #endif // CONCURRENTMAP_LEAPFROG_H diff --git a/libs/image/tiles3/LockFreeMap/Leapfrog.h b/libs/image/tiles3/LockFreeMap/Leapfrog.h index 24a6462833..34af163d3b 100644 --- a/libs/image/tiles3/LockFreeMap/Leapfrog.h +++ b/libs/image/tiles3/LockFreeMap/Leapfrog.h @@ -1,548 +1,547 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef LEAPFROG_H #define LEAPFROG_H -#include "Util.h" #include "MapTraits.h" #include "SimpleJobCoordinator.h" #include "QSBR.h" template struct Leapfrog { typedef typename Map::Hash Hash; typedef typename Map::Value Value; typedef typename Map::KeyTraits KeyTraits; typedef typename Map::ValueTraits ValueTraits; static const quint64 InitialSize = 8; static const quint64 TableMigrationUnitSize = 32; static const quint64 LinearSearchLimit = 128; static const quint64 CellsInUseSample = LinearSearchLimit; Q_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links Q_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain struct Cell { Atomic hash; Atomic value; }; struct CellGroup { // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain. // Each cell in the probe chain is located within the table itself. // "deltas" determines the index of the next cell in the probe chain. // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket. // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket. // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket. Atomic deltas[8]; Cell cells[4]; }; struct Table { const quint64 sizeMask; // a power of two minus one QMutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration Table(quint64 sizeMask) : sizeMask(sizeMask) { } static Table* create(quint64 tableSize) { Q_ASSERT(isPowerOf2(tableSize)); Q_ASSERT(tableSize >= 4); quint64 numGroups = tableSize >> 2; Table* table = (Table*) std::malloc(sizeof(Table) + sizeof(CellGroup) * numGroups); - new(table) Table(tableSize - 1); + new (table) Table(tableSize - 1); for (quint64 i = 0; i < numGroups; i++) { CellGroup* group = table->getCellGroups() + i; for (quint64 j = 0; j < 4; j++) { group->deltas[j].storeNonatomic(0); group->deltas[j + 4].storeNonatomic(0); group->cells[j].hash.storeNonatomic(KeyTraits::NullHash); group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue)); } } return table; } void destroy() { this->Table::~Table(); std::free(this); } CellGroup* getCellGroups() const { return (CellGroup*)(this + 1); } quint64 getNumMigrationUnits() const { return sizeMask / TableMigrationUnitSize + 1; } }; class TableMigration : public SimpleJobCoordinator::Job { public: struct Source { Table* table; Atomic sourceIndex; }; Map& m_map; Table* m_destination; Atomic m_workerStatus; // number of workers + end flag Atomic m_overflowed; Atomic m_unitsRemaining; quint64 m_numSources; TableMigration(Map& map) : m_map(map) { } static TableMigration* create(Map& map, quint64 numSources) { TableMigration* migration = (TableMigration*) std::malloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); - new(migration) TableMigration(map); + new (migration) TableMigration(map); migration->m_workerStatus.storeNonatomic(0); migration->m_overflowed.storeNonatomic(false); migration->m_unitsRemaining.storeNonatomic(0); migration->m_numSources = numSources; // Caller is responsible for filling in sources & destination return migration; } virtual ~TableMigration() override { } void destroy() { // Destroy all source tables. for (quint64 i = 0; i < m_numSources; i++) if (getSources()[i].table) getSources()[i].table->destroy(); // Delete the migration object itself. this->TableMigration::~TableMigration(); std::free(this); } Source* getSources() const { return (Source*)(this + 1); } bool migrateRange(Table* srcTable, quint64 startIdx); virtual void run() override; }; static Cell* find(Hash hash, Table* table) { Q_ASSERT(table); Q_ASSERT(hash != KeyTraits::NullHash); quint64 sizeMask = table->sizeMask; // Optimistically check hashed cell even though it might belong to another bucket quint64 idx = hash & sizeMask; CellGroup* group = table->getCellGroups() + (idx >> 2); Cell* cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); if (probeHash == hash) { return cell; } else if (probeHash == KeyTraits::NullHash) { return cell = NULL; } // Follow probe chain for our bucket quint8 delta = group->deltas[idx & 3].load(Relaxed); while (delta) { idx = (idx + delta) & sizeMask; group = table->getCellGroups() + (idx >> 2); cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert, // but we don't check for it. We just follow the probe chain. if (probeHash == hash) { return cell; } delta = group->deltas[(idx & 3) + 4].load(Relaxed); } // End of probe chain, not found return NULL; } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, quint64& overflowIdx) { Q_ASSERT(table); Q_ASSERT(hash != KeyTraits::NullHash); quint64 sizeMask = table->sizeMask; quint64 idx = quint64(hash); // Check hashed cell first, though it may not even belong to the bucket. CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { if (cell->hash.compareExchangeStrong(probeHash, hash, Relaxed)) { // There are no links to set. We're done. return InsertResult_InsertedNew; } else { // Fall through to check if it was the same hash... } } if (probeHash == hash) { return InsertResult_AlreadyFound; } // Follow the link chain for this bucket. quint64 maxIdx = idx + sizeMask; quint64 linkLevel = 0; Atomic* prevLink; for (;;) { followLink: prevLink = group->deltas + ((idx & 3) + linkLevel); linkLevel = 4; quint8 probeDelta = prevLink->load(Relaxed); if (probeDelta) { idx += probeDelta; // Check the hash for this cell. group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { // Cell was linked, but hash is not visible yet. // We could avoid this case (and guarantee it's visible) using acquire & release, but instead, // just poll until it becomes visible. do { probeHash = cell->hash.load(Acquire); } while (probeHash == KeyTraits::NullHash); } Q_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked if (probeHash == hash) { return InsertResult_AlreadyFound; } } else { // Reached the end of the link chain for this bucket. // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. quint64 prevLinkIdx = idx; Q_ASSERT(qint64(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. quint64 linearProbesRemaining = qMin(maxIdx - idx, quint64(LinearSearchLimit)); while (linearProbesRemaining-- > 0) { idx++; group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { // It's an empty cell. Try to reserve it. if (cell->hash.compareExchangeStrong(probeHash, hash, Relaxed)) { // Success. We've reserved the cell. Link it to previous cell in same bucket. Q_ASSERT(probeDelta == 0); quint8 desiredDelta = idx - prevLinkIdx; prevLink->store(desiredDelta, Relaxed); return InsertResult_InsertedNew; } else { // Fall through to check if it's the same hash... } } Hash x = (probeHash ^ hash); // Check for same hash. if (!x) { return InsertResult_AlreadyFound; } // Check for same bucket. if ((x & sizeMask) == 0) { // Attempt to set the link on behalf of the late-arriving cell. // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here, // there's no guarantee that our own link chain will be well-formed by the time this function returns. // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) quint8 desiredDelta = idx - prevLinkIdx; prevLink->store(desiredDelta, Relaxed); goto followLink; // Try to follow link chain for the bucket again. } // Continue linear search... } // Table is too full to insert. overflowIdx = idx + 1; return InsertResult_Overflow; } } } static void beginTableMigrationToSize(Map& map, Table* table, quint64 nextTableSize) { // Create new migration by DCLI. SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume(); if (job) { // new migration already exists } else { QMutexLocker guard(&table->mutex); job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { // new migration already exists (double-checked) } else { // Create new migration. TableMigration* migration = TableMigration::create(map, 1); migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); migration->getSources()[0].table = table; migration->getSources()[0].sourceIndex.storeNonatomic(0); migration->m_destination = Table::create(nextTableSize); // Publish the new migration. table->jobCoordinator.storeRelease(migration); } } } static void beginTableMigration(Map& map, Table* table, quint64 overflowIdx) { // Estimate number of cells in use based on a small sample. quint64 sizeMask = table->sizeMask; quint64 idx = overflowIdx - CellsInUseSample; quint64 inUseCells = 0; for (quint64 linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); Cell* cell = group->cells + (idx & 3); Value value = cell->value.load(Relaxed); if (value == Value(ValueTraits::Redirect)) { // Another thread kicked off the jobCoordinator. The caller will participate upon return. return; } if (value != Value(ValueTraits::NullValue)) inUseCells++; idx++; } float inUseRatio = float(inUseCells) / CellsInUseSample; float estimatedInUse = (sizeMask + 1) * inUseRatio; quint64 nextTableSize = qMax(quint64(InitialSize), roundUpPowerOf2(quint64(estimatedInUse * 2))); beginTableMigrationToSize(map, table, nextTableSize); } }; // Leapfrog template bool Leapfrog::TableMigration::migrateRange(Table* srcTable, quint64 startIdx) { quint64 srcSizeMask = srcTable->sizeMask; quint64 endIdx = qMin(startIdx + TableMigrationUnitSize, srcSizeMask + 1); // Iterate over source range. for (quint64 srcIdx = startIdx; srcIdx < endIdx; srcIdx++) { CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2); Cell* srcCell = srcGroup->cells + (srcIdx & 3); Hash srcHash; Value srcValue; // Fetch the srcHash and srcValue. for (;;) { srcHash = srcCell->hash.load(Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. break; } if (srcValue == Value(ValueTraits::NullValue)) { break; // Redirect has been placed. Break inner loop, continue outer loop. } // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. srcValue = srcCell->value.load(Relaxed); if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), Relaxed)) { break; // Redirect has been placed. Break inner loop, continue outer loop. } if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. break; } // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. Q_ASSERT(srcHash != KeyTraits::NullHash); Q_ASSERT(srcValue != Value(ValueTraits::NullValue)); Q_ASSERT(srcValue != Value(ValueTraits::Redirect)); Cell* dstCell; quint64 overflowIdx; InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx); // During migration, a hash can only exist in one place among all the source tables, // and it is only migrated by one thread. Therefore, the hash will never already exist // in the destination table: Q_ASSERT(result != InsertResult_AlreadyFound); if (result == InsertResult_Overflow) { // Destination overflow. // This can happen for several reasons. For example, the source table could have // existed of all deleted cells when it overflowed, resulting in a small destination // table size, but then another thread could re-insert all the same hashes // before the migration completed. // Caller will cancel the current migration and begin a new one. return false; } // Migrate the old value to the new cell. for (;;) { // Copy srcValue to the destination. dstCell->value.store(srcValue, Relaxed); // Try to place a Redirect marker in srcValue. Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), Relaxed); Q_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) { // racing update was erase", uptr(srcTable), srcIdx) } break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. break; } } } // Range has been migrated successfully. return true; } template void Leapfrog::TableMigration::run() { // Conditionally increment the shared # of workers. quint64 probeStatus = m_workerStatus.load(Relaxed); do { if (probeStatus & 1) { // End flag is already set, so do nothing. return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, Relaxed, Relaxed)); // # of workers has been incremented, and the end flag is clear. Q_ASSERT((probeStatus & 1) == 0); // Iterate over all source tables. for (quint64 s = 0; s < m_numSources; s++) { Source& source = getSources()[s]; // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(Relaxed) & 1) { goto endMigration; } quint64 startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, Relaxed); if (startIdx >= source.table->sizeMask + 1) break; // No more migration units in this table. Try next source table. bool overflowed = !migrateRange(source.table, startIdx); if (overflowed) { // *** FAILED MIGRATION *** // TableMigration failed due to destination table overflow. // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, Relaxed); if (oldOverflowed) { // race to set m_overflowed } m_workerStatus.fetchOr(1, Relaxed); goto endMigration; } qint64 prevRemaining = m_unitsRemaining.fetchSub(1, Relaxed); Q_ASSERT(prevRemaining > 0); if (prevRemaining == 1) { // *** SUCCESSFUL MIGRATION *** // That was the last chunk to migrate. m_workerStatus.fetchOr(1, Relaxed); goto endMigration; } } } endMigration: // Decrement the shared # of workers. probeStatus = m_workerStatus.fetchSub(2, AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. return; } // We're the very last worker thread. // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. Q_ASSERT(probeStatus == 3); bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point if (!overflowed) { // The migration succeeded. This is the most likely outcome. Publish the new subtree. m_map.publishTableMigration(this); // End the jobCoodinator. getSources()[0].table->jobCoordinator.end(); } else { // The migration failed due to the overflow of the destination table. Table* origTable = getSources()[0].table; QMutexLocker guard(&origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { // a new TableMigration was already started } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); // Double the destination table size. migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2); // Transfer source tables to the new migration. for (quint64 i = 0; i < m_numSources; i++) { migration->getSources()[i].table = getSources()[i].table; getSources()[i].table = NULL; migration->getSources()[i].sourceIndex.storeNonatomic(0); } migration->getSources()[m_numSources].table = m_destination; migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0); // Calculate total number of migration units to move. quint64 unitsRemaining = 0; for (quint64 s = 0; s < migration->m_numSources; s++) { unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits(); } migration->m_unitsRemaining.storeNonatomic(unitsRemaining); // Publish the new migration. origTable->jobCoordinator.storeRelease(migration); } } // We're done with this TableMigration. Queue it for GC. QSBR::instance().enqueue(&TableMigration::destroy, this); } #endif // LEAPFROG_H diff --git a/libs/image/tiles3/LockFreeMap/MapTraits.h b/libs/image/tiles3/LockFreeMap/MapTraits.h index 857923025b..ac984151d9 100644 --- a/libs/image/tiles3/LockFreeMap/MapTraits.h +++ b/libs/image/tiles3/LockFreeMap/MapTraits.h @@ -1,42 +1,56 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef MAPTRAITS_H #define MAPTRAITS_H -#include "Util.h" +#include +#include + +inline quint64 roundUpPowerOf2(quint64 v) +{ + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + v++; + return v; +} + +inline bool isPowerOf2(quint64 v) +{ + return (v & (v - 1)) == 0; +} template struct DefaultKeyTraits { typedef T Key; - typedef typename BestFit::Unsigned Hash; + typedef quint32 Hash; static const Key NullKey = Key(0); static const Hash NullHash = Hash(0); static Hash hash(T key) { - return avalanche(Hash(key)); - } - - static Key dehash(Hash hash) - { - return (T) deavalanche(hash); + return std::hash()(Hash(key)) & std::hash()(Hash(key)); } }; template struct DefaultValueTraits { typedef T Value; - typedef typename BestFit::Unsigned IntType; + typedef quint32 IntType; static const IntType NullValue = 0; static const IntType Redirect = 1; }; #endif // MAPTRAITS_H diff --git a/libs/image/tiles3/LockFreeMap/QSBR.h b/libs/image/tiles3/LockFreeMap/QSBR.h index cae6bd13e4..484e91c93d 100644 --- a/libs/image/tiles3/LockFreeMap/QSBR.h +++ b/libs/image/tiles3/LockFreeMap/QSBR.h @@ -1,198 +1,198 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef QSBR_H #define QSBR_H #include #include #include #define CALL_MEMBER(obj, pmf) ((obj).*(pmf)) class QSBR { private: struct Action { void (*func)(void*); quint64 param[4]; // Size limit found experimentally. Verified by assert below. Action() = default; Action(void (*f)(void*), void* p, quint64 paramSize) : func(f) { Q_ASSERT(paramSize <= sizeof(param)); // Verify size limit. memcpy(¶m, p, paramSize); } void operator()() { func(¶m); } }; struct Status { qint16 inUse : 1; qint16 wasIdle : 1; qint16 nextFree : 14; Status() : inUse(1), wasIdle(0), nextFree(0) { } }; QMutex m_mutex; QVector m_status; qint64 m_freeIndex; qint64 m_numContexts; qint64 m_remaining; QVector m_deferredActions; QVector m_pendingActions; void onAllQuiescentStatesPassed(QVector& actions) { // m_mutex must be held actions.swap(m_pendingActions); m_pendingActions.swap(m_deferredActions); m_remaining = m_numContexts; - for (quint64 i = 0; i < m_status.size(); i++) { + for (qint32 i = 0; i < m_status.size(); i++) { m_status[i].wasIdle = 0; } } QSBR() : m_freeIndex(-1), m_numContexts(0), m_remaining(0) { } public: typedef qint16 Context; static QSBR &instance() { static QSBR m_instance; return m_instance; } Context createContext() { QMutexLocker guard(&m_mutex); m_numContexts++; m_remaining++; Q_ASSERT(m_numContexts < (1 << 14)); qint64 context = m_freeIndex; if (context >= 0) { Q_ASSERT(context < (qint64) m_status.size()); Q_ASSERT(!m_status[context].inUse); m_freeIndex = m_status[context].nextFree; m_status[context] = Status(); } else { context = m_status.size(); m_status.append(Status()); } return context; } void destroyContext(QSBR::Context context) { QVector actions; { QMutexLocker guard(&m_mutex); Q_ASSERT(context < m_status.size()); if (m_status[context].inUse && !m_status[context].wasIdle) { Q_ASSERT(m_remaining > 0); --m_remaining; } m_status[context].inUse = 0; m_status[context].nextFree = m_freeIndex; m_freeIndex = context; m_numContexts--; if (m_remaining == 0) { onAllQuiescentStatesPassed(actions); } } - for (quint64 i = 0; i < actions.size(); i++) { + for (qint32 i = 0; i < actions.size(); i++) { actions[i](); } } template void enqueue(void (T::*pmf)(), T* target) { struct Closure { void (T::*pmf)(); T* target; static void thunk(void* param) { Closure* self = (Closure*) param; CALL_MEMBER(*self->target, self->pmf)(); } }; Closure closure = {pmf, target}; QMutexLocker guard(&m_mutex); m_deferredActions.append(Action(Closure::thunk, &closure, sizeof(closure))); } void update(QSBR::Context context) { QVector actions; { QMutexLocker guard(&m_mutex); Q_ASSERT(context < m_status.size()); Status& status = m_status[context]; Q_ASSERT(status.inUse); if (status.wasIdle) { return; } status.wasIdle = 1; Q_ASSERT(m_remaining > 0); if (--m_remaining > 0) { return; } onAllQuiescentStatesPassed(actions); } - for (quint64 i = 0; i < actions.size(); i++) { + for (qint32 i = 0; i < actions.size(); i++) { actions[i](); } } void flush() { // This is like saying that all contexts are quiescent, // so we can issue all actions at once. // No lock is taken. - for (quint64 i = 0; i < m_pendingActions.size(); i++) { + for (qint32 i = 0; i < m_pendingActions.size(); i++) { m_pendingActions[i](); } m_pendingActions.clear(); - for (quint64 i = 0; i < m_deferredActions.size(); i++) { + for (qint32 i = 0; i < m_deferredActions.size(); i++) { m_deferredActions[i](); } m_deferredActions.clear(); m_remaining = m_numContexts; } }; #endif // QSBR_H diff --git a/libs/image/tiles3/LockFreeMap/Util.h b/libs/image/tiles3/LockFreeMap/Util.h deleted file mode 100644 index eb9338067e..0000000000 --- a/libs/image/tiles3/LockFreeMap/Util.h +++ /dev/null @@ -1,131 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#ifndef UTIL_H -#define UTIL_H - -#include - -template -struct BestFit; - -template <> -struct BestFit { - typedef quint32 Unsigned; - typedef qint32 Signed; -}; - -template <> -struct BestFit { - typedef quint32 Unsigned; - typedef qint32 Signed; -}; - -template <> -struct BestFit { - typedef quint64 Unsigned; - typedef qint64 Signed; -}; - -template <> -struct BestFit { - typedef quint64 Unsigned; - typedef qint64 Signed; -}; - -template -struct BestFit { - typedef quint64 Unsigned; - typedef qint64 Signed; -}; - -inline quint32 roundUpPowerOf2(quint32 v) -{ - v--; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v++; - return v; -} - -inline qint32 roundUpPowerOf2(qint32 v) -{ - return (qint32) roundUpPowerOf2((quint32) v); -} - -inline quint64 roundUpPowerOf2(quint64 v) -{ - v--; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v |= v >> 32; - v++; - return v; -} - -inline qint64 roundUpPowerOf2(qint64 v) -{ - return (qint64) roundUpPowerOf2((quint64) v); -} - -inline bool isPowerOf2(quint64 v) -{ - return (v & (v - 1)) == 0; -} - -// from code.google.com/p/smhasher/wiki/MurmurHash3 -inline quint32 avalanche(quint32 h) -{ - h ^= h >> 16; - h *= 0x85ebca6b; - h ^= h >> 13; - h *= 0xc2b2ae35; - h ^= h >> 16; - return h; -} - -inline quint32 deavalanche(quint32 h) -{ - h ^= h >> 16; - h *= 0x7ed1b41d; - h ^= (h ^ (h >> 13)) >> 13; - h *= 0xa5cb9243; - h ^= h >> 16; - return h; -} - -// from code.google.com/p/smhasher/wiki/MurmurHash3 -inline quint64 avalanche(quint64 h) -{ - h ^= h >> 33; - h *= 0xff51afd7ed558ccd; - h ^= h >> 33; - h *= 0xc4ceb9fe1a85ec53; - h ^= h >> 33; - return h; -} - -inline quint64 deavalanche(quint64 h) -{ - h ^= h >> 33; - h *= 0x9cb4b2f8129337db; - h ^= h >> 33; - h *= 0x4f74430c22a54005; - h ^= h >> 33; - return h; -} - -#endif // UTIL_H diff --git a/libs/image/tiles3/tests/kis_lock_free_map_test.cpp b/libs/image/tiles3/tests/kis_lock_free_map_test.cpp index 4067c54f6c..c309c01b0e 100644 --- a/libs/image/tiles3/tests/kis_lock_free_map_test.cpp +++ b/libs/image/tiles3/tests/kis_lock_free_map_test.cpp @@ -1,123 +1,142 @@ #include "kis_lock_free_map_test.h" #include #include "kis_debug.h" #include "tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h" #define NUM_TYPES 2 // high-concurrency #define NUM_CYCLES 50000 #define NUM_THREADS 10 -typedef ConcurrentMap_Leapfrog ConcurrentMap; +typedef ConcurrentMap_Leapfrog ConcurrentMap; class StressJobLockless : public QRunnable { public: StressJobLockless(ConcurrentMap &map) : m_map(map), m_insertSum(0), m_eraseSum(0) { } qint64 insertSum() { return m_insertSum; } qint64 eraseSum() { return m_eraseSum; } protected: void run() override { QSBR::Context context = QSBR::instance().createContext(); for (int i = 1; i < NUM_CYCLES + 1; i++) { auto type = i % NUM_TYPES; switch (type) { case 0: m_eraseSum += m_map.erase(i); break; case 1: m_eraseSum += m_map.assign(i + 1, i + 1); m_insertSum += i + 1; break; } if (i % 10000 == 0) { QSBR::instance().update(context); } } QSBR::instance().destroyContext(context); } private: ConcurrentMap &m_map; qint64 m_insertSum; qint64 m_eraseSum; }; void LockfreeMapTest::testOperations() { ConcurrentMap map; qint64 totalSum = 0; for (auto i = 1; i < NUM_CYCLES + 1; i++) { totalSum += i + 1; map.assign(i, i + 1); } for (auto i = 1; i < NUM_CYCLES + 1; i++) { ConcurrentMap::Value result = map.erase(i); totalSum -= result; QVERIFY(result); QCOMPARE(i, result - 1); } QVERIFY(totalSum == 0); } void LockfreeMapTest::stressTestLockless() { QList jobsList; ConcurrentMap map; for (auto i = 0; i < NUM_THREADS; ++i) { StressJobLockless *task = new StressJobLockless(map); task->setAutoDelete(false); jobsList.append(task); } QThreadPool pool; pool.setMaxThreadCount(NUM_THREADS); QBENCHMARK { for (auto &job : jobsList) { pool.start(job); } pool.waitForDone(); } qint64 totalSum = 0; for (auto i = 0; i < NUM_THREADS; i++) { StressJobLockless *job = jobsList.takeLast(); totalSum += job->insertSum(); totalSum -= job->eraseSum(); delete job; } QVERIFY(totalSum == 0); } +void LockfreeMapTest::iteratorTest() +{ + ConcurrentMap map; + qint32 sum = 0; + for (qint32 i = 2; i < 100; ++i) { + map.assign(i, i); + sum += i; + } + + ConcurrentMap::Iterator iter(map); + qint32 testSum = 0; + while (iter.isValid()) { + testSum += iter.getValue(); + iter.next(); + } + + QVERIFY(sum == testSum); +} + QTEST_GUILESS_MAIN(LockfreeMapTest) diff --git a/libs/image/tiles3/tests/kis_lock_free_map_test.h b/libs/image/tiles3/tests/kis_lock_free_map_test.h index 8584441768..50ff9b79d6 100644 --- a/libs/image/tiles3/tests/kis_lock_free_map_test.h +++ b/libs/image/tiles3/tests/kis_lock_free_map_test.h @@ -1,15 +1,16 @@ #ifndef KIS_LOCK_FREE_MAP_TEST_H #define KIS_LOCK_FREE_MAP_TEST_H #include class LockfreeMapTest : public QObject { Q_OBJECT private Q_SLOTS: void testOperations(); void stressTestLockless(); + void iteratorTest(); }; #endif // KIS_LOCK_FREE_MAP_TEST_H