Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions src/ailego/buffer/buffer_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include <zvec/ailego/buffer/buffer_pool.h>
#include <zvec/core/framework/index_logger.h>

namespace zvec {
namespace ailego {

int LRUCache::init(size_t block_size) {
block_size_ = block_size;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
queues_.push_back(ConcurrentQueue(block_size));
}
return 0;
}

bool LRUCache::evict_single_block(BlockType &item) {
bool found = false;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
found = queues_[i].try_dequeue(item);
if (found) {
break;
}
}
return found;
}

bool LRUCache::add_single_block(const LPMap *lp_map, const BlockType &block,
int block_type) {
bool ok = queues_[block_type].try_enqueue(block);
evict_queue_insertions_.fetch_add(1, std::memory_order_relaxed);
if (evict_queue_insertions_ % block_size_ == 0) {
this->clear_dead_node(lp_map);
}
return ok;
}

void LRUCache::clear_dead_node(const LPMap *lp_map) {
for (int i = 0; i < CATCH_QUEUE_NUM; i++) {
int clear_size = block_size_ * 2;
if (queues_[i].size_approx() < clear_size * 4) {
continue;
}
int clear_count = 0;
ConcurrentQueue tmp(block_size_);
BlockType item;
while (queues_[i].try_dequeue(item) && (clear_count++ < clear_size)) {
if (!lp_map->isDeadBlock(item)) {
tmp.try_enqueue(item);
}
}
while (tmp.try_dequeue(item)) {
if (!lp_map->isDeadBlock(item)) {
queues_[i].try_enqueue(item);
}
}
}
}

void LPMap::init(size_t entry_num) {
if (entries_) {
delete[] entries_;
}
entry_num_ = entry_num;
entries_ = new Entry[entry_num_];
for (size_t i = 0; i < entry_num_; i++) {
entries_[i].ref_count.store(std::numeric_limits<int>::min());
entries_[i].load_count.store(0);
entries_[i].buffer = nullptr;
}
cache_.init(entry_num * 4);
}

char *LPMap::acquire_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) == 0) {
entry.load_count.fetch_add(1, std::memory_order_relaxed);
}
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
if (entry.ref_count.load(std::memory_order_relaxed) < 0) {
return nullptr;
}
return entry.buffer;
Comment on lines +75 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race condition: ref_count is checked at line 75, incremented at 78, then checked again at 79 - another thread could evict the block between these operations, making entry.buffer invalid by line 82

}

void LPMap::release_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];

if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
LRUCache::BlockType block;
block.first = block_id;
block.second = entry.load_count.load();
cache_.add_single_block(this, block, 0);
}
}

char *LPMap::evict_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
int expected = 0;
if (entry.ref_count.compare_exchange_strong(
expected, std::numeric_limits<int>::min())) {
char *buffer = entry.buffer;
entry.buffer = nullptr;
return buffer;
} else {
return nullptr;
}
}

char *LPMap::set_block_acquired(block_id_t block_id, char *buffer) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) >= 0) {
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
return entry.buffer;
}
entry.buffer = buffer;
entry.ref_count.store(1, std::memory_order_relaxed);
entry.load_count.fetch_add(1, std::memory_order_relaxed);
return buffer;
}

void LPMap::recycle(moodycamel::ConcurrentQueue<char *> &free_buffers) {
LRUCache::BlockType block;
do {
bool ok = cache_.evict_single_block(block);
if (!ok) {
return;
}
} while (isDeadBlock(block));
char *buffer = evict_block(block.first);
if (buffer) {
free_buffers.try_enqueue(buffer);
}
}

VecBufferPool::VecBufferPool(const std::string &filename) {
fd_ = open(filename.c_str(), O_RDONLY);
if (fd_ < 0) {
throw std::runtime_error("Failed to open file: " + filename);
}
struct stat st;
if (fstat(fd_, &st) < 0) {
throw std::runtime_error("Failed to stat file: " + filename);
}
file_size_ = st.st_size;
}

int VecBufferPool::init(size_t pool_capacity, size_t block_size) {
pool_capacity_ = pool_capacity;
size_t buffer_num = pool_capacity_ / block_size + 10;
size_t block_num = file_size_ / block_size + 10;
lp_map_.init(block_num);
for (size_t i = 0; i < buffer_num; i++) {
char *buffer = (char *)aligned_alloc(64, block_size);
if (buffer != nullptr) {
bool ok = free_buffers_.try_enqueue(buffer);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable ok

Suggested change
bool ok = free_buffers_.try_enqueue(buffer);
free_buffers_.try_enqueue(buffer);

}
Comment on lines +157 to +160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak when aligned_alloc fails - allocated buffers are not freed on failure, and no error is returned

}
LOG_DEBUG("Buffer pool num: %zu, entry num: %zu", buffer_num,
lp_map_.entry_num());
return 0;
}

VecBufferPoolHandle VecBufferPool::get_handle() {
return VecBufferPoolHandle(*this);
}

char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset,
size_t size, int retry) {
char *buffer = lp_map_.acquire_block(block_id);
if (buffer) {
return buffer;
}
{
bool found = free_buffers_.try_dequeue(buffer);
if (!found) {
for (int i = 0; i < retry; i++) {
lp_map_.recycle(free_buffers_);
found = free_buffers_.try_dequeue(buffer);
if (found) {
break;
}
}
}
if (!found) {
LOG_ERROR("Buffer pool failed to get free buffer");
return nullptr;
}
}

ssize_t read_bytes = pread(fd_, buffer, size, offset);
if (read_bytes != static_cast<ssize_t>(size)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return nullptr;
}
Comment on lines +194 to +198
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak when pread fails - buffer is not returned to free_buffers_, causing the pool to permanently lose this buffer

char *placed_buffer = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
placed_buffer = lp_map_.set_block_acquired(block_id, buffer);
}
if (placed_buffer != buffer) {
// another thread has set the block
free_buffers_.try_enqueue(buffer);
}
return placed_buffer;
}

int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
ssize_t read_bytes = pread(fd_, buffer, length, offset);
if (read_bytes != static_cast<ssize_t>(length)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return -1;
}
return 0;
}

char *VecBufferPoolHandle::get_block(size_t offset, size_t size,
size_t block_id) {
char *buffer = pool.acquire_buffer(block_id, offset, size, 5);
return buffer;
}

int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) {
return pool.get_meta(offset, length, buffer);
}

void VecBufferPoolHandle::release_one(block_id_t block_id) {
pool.lp_map_.release_block(block_id);
}

void VecBufferPoolHandle::acquire_one(block_id_t block_id) {
pool.lp_map_.acquire_block(block_id);
}

} // namespace ailego
} // namespace zvec
10 changes: 9 additions & 1 deletion src/core/algorithm/flat/flat_streamer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,18 @@ class FlatStreamerContext : public IndexStreamer::Context {
group_topk_heaps_.clear();
}

void reset() override {}
void reset() override {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

//! Reset the context
void reset(const FlatStreamer<BATCH_SIZE> *owner) {
this->reset();
magic_ = owner->magic();
feature_size_ = owner->meta().element_size();

Expand Down
4 changes: 4 additions & 0 deletions src/core/algorithm/hnsw/hnsw_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class HnswContext : public IndexContext {

//! Reset context
void reset(void) override {
this->clear();
set_filter(nullptr);
reset_threshold();
set_fetch_vector(false);
Expand Down Expand Up @@ -422,6 +423,9 @@ class HnswContext : public IndexContext {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

uint32_t *mutable_stats_get_neighbors() {
Expand Down
3 changes: 1 addition & 2 deletions src/core/algorithm/hnsw/hnsw_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ struct Neighbors {
Neighbors(uint32_t cnt_in, const node_id_t *data_in)
: cnt{cnt_in}, data{data_in} {}

Neighbors(IndexStorage::MemoryBlock &&mem_block)
: neighbor_block{std::move(mem_block)} {
Neighbors(IndexStorage::MemoryBlock &mem_block) : neighbor_block{mem_block} {
auto hd = reinterpret_cast<const NeighborsHeader *>(neighbor_block.data());
cnt = hd->neighbor_cnt;
data = hd->neighbors;
Expand Down
2 changes: 1 addition & 1 deletion src/core/algorithm/hnsw/hnsw_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const Neighbors HnswStreamerEntity::get_neighbors(level_t level,
LOG_ERROR("Read neighbor header failed, ret=%zu", size);
return Neighbors();
}
return Neighbors(std::move(neighbor_block));
return Neighbors(neighbor_block);
}

//! Get vector data by key
Expand Down
7 changes: 5 additions & 2 deletions src/core/interface/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,9 @@ int Index::Search(const VectorData &vector_data,
}

// dense support refiner, but sparse doesn't
int ret = 0;
if (search_param->refiner_param == nullptr) {
return _dense_search(vector_data, search_param, result, context);
ret = _dense_search(vector_data, search_param, result, context);
} else {
auto &reference_index = search_param->refiner_param->reference_index;
if (reference_index == nullptr) {
Expand Down Expand Up @@ -441,8 +442,10 @@ int Index::Search(const VectorData &vector_data,
// TODO: should copy other params?
flat_search_param->bf_pks = std::make_shared<std::vector<uint64_t>>(keys);

return reference_index->Search(vector_data, flat_search_param, result);
ret = reference_index->Search(vector_data, flat_search_param, result);
}
context->reset();
return ret;
}


Expand Down
Loading