From 70d8ddf8b2dadf79d5b90901ef0bc4fbe424e2da Mon Sep 17 00:00:00 2001 From: dongyang geng Date: Sat, 27 Dec 2025 20:17:25 +0800 Subject: [PATCH 1/3] feat: epoch+cow+lockFreeList --- cpp/pixels-retina/CMakeLists.txt | 1 + cpp/pixels-retina/include/EpochManager.h | 111 ++++++++++++ cpp/pixels-retina/include/RGVisibility.h | 8 - cpp/pixels-retina/include/TileVisibility.h | 43 ++++- cpp/pixels-retina/lib/EpochManager.cpp | 92 ++++++++++ cpp/pixels-retina/lib/RGVisibility.cpp | 82 +-------- cpp/pixels-retina/lib/TileVisibility.cpp | 194 ++++++++++++++++----- 7 files changed, 399 insertions(+), 132 deletions(-) create mode 100644 cpp/pixels-retina/include/EpochManager.h create mode 100644 cpp/pixels-retina/lib/EpochManager.cpp diff --git a/cpp/pixels-retina/CMakeLists.txt b/cpp/pixels-retina/CMakeLists.txt index 7065d0e6b2..7438eaab2c 100644 --- a/cpp/pixels-retina/CMakeLists.txt +++ b/cpp/pixels-retina/CMakeLists.txt @@ -38,6 +38,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include) # Source files set(SOURCES + lib/EpochManager.cpp lib/TileVisibility.cpp lib/RGVisibility.cpp lib/RGVisibilityJni.cpp diff --git a/cpp/pixels-retina/include/EpochManager.h b/cpp/pixels-retina/include/EpochManager.h new file mode 100644 index 0000000000..9a701599f0 --- /dev/null +++ b/cpp/pixels-retina/include/EpochManager.h @@ -0,0 +1,111 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +#ifndef PIXELS_RETINA_EPOCH_MANAGER_H +#define PIXELS_RETINA_EPOCH_MANAGER_H + +#include +#include +#include + +/** + * EpochManager - A global epoch-based memory reclamation system + * + * This manager allows safe deferred deletion of objects in a concurrent + * environment using epoch-based reclamation. Threads announce their presence + * in an epoch, and objects can only be reclaimed when all threads have + * advanced past the epoch in which the object was retired. + */ +class EpochManager { +public: + static EpochManager& getInstance() { + static EpochManager instance; + return instance; + } + + /** + * Enter the critical section. Thread announces it's participating in + * the current epoch. + */ + void enterEpoch(); + + /** + * Exit the critical section. Thread leaves the epoch. + */ + void exitEpoch(); + + /** + * Advance the global epoch counter. Should be called by GC operations. + * Returns the new epoch value. + */ + uint64_t advanceEpoch(); + + /** + * Check if an object retired at the given epoch can be safely reclaimed. + * An object can be reclaimed if all threads have advanced past its retire epoch. + */ + bool canReclaim(uint64_t retireEpoch) const; + + /** + * Get the current global epoch. + */ + uint64_t getCurrentEpoch() const { + return globalEpoch.load(std::memory_order_acquire); + } + +private: + EpochManager() : globalEpoch(1) {} + ~EpochManager(); + + EpochManager(const EpochManager&) = delete; + EpochManager& operator=(const EpochManager&) = delete; + + struct ThreadInfo { + std::atomic localEpoch{0}; // 0 means not in critical section + std::atomic active{true}; + std::thread::id threadId; + ThreadInfo* next{nullptr}; + }; + + std::atomic globalEpoch; + std::atomic threadListHead{nullptr}; + std::mutex threadListMutex; // Protects thread list modifications + + ThreadInfo* getOrCreateThreadInfo(); + static thread_local ThreadInfo* tlsThreadInfo; +}; + +/** + * RAII helper for epoch protection + */ +class EpochGuard { +public: + EpochGuard() { + EpochManager::getInstance().enterEpoch(); + } + + ~EpochGuard() { + EpochManager::getInstance().exitEpoch(); + } + + EpochGuard(const EpochGuard&) = delete; + EpochGuard& operator=(const EpochGuard&) = delete; +}; + +#endif // PIXELS_RETINA_EPOCH_MANAGER_H diff --git a/cpp/pixels-retina/include/RGVisibility.h b/cpp/pixels-retina/include/RGVisibility.h index af6e9080c7..88867c52ec 100644 --- a/cpp/pixels-retina/include/RGVisibility.h +++ b/cpp/pixels-retina/include/RGVisibility.h @@ -40,20 +40,12 @@ class RGVisibility { private: static constexpr uint32_t VISIBILITY_RECORD_CAPACITY = 256; - static constexpr uint32_t MAX_ACCESS_COUNT = 0x007FFFFF; - static constexpr uint32_t GC_MASK = 0xFF000000; - static constexpr uint32_t ACCESS_MASK = 0x00FFFFFF; - static constexpr uint32_t ACCESS_INC = 0x00000001; static constexpr uint32_t BITMAP_SIZE_PER_TILE_VISIBILITY = 4; - static constexpr uint32_t RG_READ_LEASE_MS = 100; TileVisibility* tileVisibilities; const uint64_t tileCount; - std::atomic flag; // high 1 byte is the gc flag, low 3 bytes are the access count TileVisibility* getTileVisibility(uint32_t rowId) const; - void beginRGAccess(); - void endRGAccess(); }; #endif //RG_VISIBILITY_H diff --git a/cpp/pixels-retina/include/TileVisibility.h b/cpp/pixels-retina/include/TileVisibility.h index a6af5bee47..2d9b71db46 100644 --- a/cpp/pixels-retina/include/TileVisibility.h +++ b/cpp/pixels-retina/include/TileVisibility.h @@ -29,6 +29,7 @@ #include #include #include +#include inline uint64_t makeDeleteIndex(uint8_t rowId, uint64_t ts) { return (static_cast(rowId) << 56 | (ts & 0x00FFFFFFFFFFFFFFULL)); @@ -48,6 +49,41 @@ struct DeleteIndexBlock { std::atomic next{nullptr}; }; +/** + * VersionedData - A versioned snapshot of the base state + * Used for Copy-on-Write during garbage collection + * IMPORTANT: head is part of the version to ensure atomic visibility + */ +struct VersionedData { + uint64_t baseBitmap[4]; + uint64_t baseTimestamp; + DeleteIndexBlock* head; // Delete chain head, part of the version + + VersionedData() : baseTimestamp(0), head(nullptr) { + baseBitmap[0] = baseBitmap[1] = baseBitmap[2] = baseBitmap[3] = 0; + } + + VersionedData(uint64_t ts, const uint64_t bitmap[4], DeleteIndexBlock* h) + : baseTimestamp(ts), head(h) { + baseBitmap[0] = bitmap[0]; + baseBitmap[1] = bitmap[1]; + baseBitmap[2] = bitmap[2]; + baseBitmap[3] = bitmap[3]; + } +}; + +/** + * RetiredVersion - Tracks a retired version for epoch-based reclamation + */ +struct RetiredVersion { + VersionedData* data; + DeleteIndexBlock* blocksToDelete; // Head of the chain to delete + uint64_t retireEpoch; + + RetiredVersion(VersionedData* d, DeleteIndexBlock* b, uint64_t e) + : data(d), blocksToDelete(b), retireEpoch(e) {} +}; + class TileVisibility { public: TileVisibility(); @@ -61,11 +97,12 @@ class TileVisibility { TileVisibility(const TileVisibility &) = delete; TileVisibility &operator=(const TileVisibility &) = delete; - uint64_t baseBitmap[4]; - uint64_t baseTimestamp; - std::atomic head; + void reclaimRetiredVersions(); + + std::atomic currentVersion; std::atomic tail; std::atomic tailUsed; + std::vector retired; // Protected by GC (single writer) }; #endif // PIXELS_RETINA_TILE_VISIBILITY_H diff --git a/cpp/pixels-retina/lib/EpochManager.cpp b/cpp/pixels-retina/lib/EpochManager.cpp new file mode 100644 index 0000000000..bfe583f467 --- /dev/null +++ b/cpp/pixels-retina/lib/EpochManager.cpp @@ -0,0 +1,92 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +#include "EpochManager.h" + +// Thread-local storage for thread info +thread_local EpochManager::ThreadInfo* EpochManager::tlsThreadInfo = nullptr; + +EpochManager::~EpochManager() { + ThreadInfo* current = threadListHead.load(std::memory_order_acquire); + while (current) { + ThreadInfo* next = current->next; + delete current; + current = next; + } +} + +EpochManager::ThreadInfo* EpochManager::getOrCreateThreadInfo() { + if (tlsThreadInfo) { + return tlsThreadInfo; + } + + // Create new thread info + ThreadInfo* newInfo = new ThreadInfo(); + newInfo->threadId = std::this_thread::get_id(); + + // Add to global list + std::lock_guard lock(threadListMutex); + newInfo->next = threadListHead.load(std::memory_order_relaxed); + threadListHead.store(newInfo, std::memory_order_release); + + tlsThreadInfo = newInfo; + return newInfo; +} + +void EpochManager::enterEpoch() { + ThreadInfo* info = getOrCreateThreadInfo(); + uint64_t currentEpoch = globalEpoch.load(std::memory_order_acquire); + info->localEpoch.store(currentEpoch, std::memory_order_release); +} + +void EpochManager::exitEpoch() { + if (tlsThreadInfo) { + tlsThreadInfo->localEpoch.store(0, std::memory_order_release); + } +} + +uint64_t EpochManager::advanceEpoch() { + return globalEpoch.fetch_add(1, std::memory_order_acq_rel) + 1; +} + +bool EpochManager::canReclaim(uint64_t retireEpoch) const { + // Scan all threads to find the minimum active epoch + ThreadInfo* current = threadListHead.load(std::memory_order_acquire); + + while (current) { + if (!current->active.load(std::memory_order_acquire)) { + current = current->next; + continue; + } + + uint64_t localEpoch = current->localEpoch.load(std::memory_order_acquire); + + // localEpoch == 0 means the thread is not in critical section, skip it + if (localEpoch != 0 && localEpoch <= retireEpoch) { + // Found a thread still in or before the retire epoch + return false; + } + + current = current->next; + } + + // All threads have advanced past the retire epoch + return true; +} diff --git a/cpp/pixels-retina/lib/RGVisibility.cpp b/cpp/pixels-retina/lib/RGVisibility.cpp index 8ce8db0099..84925c0040 100644 --- a/cpp/pixels-retina/lib/RGVisibility.cpp +++ b/cpp/pixels-retina/lib/RGVisibility.cpp @@ -24,7 +24,6 @@ RGVisibility::RGVisibility(uint64_t rgRecordNum) : tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) { - flag.store(0, std::memory_order_relaxed); void* rawMemory = operator new[](tileCount * sizeof(TileVisibility)); tileVisibilities = static_cast(rawMemory); for (uint64_t i = 0; i < tileCount; ++i) { @@ -34,7 +33,6 @@ RGVisibility::RGVisibility(uint64_t rgRecordNum) RGVisibility::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector& initialBitmap) : tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) { - flag.store(0, std::memory_order_relaxed); void* rawMemory = operator new[](tileCount * sizeof(TileVisibility)); tileVisibilities = static_cast(rawMemory); @@ -59,61 +57,11 @@ RGVisibility::~RGVisibility() { operator delete[](tileVisibilities); } -void RGVisibility::beginRGAccess() { - while (true) { - uint32_t v = flag.load(std::memory_order_acquire); - uint32_t accessCount = v & ACCESS_MASK; - - if (accessCount >= MAX_ACCESS_COUNT) { - throw std::runtime_error("Reaches the max concurrent access count."); - } - - if ((v & GC_MASK) > 0 || - !flag.compare_exchange_strong(v, v + ACCESS_INC, std::memory_order_acq_rel)) { - // We failed to get gc lock or increase access count. - if ((v & GC_MASK) > 0) { - // if there is an existing gc, sleep for 10ms. - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - continue; - } - break; - } -} - -void RGVisibility::endRGAccess() { - uint32_t v = flag.load(std::memory_order_acquire); - while((v & ACCESS_MASK) > 0) { - if (flag.compare_exchange_strong(v, v - ACCESS_INC, std::memory_order_acq_rel)) { - break; - } - v = flag.load(std::memory_order_acquire); - } -} - void RGVisibility::collectRGGarbage(uint64_t timestamp) { - // Set the gc flag. - flag.store(flag.load(std::memory_order_acquire) | GC_MASK, std::memory_order_release); - - // Wait for all access to end. - while (true) { - uint32_t v = flag.load(std::memory_order_acquire); - if ((v & ACCESS_MASK) == 0) { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - - assert((flag.load(std::memory_order_acquire) & GC_MASK) > 0); - assert((flag.load(std::memory_order_acquire) & ACCESS_MASK) == 0); - - // Garbage collect. + // TileVisibility::collectTileGarbage uses COW + Epoch, so it's safe to call concurrently for (uint64_t i = 0; i < tileCount; i++) { tileVisibilities[i].collectTileGarbage(timestamp); } - - // Clear the gc flag. - flag.store(flag.load(std::memory_order_acquire) & ~GC_MASK, std::memory_order_release); } TileVisibility* RGVisibility::getTileVisibility(uint32_t rowId) const { @@ -125,34 +73,20 @@ TileVisibility* RGVisibility::getTileVisibility(uint32_t rowId) const { } void RGVisibility::deleteRGRecord(uint32_t rowId, uint64_t timestamp) { - try { - beginRGAccess(); - TileVisibility* tileVisibility = getTileVisibility(rowId); - tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp); - endRGAccess(); - } - catch (const std::runtime_error& e) { - endRGAccess(); - throw std::runtime_error("Failed to delete record: " + std::string(e.what())); - } + // TileVisibility::deleteTileRecord is lock-free and concurrent-safe + TileVisibility* tileVisibility = getTileVisibility(rowId); + tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp); } uint64_t* RGVisibility::getRGVisibilityBitmap(uint64_t timestamp) { - beginRGAccess(); + // TileVisibility::getTileVisibilityBitmap uses Epoch protection internally uint64_t* bitmap = new uint64_t[tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY]; memset(bitmap, 0, tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY * sizeof(uint64_t)); - try { - for (uint64_t i = 0; i < tileCount; i++) { - tileVisibilities[i].getTileVisibilityBitmap(timestamp, bitmap + i * BITMAP_SIZE_PER_TILE_VISIBILITY); - } - endRGAccess(); - return bitmap; - } catch (const std::runtime_error& e) { - delete[] bitmap; - endRGAccess(); - throw std::runtime_error("Failed to get visibility bitmap: " + std::string(e.what())); + for (uint64_t i = 0; i < tileCount; i++) { + tileVisibilities[i].getTileVisibilityBitmap(timestamp, bitmap + i * BITMAP_SIZE_PER_TILE_VISIBILITY); } + return bitmap; } uint64_t RGVisibility::getBitmapSize() const { diff --git a/cpp/pixels-retina/lib/TileVisibility.cpp b/cpp/pixels-retina/lib/TileVisibility.cpp index a351314876..5b2ac7491b 100644 --- a/cpp/pixels-retina/lib/TileVisibility.cpp +++ b/cpp/pixels-retina/lib/TileVisibility.cpp @@ -19,32 +19,51 @@ */ #include "TileVisibility.h" +#include "EpochManager.h" #include #include #include -TileVisibility::TileVisibility() : baseTimestamp(0UL) { - memset(baseBitmap, 0, 4 * sizeof(uint64_t)); - head.store(nullptr, std::memory_order_release); +TileVisibility::TileVisibility() { + VersionedData* initialVersion = new VersionedData(); + currentVersion.store(initialVersion, std::memory_order_release); tail.store(nullptr, std::memory_order_release); tailUsed.store(0, std::memory_order_release); } -TileVisibility::TileVisibility(uint64_t ts, const uint64_t bitmap[4]) - : baseTimestamp(ts) { - memcpy(baseBitmap, bitmap, 4 * sizeof(uint64_t)); - head.store(nullptr, std::memory_order_release); +TileVisibility::TileVisibility(uint64_t ts, const uint64_t bitmap[4]) { + VersionedData* initialVersion = new VersionedData(ts, bitmap, nullptr); + currentVersion.store(initialVersion, std::memory_order_release); tail.store(nullptr, std::memory_order_release); + tailUsed.store(0, std::memory_order_release); } TileVisibility::~TileVisibility() { - DeleteIndexBlock *blk = head.load(std::memory_order_acquire); - while (blk) { - DeleteIndexBlock *next = blk->next.load(std::memory_order_acquire); - delete blk; - blk = next; + // Clean up current version and its delete chain + VersionedData* ver = currentVersion.load(std::memory_order_acquire); + if (ver) { + DeleteIndexBlock *blk = ver->head; + while (blk) { + DeleteIndexBlock *next = blk->next.load(std::memory_order_acquire); + delete blk; + blk = next; + } + delete ver; + } + + // Clean up retired versions and their delete chains + for (auto& retired : this->retired) { + if (retired.data) { + delete retired.data; + } + DeleteIndexBlock* blk = retired.blocksToDelete; + while (blk) { + DeleteIndexBlock* next = blk->next.load(std::memory_order_acquire); + delete blk; + blk = next; + } } } @@ -52,12 +71,7 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) { uint64_t item = makeDeleteIndex(rowId, ts); while (true) { DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire); - if (!curTail) { // empty list - /** - * Issue: There is a delay in reading. - * Reads are judged from the head, and if the head pointer is - * not changed in time, the latest data cannot be read. - */ + if (!curTail) { // empty list - need to create first block and update version DeleteIndexBlock *newBlk = new DeleteIndexBlock(); newBlk->items[0] = item; DeleteIndexBlock *expectedTail = nullptr; @@ -67,9 +81,23 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) { delete newBlk; continue; } - head.store(newBlk, std::memory_order_release); - tailUsed.store(1, std::memory_order_release); - return; + + // COW: Create new version with the new head + VersionedData* oldVer = currentVersion.load(std::memory_order_acquire); + VersionedData* newVer = new VersionedData(oldVer->baseTimestamp, oldVer->baseBitmap, newBlk); + + if (currentVersion.compare_exchange_strong(oldVer, newVer, std::memory_order_acq_rel)) { + // Success: retire old version (no chain to delete since head was nullptr) + delete oldVer; + tailUsed.store(1, std::memory_order_release); + return; + } else { + // CAS failed, retry from beginning + delete newVer; + tail.store(nullptr, std::memory_order_release); + delete newBlk; + continue; + } } else { size_t pos = tailUsed.load(std::memory_order_acquire); if (pos < DeleteIndexBlock::BLOCK_CAPACITY) { @@ -136,15 +164,21 @@ inline void process_bitmap_block_256(const DeleteIndexBlock *blk, } void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4]) const { - if (ts < baseTimestamp) { + // Enter epoch protection + EpochGuard guard; + + // Load current version under epoch protection + VersionedData* ver = currentVersion.load(std::memory_order_acquire); + + if (ts < ver->baseTimestamp) { throw std::runtime_error("need to read checkpoint from disk"); } - std::memcpy(outBitmap, baseBitmap, 4 * sizeof(uint64_t)); - if (ts == baseTimestamp) { + std::memcpy(outBitmap, ver->baseBitmap, 4 * sizeof(uint64_t)); + if (ts == ver->baseTimestamp) { return; } - DeleteIndexBlock *blk = head.load(std::memory_order_acquire); + DeleteIndexBlock *blk = ver->head; #ifdef RETINA_SIMD const __m256i signBit = _mm256_set1_epi64x(0x8000000000000000ULL); const __m256i vThrFlip = _mm256_xor_si256(_mm256_set1_epi64x(ts), signBit); @@ -194,16 +228,17 @@ void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4]) } void TileVisibility::collectTileGarbage(uint64_t ts) { - // The upper layers have ensured that there are no reads or writes at this point - // so we can safely delete the records - - if (ts <= baseTimestamp) { + // Load old version + VersionedData* oldVer = currentVersion.load(std::memory_order_acquire); + + if (ts <= oldVer->baseTimestamp) { return; } - DeleteIndexBlock *blk = head.load(std::memory_order_acquire); + // Find the last block that should be compacted + DeleteIndexBlock *blk = oldVer->head; DeleteIndexBlock *lastFullBlk = nullptr; - uint64_t newBaseTimestamp = baseTimestamp; + uint64_t newBaseTimestamp = oldVer->baseTimestamp; while (blk) { size_t count = (blk == tail.load(std::memory_order_acquire)) @@ -225,26 +260,91 @@ void TileVisibility::collectTileGarbage(uint64_t ts) { blk = blk->next.load(std::memory_order_acquire); } - if (lastFullBlk) { - getTileVisibilityBitmap(ts, baseBitmap); - baseTimestamp = newBaseTimestamp; - - DeleteIndexBlock *current = head.load(std::memory_order_acquire); - DeleteIndexBlock *newHead = - lastFullBlk->next.load(std::memory_order_acquire); - - head.store(newHead, std::memory_order_release); + if (!lastFullBlk) { + // Nothing to compact + return; + } - DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire); + // Create new version with Copy-on-Write + // Manually compute the new base bitmap from oldVer + uint64_t newBaseBitmap[4]; + std::memcpy(newBaseBitmap, oldVer->baseBitmap, 4 * sizeof(uint64_t)); + + // Apply deletes from oldVer->head up to lastFullBlk + blk = oldVer->head; + while (blk) { + size_t count = (blk == lastFullBlk) + ? ((blk == tail.load(std::memory_order_acquire)) + ? tailUsed.load(std::memory_order_acquire) + : DeleteIndexBlock::BLOCK_CAPACITY) + : DeleteIndexBlock::BLOCK_CAPACITY; + + for (size_t i = 0; i < count; i++) { + uint64_t item = blk->items[i]; + uint64_t delTs = extractTimestamp(item); + if (delTs <= ts) { + SET_BITMAP_BIT(newBaseBitmap, extractRowId(item)); + } + } + + if (blk == lastFullBlk) { + break; + } + blk = blk->next.load(std::memory_order_acquire); + } + + // Get new head and break the chain to avoid double-free + DeleteIndexBlock* newHead = lastFullBlk->next.load(std::memory_order_acquire); + lastFullBlk->next.store(nullptr, std::memory_order_release); + + // Create new version with new head - this is the atomic COW update + VersionedData* newVer = new VersionedData(newBaseTimestamp, newBaseBitmap, newHead); + + // CAS to install new version atomically + if (currentVersion.compare_exchange_strong(oldVer, newVer, + std::memory_order_acq_rel)) { + // Successfully updated + // Retire old version and its delete chain + uint64_t retireEpoch = EpochManager::getInstance().advanceEpoch(); + retired.emplace_back(oldVer, oldVer->head, retireEpoch); + + // Update tail if needed (if all blocks were compacted) if (!newHead) { - tail.store(newHead, std::memory_order_release); + tail.store(nullptr, std::memory_order_release); + tailUsed.store(0, std::memory_order_release); } + + // Try to reclaim retired versions + reclaimRetiredVersions(); + } else { + // CAS failed, another GC happened concurrently + // Restore the chain link + lastFullBlk->next.store(newHead, std::memory_order_release); + delete newVer; + } +} - while (current != lastFullBlk->next.load(std::memory_order_acquire)) { - DeleteIndexBlock *next = current->next.load( - std::memory_order_acquire); - delete current; - current = next; +void TileVisibility::reclaimRetiredVersions() { + // Remove retired versions that can be safely reclaimed + auto it = retired.begin(); + while (it != retired.end()) { + if (EpochManager::getInstance().canReclaim(it->retireEpoch)) { + // Safe to delete + if (it->data) { + delete it->data; + } + + // Delete the chain of blocks + DeleteIndexBlock* blk = it->blocksToDelete; + while (blk) { + DeleteIndexBlock* next = blk->next.load(std::memory_order_acquire); + delete blk; + blk = next; + } + + it = retired.erase(it); + } else { + ++it; } } } From dc2b1380c58a0395a2aef9b728e9963f1f853c34 Mon Sep 17 00:00:00 2001 From: dongyang geng Date: Sat, 27 Dec 2025 20:29:20 +0800 Subject: [PATCH 2/3] feat: opt cas --- cpp/pixels-retina/lib/TileVisibility.cpp | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/cpp/pixels-retina/lib/TileVisibility.cpp b/cpp/pixels-retina/lib/TileVisibility.cpp index 5b2ac7491b..e58671f2d1 100644 --- a/cpp/pixels-retina/lib/TileVisibility.cpp +++ b/cpp/pixels-retina/lib/TileVisibility.cpp @@ -77,7 +77,8 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) { DeleteIndexBlock *expectedTail = nullptr; if (!tail.compare_exchange_strong(expectedTail, newBlk, - std::memory_order_acq_rel)) { + std::memory_order_release, + std::memory_order_relaxed)) { delete newBlk; continue; } @@ -101,7 +102,9 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) { } else { size_t pos = tailUsed.load(std::memory_order_acquire); if (pos < DeleteIndexBlock::BLOCK_CAPACITY) { - if (tailUsed.compare_exchange_strong(pos, pos + 1, std::memory_order_acq_rel)) { + if (tailUsed.compare_exchange_strong(pos, pos + 1, + std::memory_order_relaxed, + std::memory_order_relaxed)) { curTail->items[pos] = item; return; } @@ -117,13 +120,16 @@ void TileVisibility::deleteTileRecord(uint8_t rowId, uint64_t ts) { DeleteIndexBlock *expectedNext = nullptr; if (!curTail->next.compare_exchange_strong( - expectedNext, newBlk, std::memory_order_acq_rel)) { + expectedNext, newBlk, + std::memory_order_release, + std::memory_order_relaxed)) { delete newBlk; continue; } tail.compare_exchange_strong(curTail, newBlk, - std::memory_order_acq_rel); + std::memory_order_release, + std::memory_order_relaxed); tailUsed.store(1, std::memory_order_release); return; } @@ -186,8 +192,8 @@ void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4]) #endif while (blk) { - DeleteIndexBlock *currentTail = tail.load(std::memory_order_acquire); - size_t currentTailUsed = tailUsed.load(std::memory_order_acquire); + DeleteIndexBlock *currentTail = tail.load(std::memory_order_relaxed); + size_t currentTailUsed = tailUsed.load(std::memory_order_relaxed); size_t count = (blk == currentTail) ? currentTailUsed : DeleteIndexBlock::BLOCK_CAPACITY; @@ -223,7 +229,7 @@ void TileVisibility::getTileVisibilityBitmap(uint64_t ts, uint64_t outBitmap[4]) } } - blk = blk->next.load(std::memory_order_acquire); + blk = blk->next.load(std::memory_order_relaxed); } } From c6ab5405895f37d18679d2e081e219203a0d970d Mon Sep 17 00:00:00 2001 From: dongyang geng Date: Sat, 27 Dec 2025 21:11:19 +0800 Subject: [PATCH 3/3] feat: add perf test --- cpp/pixels-retina/CMakeLists.txt | 6 + .../test/VisibilityPerformanceTest.cpp | 583 ++++++++++++++++++ 2 files changed, 589 insertions(+) create mode 100644 cpp/pixels-retina/test/VisibilityPerformanceTest.cpp diff --git a/cpp/pixels-retina/CMakeLists.txt b/cpp/pixels-retina/CMakeLists.txt index 7438eaab2c..f5fcad58fb 100644 --- a/cpp/pixels-retina/CMakeLists.txt +++ b/cpp/pixels-retina/CMakeLists.txt @@ -64,6 +64,7 @@ install(TARGETS pixels-retina # Add the test executable add_executable(tile_visibility_tests test/TileVisibilityTest.cpp) add_executable(rg_visibility_tests test/RGVisibilityTest.cpp) +add_executable(visibility_perf_tests test/VisibilityPerformanceTest.cpp) # Link the test executable with the library target_link_libraries(tile_visibility_tests @@ -74,10 +75,15 @@ target_link_libraries(rg_visibility_tests pixels-retina GTest::gtest_main ) +target_link_libraries(visibility_perf_tests + pixels-retina + GTest::gtest_main +) include(GoogleTest) gtest_discover_tests(tile_visibility_tests) gtest_discover_tests(rg_visibility_tests) +gtest_discover_tests(visibility_perf_tests) # Set build type to Debug if not specified # if (NOT CMAKE_BUILD_TYPE) diff --git a/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp b/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp new file mode 100644 index 0000000000..381cbf9168 --- /dev/null +++ b/cpp/pixels-retina/test/VisibilityPerformanceTest.cpp @@ -0,0 +1,583 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +#include "gtest/gtest.h" +#include "RGVisibility.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Configurable test parameters +#ifndef PERF_SMALL_DATASET +#define PERF_SMALL_DATASET 1024 +#endif + +#ifndef PERF_MEDIUM_DATASET +#define PERF_MEDIUM_DATASET 102400 +#endif + +#ifndef PERF_LARGE_DATASET +#define PERF_LARGE_DATASET 1024000 +#endif + +#ifndef PERF_NUM_OPERATIONS +#define PERF_NUM_OPERATIONS 10000 +#endif + +#ifndef PERF_NUM_THREADS +#define PERF_NUM_THREADS 8 +#endif + +using namespace std::chrono; + +// Performance statistics helper class +class PerfStats { +public: + void addSample(double microseconds) { + samples.push_back(microseconds); + } + + void calculate() { + if (samples.empty()) return; + + std::sort(samples.begin(), samples.end()); + + min = samples.front(); + max = samples.back(); + avg = std::accumulate(samples.begin(), samples.end(), 0.0) / samples.size(); + + p50 = samples[samples.size() * 50 / 100]; + p95 = samples[samples.size() * 95 / 100]; + p99 = samples[samples.size() * 99 / 100]; + } + + void print(const std::string& name) const { + std::cout << "\n=== " << name << " ===" << std::endl; + std::cout << std::fixed << std::setprecision(2); + std::cout << " Count: " << samples.size() << std::endl; + std::cout << " Min: " << min << " μs" << std::endl; + std::cout << " Max: " << max << " μs" << std::endl; + std::cout << " Average: " << avg << " μs" << std::endl; + std::cout << " P50: " << p50 << " μs" << std::endl; + std::cout << " P95: " << p95 << " μs" << std::endl; + std::cout << " P99: " << p99 << " μs" << std::endl; + + if (!samples.empty()) { + double totalSeconds = std::accumulate(samples.begin(), samples.end(), 0.0) / 1000000.0; + double throughput = samples.size() / totalSeconds; + std::cout << " Throughput: " << std::fixed << std::setprecision(0) + << throughput << " ops/sec" << std::endl; + } + } + + double getAvg() const { return avg; } + size_t getCount() const { return samples.size(); } + +private: + std::vector samples; + double min = 0, max = 0, avg = 0, p50 = 0, p95 = 0, p99 = 0; +}; + +// Performance test base class +class VisibilityPerfTest : public ::testing::Test { +protected: + void SetUp() override { + std::cout << "\n" << std::string(80, '=') << std::endl; + } + + void TearDown() override { + std::cout << std::string(80, '=') << std::endl; + } + + // Timing helper function + template + double measureMicroseconds(Func&& func) { + auto start = high_resolution_clock::now(); + func(); + auto end = high_resolution_clock::now(); + return duration_cast(end - start).count(); + } +}; + +// ============================================================================ +// Single-threaded performance tests +// ============================================================================ + +TEST_F(VisibilityPerfTest, DeleteRecordPerformance_SmallDataset) { + const uint64_t ROW_COUNT = PERF_SMALL_DATASET; + const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT); + + std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + PerfStats stats; + + for (uint64_t i = 0; i < NUM_OPS; i++) { + uint64_t timestamp = i + 1; + uint32_t rowId = i % ROW_COUNT; + + double elapsed = measureMicroseconds([&]() { + rgVisibility->deleteRGRecord(rowId, timestamp); + }); + + stats.addSample(elapsed); + } + + stats.calculate(); + stats.print("deleteRGRecord() - Small Dataset"); + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, DeleteRecordPerformance_MediumDataset) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT); + + std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + PerfStats stats; + + for (uint64_t i = 0; i < NUM_OPS; i++) { + uint64_t timestamp = i + 1; + uint32_t rowId = i % ROW_COUNT; + + double elapsed = measureMicroseconds([&]() { + rgVisibility->deleteRGRecord(rowId, timestamp); + }); + + stats.addSample(elapsed); + } + + stats.calculate(); + stats.print("deleteRGRecord() - Medium Dataset"); + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, DeleteRecordPerformance_LargeDataset) { + const uint64_t ROW_COUNT = PERF_LARGE_DATASET; + const uint64_t NUM_OPS = std::min((uint64_t)PERF_NUM_OPERATIONS, ROW_COUNT); + + std::cout << "Testing deleteRGRecord() performance with " << ROW_COUNT << " rows" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + PerfStats stats; + + for (uint64_t i = 0; i < NUM_OPS; i++) { + uint64_t timestamp = i + 1; + uint32_t rowId = i % ROW_COUNT; + + double elapsed = measureMicroseconds([&]() { + rgVisibility->deleteRGRecord(rowId, timestamp); + }); + + stats.addSample(elapsed); + } + + stats.calculate(); + stats.print("deleteRGRecord() - Large Dataset"); + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, GetBitmapPerformance) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const uint64_t NUM_DELETES = 1000; + + std::cout << "Testing getRGVisibilityBitmap() performance" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + + // Perform some delete operations first + for (uint64_t i = 0; i < NUM_DELETES; i++) { + rgVisibility->deleteRGRecord(i, i + 1); + } + + PerfStats stats; + + for (int i = 0; i < PERF_NUM_OPERATIONS; i++) { + uint64_t timestamp = (i % NUM_DELETES) + 1; + + double elapsed = measureMicroseconds([&]() { + uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(timestamp); + delete[] bitmap; + }); + + stats.addSample(elapsed); + } + + stats.calculate(); + stats.print("getRGVisibilityBitmap()"); + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, CollectGarbagePerformance) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const uint64_t NUM_DELETES = 10000; + + std::cout << "Testing collectRGGarbage() performance" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + + // Perform many delete operations first + for (uint64_t i = 0; i < NUM_DELETES; i++) { + rgVisibility->deleteRGRecord(i % ROW_COUNT, i + 1); + } + + PerfStats stats; + + for (int i = 0; i < 100; i++) { + uint64_t gcTimestamp = (i + 1) * 100; + + double elapsed = measureMicroseconds([&]() { + rgVisibility->collectRGGarbage(gcTimestamp); + }); + + stats.addSample(elapsed); + } + + stats.calculate(); + stats.print("collectRGGarbage()"); + + delete rgVisibility; +} + +// ============================================================================ +// Multi-threaded performance tests +// ============================================================================ + +TEST_F(VisibilityPerfTest, ConcurrentDeletePerformance) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const int NUM_THREADS = PERF_NUM_THREADS; + const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / NUM_THREADS; + + std::cout << "Testing concurrent deleteRGRecord() with " + << NUM_THREADS << " threads" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + std::vector threadStats(NUM_THREADS); + std::atomic timestamp{0}; + + auto start = high_resolution_clock::now(); + + std::vector threads; + for (int t = 0; t < NUM_THREADS; t++) { + threads.emplace_back([&, t]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(0, ROW_COUNT - 1); + + for (int i = 0; i < OPS_PER_THREAD; i++) { + uint64_t ts = timestamp.fetch_add(1) + 1; + uint32_t rowId = dist(gen); + + auto opStart = high_resolution_clock::now(); + rgVisibility->deleteRGRecord(rowId, ts); + auto opEnd = high_resolution_clock::now(); + + double elapsed = duration_cast(opEnd - opStart).count(); + threadStats[t].addSample(elapsed); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto end = high_resolution_clock::now(); + double totalTime = duration_cast(end - start).count(); + + // Calculate statistics for all threads + PerfStats combinedStats; + for (auto& stats : threadStats) { + stats.calculate(); + } + + std::cout << "\nConcurrent Delete Performance:" << std::endl; + std::cout << " Threads: " << NUM_THREADS << std::endl; + std::cout << " Total Ops: " << NUM_THREADS * OPS_PER_THREAD << std::endl; + std::cout << " Total Time: " << std::fixed << std::setprecision(2) + << totalTime / 1000.0 << " ms" << std::endl; + std::cout << " Throughput: " << std::fixed << std::setprecision(0) + << (NUM_THREADS * OPS_PER_THREAD) / (totalTime / 1000000.0) + << " ops/sec" << std::endl; + + for (int t = 0; t < NUM_THREADS; t++) { + std::cout << "\n Thread " << t << " - Avg latency: " + << std::fixed << std::setprecision(2) + << threadStats[t].getAvg() << " μs" << std::endl; + } + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, ConcurrentReadPerformance) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const int NUM_THREADS = PERF_NUM_THREADS; + const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / NUM_THREADS; + + std::cout << "Testing concurrent getRGVisibilityBitmap() with " + << NUM_THREADS << " threads" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + + // Perform some delete operations in advance + const uint64_t NUM_DELETES = 1000; + for (uint64_t i = 0; i < NUM_DELETES; i++) { + rgVisibility->deleteRGRecord(i % ROW_COUNT, i + 1); + } + + std::vector threadStats(NUM_THREADS); + + auto start = high_resolution_clock::now(); + + std::vector threads; + for (int t = 0; t < NUM_THREADS; t++) { + threads.emplace_back([&, t]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(1, NUM_DELETES); + + for (int i = 0; i < OPS_PER_THREAD; i++) { + uint64_t ts = dist(gen); + + auto opStart = high_resolution_clock::now(); + uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(ts); + delete[] bitmap; + auto opEnd = high_resolution_clock::now(); + + double elapsed = duration_cast(opEnd - opStart).count(); + threadStats[t].addSample(elapsed); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto end = high_resolution_clock::now(); + double totalTime = duration_cast(end - start).count(); + + for (auto& stats : threadStats) { + stats.calculate(); + } + + std::cout << "\nConcurrent Read Performance:" << std::endl; + std::cout << " Threads: " << NUM_THREADS << std::endl; + std::cout << " Total Ops: " << NUM_THREADS * OPS_PER_THREAD << std::endl; + std::cout << " Total Time: " << std::fixed << std::setprecision(2) + << totalTime / 1000.0 << " ms" << std::endl; + std::cout << " Throughput: " << std::fixed << std::setprecision(0) + << (NUM_THREADS * OPS_PER_THREAD) / (totalTime / 1000000.0) + << " ops/sec" << std::endl; + + for (int t = 0; t < NUM_THREADS; t++) { + std::cout << "\n Thread " << t << " - Avg latency: " + << std::fixed << std::setprecision(2) + << threadStats[t].getAvg() << " μs" << std::endl; + } + + delete rgVisibility; +} + +TEST_F(VisibilityPerfTest, MixedWorkloadPerformance) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const int NUM_WRITE_THREADS = PERF_NUM_THREADS / 2; + const int NUM_READ_THREADS = PERF_NUM_THREADS / 2; + const int OPS_PER_THREAD = PERF_NUM_OPERATIONS / PERF_NUM_THREADS; + + std::cout << "Testing mixed read/write workload with " + << NUM_WRITE_THREADS << " writers and " + << NUM_READ_THREADS << " readers" << std::endl; + + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + std::atomic timestamp{0}; + std::atomic running{true}; + + std::vector writeStats(NUM_WRITE_THREADS); + std::vector readStats(NUM_READ_THREADS); + + auto start = high_resolution_clock::now(); + + // Start write threads + std::vector threads; + for (int t = 0; t < NUM_WRITE_THREADS; t++) { + threads.emplace_back([&, t]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(0, ROW_COUNT - 1); + + for (int i = 0; i < OPS_PER_THREAD; i++) { + uint64_t ts = timestamp.fetch_add(1) + 1; + uint32_t rowId = dist(gen); + + auto opStart = high_resolution_clock::now(); + rgVisibility->deleteRGRecord(rowId, ts); + auto opEnd = high_resolution_clock::now(); + + double elapsed = duration_cast(opEnd - opStart).count(); + writeStats[t].addSample(elapsed); + } + }); + } + + // Start read threads + for (int t = 0; t < NUM_READ_THREADS; t++) { + threads.emplace_back([&, t]() { + std::random_device rd; + std::mt19937 gen(rd()); + + for (int i = 0; i < OPS_PER_THREAD; i++) { + uint64_t maxTs = timestamp.load(); + if (maxTs == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(10)); + continue; + } + + std::uniform_int_distribution dist(1, maxTs); + uint64_t ts = dist(gen); + + auto opStart = high_resolution_clock::now(); + uint64_t* bitmap = rgVisibility->getRGVisibilityBitmap(ts); + delete[] bitmap; + auto opEnd = high_resolution_clock::now(); + + double elapsed = duration_cast(opEnd - opStart).count(); + readStats[t].addSample(elapsed); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto end = high_resolution_clock::now(); + double totalTime = duration_cast(end - start).count(); + + // Calculate statistics + for (auto& stats : writeStats) { + stats.calculate(); + } + for (auto& stats : readStats) { + stats.calculate(); + } + + std::cout << "\nMixed Workload Performance:" << std::endl; + std::cout << " Write Threads: " << NUM_WRITE_THREADS << std::endl; + std::cout << " Read Threads: " << NUM_READ_THREADS << std::endl; + std::cout << " Total Time: " << std::fixed << std::setprecision(2) + << totalTime / 1000.0 << " ms" << std::endl; + + size_t totalWrites = 0, totalReads = 0; + for (auto& stats : writeStats) { + totalWrites += stats.getCount(); + } + for (auto& stats : readStats) { + totalReads += stats.getCount(); + } + + std::cout << " Total Writes: " << totalWrites << std::endl; + std::cout << " Total Reads: " << totalReads << std::endl; + std::cout << " Write Throughput: " << std::fixed << std::setprecision(0) + << totalWrites / (totalTime / 1000000.0) << " ops/sec" << std::endl; + std::cout << " Read Throughput: " << std::fixed << std::setprecision(0) + << totalReads / (totalTime / 1000000.0) << " ops/sec" << std::endl; + + delete rgVisibility; +} + +// ============================================================================ +// Scalability tests - Testing scalability with different thread counts +// ============================================================================ + +TEST_F(VisibilityPerfTest, ScalabilityTest) { + const uint64_t ROW_COUNT = PERF_MEDIUM_DATASET; + const int TOTAL_OPS = 50000; + + std::cout << "Testing scalability with different thread counts" << std::endl; + std::cout << std::string(60, '-') << std::endl; + std::cout << std::setw(10) << "Threads" + << std::setw(15) << "Time (ms)" + << std::setw(20) << "Throughput (ops/s)" + << std::setw(15) << "Speedup" << std::endl; + std::cout << std::string(60, '-') << std::endl; + + double baselineTime = 0; + + for (int numThreads : {1, 2, 4, 8, 16}) { + RGVisibility* rgVisibility = new RGVisibility(ROW_COUNT); + std::atomic timestamp{0}; + + int opsPerThread = TOTAL_OPS / numThreads; + + auto start = high_resolution_clock::now(); + + std::vector threads; + for (int t = 0; t < numThreads; t++) { + threads.emplace_back([&]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(0, ROW_COUNT - 1); + + for (int i = 0; i < opsPerThread; i++) { + uint64_t ts = timestamp.fetch_add(1) + 1; + uint32_t rowId = dist(gen); + rgVisibility->deleteRGRecord(rowId, ts); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + auto end = high_resolution_clock::now(); + double totalTime = duration_cast(end - start).count(); + double throughput = (numThreads * opsPerThread) / (totalTime / 1000000.0); + + if (numThreads == 1) { + baselineTime = totalTime; + } + + double speedup = baselineTime / totalTime; + + std::cout << std::setw(10) << numThreads + << std::setw(15) << std::fixed << std::setprecision(2) << totalTime / 1000.0 + << std::setw(20) << std::fixed << std::setprecision(0) << throughput + << std::setw(15) << std::fixed << std::setprecision(2) << speedup << "x" + << std::endl; + + delete rgVisibility; + } + + std::cout << std::string(60, '-') << std::endl; +}