diff --git a/libs/image/3rdparty/lock_free_map/leapfrog.h b/libs/image/3rdparty/lock_free_map/leapfrog.h index 7bc95a8253..660186ccb1 100644 --- a/libs/image/3rdparty/lock_free_map/leapfrog.h +++ b/libs/image/3rdparty/lock_free_map/leapfrog.h @@ -1,573 +1,578 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef LEAPFROG_H #define LEAPFROG_H #include "map_traits.h" #include "simple_job_coordinator.h" #include "kis_assert.h" #define SANITY_CHECK template struct Leapfrog { typedef typename Map::Hash Hash; typedef typename Map::Value Value; typedef typename Map::KeyTraits KeyTraits; typedef typename Map::ValueTraits ValueTraits; static const quint64 InitialSize = 8; static const quint64 TableMigrationUnitSize = 32; static const quint64 LinearSearchLimit = 128; static const quint64 CellsInUseSample = LinearSearchLimit; Q_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links Q_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain struct Cell { Atomic hash; Atomic value; }; struct CellGroup { // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain. // Each cell in the probe chain is located within the table itself. // "deltas" determines the index of the next cell in the probe chain. // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket. // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket. // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket. Atomic deltas[8]; Cell cells[4]; }; struct Table { const quint64 sizeMask; // a power of two minus one QMutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration Table(quint64 sizeMask) : sizeMask(sizeMask) { } static Table* create(quint64 tableSize) { #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(isPowerOf2(tableSize)); KIS_ASSERT_RECOVER_NOOP(tableSize >= 4); #endif // SANITY_CHECK quint64 numGroups = tableSize >> 2; Table* table = (Table*) std::malloc(sizeof(Table) + sizeof(CellGroup) * numGroups); new (table) Table(tableSize - 1); for (quint64 i = 0; i < numGroups; i++) { CellGroup* group = table->getCellGroups() + i; for (quint64 j = 0; j < 4; j++) { group->deltas[j].storeNonatomic(0); group->deltas[j + 4].storeNonatomic(0); group->cells[j].hash.storeNonatomic(KeyTraits::NullHash); group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue)); } } return table; } void destroy() { this->Table::~Table(); std::free(this); } CellGroup* getCellGroups() const { return (CellGroup*)(this + 1); } quint64 getNumMigrationUnits() const { return sizeMask / TableMigrationUnitSize + 1; } }; class TableMigration : public SimpleJobCoordinator::Job { public: struct Source { Table* table; Atomic sourceIndex; }; Map& m_map; Table* m_destination; Atomic m_workerStatus; // number of workers + end flag Atomic m_overflowed; Atomic m_unitsRemaining; quint64 m_numSources; TableMigration(Map& map) : m_map(map) { } static TableMigration* create(Map& map, quint64 numSources) { TableMigration* migration = (TableMigration*) std::malloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); new (migration) TableMigration(map); migration->m_workerStatus.storeNonatomic(0); migration->m_overflowed.storeNonatomic(false); migration->m_unitsRemaining.storeNonatomic(0); migration->m_numSources = numSources; // Caller is responsible for filling in sources & destination return migration; } virtual ~TableMigration() override { } void destroy() { // Destroy all source tables. for (quint64 i = 0; i < m_numSources; i++) if (getSources()[i].table) getSources()[i].table->destroy(); // Delete the migration object itself. this->TableMigration::~TableMigration(); std::free(this); } Source* getSources() const { return (Source*)(this + 1); } bool migrateRange(Table* srcTable, quint64 startIdx); virtual void run() override; }; static Cell* find(Hash hash, Table* table) { #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(table); KIS_ASSERT_RECOVER_NOOP(hash != KeyTraits::NullHash); #endif // SANITY_CHECK quint64 sizeMask = table->sizeMask; // Optimistically check hashed cell even though it might belong to another bucket quint64 idx = hash & sizeMask; CellGroup* group = table->getCellGroups() + (idx >> 2); Cell* cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); if (probeHash == hash) { return cell; } else if (probeHash == KeyTraits::NullHash) { return cell = NULL; } // Follow probe chain for our bucket quint8 delta = group->deltas[idx & 3].load(Relaxed); while (delta) { idx = (idx + delta) & sizeMask; group = table->getCellGroups() + (idx >> 2); cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert, // but we don't check for it. We just follow the probe chain. if (probeHash == hash) { return cell; } delta = group->deltas[(idx & 3) + 4].load(Relaxed); } // End of probe chain, not found return NULL; } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, quint64& overflowIdx) { #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(table); KIS_ASSERT_RECOVER_NOOP(hash != KeyTraits::NullHash); #endif // SANITY_CHECK quint64 sizeMask = table->sizeMask; quint64 idx = quint64(hash); // Check hashed cell first, though it may not even belong to the bucket. CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); Hash probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { if (cell->hash.compareExchangeStrong(probeHash, hash, Relaxed)) { // There are no links to set. We're done. return InsertResult_InsertedNew; } else { // Fall through to check if it was the same hash... } } if (probeHash == hash) { return InsertResult_AlreadyFound; } // Follow the link chain for this bucket. quint64 maxIdx = idx + sizeMask; quint64 linkLevel = 0; Atomic* prevLink; for (;;) { followLink: prevLink = group->deltas + ((idx & 3) + linkLevel); linkLevel = 4; quint8 probeDelta = prevLink->load(Relaxed); if (probeDelta) { idx += probeDelta; // Check the hash for this cell. group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { // Cell was linked, but hash is not visible yet. // We could avoid this case (and guarantee it's visible) using acquire & release, but instead, // just poll until it becomes visible. do { probeHash = cell->hash.load(Acquire); } while (probeHash == KeyTraits::NullHash); } #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked #endif // SANITY_CHECK if (probeHash == hash) { return InsertResult_AlreadyFound; } } else { // Reached the end of the link chain for this bucket. // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. quint64 prevLinkIdx = idx; #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(qint64(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. #endif // SANITY_CHECK quint64 linearProbesRemaining = qMin(maxIdx - idx, quint64(LinearSearchLimit)); while (linearProbesRemaining-- > 0) { idx++; group = table->getCellGroups() + ((idx & sizeMask) >> 2); cell = group->cells + (idx & 3); probeHash = cell->hash.load(Relaxed); if (probeHash == KeyTraits::NullHash) { // It's an empty cell. Try to reserve it. if (cell->hash.compareExchangeStrong(probeHash, hash, Relaxed)) { // Success. We've reserved the cell. Link it to previous cell in same bucket. #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(probeDelta == 0); #endif // SANITY_CHECK quint8 desiredDelta = idx - prevLinkIdx; prevLink->store(desiredDelta, Relaxed); return InsertResult_InsertedNew; } else { // Fall through to check if it's the same hash... } } Hash x = (probeHash ^ hash); // Check for same hash. if (!x) { return InsertResult_AlreadyFound; } // Check for same bucket. if ((x & sizeMask) == 0) { // Attempt to set the link on behalf of the late-arriving cell. // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here, // there's no guarantee that our own link chain will be well-formed by the time this function returns. // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) quint8 desiredDelta = idx - prevLinkIdx; prevLink->store(desiredDelta, Relaxed); goto followLink; // Try to follow link chain for the bucket again. } // Continue linear search... } // Table is too full to insert. overflowIdx = idx + 1; return InsertResult_Overflow; } } } static void beginTableMigrationToSize(Map& map, Table* table, quint64 nextTableSize) { // Create new migration by DCLI. SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume(); if (job) { // new migration already exists } else { QMutexLocker guard(&table->mutex); job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { // new migration already exists (double-checked) } else { // Create new migration. TableMigration* migration = TableMigration::create(map, 1); migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); migration->getSources()[0].table = table; migration->getSources()[0].sourceIndex.storeNonatomic(0); migration->m_destination = Table::create(nextTableSize); // Publish the new migration. table->jobCoordinator.storeRelease(migration); } } } static void beginTableMigration(Map& map, Table* table, quint64 overflowIdx) { // Estimate number of cells in use based on a small sample. quint64 sizeMask = table->sizeMask; quint64 idx = overflowIdx - CellsInUseSample; quint64 inUseCells = 0; for (quint64 linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); Cell* cell = group->cells + (idx & 3); Value value = cell->value.load(Relaxed); if (value == Value(ValueTraits::Redirect)) { // Another thread kicked off the jobCoordinator. The caller will participate upon return. return; } if (value != Value(ValueTraits::NullValue)) inUseCells++; idx++; } float inUseRatio = float(inUseCells) / CellsInUseSample; float estimatedInUse = (sizeMask + 1) * inUseRatio; quint64 nextTableSize = qMax(quint64(InitialSize), roundUpPowerOf2(quint64(estimatedInUse * 2))); beginTableMigrationToSize(map, table, nextTableSize); } }; // Leapfrog template bool Leapfrog::TableMigration::migrateRange(Table* srcTable, quint64 startIdx) { quint64 srcSizeMask = srcTable->sizeMask; quint64 endIdx = qMin(startIdx + TableMigrationUnitSize, srcSizeMask + 1); // Iterate over source range. for (quint64 srcIdx = startIdx; srcIdx < endIdx; srcIdx++) { CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2); Cell* srcCell = srcGroup->cells + (srcIdx & 3); Hash srcHash; Value srcValue; // Fetch the srcHash and srcValue. for (;;) { srcHash = srcCell->hash.load(Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. break; } if (srcValue == Value(ValueTraits::NullValue)) { break; // Redirect has been placed. Break inner loop, continue outer loop. } // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. srcValue = srcCell->value.load(Relaxed); if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), Relaxed)) { break; // Redirect has been placed. Break inner loop, continue outer loop. } if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert break; } } else if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. break; } // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(srcHash != KeyTraits::NullHash); KIS_ASSERT_RECOVER_NOOP(srcValue != Value(ValueTraits::NullValue)); KIS_ASSERT_RECOVER_NOOP(srcValue != Value(ValueTraits::Redirect)); #endif // SANITY_CHECK Cell* dstCell; quint64 overflowIdx; InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx); // During migration, a hash can only exist in one place among all the source tables, // and it is only migrated by one thread. Therefore, the hash will never already exist // in the destination table: #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(result != InsertResult_AlreadyFound); #endif // SANITY_CHECK if (result == InsertResult_Overflow) { // Destination overflow. // This can happen for several reasons. For example, the source table could have // existed of all deleted cells when it overflowed, resulting in a small destination // table size, but then another thread could re-insert all the same hashes // before the migration completed. // Caller will cancel the current migration and begin a new one. return false; } // Migrate the old value to the new cell. for (;;) { // Copy srcValue to the destination. dstCell->value.store(srcValue, Relaxed); // Try to place a Redirect marker in srcValue. Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), Relaxed); #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. #endif // SANITY_CHECK if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) { // racing update was erase", uptr(srcTable), srcIdx) } break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. break; } } } // Range has been migrated successfully. return true; } template void Leapfrog::TableMigration::run() { +#ifdef SANITY_CHECK + KIS_ASSERT_RECOVER_NOOP(m_map.getGC().sanityRawPointerAccessLocked()); +#endif // SANITY_CHECK + + // Conditionally increment the shared # of workers. quint64 probeStatus = m_workerStatus.load(Relaxed); do { if (probeStatus & 1) { // End flag is already set, so do nothing. return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, Relaxed, Relaxed)); // # of workers has been incremented, and the end flag is clear. #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP((probeStatus & 1) == 0); #endif // SANITY_CHECK // Iterate over all source tables. for (quint64 s = 0; s < m_numSources; s++) { Source& source = getSources()[s]; // Loop over all migration units in this source table. for (;;) { if (m_workerStatus.load(Relaxed) & 1) { goto endMigration; } quint64 startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, Relaxed); if (startIdx >= source.table->sizeMask + 1) break; // No more migration units in this table. Try next source table. bool overflowed = !migrateRange(source.table, startIdx); if (overflowed) { // *** FAILED MIGRATION *** // TableMigration failed due to destination table overflow. // No other thread can declare the migration successful at this point, because *this* unit will never complete, // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, Relaxed); if (oldOverflowed) { // race to set m_overflowed } m_workerStatus.fetchOr(1, Relaxed); goto endMigration; } qint64 prevRemaining = m_unitsRemaining.fetchSub(1, Relaxed); #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(prevRemaining > 0); #endif // SANITY_CHECK if (prevRemaining == 1) { // *** SUCCESSFUL MIGRATION *** // That was the last chunk to migrate. m_workerStatus.fetchOr(1, Relaxed); goto endMigration; } } } endMigration: // Decrement the shared # of workers. probeStatus = m_workerStatus.fetchSub(2, AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. return; } // We're the very last worker thread. // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(probeStatus == 3); #endif // SANITY_CHECK bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point if (!overflowed) { // The migration succeeded. This is the most likely outcome. Publish the new subtree. m_map.publishTableMigration(this); // End the jobCoodinator. getSources()[0].table->jobCoordinator.end(); } else { // The migration failed due to the overflow of the destination table. Table* origTable = getSources()[0].table; QMutexLocker guard(&origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { // a new TableMigration was already started } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); // Double the destination table size. migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2); // Transfer source tables to the new migration. for (quint64 i = 0; i < m_numSources; i++) { migration->getSources()[i].table = getSources()[i].table; getSources()[i].table = NULL; migration->getSources()[i].sourceIndex.storeNonatomic(0); } migration->getSources()[m_numSources].table = m_destination; migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0); // Calculate total number of migration units to move. quint64 unitsRemaining = 0; for (quint64 s = 0; s < migration->m_numSources; s++) { unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits(); } migration->m_unitsRemaining.storeNonatomic(unitsRemaining); // Publish the new migration. origTable->jobCoordinator.storeRelease(migration); } } // We're done with this TableMigration. Queue it for GC. m_map.getGC().enqueue(&TableMigration::destroy, this, true); } #endif // LEAPFROG_H diff --git a/libs/image/3rdparty/lock_free_map/qsbr.h b/libs/image/3rdparty/lock_free_map/qsbr.h index eae7aaaf8f..912f88227e 100644 --- a/libs/image/3rdparty/lock_free_map/qsbr.h +++ b/libs/image/3rdparty/lock_free_map/qsbr.h @@ -1,122 +1,123 @@ /*------------------------------------------------------------------------ Junction: Concurrent data structures in C++ Copyright (c) 2016 Jeff Preshing Distributed under the Simplified BSD License. Original location: https://github.com/preshing/junction This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE file for more information. ------------------------------------------------------------------------*/ #ifndef QSBR_H #define QSBR_H #include #include #include #include #define CALL_MEMBER(obj, pmf) ((obj).*(pmf)) class QSBR { private: struct Action { void (*func)(void*); quint64 param[4]; // Size limit found experimentally. Verified by assert below. Action() = default; Action(void (*f)(void*), void* p, quint64 paramSize) : func(f) { KIS_ASSERT(paramSize <= sizeof(param)); // Verify size limit. memcpy(¶m, p, paramSize); } void operator()() { func(¶m); } }; QAtomicInt m_rawPointerUsers; KisLocklessStack m_pendingActions; KisLocklessStack m_migrationReclaimActions; void releasePoolSafely(KisLocklessStack *pool, bool force = false) { KisLocklessStack tmp; tmp.mergeFrom(*pool); if (tmp.isEmpty()) return; if (force || tmp.size() > 4096) { while (m_rawPointerUsers.loadAcquire()); Action action; while (tmp.pop(action)) { action(); } } else { if (!m_rawPointerUsers.loadAcquire()) { Action action; while (tmp.pop(action)) { action(); } } else { // push elements back to the source pool->mergeFrom(tmp); } } } public: template void enqueue(void (T::*pmf)(), T* target, bool migration = false) { struct Closure { void (T::*pmf)(); T* target; static void thunk(void* param) { Closure* self = (Closure*) param; CALL_MEMBER(*self->target, self->pmf)(); } }; Closure closure = {pmf, target}; if (migration) { m_migrationReclaimActions.push(Action(Closure::thunk, &closure, sizeof(closure))); } else { m_pendingActions.push(Action(Closure::thunk, &closure, sizeof(closure))); } } - void update(bool migrationInProgress) + void update() { releasePoolSafely(&m_pendingActions); - - if (!migrationInProgress) { - releasePoolSafely(&m_migrationReclaimActions); - } + releasePoolSafely(&m_migrationReclaimActions); } void flush() { releasePoolSafely(&m_pendingActions, true); releasePoolSafely(&m_migrationReclaimActions, true); } void lockRawPointerAccess() { m_rawPointerUsers.ref(); } void unlockRawPointerAccess() { m_rawPointerUsers.deref(); } + + bool sanityRawPointerAccessLocked() const { + return m_rawPointerUsers.loadAcquire(); + } }; #endif // QSBR_H diff --git a/libs/image/tiles3/kis_tile_hash_table2.h b/libs/image/tiles3/kis_tile_hash_table2.h index dbe1d08584..d2e119eb5c 100644 --- a/libs/image/tiles3/kis_tile_hash_table2.h +++ b/libs/image/tiles3/kis_tile_hash_table2.h @@ -1,482 +1,482 @@ /* * Copyright (c) 2018 Andrey Kamakin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef KIS_TILEHASHTABLE_2_H #define KIS_TILEHASHTABLE_2_H #include "kis_shared.h" #include "kis_shared_ptr.h" #include "3rdparty/lock_free_map/concurrent_map.h" #include "kis_tile.h" #include "kis_debug.h" #define SANITY_CHECK /** * This is a template for a hash table that stores tiles (or some other * objects resembling tiles). Actually, this object should only have * col()/row() methods and be able to answer notifyDetachedFromDataManager() requests to * be stored here. It is used in KisTiledDataManager and * KisMementoManager. * * How to use: * 1) each hash must be unique, otherwise tiles would rewrite each-other * 2) 0 key is reserved, so can't be used * 3) col and row must be less than 0x7FFF to guarantee uniqueness of hash for each pair */ template class KisTileHashTableIteratorTraits2; template class KisTileHashTableTraits2 { static constexpr bool isInherited = std::is_convertible::value; Q_STATIC_ASSERT_X(isInherited, "Template must inherit KisShared"); public: typedef T TileType; typedef KisSharedPtr TileTypeSP; typedef KisWeakSharedPtr TileTypeWSP; KisTileHashTableTraits2(KisMementoManager *mm); KisTileHashTableTraits2(const KisTileHashTableTraits2 &ht, KisMementoManager *mm); ~KisTileHashTableTraits2(); bool isEmpty() { return !m_numTiles.load(); } bool tileExists(qint32 col, qint32 row); /** * Returns a tile in position (col,row). If no tile exists, * returns null. * \param col column of the tile * \param row row of the tile */ TileTypeSP getExistingTile(qint32 col, qint32 row); /** * Returns a tile in position (col,row). If no tile exists, * creates a new one, attaches it to the list and returns. * \param col column of the tile * \param row row of the tile * \param newTile out-parameter, returns true if a new tile * was created */ TileTypeSP getTileLazy(qint32 col, qint32 row, bool& newTile); /** * Returns a tile in position (col,row). If no tile exists, * creates nothing, but returns shared default tile object * of the table. Be careful, this object has column and row * parameters set to (qint32_MIN, qint32_MIN). * \param col column of the tile * \param row row of the tile * \param existingTile returns true if the tile actually exists in the table * and it is not a lazily created default wrapper tile */ TileTypeSP getReadOnlyTileLazy(qint32 col, qint32 row, bool &existingTile); void addTile(TileTypeSP tile); bool deleteTile(TileTypeSP tile); bool deleteTile(qint32 col, qint32 row); void clear(); void setDefaultTileData(KisTileData *defaultTileData); KisTileData* defaultTileData(); qint32 numTiles() { return m_numTiles.load(); } void debugPrintInfo(); void debugMaxListLength(qint32 &min, qint32 &max); friend class KisTileHashTableIteratorTraits2; private: struct MemoryReclaimer { MemoryReclaimer(TileType *data) : d(data) {} void destroy() { TileTypeSP::deref(reinterpret_cast(this), d); delete this; } private: TileType *d; }; inline quint32 calculateHash(qint32 col, qint32 row) { #ifdef SANITY_CHECK KIS_ASSERT_RECOVER_NOOP(row < 0x7FFF && col < 0x7FFF) #endif // SANITY_CHECK if (col == 0 && row == 0) { col = 0x7FFF; row = 0x7FFF; } return ((static_cast(row) << 16) | (static_cast(col) & 0xFFFF)); } inline void insert(quint32 idx, TileTypeSP item) { TileTypeSP::ref(&item, item.data()); TileType *tile = 0; { QReadLocker locker(&m_iteratorLock); m_map.getGC().lockRawPointerAccess(); tile = m_map.assign(idx, item.data()); } if (tile) { tile->notifyDeadWithoutDetaching(); m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(tile)); } else { m_numTiles.fetchAndAddRelaxed(1); } m_map.getGC().unlockRawPointerAccess(); - m_map.getGC().update(m_map.migrationInProcess()); + m_map.getGC().update(); } inline bool erase(quint32 idx) { m_map.getGC().lockRawPointerAccess(); bool wasDeleted = false; TileType *tile = m_map.erase(idx); if (tile) { tile->notifyDetachedFromDataManager(); wasDeleted = true; m_numTiles.fetchAndSubRelaxed(1); m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(tile)); } m_map.getGC().unlockRawPointerAccess(); - m_map.getGC().update(m_map.migrationInProcess()); + m_map.getGC().update(); return wasDeleted; } private: typedef ConcurrentMap LockFreeTileMap; typedef typename LockFreeTileMap::Mutator LockFreeTileMapMutator; mutable LockFreeTileMap m_map; /** * We still need something to guard changes in m_defaultTileData, * otherwise there will be concurrent read/writes, resulting in broken memory. */ QReadWriteLock m_defaultPixelDataLock; mutable QReadWriteLock m_iteratorLock; QAtomicInt m_numTiles; KisTileData *m_defaultTileData; KisMementoManager *m_mementoManager; }; template class KisTileHashTableIteratorTraits2 { public: typedef T TileType; typedef KisSharedPtr TileTypeSP; typedef typename ConcurrentMap::Iterator Iterator; KisTileHashTableIteratorTraits2(KisTileHashTableTraits2 *ht) : m_ht(ht) { m_ht->m_iteratorLock.lockForWrite(); m_iter.setMap(m_ht->m_map); } ~KisTileHashTableIteratorTraits2() { m_ht->m_iteratorLock.unlock(); } void next() { m_iter.next(); } TileTypeSP tile() const { return TileTypeSP(m_iter.getValue()); } bool isDone() const { return !m_iter.isValid(); } void deleteCurrent() { m_ht->erase(m_iter.getKey()); next(); } void moveCurrentToHashTable(KisTileHashTableTraits2 *newHashTable) { TileTypeSP tile = m_iter.getValue(); next(); quint32 idx = m_ht->calculateHash(tile->col(), tile->row()); m_ht->erase(idx); newHashTable->insert(idx, tile); } private: KisTileHashTableTraits2 *m_ht; Iterator m_iter; }; template KisTileHashTableTraits2::KisTileHashTableTraits2(KisMementoManager *mm) : m_numTiles(0), m_defaultTileData(0), m_mementoManager(mm) { } template KisTileHashTableTraits2::KisTileHashTableTraits2(const KisTileHashTableTraits2 &ht, KisMementoManager *mm) : KisTileHashTableTraits2(mm) { setDefaultTileData(ht.m_defaultTileData); QWriteLocker locker(&ht.m_iteratorLock); typename ConcurrentMap::Iterator iter(ht.m_map); while (iter.isValid()) { TileTypeSP tile = new TileType(*iter.getValue(), m_mementoManager); insert(iter.getKey(), tile); iter.next(); } } template KisTileHashTableTraits2::~KisTileHashTableTraits2() { clear(); setDefaultTileData(0); } template bool KisTileHashTableTraits2::tileExists(qint32 col, qint32 row) { return getExistingTile(col, row); } template typename KisTileHashTableTraits2::TileTypeSP KisTileHashTableTraits2::getExistingTile(qint32 col, qint32 row) { quint32 idx = calculateHash(col, row); m_map.getGC().lockRawPointerAccess(); TileTypeSP tile = m_map.get(idx); m_map.getGC().unlockRawPointerAccess(); - m_map.getGC().update(m_map.migrationInProcess()); + m_map.getGC().update(); return tile; } template typename KisTileHashTableTraits2::TileTypeSP KisTileHashTableTraits2::getTileLazy(qint32 col, qint32 row, bool &newTile) { newTile = false; quint32 idx = calculateHash(col, row); // we are going to assign a raw-pointer tile from the table // to a shared pointer... m_map.getGC().lockRawPointerAccess(); TileTypeSP tile = m_map.get(idx); while (!tile) { // we shouldn't try to aquire **any** lock with // raw-pointer lock held m_map.getGC().unlockRawPointerAccess(); { QReadLocker locker(&m_defaultPixelDataLock); tile = new TileType(col, row, m_defaultTileData, 0); } TileTypeSP::ref(&tile, tile.data()); TileType *discardedTile = 0; // iterator lock should be taken **before** // the pointers are locked m_iteratorLock.lockForRead(); // and now lock raw-pointers again m_map.getGC().lockRawPointerAccess(); // mutator might have become invalidated when // we released raw pointers, so we need to reinitialize it LockFreeTileMapMutator mutator = m_map.insertOrFind(idx); if (!mutator.getValue()) { discardedTile = mutator.exchangeValue(tile.data()); } else { discardedTile = tile.data(); } m_iteratorLock.unlock(); if (discardedTile) { // we've got our tile back, it didn't manage to // get into the table. Now release the allocated // tile and push TO/GA switch. tile = 0; discardedTile->notifyDeadWithoutDetaching(); m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(discardedTile)); tile = m_map.get(idx); continue; } else { newTile = true; m_numTiles.fetchAndAddRelaxed(1); tile->notifyAttachedToDataManager(m_mementoManager); } } m_map.getGC().unlockRawPointerAccess(); - m_map.getGC().update(m_map.migrationInProcess()); + m_map.getGC().update(); return tile; } template typename KisTileHashTableTraits2::TileTypeSP KisTileHashTableTraits2::getReadOnlyTileLazy(qint32 col, qint32 row, bool &existingTile) { quint32 idx = calculateHash(col, row); m_map.getGC().lockRawPointerAccess(); TileTypeSP tile = m_map.get(idx); m_map.getGC().unlockRawPointerAccess(); existingTile = tile; if (!existingTile) { QReadLocker locker(&m_defaultPixelDataLock); tile = new TileType(col, row, m_defaultTileData, 0); } - m_map.getGC().update(m_map.migrationInProcess()); + m_map.getGC().update(); return tile; } template void KisTileHashTableTraits2::addTile(TileTypeSP tile) { quint32 idx = calculateHash(tile->col(), tile->row()); insert(idx, tile); } template bool KisTileHashTableTraits2::deleteTile(TileTypeSP tile) { return deleteTile(tile->col(), tile->row()); } template bool KisTileHashTableTraits2::deleteTile(qint32 col, qint32 row) { quint32 idx = calculateHash(col, row); return erase(idx); } template void KisTileHashTableTraits2::clear() { { QWriteLocker locker(&m_iteratorLock); typename ConcurrentMap::Iterator iter(m_map); TileType *tile = 0; while (iter.isValid()) { m_map.getGC().lockRawPointerAccess(); tile = m_map.erase(iter.getKey()); if (tile) { tile->notifyDetachedFromDataManager(); m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(tile)); } m_map.getGC().unlockRawPointerAccess(); iter.next(); } m_numTiles.store(0); } // garbage collection must **not** be run with locks held - m_map.getGC().update(false); + m_map.getGC().update(); } template inline void KisTileHashTableTraits2::setDefaultTileData(KisTileData *defaultTileData) { QWriteLocker locker(&m_defaultPixelDataLock); if (m_defaultTileData) { m_defaultTileData->release(); m_defaultTileData = 0; } if (defaultTileData) { defaultTileData->acquire(); m_defaultTileData = defaultTileData; } } template inline KisTileData* KisTileHashTableTraits2::defaultTileData() { QReadLocker locker(&m_defaultPixelDataLock); return m_defaultTileData; } template void KisTileHashTableTraits2::debugPrintInfo() { } template void KisTileHashTableTraits2::debugMaxListLength(qint32 &/*min*/, qint32 &/*max*/) { } typedef KisTileHashTableTraits2 KisTileHashTable; typedef KisTileHashTableIteratorTraits2 KisTileHashTableIterator; typedef KisTileHashTableIteratorTraits2 KisTileHashTableConstIterator; #endif // KIS_TILEHASHTABLE_2_H