From 1a60a1b33332fb41bc1b5116ef0f292d33bedf63 Mon Sep 17 00:00:00 2001 From: HuRuilizhen <122090168@link.cuhk.edu.cn> Date: Sun, 7 Sep 2025 22:05:11 +0800 Subject: [PATCH 1/3] feat(test): complete mpmc and mpsc unit test --- tests/mpmc_ring_buffer_test.cc | 51 ++++++++++++++++++++++++++++++++++ tests/mpsc_ring_buffer_test.cc | 35 +++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/tests/mpmc_ring_buffer_test.cc b/tests/mpmc_ring_buffer_test.cc index 2070ebc..bb11617 100644 --- a/tests/mpmc_ring_buffer_test.cc +++ b/tests/mpmc_ring_buffer_test.cc @@ -119,6 +119,57 @@ TEST(MPMCRingBufferTest, SPSC) { } } +TEST(MPSCRingBufferTest, MPSC) { + constexpr int PRODUCERS = 4; + constexpr int ITEMS_PER_PRODUCER = 500; + constexpr int TOTAL_ITEMS = PRODUCERS * ITEMS_PER_PRODUCER; + constexpr int CAPACITY = 128; + + RingBuffer::MPMCRingBuffer buffer(CAPACITY); + std::atomic produced{0}; + std::atomic consumed{0}; + + std::vector producer_threads; + for (int p = 0; p < PRODUCERS; ++p) { + producer_threads.emplace_back([p, &buffer, &produced]() { + for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) { + int value = p * ITEMS_PER_PRODUCER + i; + while (true) { + if (buffer.tryPush(value)) { + produced.fetch_add(1, std::memory_order_relaxed); + break; + } else { + std::this_thread::yield(); + } + } + } + }); + } + + std::vector results; + results.reserve(TOTAL_ITEMS); + std::thread consumer_thread([&]() { + while (consumed.load(std::memory_order_relaxed) < TOTAL_ITEMS) { + int value; + if (buffer.tryPop(value)) { + results.push_back(value); + consumed.fetch_add(1, std::memory_order_relaxed); + } else { + std::this_thread::yield(); + } + } + }); + + for (auto& t : producer_threads) t.join(); + consumer_thread.join(); + + EXPECT_EQ(results.size(), static_cast(TOTAL_ITEMS)); + std::sort(results.begin(), results.end()); + for (int i = 0; i < TOTAL_ITEMS; ++i) { + EXPECT_EQ(results[i], i); + } +} + TEST(MPMCRingBufferTest, MPMC) { constexpr int PRODUCERS = 4; constexpr int ITEMS_PER_PRODUCER = 250; diff --git a/tests/mpsc_ring_buffer_test.cc b/tests/mpsc_ring_buffer_test.cc index a667024..d98ada9 100644 --- a/tests/mpsc_ring_buffer_test.cc +++ b/tests/mpsc_ring_buffer_test.cc @@ -82,6 +82,41 @@ TEST(MPSCRingBufferTest, WrapAround) { EXPECT_FALSE(buffer.tryPop(value)); } +TEST(SPSCRingBufferTest, SPSC) { + constexpr int ITERATIONS = 10000; + constexpr int CAPACITY = 128; + RingBuffer::MPSCRingBuffer buffer(CAPACITY); + + std::thread producer([&]() { + for (int i = 0; i < ITERATIONS; ++i) { + while (true) { + if (buffer.tryPush(i)) break; + std::this_thread::yield(); + } + } + }); + + std::vector results; + results.reserve(ITERATIONS); + std::thread consumer([&]() { + int value; + for (int i = 0; i < ITERATIONS; ++i) { + while (true) { + if (buffer.tryPop(value)) break; + std::this_thread::yield(); + } + results.push_back(value); + } + }); + + producer.join(); + consumer.join(); + + for (int i = 0; i < ITERATIONS; ++i) { + EXPECT_EQ(results[i], i); + } +} + TEST(MPSCRingBufferTest, MPSC) { constexpr int PRODUCERS = 4; constexpr int ITEMS_PER_PRODUCER = 500; From de2073d2eb931025740fb4a69349c880116a63f4 Mon Sep 17 00:00:00 2001 From: HuRuilizhen <122090168@link.cuhk.edu.cn> Date: Mon, 8 Sep 2025 03:43:44 +0800 Subject: [PATCH 2/3] #13 refactor(mpmc): implement mpmc ring buffer using slots and atomic --- include/ring_buffer/mpmc_ring_buffer.h | 143 +++++++++++++++++-------- tests/mpmc_ring_buffer_test.cc | 7 +- 2 files changed, 102 insertions(+), 48 deletions(-) diff --git a/include/ring_buffer/mpmc_ring_buffer.h b/include/ring_buffer/mpmc_ring_buffer.h index f707dad..9ddf6f4 100644 --- a/include/ring_buffer/mpmc_ring_buffer.h +++ b/include/ring_buffer/mpmc_ring_buffer.h @@ -1,8 +1,8 @@ #pragma once +#include #include #include -#include #include #include "internal/common.h" @@ -12,21 +12,26 @@ namespace RingBuffer { // interface template class alignas(Common::CACHELINE_SIZE) MPMCRingBuffer { private: - std::unique_ptr buffer; + struct Slot { + alignas(Common::CACHELINE_SIZE) std::atomic seq; + alignas(Common::CACHELINE_SIZE) T data; + }; + + std::unique_ptr buffer; const size_t capacity; + const std::size_t alignment; - alignas(Common::CACHELINE_SIZE) mutable std::mutex mtx; - alignas(Common::CACHELINE_SIZE) size_t head = 0; - alignas(Common::CACHELINE_SIZE) size_t tail = 0; + alignas(Common::CACHELINE_SIZE) std::atomic tail = 0; + alignas(Common::CACHELINE_SIZE) std::atomic head = 0; public: explicit MPMCRingBuffer(size_t cap, std::size_t align = Common::CACHELINE_SIZE); - bool tryPush(const T& value); - bool tryPush(T&& value); + bool tryPush(const T &value); + bool tryPush(T &&value); template - bool tryEmplace(Args&&... args); - bool tryPop(T& value); + bool tryEmplace(Args &&...args); + bool tryPop(T &value); }; } // namespace RingBuffer @@ -35,56 +40,102 @@ namespace RingBuffer { // implementation template MPMCRingBuffer::MPMCRingBuffer(size_t cap, std::size_t align) - : capacity(cap + 1), buffer(nullptr, Common::AlignedDeleter{align}) { - if (cap == 0) - throw std::invalid_argument( - "capacity of ring buffer must be greater than 0"); - - T* ptr = static_cast( - ::operator new[](capacity * sizeof(T), std::align_val_t(align))); - buffer.reset(ptr); + : capacity(cap), + alignment(align), + buffer(nullptr, Common::AlignedDeleter{align}) { + if (cap == 0) throw std::invalid_argument("Capacity must be greater than 0"); + + Slot *raw = static_cast( + ::operator new[](capacity * sizeof(Slot), std::align_val_t(align))); + for (size_t i = 0; i < capacity; ++i) { + new (&raw[i]) Slot{.seq = i}; // placement new + } + buffer.reset(raw); } template -bool MPMCRingBuffer::tryPush(const T& value) { - std::lock_guard lock(mtx); - if (((tail + 1) % capacity) == head) return false; - buffer[tail] = value; - tail = (tail + 1) % capacity; - return true; +bool MPMCRingBuffer::tryPush(const T &value) { + size_t pos = tail.load(std::memory_order_relaxed); + + while (true) { + Slot &slot = buffer[pos % capacity]; + size_t expected = pos; + + if (slot.seq.load(std::memory_order_acquire) != expected) { + return false; + } + + if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { + slot.data = value; + slot.seq.store(expected + 1, std::memory_order_release); + return true; + } + } } template -bool MPMCRingBuffer::tryPush(T&& value) { - std::lock_guard lock(mtx); - if (((tail + 1) % capacity) == head) return false; - buffer[tail] = std::move(value); - tail = (tail + 1) % capacity; - return true; +bool MPMCRingBuffer::tryPush(T &&value) { + size_t pos = tail.load(std::memory_order_relaxed); + + while (true) { + Slot &slot = buffer[pos % capacity]; + size_t expected = pos; + + if (slot.seq.load(std::memory_order_acquire) != expected) { + return false; + } + + if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { + slot.data = std::move(value); + slot.seq.store(expected + 1, std::memory_order_release); + return true; + } + } } template template -bool MPMCRingBuffer::tryEmplace(Args&&... args) { - std::lock_guard lock(mtx); - if (((tail + 1) % capacity) == head) return false; - new (&buffer[tail]) T(std::forward(args)...); - tail = (tail + 1) % capacity; - return true; +bool MPMCRingBuffer::tryEmplace(Args &&...args) { + size_t pos = tail.load(std::memory_order_relaxed); + + while (true) { + Slot &slot = buffer[pos % capacity]; + size_t expected = pos; + + if (slot.seq.load(std::memory_order_acquire) != expected) { + return false; + } + + if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { + new (&slot.data) T(std::forward(args)...); + slot.seq.store(expected + 1, std::memory_order_release); + return true; + } + } } template -bool MPMCRingBuffer::tryPop(T& value) { - std::lock_guard lock(mtx); - if (head == tail) return false; - - T* elem = &buffer[head]; - value.~T(); - new (&value) T(std::move(*elem)); - elem->~T(); - - head = (head + 1) % capacity; - return true; +bool MPMCRingBuffer::tryPop(T &value) { + size_t pos = head.load(std::memory_order_relaxed); + + while (true) { + Slot &slot = buffer[pos % capacity]; + size_t expected = pos + 1; + + if (slot.seq.load(std::memory_order_acquire) != expected) { + return false; + } + + if (head.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { + T *elem = &(slot.data); + value.~T(); + new (&value) T(std::move(*elem)); + elem->~T(); + + slot.seq.store(pos + capacity, std::memory_order_release); + return true; + } + } } } // namespace RingBuffer diff --git a/tests/mpmc_ring_buffer_test.cc b/tests/mpmc_ring_buffer_test.cc index bb11617..e21c125 100644 --- a/tests/mpmc_ring_buffer_test.cc +++ b/tests/mpmc_ring_buffer_test.cc @@ -260,9 +260,11 @@ TEST(MPMCRingBufferTest, EmplaceString) { } TEST(MPMCRingBufferTest, CounterLifecycle) { + const int CAPACITY = 2; Counter::reset(); + { - RingBuffer::MPMCRingBuffer buffer(2); + RingBuffer::MPMCRingBuffer buffer(CAPACITY); EXPECT_TRUE(buffer.tryEmplace(1)); EXPECT_TRUE(buffer.tryEmplace(2)); @@ -274,5 +276,6 @@ TEST(MPMCRingBufferTest, CounterLifecycle) { EXPECT_FALSE(buffer.tryPop(tmp)); } - EXPECT_EQ(Counter::constructed, Counter::destructed); + // the init phase should be cleaned up + EXPECT_EQ(Counter::constructed - CAPACITY, Counter::destructed); } From e14ccd54957934d408b01327e5f1b73afea39add Mon Sep 17 00:00:00 2001 From: HuRuilizhen <122090168@link.cuhk.edu.cn> Date: Mon, 8 Sep 2025 03:47:20 +0800 Subject: [PATCH 3/3] #13 docs(mpmc): update readme --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 48b7ee7..4c6bb6c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ A header-only C++20 ring buffer library providing several synchronization strate ## Features - Basic, SPSC, MPSC and MPMC variants -- Implementations using mutexes, atomics and semi-atomic slots +- Line cache friendly memory layout +- Implementations using atomics and slots - Simple integration through CMake's `find_package` ## Requirements @@ -111,7 +112,7 @@ target_link_libraries( | `BasicRingBuffer` | Basic | None (single-thread only) | | `SPSCRingBuffer` | SPSC | atomics | | `MPSCRingBuffer` | MPSC | atomics + slots | -| `MPMCRingBuffer` | MPMC | mutex | +| `MPMCRingBuffer` | MPMC | atomics + slots | **Common API** (all variants):