Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ A header-only C++20 ring buffer library providing several synchronization strate
## Features

- Basic, SPSC, MPSC and MPMC variants
- Implementations using mutexes, atomics and semi-atomic slots
- Line cache friendly memory layout
- Implementations using atomics and slots
- Simple integration through CMake's `find_package`

## Requirements
Expand Down Expand Up @@ -111,7 +112,7 @@ target_link_libraries(
| `BasicRingBuffer<T>` | Basic | None (single-thread only) |
| `SPSCRingBuffer<T>` | SPSC | atomics |
| `MPSCRingBuffer<T>` | MPSC | atomics + slots |
| `MPMCRingBuffer<T>` | MPMC | mutex |
| `MPMCRingBuffer<T>` | MPMC | atomics + slots |

**Common API** (all variants):

Expand Down
143 changes: 97 additions & 46 deletions include/ring_buffer/mpmc_ring_buffer.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <atomic>
#include <cstddef>
#include <memory>
#include <mutex>
#include <stdexcept>

#include "internal/common.h"
Expand All @@ -12,21 +12,26 @@ namespace RingBuffer { // interface
template <typename T>
class alignas(Common::CACHELINE_SIZE) MPMCRingBuffer {
private:
std::unique_ptr<T[], Common::AlignedDeleter> buffer;
struct Slot {
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> seq;
alignas(Common::CACHELINE_SIZE) T data;
};

std::unique_ptr<Slot[], Common::AlignedDeleter> buffer;
const size_t capacity;
const std::size_t alignment;

alignas(Common::CACHELINE_SIZE) mutable std::mutex mtx;
alignas(Common::CACHELINE_SIZE) size_t head = 0;
alignas(Common::CACHELINE_SIZE) size_t tail = 0;
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> tail = 0;
alignas(Common::CACHELINE_SIZE) std::atomic<size_t> head = 0;

public:
explicit MPMCRingBuffer(size_t cap,
std::size_t align = Common::CACHELINE_SIZE);
bool tryPush(const T& value);
bool tryPush(T&& value);
bool tryPush(const T &value);
bool tryPush(T &&value);
template <typename... Args>
bool tryEmplace(Args&&... args);
bool tryPop(T& value);
bool tryEmplace(Args &&...args);
bool tryPop(T &value);
};

} // namespace RingBuffer
Expand All @@ -35,56 +40,102 @@ namespace RingBuffer { // implementation

template <typename T>
MPMCRingBuffer<T>::MPMCRingBuffer(size_t cap, std::size_t align)
: capacity(cap + 1), buffer(nullptr, Common::AlignedDeleter{align}) {
if (cap == 0)
throw std::invalid_argument(
"capacity of ring buffer must be greater than 0");

T* ptr = static_cast<T*>(
::operator new[](capacity * sizeof(T), std::align_val_t(align)));
buffer.reset(ptr);
: capacity(cap),
alignment(align),
buffer(nullptr, Common::AlignedDeleter{align}) {
if (cap == 0) throw std::invalid_argument("Capacity must be greater than 0");

Slot *raw = static_cast<Slot *>(
::operator new[](capacity * sizeof(Slot), std::align_val_t(align)));
for (size_t i = 0; i < capacity; ++i) {
new (&raw[i]) Slot{.seq = i}; // placement new
}
buffer.reset(raw);
}

template <typename T>
bool MPMCRingBuffer<T>::tryPush(const T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (((tail + 1) % capacity) == head) return false;
buffer[tail] = value;
tail = (tail + 1) % capacity;
return true;
bool MPMCRingBuffer<T>::tryPush(const T &value) {
size_t pos = tail.load(std::memory_order_relaxed);

while (true) {
Slot &slot = buffer[pos % capacity];
size_t expected = pos;

if (slot.seq.load(std::memory_order_acquire) != expected) {
return false;
}

if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
slot.data = value;
slot.seq.store(expected + 1, std::memory_order_release);
return true;
}
}
}

template <typename T>
bool MPMCRingBuffer<T>::tryPush(T&& value) {
std::lock_guard<std::mutex> lock(mtx);
if (((tail + 1) % capacity) == head) return false;
buffer[tail] = std::move(value);
tail = (tail + 1) % capacity;
return true;
bool MPMCRingBuffer<T>::tryPush(T &&value) {
size_t pos = tail.load(std::memory_order_relaxed);

while (true) {
Slot &slot = buffer[pos % capacity];
size_t expected = pos;

if (slot.seq.load(std::memory_order_acquire) != expected) {
return false;
}

if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
slot.data = std::move(value);
slot.seq.store(expected + 1, std::memory_order_release);
return true;
}
}
}

template <typename T>
template <typename... Args>
bool MPMCRingBuffer<T>::tryEmplace(Args&&... args) {
std::lock_guard<std::mutex> lock(mtx);
if (((tail + 1) % capacity) == head) return false;
new (&buffer[tail]) T(std::forward<Args>(args)...);
tail = (tail + 1) % capacity;
return true;
bool MPMCRingBuffer<T>::tryEmplace(Args &&...args) {
size_t pos = tail.load(std::memory_order_relaxed);

while (true) {
Slot &slot = buffer[pos % capacity];
size_t expected = pos;

if (slot.seq.load(std::memory_order_acquire) != expected) {
return false;
}

if (tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
new (&slot.data) T(std::forward<Args>(args)...);
slot.seq.store(expected + 1, std::memory_order_release);
return true;
}
}
}

template <typename T>
bool MPMCRingBuffer<T>::tryPop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (head == tail) return false;

T* elem = &buffer[head];
value.~T();
new (&value) T(std::move(*elem));
elem->~T();

head = (head + 1) % capacity;
return true;
bool MPMCRingBuffer<T>::tryPop(T &value) {
size_t pos = head.load(std::memory_order_relaxed);

while (true) {
Slot &slot = buffer[pos % capacity];
size_t expected = pos + 1;

if (slot.seq.load(std::memory_order_acquire) != expected) {
return false;
}

if (head.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
T *elem = &(slot.data);
value.~T();
new (&value) T(std::move(*elem));
elem->~T();

slot.seq.store(pos + capacity, std::memory_order_release);
return true;
}
}
}

} // namespace RingBuffer
58 changes: 56 additions & 2 deletions tests/mpmc_ring_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,57 @@ TEST(MPMCRingBufferTest, SPSC) {
}
}

TEST(MPSCRingBufferTest, MPSC) {
constexpr int PRODUCERS = 4;
constexpr int ITEMS_PER_PRODUCER = 500;
constexpr int TOTAL_ITEMS = PRODUCERS * ITEMS_PER_PRODUCER;
constexpr int CAPACITY = 128;

RingBuffer::MPMCRingBuffer<int> buffer(CAPACITY);
std::atomic<int> produced{0};
std::atomic<int> consumed{0};

std::vector<std::thread> producer_threads;
for (int p = 0; p < PRODUCERS; ++p) {
producer_threads.emplace_back([p, &buffer, &produced]() {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
int value = p * ITEMS_PER_PRODUCER + i;
while (true) {
if (buffer.tryPush(value)) {
produced.fetch_add(1, std::memory_order_relaxed);
break;
} else {
std::this_thread::yield();
}
}
}
});
}

std::vector<int> results;
results.reserve(TOTAL_ITEMS);
std::thread consumer_thread([&]() {
while (consumed.load(std::memory_order_relaxed) < TOTAL_ITEMS) {
int value;
if (buffer.tryPop(value)) {
results.push_back(value);
consumed.fetch_add(1, std::memory_order_relaxed);
} else {
std::this_thread::yield();
}
}
});

for (auto& t : producer_threads) t.join();
consumer_thread.join();

EXPECT_EQ(results.size(), static_cast<size_t>(TOTAL_ITEMS));
std::sort(results.begin(), results.end());
for (int i = 0; i < TOTAL_ITEMS; ++i) {
EXPECT_EQ(results[i], i);
}
}

TEST(MPMCRingBufferTest, MPMC) {
constexpr int PRODUCERS = 4;
constexpr int ITEMS_PER_PRODUCER = 250;
Expand Down Expand Up @@ -209,9 +260,11 @@ TEST(MPMCRingBufferTest, EmplaceString) {
}

TEST(MPMCRingBufferTest, CounterLifecycle) {
const int CAPACITY = 2;
Counter::reset();

{
RingBuffer::MPMCRingBuffer<Counter> buffer(2);
RingBuffer::MPMCRingBuffer<Counter> buffer(CAPACITY);

EXPECT_TRUE(buffer.tryEmplace(1));
EXPECT_TRUE(buffer.tryEmplace(2));
Expand All @@ -223,5 +276,6 @@ TEST(MPMCRingBufferTest, CounterLifecycle) {
EXPECT_FALSE(buffer.tryPop(tmp));
}

EXPECT_EQ(Counter::constructed, Counter::destructed);
// the init phase should be cleaned up
EXPECT_EQ(Counter::constructed - CAPACITY, Counter::destructed);
}
35 changes: 35 additions & 0 deletions tests/mpsc_ring_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,41 @@ TEST(MPSCRingBufferTest, WrapAround) {
EXPECT_FALSE(buffer.tryPop(value));
}

TEST(SPSCRingBufferTest, SPSC) {
constexpr int ITERATIONS = 10000;
constexpr int CAPACITY = 128;
RingBuffer::MPSCRingBuffer<int> buffer(CAPACITY);

std::thread producer([&]() {
for (int i = 0; i < ITERATIONS; ++i) {
while (true) {
if (buffer.tryPush(i)) break;
std::this_thread::yield();
}
}
});

std::vector<int> results;
results.reserve(ITERATIONS);
std::thread consumer([&]() {
int value;
for (int i = 0; i < ITERATIONS; ++i) {
while (true) {
if (buffer.tryPop(value)) break;
std::this_thread::yield();
}
results.push_back(value);
}
});

producer.join();
consumer.join();

for (int i = 0; i < ITERATIONS; ++i) {
EXPECT_EQ(results[i], i);
}
}

TEST(MPSCRingBufferTest, MPSC) {
constexpr int PRODUCERS = 4;
constexpr int ITEMS_PER_PRODUCER = 500;
Expand Down
Loading