Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/pixels-retina/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include)

# Source files
set(SOURCES
lib/EpochManager.cpp
lib/TileVisibility.cpp
lib/RGVisibility.cpp
lib/RGVisibilityJni.cpp
Expand All @@ -63,6 +64,7 @@ install(TARGETS pixels-retina
# Add the test executable
add_executable(tile_visibility_tests test/TileVisibilityTest.cpp)
add_executable(rg_visibility_tests test/RGVisibilityTest.cpp)
add_executable(visibility_perf_tests test/VisibilityPerformanceTest.cpp)

# Link the test executable with the library
target_link_libraries(tile_visibility_tests
Expand All @@ -73,10 +75,15 @@ target_link_libraries(rg_visibility_tests
pixels-retina
GTest::gtest_main
)
target_link_libraries(visibility_perf_tests
pixels-retina
GTest::gtest_main
)

include(GoogleTest)
gtest_discover_tests(tile_visibility_tests)
gtest_discover_tests(rg_visibility_tests)
gtest_discover_tests(visibility_perf_tests)

# Set build type to Debug if not specified
# if (NOT CMAKE_BUILD_TYPE)
Expand Down
111 changes: 111 additions & 0 deletions cpp/pixels-retina/include/EpochManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
#ifndef PIXELS_RETINA_EPOCH_MANAGER_H
#define PIXELS_RETINA_EPOCH_MANAGER_H

#include <atomic>
#include <thread>
#include <mutex>

/**
* EpochManager - A global epoch-based memory reclamation system
*
* This manager allows safe deferred deletion of objects in a concurrent
* environment using epoch-based reclamation. Threads announce their presence
* in an epoch, and objects can only be reclaimed when all threads have
* advanced past the epoch in which the object was retired.
*/
class EpochManager {
public:
static EpochManager& getInstance() {
static EpochManager instance;
return instance;
}

/**
* Enter the critical section. Thread announces it's participating in
* the current epoch.
*/
void enterEpoch();

/**
* Exit the critical section. Thread leaves the epoch.
*/
void exitEpoch();

/**
* Advance the global epoch counter. Should be called by GC operations.
* Returns the new epoch value.
*/
uint64_t advanceEpoch();

/**
* Check if an object retired at the given epoch can be safely reclaimed.
* An object can be reclaimed if all threads have advanced past its retire epoch.
*/
bool canReclaim(uint64_t retireEpoch) const;

/**
* Get the current global epoch.
*/
uint64_t getCurrentEpoch() const {
return globalEpoch.load(std::memory_order_acquire);
}

private:
EpochManager() : globalEpoch(1) {}
~EpochManager();

EpochManager(const EpochManager&) = delete;
EpochManager& operator=(const EpochManager&) = delete;

struct ThreadInfo {
std::atomic<uint64_t> localEpoch{0}; // 0 means not in critical section
std::atomic<bool> active{true};
std::thread::id threadId;
ThreadInfo* next{nullptr};
};

std::atomic<uint64_t> globalEpoch;
std::atomic<ThreadInfo*> threadListHead{nullptr};
std::mutex threadListMutex; // Protects thread list modifications

ThreadInfo* getOrCreateThreadInfo();
static thread_local ThreadInfo* tlsThreadInfo;
};

/**
* RAII helper for epoch protection
*/
class EpochGuard {
public:
EpochGuard() {
EpochManager::getInstance().enterEpoch();
}

~EpochGuard() {
EpochManager::getInstance().exitEpoch();
}

EpochGuard(const EpochGuard&) = delete;
EpochGuard& operator=(const EpochGuard&) = delete;
};

#endif // PIXELS_RETINA_EPOCH_MANAGER_H
8 changes: 0 additions & 8 deletions cpp/pixels-retina/include/RGVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,12 @@ class RGVisibility {

private:
static constexpr uint32_t VISIBILITY_RECORD_CAPACITY = 256;
static constexpr uint32_t MAX_ACCESS_COUNT = 0x007FFFFF;
static constexpr uint32_t GC_MASK = 0xFF000000;
static constexpr uint32_t ACCESS_MASK = 0x00FFFFFF;
static constexpr uint32_t ACCESS_INC = 0x00000001;
static constexpr uint32_t BITMAP_SIZE_PER_TILE_VISIBILITY = 4;
static constexpr uint32_t RG_READ_LEASE_MS = 100;

TileVisibility* tileVisibilities;
const uint64_t tileCount;
std::atomic<uint32_t> flag; // high 1 byte is the gc flag, low 3 bytes are the access count

TileVisibility* getTileVisibility(uint32_t rowId) const;
void beginRGAccess();
void endRGAccess();
};

#endif //RG_VISIBILITY_H
43 changes: 40 additions & 3 deletions cpp/pixels-retina/include/TileVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <vector>

inline uint64_t makeDeleteIndex(uint8_t rowId, uint64_t ts) {
return (static_cast<uint64_t>(rowId) << 56 | (ts & 0x00FFFFFFFFFFFFFFULL));
Expand All @@ -48,6 +49,41 @@ struct DeleteIndexBlock {
std::atomic<DeleteIndexBlock *> next{nullptr};
};

/**
* VersionedData - A versioned snapshot of the base state
* Used for Copy-on-Write during garbage collection
* IMPORTANT: head is part of the version to ensure atomic visibility
*/
struct VersionedData {
uint64_t baseBitmap[4];
uint64_t baseTimestamp;
DeleteIndexBlock* head; // Delete chain head, part of the version

VersionedData() : baseTimestamp(0), head(nullptr) {
baseBitmap[0] = baseBitmap[1] = baseBitmap[2] = baseBitmap[3] = 0;
}

VersionedData(uint64_t ts, const uint64_t bitmap[4], DeleteIndexBlock* h)
: baseTimestamp(ts), head(h) {
baseBitmap[0] = bitmap[0];
baseBitmap[1] = bitmap[1];
baseBitmap[2] = bitmap[2];
baseBitmap[3] = bitmap[3];
}
};

/**
* RetiredVersion - Tracks a retired version for epoch-based reclamation
*/
struct RetiredVersion {
VersionedData* data;
DeleteIndexBlock* blocksToDelete; // Head of the chain to delete
uint64_t retireEpoch;

RetiredVersion(VersionedData* d, DeleteIndexBlock* b, uint64_t e)
: data(d), blocksToDelete(b), retireEpoch(e) {}
};

class TileVisibility {
public:
TileVisibility();
Expand All @@ -61,11 +97,12 @@ class TileVisibility {
TileVisibility(const TileVisibility &) = delete;
TileVisibility &operator=(const TileVisibility &) = delete;

uint64_t baseBitmap[4];
uint64_t baseTimestamp;
std::atomic<DeleteIndexBlock *> head;
void reclaimRetiredVersions();

std::atomic<VersionedData*> currentVersion;
std::atomic<DeleteIndexBlock *> tail;
std::atomic<size_t> tailUsed;
std::vector<RetiredVersion> retired; // Protected by GC (single writer)
};

#endif // PIXELS_RETINA_TILE_VISIBILITY_H
92 changes: 92 additions & 0 deletions cpp/pixels-retina/lib/EpochManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/

#include "EpochManager.h"

// Thread-local storage for thread info
thread_local EpochManager::ThreadInfo* EpochManager::tlsThreadInfo = nullptr;

EpochManager::~EpochManager() {
ThreadInfo* current = threadListHead.load(std::memory_order_acquire);
while (current) {
ThreadInfo* next = current->next;
delete current;
current = next;
}
}

EpochManager::ThreadInfo* EpochManager::getOrCreateThreadInfo() {
if (tlsThreadInfo) {
return tlsThreadInfo;
}

// Create new thread info
ThreadInfo* newInfo = new ThreadInfo();
newInfo->threadId = std::this_thread::get_id();

// Add to global list
std::lock_guard<std::mutex> lock(threadListMutex);
newInfo->next = threadListHead.load(std::memory_order_relaxed);
threadListHead.store(newInfo, std::memory_order_release);

tlsThreadInfo = newInfo;
return newInfo;
}

void EpochManager::enterEpoch() {
ThreadInfo* info = getOrCreateThreadInfo();
uint64_t currentEpoch = globalEpoch.load(std::memory_order_acquire);
info->localEpoch.store(currentEpoch, std::memory_order_release);
}

void EpochManager::exitEpoch() {
if (tlsThreadInfo) {
tlsThreadInfo->localEpoch.store(0, std::memory_order_release);
}
}

uint64_t EpochManager::advanceEpoch() {
return globalEpoch.fetch_add(1, std::memory_order_acq_rel) + 1;
}

bool EpochManager::canReclaim(uint64_t retireEpoch) const {
// Scan all threads to find the minimum active epoch
ThreadInfo* current = threadListHead.load(std::memory_order_acquire);

while (current) {
if (!current->active.load(std::memory_order_acquire)) {
current = current->next;
continue;
}

uint64_t localEpoch = current->localEpoch.load(std::memory_order_acquire);

// localEpoch == 0 means the thread is not in critical section, skip it
if (localEpoch != 0 && localEpoch <= retireEpoch) {
// Found a thread still in or before the retire epoch
return false;
}

current = current->next;
}

// All threads have advanced past the retire epoch
return true;
}
Loading