From 1a1c39d6f0aaf75e18031234ad2f4b5997349b74 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 16 Jan 2026 09:48:07 +0000 Subject: [PATCH 1/3] Add bbq spsc and bbq mpmc with verification code Signed-off-by: Diogo Behrens --- bbq/CMakeLists.txt | 21 + bbq/include/vsync/queue/bbq_mpmc.h | 365 +++++++++++++++++ bbq/include/vsync/queue/bbq_spsc.h | 377 +++++++++++++++++ bbq/include/vsync/queue/internal/bbq/common.h | 118 ++++++ bbq/test/CMakeLists.txt | 15 + bbq/test/align_test.c | 88 ++++ bbq/test/bbq_mpmc.c | 102 +++++ bbq/test/bbq_spsc.c | 84 ++++ bbq/test/include/test/bbq/bbq_test_helper.h | 29 ++ bbq/test/include/test/bbq/debug.h | 190 +++++++++ include/vsync/queue/bbq_mpmc.h | 365 +++++++++++++++++ include/vsync/queue/bbq_spsc.h | 378 ++++++++++++++++++ include/vsync/queue/internal/bbq/common.h | 118 ++++++ test/include/test/bbq/bbq_test_helper.h | 29 ++ test/include/test/bbq/debug.h | 190 +++++++++ test/queue/bbq_mpmc.c | 102 +++++ test/queue/bbq_spsc.c | 85 ++++ verify/CMakeLists.txt | 1 + verify/bbq/CMakeLists.txt | 31 ++ verify/bbq/bbq_mpsc.c | 101 +++++ verify/bbq/bbq_spmc.c | 94 +++++ verify/bbq/bbq_spsc.c | 82 ++++ 22 files changed, 2965 insertions(+) create mode 100644 bbq/CMakeLists.txt create mode 100644 bbq/include/vsync/queue/bbq_mpmc.h create mode 100644 bbq/include/vsync/queue/bbq_spsc.h create mode 100644 bbq/include/vsync/queue/internal/bbq/common.h create mode 100644 bbq/test/CMakeLists.txt create mode 100644 bbq/test/align_test.c create mode 100644 bbq/test/bbq_mpmc.c create mode 100644 bbq/test/bbq_spsc.c create mode 100644 bbq/test/include/test/bbq/bbq_test_helper.h create mode 100644 bbq/test/include/test/bbq/debug.h create mode 100644 include/vsync/queue/bbq_mpmc.h create mode 100644 include/vsync/queue/bbq_spsc.h create mode 100644 include/vsync/queue/internal/bbq/common.h create mode 100644 test/include/test/bbq/bbq_test_helper.h create mode 100644 test/include/test/bbq/debug.h create mode 100644 test/queue/bbq_mpmc.c create mode 100644 test/queue/bbq_spsc.c create mode 100755 verify/bbq/CMakeLists.txt create mode 100644 verify/bbq/bbq_mpsc.c create mode 100644 verify/bbq/bbq_spmc.c create mode 100644 verify/bbq/bbq_spsc.c diff --git a/bbq/CMakeLists.txt b/bbq/CMakeLists.txt new file mode 100644 index 00000000..53093310 --- /dev/null +++ b/bbq/CMakeLists.txt @@ -0,0 +1,21 @@ +target_include_directories( + vsync INTERFACE "$" + "$") + +install(DIRECTORY include/vsync DESTINATION include) + +vsync_install( + DIRECTORY include/vsync/queue + DESTINATION include/vsync + COMPONENTS STABLE OPEN_DISTRO) + +vsync_install( + DIRECTORY test/ + DESTINATION test/queue + COMPONENTS STABLE_TEST OPEN_DISTRO + EXTRA_ARGS EXCLUDE_FROM_ALL) + +if(${LIBVSYNC_BUILD_TESTS}) + add_subdirectory(test) + add_subdirectory(verify) +endif() diff --git a/bbq/include/vsync/queue/bbq_mpmc.h b/bbq/include/vsync/queue/bbq_mpmc.h new file mode 100644 index 00000000..18af006a --- /dev/null +++ b/bbq/include/vsync/queue/bbq_mpmc.h @@ -0,0 +1,365 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_MPMC_H +#define VSYNC_BBQ_MPMC_H +/******************************************************************************* + * @file bbq_mpmc.h + * @brief Block-based Bounded Queue multi-producer/multi-consumer + * + * A highly performant bounded queue that splits the buffer in multiple blocks. + * + * ### Remarks: + * + * In this implementations, values have the fixed size (pointer size). + * This implementation does not support DROP_OLD mode as described in the + * original paper. + * + * @cite [BBQ: A Block-based Bounded Queue for Exchanging Data and + * Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + * + * @example + * @include eg_bbq_mpmc.c + * + ******************************************************************************/ + +#include +#include +#include +#include +#include + +/** + * @def BBQ_BLOCK_NUM_LOG + * + * Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` + * to define the total number of blocks equals to `2^N` + * + * @note default value is `3U` -> (`8` blocks) + * + */ +#ifndef BBQ_BLOCK_NUM_LOG + #define BBQ_BLOCK_NUM_LOG 3U +#endif + +/** + * @def BBQ_ENTRY_SIZE_LOG + * + * Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to + * define an entry size equals to `2^N` + * + * @note default value is `log2(sizeof(vuintptr_t))` + * + */ +#ifndef BBQ_ENTRY_SIZE_LOG + #define BBQ_ENTRY_SIZE_LOG v_log2(sizeof(vuintptr_t)) +#endif + +#include + +typedef struct bbq_mpmc_block_s { + vatomic64_t allocated VSYNC_CACHEALIGN; + vatomic64_t committed VSYNC_CACHEALIGN; + vatomic64_t reserved VSYNC_CACHEALIGN; + vatomic64_t consumed VSYNC_CACHEALIGN; + vuint8_t entry[] VSYNC_CACHEALIGN; +} bbq_mpmc_block_t; + +typedef struct bbq_mpmc_s { + bbq_config_t config VSYNC_CACHEALIGN; + vatomic64_t widx VSYNC_CACHEALIGN; + vatomic64_t ridx VSYNC_CACHEALIGN; + vuint8_t blk[] VSYNC_CACHEALIGN; +} bbq_mpmc_t; + +/* block cursor init value */ +#define BBQ_BLOCK_MPMC_INIT_VALUE BBQ_BLOCK_INIT_VALUE(bbq_mpmc_block_t) + +/* Note:The following macros are used inside bbq/common.h in BBQ_COUNT + * definition */ +#define BBQ_MPMC_WRITE_PROD(k, v) (vatomic64_write(&(k), v)) +#define BBQ_MPMC_WRITE_CONS(k, v) (vatomic64_write(&(k), v)) +#define BBQ_MPMC_READ_PROD(k) (vatomic64_read(&(k))) +#define BBQ_MPMC_READ_CONS(k) (vatomic64_read(&(k))) +#define BBQ_MPMC_COUNT(q) BBQ_COUNT(q, mpmc, MPMC) + +static inline vbool_t _bbq_mpmc_enqueue(struct bbq_mpmc_s *q, vuintptr_t **buf, + vuint32_t *count); +static inline vbool_t _bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline void _bbq_mpmc_block_init(bbq_mpmc_block_t *blk, vsize_t idx, + vuint16_t block_size); + +/** + * Enqueues one or more entries. + * + * Multiple entries can be enqueued if `src` points to an array. Use `count` to + * indicate how many entries should be enqueueed, starting from `src`. + * + * @param q pointer to bbq_mpmc data structure + * @param src pointer to first entry + * @param count number of entries to enqueue + * @param wait should wait for space to be available + * + * @return number of enqueued entries + */ +static inline vuint32_t +bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* the following is equivalent to + * while(bbq_mpmc_enqueue_internal(q, &rest_buf, &rest) || (wait && rest)); + */ + do { + retry = _bbq_mpmc_enqueue(q, &rest_buf, &rest); + + /* Help model checker in case the queue is empty + * The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) + retry = _bbq_mpmc_enqueue(q, &rest_buf, &rest); + } while (retry); + + return count - rest; +} + +/** + * Dequeues one or more entries. + * + * Multiple entries can be dequeued if `src` points to an array. Use `count` to + * indicate how many entries should be dequeued. + * + * @param q pointer to bbq_mpmc data structure + * @param src pointer to preallocated memory for the first entry + * @param count number of entries to dequeue + * @param wait should wait for entries to be available + * + * @return number of dequeued entries + */ +static inline vuint32_t +bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* the following is equivalent to + * while(bbq_mpmc_dequeue_internal(q, &rest_buf, &rest) || (wait && rest)); + */ + do { + retry = _bbq_mpmc_dequeue(q, &rest_buf, &rest); + + /* Help model checker in case the queue is empty + * The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) + retry = _bbq_mpmc_dequeue(q, &rest_buf, &rest); + } while (retry); + + return count - rest; +} + +/** + * Calculates the size of the bbq queue. + * + * @param capacity maximum number of entries that can fit in the queue + * @return size to be allocated in bytes + */ +static inline vsize_t +bbq_mpmc_memsize(vsize_t capacity) +{ + vsize_t cnt_each_blk = (capacity) >> BBQ_BLOCK_NUM_LOG; + if (cnt_each_blk == 0) { + cnt_each_blk = 1; + } + vsize_t mem_each_blk = + sizeof(bbq_mpmc_block_t) + (cnt_each_blk << BBQ_ENTRY_SIZE_LOG); + vsize_t mem_each_blk_log = + v_pow2_round_down(mem_each_blk * 2 - 1); /* align up */ + vsize_t mem_buf = + sizeof(bbq_mpmc_t) + (mem_each_blk_log << BBQ_BLOCK_NUM_LOG); + return mem_buf; +} +/** + * Initializes a bbq data structure. + * + * @param buf pointer to bbq data structure + * @param size number of bytes allocated for bbq data structure + * @return true initialization succeeded. + * @return false initialization failed. + */ +static inline vbool_t +bbq_mpmc_init(bbq_mpmc_t *rb, vsize_t size) +{ + if (unlikely(rb == NULL) || unlikely(BBQ_ENTRY_SIZE < BBQ_MIN_ENTRY_SIZE) || + unlikely(BBQ_ENTRY_SIZE > BBQ_MAX_ENTRY_SIZE) || + unlikely(BBQ_BLOCK_NUM_LOG < BBQ_MIN_BLOCK_NUM_LOG) || + unlikely(BBQ_BLOCK_NUM_LOG > BBQ_MAX_BLOCK_NUM_LOG)) { + return false; + } + + vsize_t blks_total_size = (size) - sizeof(bbq_mpmc_t); + vsize_t blk_size = v_pow2_round_down(blks_total_size >> BBQ_BLOCK_NUM_LOG); + if (unlikely(blk_size <= BBQ_BLOCK_INIT_VALUE(bbq_mpmc_block_t))) { + return false; + } + vsize_t blk_size_log = v_log2(blk_size); + BBQ_BLK_SZ_VERIFICATION(mpmc); + if (unlikely(blk_size_log < BBQ_MIN_BLOCK_SIZE_LOG) || + unlikely(blk_size_log >= BBQ_MAX_BLOCK_SIZE_LOG)) { + return false; + } + (rb)->config.blk_size = blk_size; + (rb)->config.blk_size_log = blk_size_log; + BBQ_MPMC_WRITE_PROD((rb)->widx, 0); + BBQ_MPMC_WRITE_CONS((rb)->ridx, 0); + for (vsize_t i = 0; i < (1UL << BBQ_BLOCK_NUM_LOG); i++) { + _bbq_mpmc_block_init( + (bbq_mpmc_block_t *)((rb)->blk + (i << blk_size_log)), i, blk_size); + } + return true; +} + +static inline void +_bbq_mpmc_block_init(bbq_mpmc_block_t *blk, vsize_t idx, vuint16_t block_size) +{ + /* if it is not the first block, set to invalid state */ + vuint16_t init_value = likely(idx) ? block_size : BBQ_BLOCK_MPMC_INIT_VALUE; + vatomic64_write(&blk->allocated, init_value); + vatomic64_write(&blk->committed, init_value); + vatomic64_write(&blk->reserved, init_value); + vatomic64_write(&blk->consumed, init_value); +} + +/* return means retry */ +static inline vbool_t +_bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + /* get the address of the alloc block */ + vuint64_t widx = vatomic64_read(&q->widx); + vuint16_t block_idx = BBQ_GLOBAL_IDX(widx); + bbq_mpmc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* precheck once */ + vuint16_t block_size = q->config.blk_size; + vuint64_t allocated = vatomic64_read(&blk->allocated); + vuint64_t allocated_space = BBQ_LOCAL_IDX(allocated); + vsize_t entry_total_size = (*count) << BBQ_ENTRY_SIZE_LOG; + /* if out of bound, we don't add the space, but help to move the block */ + if (likely(allocated_space < block_size)) { + /* update the allocated index using FAA */ + vuint64_t old_allocated = + vatomic64_get_add(&blk->allocated, entry_total_size); + /* we have some space */ + vuint64_t old_local_space = BBQ_LOCAL_IDX(old_allocated); + if (likely(old_local_space < block_size)) { + vuint16_t space = + VMIN(entry_total_size, block_size - old_local_space); + void *entry = BBQ_GET_ENTRY(blk, old_local_space); + int r = memcpy_s(entry, space, *buf, space); + BUG_ON(r != 0); + vatomic64_add(&blk->committed, space); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + } + /* slow path, all writers help to move to next block */ + bbq_mpmc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(widx); + if (unlikely( + !BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(nblk, block_size, global_vsn))) { + return false; + } + /* reset cursor and advance block */ + bbq_reset_block_cursor_heavy(&nblk->committed, global_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + bbq_reset_block_cursor_heavy(&nblk->allocated, global_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + BBQ_ADVANCE_HEAD(&q->widx, widx, widx + 1); + return true; +} +/* return means retry */ +static inline vbool_t +_bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + /* get the address of the occupy block */ + vuint64_t ridx = vatomic64_read(&q->ridx); + vuint16_t block_idx = BBQ_GLOBAL_IDX(ridx); + bbq_mpmc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* check if the block is fully reserved */ + vuint16_t block_size = q->config.blk_size; + vuint64_t reserved = vatomic64_read(&blk->reserved); + vuint64_t reserved_space = BBQ_LOCAL_IDX(reserved); + if (likely(reserved_space < block_size)) { + vuint64_t committed = vatomic64_read(&blk->committed); + /* check if we have an entry to occupy */ + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + if (unlikely(reserved_space >= committed_space)) { + /* Note: the version is strictly monotic and may not wrap. */ + ASSERT(reserved <= committed && "reserved must be <= committed"); + return false; + } + vuint16_t entry_total_size = VMIN((*count) << BBQ_ENTRY_SIZE_LOG, + committed_space - reserved_space); + if (unlikely(committed_space != block_size)) { + vuint64_t allocated = vatomic64_read(&blk->allocated); + vuint64_t allocated_space = BBQ_LOCAL_IDX(allocated); + if (likely(allocated_space != committed_space)) { + return false; + } + } + if (vatomic64_cmpxchg(&blk->reserved, reserved, + reserved + entry_total_size) != reserved) { + return true; + } + /* we got the entry */ + void *entry = BBQ_GET_ENTRY(blk, BBQ_LOCAL_IDX(reserved)); + int r = memcpy_s(*buf, entry_total_size, entry, entry_total_size); + BUG_ON(r != 0); + /* consume after copy the data back */ + vatomic64_add(&blk->consumed, entry_total_size); + vuint16_t offset = entry_total_size >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + /* need to advance the block */ + bbq_mpmc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + /* r_head never pass the w_head and r_tail */ + vuint64_t next_consumer_vsn = BBQ_LOCAL_VSN(reserved) - (block_idx != 0); + vuint64_t next_producer_vsn = + BBQ_LOCAL_VSN(vatomic64_read(&nblk->committed)); + if (next_producer_vsn != next_consumer_vsn + 1) { + return false; + } + /* reset the cursor */ + bbq_reset_block_cursor_heavy(&nblk->consumed, next_consumer_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + bbq_reset_block_cursor_heavy(&nblk->reserved, next_consumer_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + BBQ_ADVANCE_HEAD(&q->ridx, ridx, ridx + 1); + return true; +} +#endif diff --git a/bbq/include/vsync/queue/bbq_spsc.h b/bbq/include/vsync/queue/bbq_spsc.h new file mode 100644 index 00000000..28ab939f --- /dev/null +++ b/bbq/include/vsync/queue/bbq_spsc.h @@ -0,0 +1,377 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ +#ifndef VSYNC_BBQ_SPSC_H +#define VSYNC_BBQ_SPSC_H + +/******************************************************************************* + * @file bbq_spsc.h + * @brief Block-based Bounded Queue single-producer/single-consumer + * @ingroup linearizable lock_free + * + * A highly performant bounded queue that splits the buffer in multiple blocks. + * + * + * ### Remarks: + * + * In this implementations, values have a fixed size equal to pointer size. + * This implementation does not support `DROP_OLD` mode as described in the + * original paper. + * + * @cite [BBQ: A Block-based Bounded Queue for Exchanging Data and + * Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + * + * @example + * ### Multi-threaded + * @include eg_bbq_spsc.c + * ### Multi-process + * @include eg_bbq_spsc_m_proc.c + * + ******************************************************************************/ + +#include +#include +#include +#include +#include +#include + +/** + * @def BBQ_BLOCK_NUM_LOG + * + * Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` + * to define the total number of blocks equals to `2^N` + * + * @note default value is `3U` -> (`8` blocks) + * + */ +#ifndef BBQ_BLOCK_NUM_LOG + #define BBQ_BLOCK_NUM_LOG 3U +#endif + +/** + * @def BBQ_ENTRY_SIZE_LOG + * + * Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to + * define an entry size equals to `2^N` + * + * @note default value is `log2(sizeof(vuintptr_t))` + * + */ +#ifndef BBQ_ENTRY_SIZE_LOG + #define BBQ_ENTRY_SIZE_LOG v_log2(sizeof(vuintptr_t)) +#endif + +#include + +typedef struct bbq_spsc_block_s { + vatomic64_t committed VSYNC_CACHEALIGN; + vatomic64_t consumed VSYNC_CACHEALIGN; + vuint8_t entry[] VSYNC_CACHEALIGN; +} bbq_spsc_block_t; + +typedef struct bbq_spsc_s { + bbq_config_t config VSYNC_CACHEALIGN; + vuint64_t widx VSYNC_CACHEALIGN; + vuint64_t ridx VSYNC_CACHEALIGN; + vuint8_t blk[] VSYNC_CACHEALIGN; +} bbq_spsc_t; + +/* block cursor init value */ +#define BBQ_BLOCK_SPSC_INIT_VALUE BBQ_BLOCK_INIT_VALUE(struct bbq_spsc_block_s) +/* Note:The following macros are used inside bbq/common.h in BBQ_COUNT + * definition */ +#define BBQ_SPSC_WRITE_PROD(k, v) ((k) = v) +#define BBQ_SPSC_WRITE_CONS(k, v) ((k) = v) +#define BBQ_SPSC_READ_PROD(k) (k) +#define BBQ_SPSC_READ_CONS(k) (k) +#define BBQ_SPSC_COUNT(q) BBQ_COUNT(q, spsc, SPSC) + +/* prototypes of internal functions */ +static inline vbool_t _bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline vbool_t _bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline void _bbq_spsc_block_init(bbq_spsc_block_t *blk, vsize_t idx, + vuint16_t block_size); +/** + * Enqueues one or more entries. + * + * Multiple entries can be enqueued if `src` points to an array. Use `count` to + * indicate how many entries should be enqueueed, starting from `src`. + * + * @param q address of bbq_spsc_t object. + * @param buf address of the first entry. + * @param count number of entries to enqueue. + * @param wait true/false when set to true it waits (blocks) till space becomes + * available. Otherwise, it quits retrying. + * + * @return number of enqueued entries. + */ +static inline vuint32_t +bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* The following two are equivalent. + * Which one is better depends on the model checker. + */ +#if defined(VSYNC_VERIFICATION_DAT3M) + while (_bbq_spsc_enqueue(q, &rest_buf, &rest) || (wait && rest)) + ; +#else + do { + retry = _bbq_spsc_enqueue(q, &rest_buf, &rest); + + /* The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) { + retry = _bbq_spsc_enqueue(q, &rest_buf, &rest); + } + } while (retry); +#endif + + return count - rest; +} + +/** + * Dequeues one or more entries. + * + * Multiple entries can be dequeued if `buf` points to an array. Use `count` to + * indicate how many entries should be dequeued. + * + * @param q address of bbq_spsc_t object. + * @param buf address of the first entry of the preallocated memory. + * @param count number of entries to dequeue. + * @param wait true/false. When set to true the API waits/blocks for entries to + * be available + * + * @return number of dequeued entries. + */ +static inline vuint32_t +bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* The following two are equivalent. + * Which one is better depends on the model checker. + */ +#if defined(VSYNC_VERIFICATION_DAT3M) + while (_bbq_spsc_dequeue(q, &rest_buf, &rest) || (wait && rest)) + ; +#else + do { + retry = _bbq_spsc_dequeue(q, &rest_buf, &rest); + + /* The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) { + retry = _bbq_spsc_dequeue(q, &rest_buf, &rest); + } + } while (retry); +#endif + + return count - rest; +} +/** + * Calculates the size of bbq_spsc_t object based on the given capacity. + * + * @param capacity maximum number of entries that can fit in the queue. + * @return size of bbq_spsc_t object with the given capacity. + */ +static inline vsize_t +bbq_spsc_memsize(vsize_t capacity) +{ + vsize_t cnt_each_blk = (capacity) >> BBQ_BLOCK_NUM_LOG; + if (cnt_each_blk == 0) { + cnt_each_blk = 1; + } + vsize_t mem_each_blk = + sizeof(bbq_spsc_block_t) + (cnt_each_blk << BBQ_ENTRY_SIZE_LOG); + + vsize_t mem_each_blk_log = + v_pow2_round_down(mem_each_blk * 2 - 1); /* align up */ + + vsize_t mem_buf = + sizeof(bbq_spsc_t) + (mem_each_blk_log << BBQ_BLOCK_NUM_LOG); + + return mem_buf; +} + +/** + * Initializes a bbq data structure. + * + * @param rb address of bbq_spsc_t object. + * @param size size of the given bbq_spsc_t object `rb`. + * + * @return true initialization succeeded. + * @return false initialization failed. + */ +static inline vbool_t +bbq_spsc_init(bbq_spsc_t *rb, vsize_t size) +{ + // we shift vuint16_t by BBQ_ENTRY_SIZE_LOG, we need to make sure the + // behavior is defined + ASSERT( + BBQ_ENTRY_SIZE_LOG < 16U && + "must have width less than vuint16_t because to be able to shift it"); + + if (unlikely(rb == NULL) || unlikely(BBQ_ENTRY_SIZE < BBQ_MIN_ENTRY_SIZE) || + unlikely(BBQ_ENTRY_SIZE > BBQ_MAX_ENTRY_SIZE) || + unlikely(BBQ_BLOCK_NUM_LOG < BBQ_MIN_BLOCK_NUM_LOG) || + unlikely(BBQ_BLOCK_NUM_LOG > BBQ_MAX_BLOCK_NUM_LOG)) { + return false; + } + + vsize_t blks_total_size = (size) - sizeof(bbq_spsc_t); + vsize_t blk_size = v_pow2_round_down(blks_total_size >> BBQ_BLOCK_NUM_LOG); + if (unlikely(blk_size <= BBQ_BLOCK_INIT_VALUE(struct bbq_spsc_block_s))) { + return false; + } + vsize_t blk_size_log = v_log2(blk_size); + BBQ_BLK_SZ_VERIFICATION(spsc); + if (unlikely(blk_size_log < BBQ_MIN_BLOCK_SIZE_LOG) || + unlikely(blk_size_log >= BBQ_MAX_BLOCK_SIZE_LOG)) { + return false; + } + (rb)->config.blk_size = blk_size; + (rb)->config.blk_size_log = blk_size_log; + BBQ_SPSC_WRITE_PROD((rb)->widx, 0); + BBQ_SPSC_WRITE_CONS((rb)->ridx, 0); + + for (vsize_t i = 0; i < (1UL << BBQ_BLOCK_NUM_LOG); i++) { + _bbq_spsc_block_init( + (bbq_spsc_block_t *)((rb)->blk + (i << blk_size_log)), i, blk_size); + } + return true; +} + +static inline void +_bbq_spsc_block_init(bbq_spsc_block_t *blk, vsize_t idx, vuint16_t block_size) +{ + /* if it is not the first block, set to invalid state */ + vuint16_t init_value = likely(idx) ? block_size : BBQ_BLOCK_SPSC_INIT_VALUE; + vatomic64_write(&blk->committed, init_value); + vatomic64_write(&blk->consumed, init_value); +} + +/* return means retry */ +static inline vbool_t +_bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + + /* get the address of the alloc block */ + vuint64_t widx = q->widx; + vuint16_t block_idx = BBQ_GLOBAL_IDX(widx); + bbq_spsc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* precheck once */ + vuint16_t block_size = q->config.blk_size; + vuint64_t committed = vatomic64_read_rlx(&blk->committed); + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + vsize_t entry_total_size = (*count) << BBQ_ENTRY_SIZE_LOG; + /* if out of bound, we don't add the space, but help to move the block */ + if (likely(committed_space < block_size)) { + vuint16_t space = VMIN(entry_total_size, block_size - committed_space); + void *entry = BBQ_GET_ENTRY(blk, committed_space); + int r = memcpy_s(entry, space, *buf, space); + BUG_ON(r != 0); + vuint64_t new_committed = BBQ_LOCAL_COMPOSE(BBQ_LOCAL_VSN(committed), + committed_space + space); + vatomic64_write_rel(&blk->committed, new_committed); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + + /* slow path, all writers help to move to next block */ + bbq_spsc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(widx); + if (unlikely( + !BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(nblk, block_size, global_vsn))) { + return false; + } + + /* reset cursor and advance block */ + BBQ_RESET_BLOCK_CURSOR_LIGHT(&nblk->committed, global_vsn + 1, + BBQ_BLOCK_SPSC_INIT_VALUE); + q->widx++; + return true; +} + +/* return means retry */ +static inline vbool_t +_bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + + /* get the address of the occupy block */ + vuint64_t ridx = q->ridx; + vuint16_t block_idx = BBQ_GLOBAL_IDX(ridx); + bbq_spsc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + + /* check if the block is fully reserved */ + vuint16_t block_size = q->config.blk_size; + vuint64_t consumed = vatomic64_read_rlx(&blk->consumed); + vuint64_t consumed_space = BBQ_LOCAL_IDX(consumed); + if (likely(consumed_space < block_size)) { + vuint64_t committed = vatomic64_read_acq(&blk->committed); + /* check if we have an entry to occupy */ + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + if (unlikely(consumed_space >= committed_space)) { + ASSERT(consumed_space == committed_space && + "Consumed should be <= committed"); + return false; + } + + /* we got the entry */ + vuint16_t space = VMIN((*count) << BBQ_ENTRY_SIZE_LOG, + committed_space - consumed_space); + void *entry = BBQ_GET_ENTRY(blk, consumed_space); + int r = memcpy_s(*buf, space, entry, space); + BUG_ON(r != 0); + vuint64_t new_consumed = consumed + space; + vatomic64_write_rel(&blk->consumed, new_consumed); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + + /* need to advance the block */ + bbq_spsc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(ridx); + + /* r_head never pass the w_head and r_tail */ + vuint64_t next_block_vsn = + BBQ_LOCAL_VSN(vatomic64_read_rlx(&nblk->committed)); + if (unlikely(next_block_vsn != global_vsn + 1)) { + return false; + } + /* reset the cursor */ + BBQ_RESET_BLOCK_CURSOR_LIGHT(&nblk->consumed, global_vsn + 1, + BBQ_BLOCK_SPSC_INIT_VALUE); + q->ridx++; + return true; +} +#endif diff --git a/bbq/include/vsync/queue/internal/bbq/common.h b/bbq/include/vsync/queue/internal/bbq/common.h new file mode 100644 index 00000000..6cecdfa2 --- /dev/null +++ b/bbq/include/vsync/queue/internal/bbq/common.h @@ -0,0 +1,118 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2023-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_COMMON_H +#define VSYNC_BBQ_COMMON_H + +#include +#include +#include +#include + +/* utils */ + +/* size related */ +/* mininum and maxinum entry size, block size and block number */ +#define BBQ_ENTRY_SIZE (1ULL << BBQ_ENTRY_SIZE_LOG) +#define BBQ_MIN_BLOCK_NUM_LOG 0U +#define BBQ_MAX_BLOCK_NUM_LOG 16U +#define BBQ_MIN_ENTRY_SIZE 4U +#define BBQ_MAX_ENTRY_SIZE (1ULL << (BBQ_MAX_BLOCK_SIZE_LOG - 1)) +#define BBQ_MIN_BLOCK_SIZE_LOG 4U +#define BBQ_MAX_BLOCK_SIZE_LOG 16U + +/* offset and mask */ +/* global var related */ +#define BBQ_GLOBA_VERSION_BIT 44U /* 64 - 20 */ +#define BBQ_GLOBA_VERSION_MASK ((1ULL << BBQ_GLOBA_VERSION_BIT) - 1) +#define BBQ_GLOBAL_IDX(v) ((v) & ((1ULL << BBQ_BLOCK_NUM_LOG) - 1)) +#define BBQ_GLOBAL_VSN(v) (((v) >> BBQ_BLOCK_NUM_LOG) & BBQ_GLOBA_VERSION_MASK) +#define BBQ_GLOBAL_COMPOSE(h, l) (((h) << BBQ_BLOCK_NUM_LOG) | (l)) + +/* local var related */ +#define BBQ_LOCAL_SPACE_BIT 20U /* >16 to provent the FAA overflow */ +#define BBQ_LOCAL_SPACE_MASK ((1ULL << BBQ_LOCAL_SPACE_BIT) - 1) +#define BBQ_LOCAL_IDX(v) ((v)&BBQ_LOCAL_SPACE_MASK) +#define BBQ_LOCAL_VSN(v) ((v) >> BBQ_LOCAL_SPACE_BIT) +#define BBQ_LOCAL_COMPOSE(h, l) (((h) << BBQ_LOCAL_SPACE_BIT) | (l)) + +typedef struct bbq_config_s { + vuint16_t blk_size_log; /* total size of each block (in log) */ + vuint16_t blk_size; /* total size of each block */ +} bbq_config_t; + +#define BBQ_BLOCK_INIT_VALUE(S) \ + ((sizeof(S) / BBQ_ENTRY_SIZE + 1) * BBQ_ENTRY_SIZE) + +#ifdef VSYNC_VERIFICATION +static vuint32_t g_verify_bbq_count = 0; + #define BBQ_VERIFY_BLK_COUNT(count) g_verify_bbq_count = (count) + #define BBQ_BLK_SZ_VERIFICATION(name) \ + do { \ + ASSERT(g_verify_bbq_count != 0 && "must set BBQ_COUNT"); \ + vuint32_t count_per_blk = \ + (g_verify_bbq_count) >> BBQ_BLOCK_NUM_LOG; \ + blk_size = BBQ_BLOCK_INIT_VALUE(struct bbq_##name##_block_s) + \ + (count_per_blk << BBQ_ENTRY_SIZE_LOG); \ + } while (0) +#else + #define BBQ_VERIFY_BLK_COUNT(count) \ + do { \ + } while (0) + #define BBQ_BLK_SZ_VERIFICATION(name) \ + do { \ + } while (0) +#endif + +#define BBQ_GET_BLOCK(rb, idx) \ + (void *)((rb)->blk + ((idx) << (rb)->config.blk_size_log)) +#define BBQ_GET_NEXT_BLOCK(rb, idx) \ + (BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX((idx) + 1))) +#define BBQ_GET_ENTRY(blk, offset) \ + ((void *)(((vuintptr_t)(blk)) + (vuintptr_t)(offset))) + +#define BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(blk, sz, vsn) \ + ({ \ + vuint64_t consumed = vatomic64_read_acq(&(blk)->consumed); \ + (BBQ_LOCAL_IDX(consumed) == (sz) && \ + BBQ_LOCAL_VSN(consumed) == (vsn)) || \ + BBQ_LOCAL_VSN(consumed) > (vsn); \ + }) + +#define BBQ_RESET_BLOCK_CURSOR_LIGHT(v, new_vsn, init_v) \ + vatomic64_write_rlx(v, BBQ_LOCAL_COMPOSE(new_vsn, init_v)) + +static inline void +bbq_reset_block_cursor_heavy(vatomic64_t *v, vuint64_t new_vsn, + vuintptr_t init_v) +{ + vuint64_t new_cursor = BBQ_LOCAL_COMPOSE(new_vsn, init_v); + vatomic64_max(v, new_cursor); +} + +#define BBQ_ADVANCE_HEAD(v, old, new) vatomic64_max(v, new) + +#define BBQ_COUNT(rb, name, name_uc) \ + ({ \ + vuint64_t ridx = BBQ_##name_uc##_READ_CONS((rb)->ridx); \ + struct bbq_##name##_block_s *rblk = \ + BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX(ridx)); \ + vuintptr_t consumed_index = \ + BBQ_LOCAL_IDX(vatomic64_read_rlx(&rblk->consumed)); \ + \ + vuint64_t widx = BBQ_##name_uc##_READ_PROD((rb)->widx); \ + struct bbq_##name##_block_s *wblk = \ + BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX(widx)); \ + vuintptr_t committed_index = \ + BBQ_LOCAL_IDX(vatomic64_read_rlx(&wblk->committed)); \ + \ + vuintptr_t block_size = (rb)->config.blk_size; \ + ((((widx - ridx) * \ + (block_size - BBQ_BLOCK_INIT_VALUE(struct bbq_##name##_block_s))) + \ + (committed_index - consumed_index)) >> \ + BBQ_ENTRY_SIZE_LOG); \ + }) + +#endif /* VSYNC_BBQ_COMMON_H */ diff --git a/bbq/test/CMakeLists.txt b/bbq/test/CMakeLists.txt new file mode 100644 index 00000000..3085ecde --- /dev/null +++ b/bbq/test/CMakeLists.txt @@ -0,0 +1,15 @@ +target_include_directories( + vsync INTERFACE $ + $) +file(GLOB SRCS *.c) +foreach(SRC ${SRCS}) + + get_filename_component(TEST ${SRC} NAME_WE) + set(TEST test_${TEST}) + + add_executable(${TEST} ${SRC}) + target_link_libraries(${TEST} vsync pthread) + target_compile_options(${TEST} PRIVATE -O3) + v_add_bin_test(NAME ${TEST} COMMAND ${TEST}) + set_tests_properties(${TEST} PROPERTIES COST 300) +endforeach() diff --git a/bbq/test/align_test.c b/bbq/test/align_test.c new file mode 100644 index 00000000..b30db56c --- /dev/null +++ b/bbq/test/align_test.c @@ -0,0 +1,88 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include + +uint32_t +log2_of(vuint32_t v) +{ +#if 0 + return ((unsigned)(8 * sizeof(unsigned long long) - __builtin_clzll((v)) - 1)); +#else + + const unsigned int b[] = {0x2, 0xC, 0xF0, 0xFF00, 0xFFFF0000}; + const unsigned int S[] = {1, 2, 4, 8, 16}; + int i; + + register unsigned int r = 0; // result of log2(v) will go here + for (i = 4; i >= 0; i--) // unroll for speed... + { + if (v & b[i]) { + v >>= S[i]; + r |= S[i]; + } + } + return r; +#endif +} + +uint32_t +pow2_round_up(vuint32_t v) +{ +#if 1 + return v == 1 ? 1 : 1 << (32 - __builtin_clz(v - 1)); +#else + v--; + for (vuint32_t shift_by = 1; shift_by <= 16; shift_by *= 2) { + v |= v >> shift_by; + } + return v; +#endif +} + +uint32_t +pow2_round_down(vuint32_t v) +{ +#if 0 + return (1U << (32U - __builtin_clz((vuint32_t)(v)) - 1U)); +#else + for (vuint32_t shift_by = 1; shift_by <= 16; shift_by *= 2) { + v |= v >> shift_by; + } + return v - (v >> 1); +#endif +} + +#define bbq_align_down_with_power2(a) (pow2_round_down(a)) + +#define bbq_log2(X) (log2_of(X)) + +int +main(void) +{ + // 2^f2(x) = f1(x),f1 is the align down and f2 is log2 + // for example, f2(1023) = 9, f2(1024) = 10 + // Jiawei Wang(84201116)2022-12-15 08:11 + // f1(1023) = 512, f1(1024) = 1024 + + ASSERT(bbq_align_down_with_power2(1023) == 512); + ASSERT(pow2_round_up(1023) == 1024); + + ASSERT(bbq_align_down_with_power2(1024) == 1024); + ASSERT(pow2_round_up(1024) == 1024); + + ASSERT(bbq_align_down_with_power2(1023) == 512); + ASSERT(pow2_round_up(1023) == 1024); + + ASSERT(bbq_align_down_with_power2(1023) == 512); + ASSERT(pow2_round_up(1023) == 1024); + + ASSERT(bbq_log2(1023) == 9); + ASSERT(bbq_log2(1024) == 10); + + return 0; +} diff --git a/bbq/test/bbq_mpmc.c b/bbq/test/bbq_mpmc.c new file mode 100644 index 00000000..319fb4ac --- /dev/null +++ b/bbq/test/bbq_mpmc.c @@ -0,0 +1,102 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM 2000000UL +#define NUM_WRITER 4 +#define NUM_READER 5 + +#define BUFFER_ENTRY_NUM 4096 +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#include + +struct bbq_mpmc_s *rb; + +void * +writer(void *arg) +{ + vuint64_t *id = (vuint64_t *)arg; + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM * NUM_READER; + while (rest) { + vuint32_t count = VMIN(rest, ENQUEUE_BATCH); + ASSERT(count <= ENQUEUE_BATCH); + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ((*id) << 30) | (ptr++); + } + rest -= bbq_mpmc_enqueue(rb, buf, count, true); + } + free(arg); + return NULL; +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t last[NUM_WRITER] = {0}; + vuint64_t rest = NUM * NUM_WRITER; + while (rest) { + vuint32_t count = VMIN(rest, DEQUEUE_BATCH); + count = bbq_mpmc_dequeue(rb, buf, count, false); + for (vuint32_t i = 0; i < count; i++) { + vuint64_t id = buf[i] >> 30; + vuint64_t data = buf[i] & ((1ULL << 30) - 1); + ASSERT(id < NUM_WRITER); + ASSERT(last[id] <= data); + last[id] = data; + } + rest -= count; + } + return NULL; +} + +int +main(void) +{ + vsize_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + rb = (bbq_mpmc_t *)malloc(sz); + if (rb == NULL) { + perror("fail to create the ring buffer"); + abort(); + } + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + vbool_t success = bbq_mpmc_init(rb, sz); + ASSERT(success); + ASSERT(BBQ_MPMC_COUNT(rb) == 0); + + pthread_t t1[NUM_WRITER], t2[NUM_READER]; + for (vsize_t i = 0; i < NUM_WRITER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t1[i], NULL, writer, arg); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_create(&t2[i], NULL, reader, NULL); + } + + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_join(t1[i], NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_join(t2[i], NULL); + } + + ASSERT(BBQ_MPMC_COUNT(rb) == 0); + free(rb); + return 0; +} diff --git a/bbq/test/bbq_spsc.c b/bbq/test/bbq_spsc.c new file mode 100644 index 00000000..933a6359 --- /dev/null +++ b/bbq/test/bbq_spsc.c @@ -0,0 +1,84 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM 200000000UL +#define BUFFER_ENTRY_NUM 4096 +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#include + +bbq_spsc_t *rb; + +void * +writer(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + while (rest) { + vuint32_t count = VMIN(rest, ENQUEUE_BATCH); + ASSERT(count <= ENQUEUE_BATCH); + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ptr++; + } + rest -= bbq_spsc_enqueue(rb, buf, count, true); + } + return NULL; +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + while (rest) { + vuint32_t count = bbq_spsc_dequeue(rb, buf, DEQUEUE_BATCH, false); + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] == ptr++); + } + rest -= count; + } + return NULL; +} + +int +main(void) +{ + vuint32_t sz = bbq_spsc_memsize(BUFFER_ENTRY_NUM); + rb = (struct bbq_spsc_s *)malloc(sz); + if (rb == NULL) { + perror("fail to create the ring buffer"); + abort(); + } + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + + vbool_t success = bbq_spsc_init(rb, sz); + ASSERT(success); + ASSERT(BBQ_SPSC_COUNT(rb) == 0); + + pthread_t t1, t2; + pthread_create(&t1, NULL, writer, NULL); + pthread_create(&t2, NULL, reader, NULL); + + pthread_join(t1, NULL); + pthread_join(t2, NULL); + + ASSERT(BBQ_SPSC_COUNT(rb) == 0); + free(rb); + return 0; +} diff --git a/bbq/test/include/test/bbq/bbq_test_helper.h b/bbq/test/include/test/bbq/bbq_test_helper.h new file mode 100644 index 00000000..b6e64446 --- /dev/null +++ b/bbq/test/include/test/bbq/bbq_test_helper.h @@ -0,0 +1,29 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_TEST_HELPER +#define VSYNC_BBQ_TEST_HELPER + +#if defined(VSYNC_VERIFICATION) + + #include + +/* TODO: linters will complain about this even if VSYNC_VERIFICATION is not + * defined! */ + +static inline int +memcpy_s(void *dst, vsize_t dstsz, void *src, vsize_t srcsz) +{ + ASSERT(srcsz <= dstsz); + vuintptr_t *d = (vuintptr_t *)dst; + vuintptr_t *s = (vuintptr_t *)src; + for (vuint64_t i = 0; i < dstsz / sizeof(vuintptr_t); i++) { + d[i] = s[i]; + } + return 0; +} +#endif + +#endif diff --git a/bbq/test/include/test/bbq/debug.h b/bbq/test/include/test/bbq/debug.h new file mode 100644 index 00000000..8e199e0d --- /dev/null +++ b/bbq/test/include/test/bbq/debug.h @@ -0,0 +1,190 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2023-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_DEBUG_H +#define VSYNC_BBQ_DEBUG_H + +/* debug function */ +static inline void +bbq_spsc_debug(struct bbq_spsc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_SPSC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(rb->widx); + vuint64_t write_version = BBQ_GLOBAL_VSN(rb->widx); + vuint64_t read_index = BBQ_GLOBAL_IDX(rb->ridx); + vuint64_t read_version = BBQ_GLOBAL_VSN(rb->ridx); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_SPSC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_spsc_block_s *blk = (struct bbq_spsc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("committed: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", committed_version, committed_space); + printf("consumed: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_mpsc_debug(struct bbq_mpsc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_MPSC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->widx)); + vuint64_t write_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->widx)); + vuint64_t read_index = BBQ_GLOBAL_IDX(rb->ridx); + vuint64_t read_version = BBQ_GLOBAL_VSN(rb->ridx); + + DBG("------------------------------------------------------------------------\n"); + DBG("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + DBG("Block number: %" VUINT16_FORMAT "\n", block_number); + DBG("Block init value: %" VUINT16_FORMAT "\n", BBQ_BLOCK_MPSC_INIT_VALUE); + DBG("Block size: %" VUINT16_FORMAT "\n", block_size); + DBG("Entry size: %" VUINT64_FORMAT "\n", (vuint64_t) BBQ_ENTRY_SIZE); + DBG("Data space: %" VUINT16_FORMAT "\n", data_space); + DBG("widx: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", write_version, write_index); + DBG("ridx: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_mpsc_block_s *blk = (struct bbq_mpsc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t allocated_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->allocated)); + vuint64_t allocated_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->allocated)); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("allocated: (%ld,\t%ld)\n", allocated_version, allocated_space); + printf("committed: (%ld,\t%ld)\n", committed_version, committed_space); + printf("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_spmc_debug(struct bbq_spmc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_SPMC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(rb->widx); + vuint64_t write_version = BBQ_GLOBAL_VSN(rb->widx); + vuint64_t read_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->ridx)); + vuint64_t read_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->ridx)); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_SPMC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_spmc_block_s *blk = (struct bbq_spmc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t reserved_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->reserved)); + vuint64_t reserved_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->reserved)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("committed: (%ld,\t%ld)\n", committed_version, committed_space); + printf("reserved: (%ld,\t%ld)\n", reserved_version, reserved_space); + printf("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_mpmc_debug(struct bbq_mpmc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_MPMC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->widx)); + vuint64_t write_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->widx)); + vuint64_t read_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->ridx)); + vuint64_t read_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->ridx)); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_MPMC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_mpmc_block_s *blk = (struct bbq_mpmc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t allocated_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->allocated)); + vuint64_t allocated_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->allocated)); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t reserved_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->reserved)); + vuint64_t reserved_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->reserved)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + DBG("allocated: (%ld,\t%ld)\n", allocated_version, allocated_space); + DBG("committed: (%ld,\t%ld)\n", committed_version, committed_space); + DBG("reserved: (%ld,\t%ld)\n", reserved_version, reserved_space); + DBG("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +#endif diff --git a/include/vsync/queue/bbq_mpmc.h b/include/vsync/queue/bbq_mpmc.h new file mode 100644 index 00000000..3e6dbf74 --- /dev/null +++ b/include/vsync/queue/bbq_mpmc.h @@ -0,0 +1,365 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_MPMC_H +#define VSYNC_BBQ_MPMC_H +/******************************************************************************* + * @file bbq_mpmc.h + * @brief Block-based Bounded Queue multi-producer/multi-consumer + * + * A highly performant bounded queue that splits the buffer in multiple blocks. + * + * ### Remarks: + * + * In this implementations, values have the fixed size (pointer size). + * This implementation does not support DROP_OLD mode as described in the + * original paper. + * + * @cite [BBQ: A Block-based Bounded Queue for Exchanging Data and + * Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + * + * @example + * @include eg_bbq_mpmc.c + * + ******************************************************************************/ + +#include +#include +#include +#include +#include + +/** + * @def BBQ_BLOCK_NUM_LOG + * + * Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` + * to define the total number of blocks equals to `2^N` + * + * @note default value is `3U` -> (`8` blocks) + * + */ +#ifndef BBQ_BLOCK_NUM_LOG + #define BBQ_BLOCK_NUM_LOG 3U +#endif + +/** + * @def BBQ_ENTRY_SIZE_LOG + * + * Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to + * define an entry size equals to `2^N` + * + * @note default value is `log2(sizeof(vuintptr_t))` + * + */ +#ifndef BBQ_ENTRY_SIZE_LOG + #define BBQ_ENTRY_SIZE_LOG v_log2(sizeof(vuintptr_t)) +#endif + +#include + +typedef struct bbq_mpmc_block_s { + vatomic64_t allocated VSYNC_CACHEALIGN; + vatomic64_t committed VSYNC_CACHEALIGN; + vatomic64_t reserved VSYNC_CACHEALIGN; + vatomic64_t consumed VSYNC_CACHEALIGN; + vuint8_t entry[] VSYNC_CACHEALIGN; +} bbq_mpmc_block_t; + +typedef struct bbq_mpmc_s { + bbq_config_t config VSYNC_CACHEALIGN; + vatomic64_t widx VSYNC_CACHEALIGN; + vatomic64_t ridx VSYNC_CACHEALIGN; + vuint8_t blk[] VSYNC_CACHEALIGN; +} bbq_mpmc_t; + +/* block cursor init value */ +#define BBQ_BLOCK_MPMC_INIT_VALUE BBQ_BLOCK_INIT_VALUE(bbq_mpmc_block_t) + +/* Note:The following macros are used inside bbq/common.h in BBQ_COUNT + * definition */ +#define BBQ_MPMC_WRITE_PROD(k, v) (vatomic64_write(&(k), v)) +#define BBQ_MPMC_WRITE_CONS(k, v) (vatomic64_write(&(k), v)) +#define BBQ_MPMC_READ_PROD(k) (vatomic64_read(&(k))) +#define BBQ_MPMC_READ_CONS(k) (vatomic64_read(&(k))) +#define BBQ_MPMC_COUNT(q) BBQ_COUNT(q, mpmc, MPMC) + +static inline vbool_t _bbq_mpmc_enqueue(struct bbq_mpmc_s *q, vuintptr_t **buf, + vuint32_t *count); +static inline vbool_t _bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline void _bbq_mpmc_block_init(bbq_mpmc_block_t *blk, vsize_t idx, + vuint16_t block_size); + +/** + * Enqueues one or more entries. + * + * Multiple entries can be enqueued if `src` points to an array. Use `count` to + * indicate how many entries should be enqueueed, starting from `src`. + * + * @param q pointer to bbq_mpmc data structure + * @param src pointer to first entry + * @param count number of entries to enqueue + * @param wait should wait for space to be available + * + * @return number of enqueued entries + */ +static inline vuint32_t +bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* the following is equivalent to + * while(bbq_mpmc_enqueue_internal(q, &rest_buf, &rest) || (wait && rest)); + */ + do { + retry = _bbq_mpmc_enqueue(q, &rest_buf, &rest); + + /* Help model checker in case the queue is empty + * The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) + retry = _bbq_mpmc_enqueue(q, &rest_buf, &rest); + } while (retry); + + return count - rest; +} + +/** + * Dequeues one or more entries. + * + * Multiple entries can be dequeued if `src` points to an array. Use `count` to + * indicate how many entries should be dequeued. + * + * @param q pointer to bbq_mpmc data structure + * @param src pointer to preallocated memory for the first entry + * @param count number of entries to dequeue + * @param wait should wait for entries to be available + * + * @return number of dequeued entries + */ +static inline vuint32_t +bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* the following is equivalent to + * while(bbq_mpmc_dequeue_internal(q, &rest_buf, &rest) || (wait && rest)); + */ + do { + retry = _bbq_mpmc_dequeue(q, &rest_buf, &rest); + + /* Help model checker in case the queue is empty + * The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) + retry = _bbq_mpmc_dequeue(q, &rest_buf, &rest); + } while (retry); + + return count - rest; +} + +/** + * Calculates the size of the bbq queue. + * + * @param capacity maximum number of entries that can fit in the queue + * @return size to be allocated in bytes + */ +static inline vsize_t +bbq_mpmc_memsize(vsize_t capacity) +{ + vsize_t cnt_each_blk = (capacity) >> BBQ_BLOCK_NUM_LOG; + if (cnt_each_blk == 0) { + cnt_each_blk = 1; + } + vsize_t mem_each_blk = + sizeof(bbq_mpmc_block_t) + (cnt_each_blk << BBQ_ENTRY_SIZE_LOG); + vsize_t mem_each_blk_log = + v_pow2_round_down(mem_each_blk * 2 - 1); /* align up */ + vsize_t mem_buf = + sizeof(bbq_mpmc_t) + (mem_each_blk_log << BBQ_BLOCK_NUM_LOG); + return mem_buf; +} +/** + * Initializes a bbq data structure. + * + * @param buf pointer to bbq data structure + * @param size number of bytes allocated for bbq data structure + * @return true initialization succeeded. + * @return false initialization failed. + */ +static inline vbool_t +bbq_mpmc_init(bbq_mpmc_t *rb, vsize_t size) +{ + if (unlikely(rb == NULL) || unlikely(BBQ_ENTRY_SIZE < BBQ_MIN_ENTRY_SIZE) || + unlikely(BBQ_ENTRY_SIZE > BBQ_MAX_ENTRY_SIZE) || + unlikely(BBQ_BLOCK_NUM_LOG < BBQ_MIN_BLOCK_NUM_LOG) || + unlikely(BBQ_BLOCK_NUM_LOG > BBQ_MAX_BLOCK_NUM_LOG)) { + return false; + } + + vsize_t blks_total_size = (size) - sizeof(bbq_mpmc_t); + vsize_t blk_size = v_pow2_round_down(blks_total_size >> BBQ_BLOCK_NUM_LOG); + if (unlikely(blk_size <= BBQ_BLOCK_INIT_VALUE(bbq_mpmc_block_t))) { + return false; + } + vsize_t blk_size_log = v_log2(blk_size); + BBQ_BLK_SZ_VERIFICATION(mpmc); + if (unlikely(blk_size_log < BBQ_MIN_BLOCK_SIZE_LOG) || + unlikely(blk_size_log >= BBQ_MAX_BLOCK_SIZE_LOG)) { + return false; + } + (rb)->config.blk_size = blk_size; + (rb)->config.blk_size_log = blk_size_log; + BBQ_MPMC_WRITE_PROD((rb)->widx, 0); + BBQ_MPMC_WRITE_CONS((rb)->ridx, 0); + for (vsize_t i = 0; i < (1UL << BBQ_BLOCK_NUM_LOG); i++) { + _bbq_mpmc_block_init( + (bbq_mpmc_block_t *)((rb)->blk + (i << blk_size_log)), i, blk_size); + } + return true; +} + +static inline void +_bbq_mpmc_block_init(bbq_mpmc_block_t *blk, vsize_t idx, vuint16_t block_size) +{ + /* if it is not the first block, set to invalid state */ + vuint16_t init_value = likely(idx) ? block_size : BBQ_BLOCK_MPMC_INIT_VALUE; + vatomic64_write(&blk->allocated, init_value); + vatomic64_write(&blk->committed, init_value); + vatomic64_write(&blk->reserved, init_value); + vatomic64_write(&blk->consumed, init_value); +} + +/* return means retry */ +static inline vbool_t +_bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + /* get the address of the alloc block */ + vuint64_t widx = vatomic64_read(&q->widx); + vuint16_t block_idx = BBQ_GLOBAL_IDX(widx); + bbq_mpmc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* precheck once */ + vuint16_t block_size = q->config.blk_size; + vuint64_t allocated = vatomic64_read(&blk->allocated); + vuint64_t allocated_space = BBQ_LOCAL_IDX(allocated); + vsize_t entry_total_size = (*count) << BBQ_ENTRY_SIZE_LOG; + /* if out of bound, we don't add the space, but help to move the block */ + if (likely(allocated_space < block_size)) { + /* update the allocated index using FAA */ + vuint64_t old_allocated = + vatomic64_get_add(&blk->allocated, entry_total_size); + /* we have some space */ + vuint64_t old_local_space = BBQ_LOCAL_IDX(old_allocated); + if (likely(old_local_space < block_size)) { + vuint16_t space = + VMIN(entry_total_size, block_size - old_local_space); + void *entry = BBQ_GET_ENTRY(blk, old_local_space); + int r = memcpy_s(entry, space, *buf, space); + BUG_ON(r != 0); + vatomic64_add(&blk->committed, space); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + } + /* slow path, all writers help to move to next block */ + bbq_mpmc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(widx); + if (unlikely( + !BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(nblk, block_size, global_vsn))) { + return false; + } + /* reset cursor and advance block */ + bbq_reset_block_cursor_heavy(&nblk->committed, global_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + bbq_reset_block_cursor_heavy(&nblk->allocated, global_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + BBQ_ADVANCE_HEAD(&q->widx, widx, widx + 1); + return true; +} +/* return means retry */ +static inline vbool_t +_bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + /* get the address of the occupy block */ + vuint64_t ridx = vatomic64_read(&q->ridx); + vuint16_t block_idx = BBQ_GLOBAL_IDX(ridx); + bbq_mpmc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* check if the block is fully reserved */ + vuint16_t block_size = q->config.blk_size; + vuint64_t reserved = vatomic64_read(&blk->reserved); + vuint64_t reserved_space = BBQ_LOCAL_IDX(reserved); + if (likely(reserved_space < block_size)) { + vuint64_t committed = vatomic64_read(&blk->committed); + /* check if we have an entry to occupy */ + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + if (unlikely(reserved_space >= committed_space)) { + /* Note: the version is strictly monotic and may not wrap. */ + ASSERT(reserved <= committed && "reserved must be <= committed"); + return false; + } + vuint16_t entry_total_size = VMIN((*count) << BBQ_ENTRY_SIZE_LOG, + committed_space - reserved_space); + if (unlikely(committed_space != block_size)) { + vuint64_t allocated = vatomic64_read(&blk->allocated); + vuint64_t allocated_space = BBQ_LOCAL_IDX(allocated); + if (likely(allocated_space != committed_space)) { + return false; + } + } + if (vatomic64_cmpxchg(&blk->reserved, reserved, + reserved + entry_total_size) != reserved) { + return true; + } + /* we got the entry */ + void *entry = BBQ_GET_ENTRY(blk, BBQ_LOCAL_IDX(reserved)); + int r = memcpy_s(*buf, entry_total_size, entry, entry_total_size); + BUG_ON(r != 0); + /* consume after copy the data back */ + vatomic64_add(&blk->consumed, entry_total_size); + vuint16_t offset = entry_total_size >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + /* need to advance the block */ + bbq_mpmc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + /* r_head never pass the w_head and r_tail */ + vuint64_t next_consumer_vsn = BBQ_LOCAL_VSN(reserved) - (block_idx != 0); + vuint64_t next_producer_vsn = + BBQ_LOCAL_VSN(vatomic64_read(&nblk->committed)); + if (next_producer_vsn != next_consumer_vsn + 1) { + return false; + } + /* reset the cursor */ + bbq_reset_block_cursor_heavy(&nblk->consumed, next_consumer_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + bbq_reset_block_cursor_heavy(&nblk->reserved, next_consumer_vsn + 1, + BBQ_BLOCK_MPMC_INIT_VALUE); + BBQ_ADVANCE_HEAD(&q->ridx, ridx, ridx + 1); + return true; +} +#endif diff --git a/include/vsync/queue/bbq_spsc.h b/include/vsync/queue/bbq_spsc.h new file mode 100644 index 00000000..3a68eb1a --- /dev/null +++ b/include/vsync/queue/bbq_spsc.h @@ -0,0 +1,378 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_SPSC_H +#define VSYNC_BBQ_SPSC_H + +/******************************************************************************* + * @file bbq_spsc.h + * @brief Block-based Bounded Queue single-producer/single-consumer + * @ingroup linearizable lock_free + * + * A highly performant bounded queue that splits the buffer in multiple blocks. + * + * + * ### Remarks: + * + * In this implementations, values have a fixed size equal to pointer size. + * This implementation does not support `DROP_OLD` mode as described in the + * original paper. + * + * @cite [BBQ: A Block-based Bounded Queue for Exchanging Data and + * Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + * + * @example + * ### Multi-threaded + * @include eg_bbq_spsc.c + * ### Multi-process + * @include eg_bbq_spsc_m_proc.c + * + ******************************************************************************/ + +#include +#include +#include +#include +#include +#include + +/** + * @def BBQ_BLOCK_NUM_LOG + * + * Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` + * to define the total number of blocks equals to `2^N` + * + * @note default value is `3U` -> (`8` blocks) + * + */ +#ifndef BBQ_BLOCK_NUM_LOG + #define BBQ_BLOCK_NUM_LOG 3U +#endif + +/** + * @def BBQ_ENTRY_SIZE_LOG + * + * Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to + * define an entry size equals to `2^N` + * + * @note default value is `log2(sizeof(vuintptr_t))` + * + */ +#ifndef BBQ_ENTRY_SIZE_LOG + #define BBQ_ENTRY_SIZE_LOG v_log2(sizeof(vuintptr_t)) +#endif + +#include + +typedef struct bbq_spsc_block_s { + vatomic64_t committed VSYNC_CACHEALIGN; + vatomic64_t consumed VSYNC_CACHEALIGN; + vuint8_t entry[] VSYNC_CACHEALIGN; +} bbq_spsc_block_t; + +typedef struct bbq_spsc_s { + bbq_config_t config VSYNC_CACHEALIGN; + vuint64_t widx VSYNC_CACHEALIGN; + vuint64_t ridx VSYNC_CACHEALIGN; + vuint8_t blk[] VSYNC_CACHEALIGN; +} bbq_spsc_t; + +/* block cursor init value */ +#define BBQ_BLOCK_SPSC_INIT_VALUE BBQ_BLOCK_INIT_VALUE(struct bbq_spsc_block_s) +/* Note:The following macros are used inside bbq/common.h in BBQ_COUNT + * definition */ +#define BBQ_SPSC_WRITE_PROD(k, v) ((k) = v) +#define BBQ_SPSC_WRITE_CONS(k, v) ((k) = v) +#define BBQ_SPSC_READ_PROD(k) (k) +#define BBQ_SPSC_READ_CONS(k) (k) +#define BBQ_SPSC_COUNT(q) BBQ_COUNT(q, spsc, SPSC) + +/* prototypes of internal functions */ +static inline vbool_t _bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline vbool_t _bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t **buf, + vuint32_t *count); +static inline void _bbq_spsc_block_init(bbq_spsc_block_t *blk, vsize_t idx, + vuint16_t block_size); +/** + * Enqueues one or more entries. + * + * Multiple entries can be enqueued if `src` points to an array. Use `count` to + * indicate how many entries should be enqueueed, starting from `src`. + * + * @param q address of bbq_spsc_t object. + * @param buf address of the first entry. + * @param count number of entries to enqueue. + * @param wait true/false when set to true it waits (blocks) till space becomes + * available. Otherwise, it quits retrying. + * + * @return number of enqueued entries. + */ +static inline vuint32_t +bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* The following two are equivalent. + * Which one is better depends on the model checker. + */ +#if defined(VSYNC_VERIFICATION_DAT3M) + while (_bbq_spsc_enqueue(q, &rest_buf, &rest) || (wait && rest)) + ; +#else + do { + retry = _bbq_spsc_enqueue(q, &rest_buf, &rest); + + /* The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) { + retry = _bbq_spsc_enqueue(q, &rest_buf, &rest); + } + } while (retry); +#endif + + return count - rest; +} + +/** + * Dequeues one or more entries. + * + * Multiple entries can be dequeued if `buf` points to an array. Use `count` to + * indicate how many entries should be dequeued. + * + * @param q address of bbq_spsc_t object. + * @param buf address of the first entry of the preallocated memory. + * @param count number of entries to dequeue. + * @param wait true/false. When set to true the API waits/blocks for entries to + * be available + * + * @return number of dequeued entries. + */ +static inline vuint32_t +bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +{ + vuint32_t rest = count; + vuintptr_t *rest_buf = buf; + vbool_t retry; + + /* The following two are equivalent. + * Which one is better depends on the model checker. + */ +#if defined(VSYNC_VERIFICATION_DAT3M) + while (_bbq_spsc_dequeue(q, &rest_buf, &rest) || (wait && rest)) + ; +#else + do { + retry = _bbq_spsc_dequeue(q, &rest_buf, &rest); + + /* The condition to leave the loop is + * retry' == true or rest' == 0 + * rest' == 0 => rest # 0 /\ retry' = true + * + * It is sufficient to observe retry only + * + */ + await_while (!retry && wait && rest) { + retry = _bbq_spsc_dequeue(q, &rest_buf, &rest); + } + } while (retry); +#endif + + return count - rest; +} +/** + * Calculates the size of bbq_spsc_t object based on the given capacity. + * + * @param capacity maximum number of entries that can fit in the queue. + * @return size of bbq_spsc_t object with the given capacity. + */ +static inline vsize_t +bbq_spsc_memsize(vsize_t capacity) +{ + vsize_t cnt_each_blk = (capacity) >> BBQ_BLOCK_NUM_LOG; + if (cnt_each_blk == 0) { + cnt_each_blk = 1; + } + vsize_t mem_each_blk = + sizeof(bbq_spsc_block_t) + (cnt_each_blk << BBQ_ENTRY_SIZE_LOG); + + vsize_t mem_each_blk_log = + v_pow2_round_down(mem_each_blk * 2 - 1); /* align up */ + + vsize_t mem_buf = + sizeof(bbq_spsc_t) + (mem_each_blk_log << BBQ_BLOCK_NUM_LOG); + + return mem_buf; +} + +/** + * Initializes a bbq data structure. + * + * @param rb address of bbq_spsc_t object. + * @param size size of the given bbq_spsc_t object `rb`. + * + * @return true initialization succeeded. + * @return false initialization failed. + */ +static inline vbool_t +bbq_spsc_init(bbq_spsc_t *rb, vsize_t size) +{ + // we shift vuint16_t by BBQ_ENTRY_SIZE_LOG, we need to make sure the + // behavior is defined + ASSERT( + BBQ_ENTRY_SIZE_LOG < 16U && + "must have width less than vuint16_t because to be able to shift it"); + + if (unlikely(rb == NULL) || unlikely(BBQ_ENTRY_SIZE < BBQ_MIN_ENTRY_SIZE) || + unlikely(BBQ_ENTRY_SIZE > BBQ_MAX_ENTRY_SIZE) || + unlikely(BBQ_BLOCK_NUM_LOG < BBQ_MIN_BLOCK_NUM_LOG) || + unlikely(BBQ_BLOCK_NUM_LOG > BBQ_MAX_BLOCK_NUM_LOG)) { + return false; + } + + vsize_t blks_total_size = (size) - sizeof(bbq_spsc_t); + vsize_t blk_size = v_pow2_round_down(blks_total_size >> BBQ_BLOCK_NUM_LOG); + if (unlikely(blk_size <= BBQ_BLOCK_INIT_VALUE(struct bbq_spsc_block_s))) { + return false; + } + vsize_t blk_size_log = v_log2(blk_size); + BBQ_BLK_SZ_VERIFICATION(spsc); + if (unlikely(blk_size_log < BBQ_MIN_BLOCK_SIZE_LOG) || + unlikely(blk_size_log >= BBQ_MAX_BLOCK_SIZE_LOG)) { + return false; + } + (rb)->config.blk_size = blk_size; + (rb)->config.blk_size_log = blk_size_log; + BBQ_SPSC_WRITE_PROD((rb)->widx, 0); + BBQ_SPSC_WRITE_CONS((rb)->ridx, 0); + + for (vsize_t i = 0; i < (1UL << BBQ_BLOCK_NUM_LOG); i++) { + _bbq_spsc_block_init( + (bbq_spsc_block_t *)((rb)->blk + (i << blk_size_log)), i, blk_size); + } + return true; +} + +static inline void +_bbq_spsc_block_init(bbq_spsc_block_t *blk, vsize_t idx, vuint16_t block_size) +{ + /* if it is not the first block, set to invalid state */ + vuint16_t init_value = likely(idx) ? block_size : BBQ_BLOCK_SPSC_INIT_VALUE; + vatomic64_write(&blk->committed, init_value); + vatomic64_write(&blk->consumed, init_value); +} + +/* return means retry */ +static inline vbool_t +_bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + + /* get the address of the alloc block */ + vuint64_t widx = q->widx; + vuint16_t block_idx = BBQ_GLOBAL_IDX(widx); + bbq_spsc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + /* precheck once */ + vuint16_t block_size = q->config.blk_size; + vuint64_t committed = vatomic64_read_rlx(&blk->committed); + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + vsize_t entry_total_size = (*count) << BBQ_ENTRY_SIZE_LOG; + /* if out of bound, we don't add the space, but help to move the block */ + if (likely(committed_space < block_size)) { + vuint16_t space = VMIN(entry_total_size, block_size - committed_space); + void *entry = BBQ_GET_ENTRY(blk, committed_space); + int r = memcpy_s(entry, space, *buf, space); + BUG_ON(r != 0); + vuint64_t new_committed = BBQ_LOCAL_COMPOSE(BBQ_LOCAL_VSN(committed), + committed_space + space); + vatomic64_write_rel(&blk->committed, new_committed); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + + /* slow path, all writers help to move to next block */ + bbq_spsc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(widx); + if (unlikely( + !BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(nblk, block_size, global_vsn))) { + return false; + } + + /* reset cursor and advance block */ + BBQ_RESET_BLOCK_CURSOR_LIGHT(&nblk->committed, global_vsn + 1, + BBQ_BLOCK_SPSC_INIT_VALUE); + q->widx++; + return true; +} + +/* return means retry */ +static inline vbool_t +_bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t **buf, vuint32_t *count) +{ + if (*count == 0) { + return false; + } + + /* get the address of the occupy block */ + vuint64_t ridx = q->ridx; + vuint16_t block_idx = BBQ_GLOBAL_IDX(ridx); + bbq_spsc_block_t *blk = BBQ_GET_BLOCK(q, block_idx); + + /* check if the block is fully reserved */ + vuint16_t block_size = q->config.blk_size; + vuint64_t consumed = vatomic64_read_rlx(&blk->consumed); + vuint64_t consumed_space = BBQ_LOCAL_IDX(consumed); + if (likely(consumed_space < block_size)) { + vuint64_t committed = vatomic64_read_acq(&blk->committed); + /* check if we have an entry to occupy */ + vuint64_t committed_space = BBQ_LOCAL_IDX(committed); + if (unlikely(consumed_space >= committed_space)) { + ASSERT(consumed_space == committed_space && + "Consumed should be <= committed"); + return false; + } + + /* we got the entry */ + vuint16_t space = VMIN((*count) << BBQ_ENTRY_SIZE_LOG, + committed_space - consumed_space); + void *entry = BBQ_GET_ENTRY(blk, consumed_space); + int r = memcpy_s(*buf, space, entry, space); + BUG_ON(r != 0); + vuint64_t new_consumed = consumed + space; + vatomic64_write_rel(&blk->consumed, new_consumed); + vuint16_t offset = space >> BBQ_ENTRY_SIZE_LOG; + *buf += offset; + *count -= offset; + return true; + } + + /* need to advance the block */ + bbq_spsc_block_t *nblk = BBQ_GET_NEXT_BLOCK(q, block_idx); + vuint64_t global_vsn = BBQ_GLOBAL_VSN(ridx); + + /* r_head never pass the w_head and r_tail */ + vuint64_t next_block_vsn = + BBQ_LOCAL_VSN(vatomic64_read_rlx(&nblk->committed)); + if (unlikely(next_block_vsn != global_vsn + 1)) { + return false; + } + /* reset the cursor */ + BBQ_RESET_BLOCK_CURSOR_LIGHT(&nblk->consumed, global_vsn + 1, + BBQ_BLOCK_SPSC_INIT_VALUE); + q->ridx++; + return true; +} +#endif diff --git a/include/vsync/queue/internal/bbq/common.h b/include/vsync/queue/internal/bbq/common.h new file mode 100644 index 00000000..d96ac45d --- /dev/null +++ b/include/vsync/queue/internal/bbq/common.h @@ -0,0 +1,118 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_COMMON_H +#define VSYNC_BBQ_COMMON_H + +#include +#include +#include +#include + +/* utils */ + +/* size related */ +/* mininum and maxinum entry size, block size and block number */ +#define BBQ_ENTRY_SIZE (1ULL << BBQ_ENTRY_SIZE_LOG) +#define BBQ_MIN_BLOCK_NUM_LOG 0U +#define BBQ_MAX_BLOCK_NUM_LOG 16U +#define BBQ_MIN_ENTRY_SIZE 4U +#define BBQ_MAX_ENTRY_SIZE (1ULL << (BBQ_MAX_BLOCK_SIZE_LOG - 1)) +#define BBQ_MIN_BLOCK_SIZE_LOG 4U +#define BBQ_MAX_BLOCK_SIZE_LOG 16U + +/* offset and mask */ +/* global var related */ +#define BBQ_GLOBA_VERSION_BIT 44U /* 64 - 20 */ +#define BBQ_GLOBA_VERSION_MASK ((1ULL << BBQ_GLOBA_VERSION_BIT) - 1) +#define BBQ_GLOBAL_IDX(v) ((v) & ((1ULL << BBQ_BLOCK_NUM_LOG) - 1)) +#define BBQ_GLOBAL_VSN(v) (((v) >> BBQ_BLOCK_NUM_LOG) & BBQ_GLOBA_VERSION_MASK) +#define BBQ_GLOBAL_COMPOSE(h, l) (((h) << BBQ_BLOCK_NUM_LOG) | (l)) + +/* local var related */ +#define BBQ_LOCAL_SPACE_BIT 20U /* >16 to provent the FAA overflow */ +#define BBQ_LOCAL_SPACE_MASK ((1ULL << BBQ_LOCAL_SPACE_BIT) - 1) +#define BBQ_LOCAL_IDX(v) ((v)&BBQ_LOCAL_SPACE_MASK) +#define BBQ_LOCAL_VSN(v) ((v) >> BBQ_LOCAL_SPACE_BIT) +#define BBQ_LOCAL_COMPOSE(h, l) (((h) << BBQ_LOCAL_SPACE_BIT) | (l)) + +typedef struct bbq_config_s { + vuint16_t blk_size_log; /* total size of each block (in log) */ + vuint16_t blk_size; /* total size of each block */ +} bbq_config_t; + +#define BBQ_BLOCK_INIT_VALUE(S) \ + ((sizeof(S) / BBQ_ENTRY_SIZE + 1) * BBQ_ENTRY_SIZE) + +#ifdef VSYNC_VERIFICATION +static vuint32_t g_verify_bbq_count = 0; + #define BBQ_VERIFY_BLK_COUNT(count) g_verify_bbq_count = (count) + #define BBQ_BLK_SZ_VERIFICATION(name) \ + do { \ + ASSERT(g_verify_bbq_count != 0 && "must set BBQ_COUNT"); \ + vuint32_t count_per_blk = \ + (g_verify_bbq_count) >> BBQ_BLOCK_NUM_LOG; \ + blk_size = BBQ_BLOCK_INIT_VALUE(struct bbq_##name##_block_s) + \ + (count_per_blk << BBQ_ENTRY_SIZE_LOG); \ + } while (0) +#else + #define BBQ_VERIFY_BLK_COUNT(count) \ + do { \ + } while (0) + #define BBQ_BLK_SZ_VERIFICATION(name) \ + do { \ + } while (0) +#endif + +#define BBQ_GET_BLOCK(rb, idx) \ + (void *)((rb)->blk + ((idx) << (rb)->config.blk_size_log)) +#define BBQ_GET_NEXT_BLOCK(rb, idx) \ + (BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX((idx) + 1))) +#define BBQ_GET_ENTRY(blk, offset) \ + ((void *)(((vuintptr_t)(blk)) + (vuintptr_t)(offset))) + +#define BBQ_BLOCK_FULLY_CONSUMED_WITH_VSN(blk, sz, vsn) \ + ({ \ + vuint64_t consumed = vatomic64_read_acq(&(blk)->consumed); \ + (BBQ_LOCAL_IDX(consumed) == (sz) && \ + BBQ_LOCAL_VSN(consumed) == (vsn)) || \ + BBQ_LOCAL_VSN(consumed) > (vsn); \ + }) + +#define BBQ_RESET_BLOCK_CURSOR_LIGHT(v, new_vsn, init_v) \ + vatomic64_write_rlx(v, BBQ_LOCAL_COMPOSE(new_vsn, init_v)) + +static inline void +bbq_reset_block_cursor_heavy(vatomic64_t *v, vuint64_t new_vsn, + vuintptr_t init_v) +{ + vuint64_t new_cursor = BBQ_LOCAL_COMPOSE(new_vsn, init_v); + vatomic64_max(v, new_cursor); +} + +#define BBQ_ADVANCE_HEAD(v, old, new) vatomic64_max(v, new) + +#define BBQ_COUNT(rb, name, name_uc) \ + ({ \ + vuint64_t ridx = BBQ_##name_uc##_READ_CONS((rb)->ridx); \ + struct bbq_##name##_block_s *rblk = \ + BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX(ridx)); \ + vuintptr_t consumed_index = \ + BBQ_LOCAL_IDX(vatomic64_read_rlx(&rblk->consumed)); \ + \ + vuint64_t widx = BBQ_##name_uc##_READ_PROD((rb)->widx); \ + struct bbq_##name##_block_s *wblk = \ + BBQ_GET_BLOCK(rb, BBQ_GLOBAL_IDX(widx)); \ + vuintptr_t committed_index = \ + BBQ_LOCAL_IDX(vatomic64_read_rlx(&wblk->committed)); \ + \ + vuintptr_t block_size = (rb)->config.blk_size; \ + ((((widx - ridx) * \ + (block_size - BBQ_BLOCK_INIT_VALUE(struct bbq_##name##_block_s))) + \ + (committed_index - consumed_index)) >> \ + BBQ_ENTRY_SIZE_LOG); \ + }) + +#endif /* VSYNC_BBQ_COMMON_H */ diff --git a/test/include/test/bbq/bbq_test_helper.h b/test/include/test/bbq/bbq_test_helper.h new file mode 100644 index 00000000..b6e64446 --- /dev/null +++ b/test/include/test/bbq/bbq_test_helper.h @@ -0,0 +1,29 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_TEST_HELPER +#define VSYNC_BBQ_TEST_HELPER + +#if defined(VSYNC_VERIFICATION) + + #include + +/* TODO: linters will complain about this even if VSYNC_VERIFICATION is not + * defined! */ + +static inline int +memcpy_s(void *dst, vsize_t dstsz, void *src, vsize_t srcsz) +{ + ASSERT(srcsz <= dstsz); + vuintptr_t *d = (vuintptr_t *)dst; + vuintptr_t *s = (vuintptr_t *)src; + for (vuint64_t i = 0; i < dstsz / sizeof(vuintptr_t); i++) { + d[i] = s[i]; + } + return 0; +} +#endif + +#endif diff --git a/test/include/test/bbq/debug.h b/test/include/test/bbq/debug.h new file mode 100644 index 00000000..e9c04999 --- /dev/null +++ b/test/include/test/bbq/debug.h @@ -0,0 +1,190 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#ifndef VSYNC_BBQ_DEBUG_H +#define VSYNC_BBQ_DEBUG_H + +/* debug function */ +static inline void +bbq_spsc_debug(struct bbq_spsc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_SPSC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(rb->widx); + vuint64_t write_version = BBQ_GLOBAL_VSN(rb->widx); + vuint64_t read_index = BBQ_GLOBAL_IDX(rb->ridx); + vuint64_t read_version = BBQ_GLOBAL_VSN(rb->ridx); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_SPSC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_spsc_block_s *blk = (struct bbq_spsc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("committed: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", committed_version, committed_space); + printf("consumed: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_mpsc_debug(struct bbq_mpsc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_MPSC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->widx)); + vuint64_t write_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->widx)); + vuint64_t read_index = BBQ_GLOBAL_IDX(rb->ridx); + vuint64_t read_version = BBQ_GLOBAL_VSN(rb->ridx); + + DBG("------------------------------------------------------------------------\n"); + DBG("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + DBG("Block number: %" VUINT16_FORMAT "\n", block_number); + DBG("Block init value: %" VUINT16_FORMAT "\n", BBQ_BLOCK_MPSC_INIT_VALUE); + DBG("Block size: %" VUINT16_FORMAT "\n", block_size); + DBG("Entry size: %" VUINT64_FORMAT "\n", (vuint64_t) BBQ_ENTRY_SIZE); + DBG("Data space: %" VUINT16_FORMAT "\n", data_space); + DBG("widx: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", write_version, write_index); + DBG("ridx: (%" VUINT64_FORMAT ",\t%" VUINT64_FORMAT ")\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_mpsc_block_s *blk = (struct bbq_mpsc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t allocated_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->allocated)); + vuint64_t allocated_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->allocated)); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("allocated: (%ld,\t%ld)\n", allocated_version, allocated_space); + printf("committed: (%ld,\t%ld)\n", committed_version, committed_space); + printf("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_spmc_debug(struct bbq_spmc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_SPMC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(rb->widx); + vuint64_t write_version = BBQ_GLOBAL_VSN(rb->widx); + vuint64_t read_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->ridx)); + vuint64_t read_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->ridx)); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_SPMC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_spmc_block_s *blk = (struct bbq_spmc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t reserved_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->reserved)); + vuint64_t reserved_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->reserved)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + printf("committed: (%ld,\t%ld)\n", committed_version, committed_space); + printf("reserved: (%ld,\t%ld)\n", reserved_version, reserved_space); + printf("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +static inline void +bbq_mpmc_debug(struct bbq_mpmc_s *rb) +{ + (void)rb; +#if 0 + vuint16_t block_number = 1U << BBQ_BLOCK_NUM_LOG; + vuint16_t block_size = rb->config.blk_size; + vuint16_t data_space = block_size - BBQ_BLOCK_MPMC_INIT_VALUE; + vuint64_t write_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->widx)); + vuint64_t write_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->widx)); + vuint64_t read_index = BBQ_GLOBAL_IDX(vatomic64_read(&rb->ridx)); + vuint64_t read_version = BBQ_GLOBAL_VSN(vatomic64_read(&rb->ridx)); + + printf("------------------------------------------------------------------------\n"); + printf("Drop old mode: %d\n", BBQ_MODE == BBQ_DROP_OLD); + printf("Block number: %d\n", block_number); + printf("Block init value: %ld\n", BBQ_BLOCK_MPMC_INIT_VALUE); + printf("Block size: %d\n", block_size); + printf("Entry size: %ld\n", BBQ_ENTRY_SIZE); + printf("Data space: %d\n", data_space); + printf("widx: (%ld,\t%ld)\n", write_version, write_index); + printf("ridx: (%ld,\t%ld)\n", read_version, read_index); + + for (int i = 0; i < block_number; i++) { + printf("------------------------------------------------\n"); + + struct bbq_mpmc_block_s *blk = (struct bbq_mpmc_block_s *) (rb->blk + i * block_size); + printf("[Block %d] %p \n", i, (void *) blk); + + vuint64_t allocated_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->allocated)); + vuint64_t allocated_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->allocated)); + + vuint64_t committed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->committed)); + vuint64_t committed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->committed)); + + vuint64_t reserved_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->reserved)); + vuint64_t reserved_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->reserved)); + + vuint64_t consumed_space = BBQ_LOCAL_IDX(vatomic64_read(&blk->consumed)); + vuint64_t consumed_version = BBQ_LOCAL_VSN(vatomic64_read(&blk->consumed)); + + DBG("allocated: (%ld,\t%ld)\n", allocated_version, allocated_space); + DBG("committed: (%ld,\t%ld)\n", committed_version, committed_space); + DBG("reserved: (%ld,\t%ld)\n", reserved_version, reserved_space); + DBG("consumed: (%ld,\t%ld)\n", consumed_version, consumed_space); + } +#endif +} + +#endif diff --git a/test/queue/bbq_mpmc.c b/test/queue/bbq_mpmc.c new file mode 100644 index 00000000..319fb4ac --- /dev/null +++ b/test/queue/bbq_mpmc.c @@ -0,0 +1,102 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM 2000000UL +#define NUM_WRITER 4 +#define NUM_READER 5 + +#define BUFFER_ENTRY_NUM 4096 +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#include + +struct bbq_mpmc_s *rb; + +void * +writer(void *arg) +{ + vuint64_t *id = (vuint64_t *)arg; + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM * NUM_READER; + while (rest) { + vuint32_t count = VMIN(rest, ENQUEUE_BATCH); + ASSERT(count <= ENQUEUE_BATCH); + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ((*id) << 30) | (ptr++); + } + rest -= bbq_mpmc_enqueue(rb, buf, count, true); + } + free(arg); + return NULL; +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t last[NUM_WRITER] = {0}; + vuint64_t rest = NUM * NUM_WRITER; + while (rest) { + vuint32_t count = VMIN(rest, DEQUEUE_BATCH); + count = bbq_mpmc_dequeue(rb, buf, count, false); + for (vuint32_t i = 0; i < count; i++) { + vuint64_t id = buf[i] >> 30; + vuint64_t data = buf[i] & ((1ULL << 30) - 1); + ASSERT(id < NUM_WRITER); + ASSERT(last[id] <= data); + last[id] = data; + } + rest -= count; + } + return NULL; +} + +int +main(void) +{ + vsize_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + rb = (bbq_mpmc_t *)malloc(sz); + if (rb == NULL) { + perror("fail to create the ring buffer"); + abort(); + } + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + vbool_t success = bbq_mpmc_init(rb, sz); + ASSERT(success); + ASSERT(BBQ_MPMC_COUNT(rb) == 0); + + pthread_t t1[NUM_WRITER], t2[NUM_READER]; + for (vsize_t i = 0; i < NUM_WRITER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t1[i], NULL, writer, arg); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_create(&t2[i], NULL, reader, NULL); + } + + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_join(t1[i], NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_join(t2[i], NULL); + } + + ASSERT(BBQ_MPMC_COUNT(rb) == 0); + free(rb); + return 0; +} diff --git a/test/queue/bbq_spsc.c b/test/queue/bbq_spsc.c new file mode 100644 index 00000000..fee9e673 --- /dev/null +++ b/test/queue/bbq_spsc.c @@ -0,0 +1,85 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + + +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM 200000000UL +#define BUFFER_ENTRY_NUM 4096 +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#include + +bbq_spsc_t *rb; + +void * +writer(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + while (rest) { + vuint32_t count = VMIN(rest, ENQUEUE_BATCH); + ASSERT(count <= ENQUEUE_BATCH); + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ptr++; + } + rest -= bbq_spsc_enqueue(rb, buf, count, true); + } + return NULL; +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + while (rest) { + vuint32_t count = bbq_spsc_dequeue(rb, buf, DEQUEUE_BATCH, false); + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] == ptr++); + } + rest -= count; + } + return NULL; +} + +int +main(void) +{ + vuint32_t sz = bbq_spsc_memsize(BUFFER_ENTRY_NUM); + rb = (struct bbq_spsc_s *)malloc(sz); + if (rb == NULL) { + perror("fail to create the ring buffer"); + abort(); + } + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + + vbool_t success = bbq_spsc_init(rb, sz); + ASSERT(success); + ASSERT(BBQ_SPSC_COUNT(rb) == 0); + + pthread_t t1, t2; + pthread_create(&t1, NULL, writer, NULL); + pthread_create(&t2, NULL, reader, NULL); + + pthread_join(t1, NULL); + pthread_join(t2, NULL); + + ASSERT(BBQ_SPSC_COUNT(rb) == 0); + free(rb); + return 0; +} diff --git a/verify/CMakeLists.txt b/verify/CMakeLists.txt index 530ee012..17641588 100644 --- a/verify/CMakeLists.txt +++ b/verify/CMakeLists.txt @@ -1,3 +1,4 @@ +include_directories(../test/include) target_include_directories( vsync INTERFACE $ $) diff --git a/verify/bbq/CMakeLists.txt b/verify/bbq/CMakeLists.txt new file mode 100755 index 00000000..cb7a3ea6 --- /dev/null +++ b/verify/bbq/CMakeLists.txt @@ -0,0 +1,31 @@ +file(GLOB SRCS *.c) +foreach(SRC ${SRCS}) + get_filename_component(TEST ${SRC} NAME_WE) + + add_executable(${TEST}-test ${SRC}) + target_link_libraries(${TEST}-test vsync pthread) + v_add_bin_test(NAME ${TEST}-test COMMAND ${TEST}-test) + + add_vsyncer_check( + TARGET check_${TEST} + SOURCE ${SRC} + TIMEOUT 1200 + DEPENDENCIES vsync) +endforeach() + +# Enable some tests with VMM and set extra options for Dartagnan +set(DAT3M_BOUND_bbq_spsc 2) + +foreach(SRC ${SRCS}) + get_filename_component(TEST ${SRC} NAME_WE) + + if(${DAT3M_BOUND_${TEST}}) + add_vsyncer_check( + TARGET check_${TEST} + SOURCE ${SRC} + MEMORY_MODELS vmm USE_DAT3M DARTAGNAN_OPTIONS + --bound=${DAT3M_BOUND_${TEST}} + TIMEOUT 700 + DEPENDENCIES vsync) + endif() +endforeach() diff --git a/verify/bbq/bbq_mpsc.c b/verify/bbq/bbq_mpsc.c new file mode 100644 index 00000000..9efd6c99 --- /dev/null +++ b/verify/bbq/bbq_mpsc.c @@ -0,0 +1,101 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +/* small enough such that producer may block with queue full */ +#define BBQ_BLOCK_NUM_LOG 1U +#define BUFFER_ENTRY_NUM 2 +#define NUM_WRITER 2 +#define NUM_READER 1 + +#include + +struct bbq_mpmc_s *q; + +static void * +writer(void *arg) +{ + vuint64_t *id = (vuint64_t *)arg; + vuint64_t count = (*id) + 1; + vuintptr_t ptr = 1; + while (count) { + vuint32_t r = bbq_mpmc_enqueue(q, &ptr, 1, true); + if (r) { + count--; + ptr++; + } + } + free(arg); + return NULL; +} + +static void * +reader(void *arg) +{ + vuint64_t count = 0; + vuintptr_t exp = 1; + + while (count < 1) { + vuintptr_t ptr; + vuint32_t r = bbq_mpmc_dequeue(q, &ptr, 1, true); + if (r) { + ASSERT(ptr == 1); + count++; + exp++; + } + } + free(arg); + return NULL; +} + +#define SIZE \ + ((sizeof(struct bbq_mpmc_s)) + \ + (BUFFER_ENTRY_NUM * \ + (sizeof(struct bbq_mpmc_block_s) + sizeof(vuintptr_t)))) + +int +main(void) +{ + ASSERT((SIZE) <= 2048); + + vuint32_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + q = (struct bbq_mpmc_s *)malloc(sz); + ASSERT(q != NULL && "failed to create the ring buffer"); + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + vbool_t success = bbq_mpmc_init(q, sz); + ASSERT(success); + ASSERT(BBQ_MPMC_COUNT(q) == 0); + + pthread_t t1[NUM_WRITER], t2[NUM_READER]; + + for (int i = 0; i < NUM_WRITER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t1[i], NULL, writer, arg); + } + + for (int i = 0; i < NUM_READER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t2[i], NULL, reader, arg); + } + + for (int i = 0; i < NUM_WRITER; i++) + pthread_join(t1[i], NULL); + for (int i = 0; i < NUM_READER; i++) + pthread_join(t2[i], NULL); + + free(q); + + return 0; +} diff --git a/verify/bbq/bbq_spmc.c b/verify/bbq/bbq_spmc.c new file mode 100644 index 00000000..263972d5 --- /dev/null +++ b/verify/bbq/bbq_spmc.c @@ -0,0 +1,94 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include + +/* small enough such that producer may block with queue full */ +#define BBQ_BLOCK_NUM_LOG 1U +#define BUFFER_ENTRY_NUM 2 +#define NUM_WRITER 1 +#define NUM_READER 2 + +#include + +struct bbq_mpmc_s *q; + +static void * +writer(void *arg) +{ + vuint64_t count = 3; + vuintptr_t ptr = 1; + while (count) { + vuint32_t r = bbq_mpmc_enqueue(q, &ptr, 1, true); + if (r) { + count--; + ptr++; + } + } + free(arg); + return NULL; +} + +static void * +reader(void *arg) +{ + vuint64_t count = 1; + while (count) { + vuintptr_t ptr; + vuint32_t r = bbq_mpmc_dequeue(q, &ptr, 1, true); + if (r) { + ASSERT(ptr == 1 || ptr == 2); + count--; + } + } + free(arg); + return NULL; +} + +#define SIZE \ + ((sizeof(struct bbq_mpmc_s)) + \ + (BUFFER_ENTRY_NUM * \ + (sizeof(struct bbq_mpmc_block_s) + sizeof(vuintptr_t)))) + +int +main(void) +{ + ASSERT((SIZE) <= 2048); + vuint32_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + q = (struct bbq_mpmc_s *)malloc(sz); + ASSERT(q != NULL && "failed to create the ring buffer"); + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + vbool_t success = bbq_mpmc_init(q, sz); + ASSERT(success); + ASSERT(BBQ_MPMC_COUNT(q) == 0); + + pthread_t t1[NUM_WRITER], t2[NUM_READER]; + + for (int i = 0; i < NUM_WRITER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t1[i], NULL, writer, arg); + } + + for (int i = 0; i < NUM_READER; i++) { + vuint64_t *arg = malloc(sizeof(*arg)); + *arg = i; + pthread_create(&t2[i], NULL, reader, arg); + } + + for (int i = 0; i < NUM_WRITER; i++) + pthread_join(t1[i], NULL); + for (int i = 0; i < NUM_READER; i++) + pthread_join(t2[i], NULL); + + free(q); + + return 0; +} diff --git a/verify/bbq/bbq_spsc.c b/verify/bbq/bbq_spsc.c new file mode 100644 index 00000000..a6e25b3a --- /dev/null +++ b/verify/bbq/bbq_spsc.c @@ -0,0 +1,82 @@ +/* + * Copyright (C) Huawei Technologies Co., Ltd. 2020-2026. All rights reserved. + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include + +/* small enough such that producer may block with queue full */ +#define BBQ_BLOCK_NUM_LOG 1U +#define BUFFER_ENTRY_NUM 2 + +/* large enough such that blocks are reused at least once */ +#define NUM 5UL + +#include +#include +struct bbq_spsc_s *q; +uintptr_t total_enq = 0; +uintptr_t total_deq = 0; + +static void * +writer(void *arg) +{ + V_UNUSED(arg); + for (vuintptr_t i = 0; i < NUM; i++) { + if (bbq_spsc_enqueue(q, &total_enq, 1, true)) { + total_enq++; + } + } + return NULL; +} + +static void * +reader(void *arg) +{ + V_UNUSED(arg); + vuintptr_t buf; + for (vuintptr_t i = 0; i < NUM; i++) { + if (bbq_spsc_dequeue(q, &buf, 1, true)) { + ASSERT(buf == total_deq); + total_deq++; + } + } + return NULL; +} + +#define SIZE \ + ((sizeof(struct bbq_spsc_s)) + \ + (BUFFER_ENTRY_NUM * \ + (sizeof(struct bbq_spsc_block_s) + sizeof(vuintptr_t)))) + +int +main(void) +{ + ASSERT((SIZE) <= 1024); + + vsize_t sz = bbq_spsc_memsize(BUFFER_ENTRY_NUM); + q = (struct bbq_spsc_s *)malloc(sz); + ASSERT(q != NULL && "failed to create the ring buffer"); + BBQ_VERIFY_BLK_COUNT(BUFFER_ENTRY_NUM); + vbool_t success = bbq_spsc_init(q, sz); + ASSERT(success); + ASSERT(BBQ_SPSC_COUNT(q) == 0); + + pthread_t t1, t2; + + pthread_create(&t1, NULL, writer, NULL); + pthread_create(&t2, NULL, reader, NULL); + + pthread_join(t1, NULL); + pthread_join(t2, NULL); + + ASSERT(BBQ_SPSC_COUNT(q) == total_enq - total_deq); + + free(q); + + return 0; +} From 2a66f1ae325f776f766351c9682dc16a8e3d35a8 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 16 Jan 2026 13:46:35 +0000 Subject: [PATCH 2/3] add documentation Signed-off-by: Diogo Behrens --- doc/api/vsync/GROUP_linearizable.md | 1 + doc/api/vsync/GROUP_lock_free.md | 1 + doc/api/vsync/queue/README.md | 2 + doc/api/vsync/queue/bbq_mpmc.h.md | 150 +++++++++++++++++++++++++ doc/api/vsync/queue/bbq_spsc.h.md | 163 ++++++++++++++++++++++++++++ 5 files changed, 317 insertions(+) create mode 100644 doc/api/vsync/queue/bbq_mpmc.h.md create mode 100644 doc/api/vsync/queue/bbq_spsc.h.md diff --git a/doc/api/vsync/GROUP_linearizable.md b/doc/api/vsync/GROUP_linearizable.md index 9463e130..838e4f6f 100644 --- a/doc/api/vsync/GROUP_linearizable.md +++ b/doc/api/vsync/GROUP_linearizable.md @@ -19,6 +19,7 @@ _Group of algorithms linearizable algorithms._ | [vsync/map/treeset_bst_fine.h](map/treeset_bst_fine.h.md)|This implementation of treeset uses unbalanced binary search tree (BST) and fine-grained locking. | ✔ | ❌ | ❌ | ❌ | | [vsync/map/treeset_rb_coarse.h](map/treeset_rb_coarse.h.md)|This implementation of treeset uses balanced red-black tree (RB) and coarse-grained locking. | ✔ | ❌ | ❌ | ❌ | | [vsync/map/treeset_rb_fine.h](map/treeset_rb_fine.h.md)|This implementation of treeset uses balanced red-black tree (RB) and fine-grained locking. | ✔ | ❌ | ❌ | ❌ | +| [vsync/queue/bbq_spsc.h](queue/bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ | | [vsync/queue/bounded_locked.h](queue/bounded_locked.h.md)|Multi-producer, multi-consumer bounded queue protected by a spinlock. | ✔ | ❌ | ❌ | ❌ | | [vsync/queue/bounded_mpmc.h](queue/bounded_mpmc.h.md)|Lockless, multi-producer, multi-consumer bounded queue. | ✔ | ❌ | ❌ | ❌ | | [vsync/queue/bounded_spsc.h](queue/bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ | diff --git a/doc/api/vsync/GROUP_lock_free.md b/doc/api/vsync/GROUP_lock_free.md index 02463814..dd0d153d 100644 --- a/doc/api/vsync/GROUP_lock_free.md +++ b/doc/api/vsync/GROUP_lock_free.md @@ -11,6 +11,7 @@ _Group of algorithms with lock-free progress condition._ | [vsync/map/listset_lf.h](map/listset_lf.h.md)|Lock-free implementation of listset. | ✔ | ✔ | ✔ | ❌ | | [vsync/map/simpleht.h](map/simpleht.h.md)|Simple lock-free hashtable. | ✔ | ✔ | ❌ | ❌ | | [vsync/map/skiplist_lf.h](map/skiplist_lf.h.md)|Lock-free concurrent skiplist. | ✔ | ✔ | ✔ | ❌ | +| [vsync/queue/bbq_spsc.h](queue/bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ | | [vsync/queue/bounded_spsc.h](queue/bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ | | [vsync/queue/chaselev.h](queue/chaselev.h.md)|Chase-Lev Work-Stealing bounded deque. | ❌ | ✔ | ❌ | ❌ | | [vsync/queue/unbounded_queue_lf.h](queue/unbounded_queue_lf.h.md)|Lock-free unbounded queue. | ✔ | ✔ | ✔ | ✔ | diff --git a/doc/api/vsync/queue/README.md b/doc/api/vsync/queue/README.md index da4fcb2f..6d6b594f 100644 --- a/doc/api/vsync/queue/README.md +++ b/doc/api/vsync/queue/README.md @@ -7,6 +7,8 @@ _Queues, priority queues and ringbuffers._ | File|Description|Linearizable|Lock-free|SMR-required|Unbounded-Queue| | --- | --- | --- | --- | --- | --- | +| [vsync/queue/bbq_mpmc.h](bbq_mpmc.h.md)|Block-based Bounded Queue multi-producer/multi-consumer. | ❌ | ❌ | ❌ | ❌ | +| [vsync/queue/bbq_spsc.h](bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ | | [vsync/queue/bounded_locked.h](bounded_locked.h.md)|Multi-producer, multi-consumer bounded queue protected by a spinlock. | ✔ | ❌ | ❌ | ❌ | | [vsync/queue/bounded_mpmc.h](bounded_mpmc.h.md)|Lockless, multi-producer, multi-consumer bounded queue. | ✔ | ❌ | ❌ | ❌ | | [vsync/queue/bounded_spsc.h](bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ | diff --git a/doc/api/vsync/queue/bbq_mpmc.h.md b/doc/api/vsync/queue/bbq_mpmc.h.md new file mode 100644 index 00000000..7ba71452 --- /dev/null +++ b/doc/api/vsync/queue/bbq_mpmc.h.md @@ -0,0 +1,150 @@ +# [vsync](../README.md) / [queue](README.md) / bbq_mpmc.h +_Block-based Bounded Queue multi-producer/multi-consumer._ + +A highly performant bounded queue that splits the buffer in multiple blocks. + +### Remarks: + +In this implementations, values have the fixed size (pointer size). This implementation does not support DROP_OLD mode as described in the original paper. + + +### References: + [BBQ: A Block-based Bounded Queue for Exchanging Data and Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + + +### Example: + + + +```c +``` + + + +--- +# Macros + +| Macro | Description | +|---|---| +| [BBQ_BLOCK_NUM_LOG](bbq_mpmc.h.md#macro-bbq_block_num_log) | Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` to define the total number of blocks equals to `2^N` | +| [BBQ_ENTRY_SIZE_LOG](bbq_mpmc.h.md#macro-bbq_entry_size_log) | Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to define an entry size equals to `2^N` | + +## Macro `BBQ_BLOCK_NUM_LOG` + + +_Define this macro with_ `-DBBQ_BLOCK_NUM_LOG=N` _to define the total number of blocks equals to_ `2^N` __ + + +> **Note:** default value is `3U` -> (`8` blocks) + + +## Macro `BBQ_ENTRY_SIZE_LOG` + + +_Define this macro with_ `-BBQ_ENTRY_SIZE_LOG=N` _to define an entry size equals to_ `2^N` __ + + +> **Note:** default value is `log2(sizeof(vuintptr_t))` + + +--- +# Functions + +| Function | Description | +|---|---| +| [bbq_mpmc_enqueue](bbq_mpmc.h.md#function-bbq_mpmc_enqueue) | Enqueues one or more entries. | +| [bbq_mpmc_dequeue](bbq_mpmc.h.md#function-bbq_mpmc_dequeue) | Dequeues one or more entries. | +| [bbq_mpmc_memsize](bbq_mpmc.h.md#function-bbq_mpmc_memsize) | Calculates the size of the bbq queue. | +| [bbq_mpmc_init](bbq_mpmc.h.md#function-bbq_mpmc_init) | Initializes a bbq data structure. | + +## Function `bbq_mpmc_enqueue` + +```c +static vuint32_t bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +``` +_Enqueues one or more entries._ + + +Multiple entries can be enqueued if `src` points to an array. Use `count` to indicate how many entries should be enqueueed, starting from `src`. + + + +**Parameters:** + +- `q`: pointer to bbq_mpmc data structure +- `src`: pointer to first entry +- `count`: number of entries to enqueue +- `wait`: should wait for space to be available + + +**Returns:** number of enqueued entries + + + +## Function `bbq_mpmc_dequeue` + +```c +static vuint32_t bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +``` +_Dequeues one or more entries._ + + +Multiple entries can be dequeued if `src` points to an array. Use `count` to indicate how many entries should be dequeued. + + + +**Parameters:** + +- `q`: pointer to bbq_mpmc data structure +- `src`: pointer to preallocated memory for the first entry +- `count`: number of entries to dequeue +- `wait`: should wait for entries to be available + + +**Returns:** number of dequeued entries + + + +## Function `bbq_mpmc_memsize` + +```c +static vsize_t bbq_mpmc_memsize(vsize_t capacity) +``` +_Calculates the size of the bbq queue._ + + + + +**Parameters:** + +- `capacity`: maximum number of entries that can fit in the queue + + +**Returns:** size to be allocated in bytes + + + +## Function `bbq_mpmc_init` + +```c +static vbool_t bbq_mpmc_init(bbq_mpmc_t *rb, vsize_t size) +``` +_Initializes a bbq data structure._ + + + + +**Parameters:** + +- `buf`: pointer to bbq data structure +- `size`: number of bytes allocated for bbq data structure + + +**Returns:** true initialization succeeded. + +**Returns:** false initialization failed. + + + + +--- diff --git a/doc/api/vsync/queue/bbq_spsc.h.md b/doc/api/vsync/queue/bbq_spsc.h.md new file mode 100644 index 00000000..3434c308 --- /dev/null +++ b/doc/api/vsync/queue/bbq_spsc.h.md @@ -0,0 +1,163 @@ +# [vsync](../README.md) / [queue](README.md) / bbq_spsc.h +_Block-based Bounded Queue single-producer/single-consumer._ + +**Groups:** [Linearizable](../GROUP_linearizable.md), [Lock-free](../GROUP_lock_free.md) + +A highly performant bounded queue that splits the buffer in multiple blocks. + +### Remarks: + +In this implementations, values have a fixed size equal to pointer size. This implementation does not support `DROP_OLD` mode as described in the original paper. + + +### References: + [BBQ: A Block-based Bounded Queue for Exchanging Data and Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei) + + +### Example: + +### Multi-threaded + + + +```c +``` + + + +### Multi-process + + + +```c +``` + + + +--- +# Macros + +| Macro | Description | +|---|---| +| [BBQ_BLOCK_NUM_LOG](bbq_spsc.h.md#macro-bbq_block_num_log) | Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` to define the total number of blocks equals to `2^N` | +| [BBQ_ENTRY_SIZE_LOG](bbq_spsc.h.md#macro-bbq_entry_size_log) | Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to define an entry size equals to `2^N` | + +## Macro `BBQ_BLOCK_NUM_LOG` + + +_Define this macro with_ `-DBBQ_BLOCK_NUM_LOG=N` _to define the total number of blocks equals to_ `2^N` __ + + +> **Note:** default value is `3U` -> (`8` blocks) + + +## Macro `BBQ_ENTRY_SIZE_LOG` + + +_Define this macro with_ `-BBQ_ENTRY_SIZE_LOG=N` _to define an entry size equals to_ `2^N` __ + + +> **Note:** default value is `log2(sizeof(vuintptr_t))` + + +--- +# Functions + +| Function | Description | +|---|---| +| [bbq_spsc_enqueue](bbq_spsc.h.md#function-bbq_spsc_enqueue) | Enqueues one or more entries. | +| [bbq_spsc_dequeue](bbq_spsc.h.md#function-bbq_spsc_dequeue) | Dequeues one or more entries. | +| [bbq_spsc_memsize](bbq_spsc.h.md#function-bbq_spsc_memsize) | Calculates the size of bbq_spsc_t object based on the given capacity. | +| [bbq_spsc_init](bbq_spsc.h.md#function-bbq_spsc_init) | Initializes a bbq data structure. | + +## Function `bbq_spsc_enqueue` + +```c +static vuint32_t bbq_spsc_enqueue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +``` +_Enqueues one or more entries._ + + +Multiple entries can be enqueued if `src` points to an array. Use `count` to indicate how many entries should be enqueueed, starting from `src`. + + + +**Parameters:** + +- `q`: address of bbq_spsc_t object. +- `buf`: address of the first entry. +- `count`: number of entries to enqueue. +- `wait`: true/false when set to true it waits (blocks) till space becomes available. Otherwise, it quits retrying. + + +**Returns:** number of enqueued entries. + + + +## Function `bbq_spsc_dequeue` + +```c +static vuint32_t bbq_spsc_dequeue(bbq_spsc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait) +``` +_Dequeues one or more entries._ + + +Multiple entries can be dequeued if `buf` points to an array. Use `count` to indicate how many entries should be dequeued. + + + +**Parameters:** + +- `q`: address of bbq_spsc_t object. +- `buf`: address of the first entry of the preallocated memory. +- `count`: number of entries to dequeue. +- `wait`: true/false. When set to true the API waits/blocks for entries to be available + + +**Returns:** number of dequeued entries. + + + +## Function `bbq_spsc_memsize` + +```c +static vsize_t bbq_spsc_memsize(vsize_t capacity) +``` +_Calculates the size of bbq_spsc_t object based on the given capacity._ + + + + +**Parameters:** + +- `capacity`: maximum number of entries that can fit in the queue. + + +**Returns:** size of bbq_spsc_t object with the given capacity. + + + +## Function `bbq_spsc_init` + +```c +static vbool_t bbq_spsc_init(bbq_spsc_t *rb, vsize_t size) +``` +_Initializes a bbq data structure._ + + + + +**Parameters:** + +- `rb`: address of bbq_spsc_t object. +- `size`: size of the given bbq_spsc_t object `rb`. + + +**Returns:** true initialization succeeded. + +**Returns:** false initialization failed. + + + + +--- From 3d05c52eea0580b8fa2891e8292be51275f35c1e Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 16 Jan 2026 13:47:03 +0000 Subject: [PATCH 3/3] add examples --- doc/api/vsync/queue/bbq_mpmc.h.md | 89 +++++++++++++++ doc/api/vsync/queue/bbq_spsc.h.md | 180 ++++++++++++++++++++++++++++++ examples/eg_bbq_mpmc.c | 89 +++++++++++++++ examples/eg_bbq_spsc.c | 79 +++++++++++++ examples/eg_bbq_spsc_m_proc.c | 101 +++++++++++++++++ 5 files changed, 538 insertions(+) create mode 100644 examples/eg_bbq_mpmc.c create mode 100644 examples/eg_bbq_spsc.c create mode 100644 examples/eg_bbq_spsc_m_proc.c diff --git a/doc/api/vsync/queue/bbq_mpmc.h.md b/doc/api/vsync/queue/bbq_mpmc.h.md index 7ba71452..12939e40 100644 --- a/doc/api/vsync/queue/bbq_mpmc.h.md +++ b/doc/api/vsync/queue/bbq_mpmc.h.md @@ -17,6 +17,95 @@ In this implementations, values have the fixed size (pointer size). This impleme ```c +#include +#include +#include +#include + +#define BUFFER_ENTRY_NUM 4096U +#define NUM 10U +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL +#define NUM_WRITER 4U +#define NUM_READER 5U + +#define NON_BLOCKING false +#define BLOCKING true + +bbq_mpmc_t *g_bbq_mpmc = NULL; + +void * +writer(void *arg) +{ + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM * NUM_READER; + + while (rest) { + vuint32_t count = rest < ENQUEUE_BATCH ? rest : ENQUEUE_BATCH; + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ++ptr; + } + rest -= bbq_mpmc_enqueue(g_bbq_mpmc, buf, count, BLOCKING); + } + + (void)arg; + return NULL; +} + +void * +reader(void *arg) +{ + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t rest = NUM * NUM_WRITER; + + while (rest) { + vuint32_t batch_size = rest < DEQUEUE_BATCH ? rest : DEQUEUE_BATCH; + vuint32_t count = + bbq_mpmc_dequeue(g_bbq_mpmc, buf, batch_size, NON_BLOCKING); + + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] > 0); + printf("dequeue item %lu\n", buf[i]); + } + rest -= count; + } + (void)arg; + return NULL; +} + +int +main(void) +{ + pthread_t t_writer[NUM_WRITER]; + pthread_t t_reader[NUM_READER]; + + // allocate + vsize_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + g_bbq_mpmc = (bbq_mpmc_t *)malloc(sz); + ASSERT(g_bbq_mpmc); + + // init + vbool_t success = bbq_mpmc_init(g_bbq_mpmc, sz); + ASSERT(success); + + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_create(&t_writer[i], NULL, writer, NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_create(&t_reader[i], NULL, reader, NULL); + } + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_join(t_writer[i], NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_join(t_reader[i], NULL); + } + + // deallocate + free(g_bbq_mpmc); + return 0; +} ``` diff --git a/doc/api/vsync/queue/bbq_spsc.h.md b/doc/api/vsync/queue/bbq_spsc.h.md index 3434c308..50363ce3 100644 --- a/doc/api/vsync/queue/bbq_spsc.h.md +++ b/doc/api/vsync/queue/bbq_spsc.h.md @@ -21,6 +21,85 @@ In this implementations, values have a fixed size equal to pointer size. This im ```c +#include +#include +#include +#include + +#define BUFFER_ENTRY_NUM 4096U +#define NUM 10U +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#define NON_BLOCKING false +#define BLOCKING true + +bbq_spsc_t *g_bbq_spsc = NULL; + +void * +writer(void *arg) +{ + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + + while (rest) { + vuint32_t count = rest < ENQUEUE_BATCH ? rest : ENQUEUE_BATCH; + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ptr++; + } + rest -= bbq_spsc_enqueue(g_bbq_spsc, buf, count, BLOCKING); + } + + (void)arg; + return NULL; +} + +void * +reader(void *arg) +{ + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + + while (rest) { + vuint32_t count = + bbq_spsc_dequeue(g_bbq_spsc, buf, DEQUEUE_BATCH, NON_BLOCKING); + + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] == ptr++); + printf("dequeue item %lu\n", buf[i]); + } + rest -= count; + } + (void)arg; + return NULL; +} + +int +main(void) +{ + pthread_t t_writer; + pthread_t t_reader; + + // allocate + vsize_t sz = bbq_spsc_memsize(BUFFER_ENTRY_NUM); + g_bbq_spsc = (bbq_spsc_t *)malloc(sz); + ASSERT(g_bbq_spsc); + + // init + vbool_t success = bbq_spsc_init(g_bbq_spsc, sz); + ASSERT(success); + + pthread_create(&t_writer, NULL, writer, NULL); + pthread_create(&t_reader, NULL, reader, NULL); + pthread_join(t_reader, NULL); + pthread_join(t_writer, NULL); + + // deallocate + free(g_bbq_spsc); + return 0; +} ``` @@ -30,6 +109,107 @@ In this implementations, values have a fixed size equal to pointer size. This im ```c +#include +#include +#include +#include +#include +#include +#include +#include + +#define QUEUE_SIZE 4095U +#define EPOCH_SIZE 100U +#define NUM_ENTRIES 1U +#define SLEEP_MICRO_SEC 10 +#define NON_BLOCKING false +#define BLOCKING true + +#include +#include +#include + +bbq_spsc_t *g_bbq; + +void * +create_shared_memory(vsize_t size) +{ + int protection = PROT_READ | PROT_WRITE; + int visibility = MAP_SHARED | MAP_ANONYMOUS; + return mmap(NULL, size, protection, visibility, -1, 0); +} + +void +check_space(void) +{ + vuint64_t data = 0; + vuint64_t count = 0; + + while (bbq_spsc_dequeue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + + while (bbq_spsc_enqueue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) { + count++; + } + while (bbq_spsc_dequeue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + + printf( + "Current size of the queue is %ld after %d crashes, shouldn't be " + "zero.\n", + count, EPOCH_SIZE); + assert(count != 0); +} + +void * +writer(void *arg) +{ + V_UNUSED(arg); + vuint64_t data = 0; + while (true) { + data = (data + 1) % (QUEUE_SIZE - 1); + while (!bbq_spsc_enqueue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + usleep(SLEEP_MICRO_SEC); + } +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuint64_t data; + while (true) { + while (!bbq_spsc_dequeue(g_bbq, &data, 1, NON_BLOCKING)) {} + usleep(SLEEP_MICRO_SEC); + assert(data < QUEUE_SIZE - 1); + } +} + +int +main(void) +{ + srand(time(NULL)); + vuint64_t sz = bbq_spsc_memsize(QUEUE_SIZE); + g_bbq = create_shared_memory(sz); + assert(g_bbq); + vbool_t success = bbq_spsc_init(g_bbq, sz); + ASSERT(success && "BBQ init failed"); + for (vuint32_t i = 0; i < EPOCH_SIZE; i++) { + pid_t pid = fork(); + assert(pid >= 0); + if (pid != 0) { + usleep(rand() % 10000); + while (kill(pid, SIGTERM) < 0) {} + } else { + pthread_t t_writer, t_reader; + pthread_create(&t_writer, NULL, writer, NULL); + pthread_create(&t_reader, NULL, reader, NULL); + pthread_join(t_writer, NULL); + pthread_join(t_reader, NULL); + return 0; + } + } + check_space(); + return 0; +} ``` diff --git a/examples/eg_bbq_mpmc.c b/examples/eg_bbq_mpmc.c new file mode 100644 index 00000000..9ed3b32b --- /dev/null +++ b/examples/eg_bbq_mpmc.c @@ -0,0 +1,89 @@ +#include +#include +#include +#include + +#define BUFFER_ENTRY_NUM 4096U +#define NUM 10U +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL +#define NUM_WRITER 4U +#define NUM_READER 5U + +#define NON_BLOCKING false +#define BLOCKING true + +bbq_mpmc_t *g_bbq_mpmc = NULL; + +void * +writer(void *arg) +{ + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM * NUM_READER; + + while (rest) { + vuint32_t count = rest < ENQUEUE_BATCH ? rest : ENQUEUE_BATCH; + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ++ptr; + } + rest -= bbq_mpmc_enqueue(g_bbq_mpmc, buf, count, BLOCKING); + } + + (void)arg; + return NULL; +} + +void * +reader(void *arg) +{ + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t rest = NUM * NUM_WRITER; + + while (rest) { + vuint32_t batch_size = rest < DEQUEUE_BATCH ? rest : DEQUEUE_BATCH; + vuint32_t count = + bbq_mpmc_dequeue(g_bbq_mpmc, buf, batch_size, NON_BLOCKING); + + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] > 0); + printf("dequeue item %lu\n", buf[i]); + } + rest -= count; + } + (void)arg; + return NULL; +} + +int +main(void) +{ + pthread_t t_writer[NUM_WRITER]; + pthread_t t_reader[NUM_READER]; + + // allocate + vsize_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM); + g_bbq_mpmc = (bbq_mpmc_t *)malloc(sz); + ASSERT(g_bbq_mpmc); + + // init + vbool_t success = bbq_mpmc_init(g_bbq_mpmc, sz); + ASSERT(success); + + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_create(&t_writer[i], NULL, writer, NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_create(&t_reader[i], NULL, reader, NULL); + } + for (vsize_t i = 0; i < NUM_WRITER; i++) { + pthread_join(t_writer[i], NULL); + } + for (vsize_t i = 0; i < NUM_READER; i++) { + pthread_join(t_reader[i], NULL); + } + + // deallocate + free(g_bbq_mpmc); + return 0; +} diff --git a/examples/eg_bbq_spsc.c b/examples/eg_bbq_spsc.c new file mode 100644 index 00000000..f27077b9 --- /dev/null +++ b/examples/eg_bbq_spsc.c @@ -0,0 +1,79 @@ +#include +#include +#include +#include + +#define BUFFER_ENTRY_NUM 4096U +#define NUM 10U +#define ENQUEUE_BATCH 5UL +#define DEQUEUE_BATCH 4UL + +#define NON_BLOCKING false +#define BLOCKING true + +bbq_spsc_t *g_bbq_spsc = NULL; + +void * +writer(void *arg) +{ + vuintptr_t buf[ENQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + + while (rest) { + vuint32_t count = rest < ENQUEUE_BATCH ? rest : ENQUEUE_BATCH; + for (vuint32_t i = 0; i < count; i++) { + buf[i] = ptr++; + } + rest -= bbq_spsc_enqueue(g_bbq_spsc, buf, count, BLOCKING); + } + + (void)arg; + return NULL; +} + +void * +reader(void *arg) +{ + vuintptr_t buf[DEQUEUE_BATCH] = {0}; + vuint64_t ptr = 0; + vuint64_t rest = NUM; + + while (rest) { + vuint32_t count = + bbq_spsc_dequeue(g_bbq_spsc, buf, DEQUEUE_BATCH, NON_BLOCKING); + + for (vuint32_t i = 0; i < count; i++) { + ASSERT(buf[i] == ptr++); + printf("dequeue item %lu\n", buf[i]); + } + rest -= count; + } + (void)arg; + return NULL; +} + +int +main(void) +{ + pthread_t t_writer; + pthread_t t_reader; + + // allocate + vsize_t sz = bbq_spsc_memsize(BUFFER_ENTRY_NUM); + g_bbq_spsc = (bbq_spsc_t *)malloc(sz); + ASSERT(g_bbq_spsc); + + // init + vbool_t success = bbq_spsc_init(g_bbq_spsc, sz); + ASSERT(success); + + pthread_create(&t_writer, NULL, writer, NULL); + pthread_create(&t_reader, NULL, reader, NULL); + pthread_join(t_reader, NULL); + pthread_join(t_writer, NULL); + + // deallocate + free(g_bbq_spsc); + return 0; +} diff --git a/examples/eg_bbq_spsc_m_proc.c b/examples/eg_bbq_spsc_m_proc.c new file mode 100644 index 00000000..7c6a666c --- /dev/null +++ b/examples/eg_bbq_spsc_m_proc.c @@ -0,0 +1,101 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#define QUEUE_SIZE 4095U +#define EPOCH_SIZE 100U +#define NUM_ENTRIES 1U +#define SLEEP_MICRO_SEC 10 +#define NON_BLOCKING false +#define BLOCKING true + +#include +#include +#include + +bbq_spsc_t *g_bbq; + +void * +create_shared_memory(vsize_t size) +{ + int protection = PROT_READ | PROT_WRITE; + int visibility = MAP_SHARED | MAP_ANONYMOUS; + return mmap(NULL, size, protection, visibility, -1, 0); +} + +void +check_space(void) +{ + vuint64_t data = 0; + vuint64_t count = 0; + + while (bbq_spsc_dequeue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + + while (bbq_spsc_enqueue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) { + count++; + } + while (bbq_spsc_dequeue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + + printf( + "Current size of the queue is %ld after %d crashes, shouldn't be " + "zero.\n", + count, EPOCH_SIZE); + assert(count != 0); +} + +void * +writer(void *arg) +{ + V_UNUSED(arg); + vuint64_t data = 0; + while (true) { + data = (data + 1) % (QUEUE_SIZE - 1); + while (!bbq_spsc_enqueue(g_bbq, &data, NUM_ENTRIES, NON_BLOCKING)) {} + usleep(SLEEP_MICRO_SEC); + } +} + +void * +reader(void *arg) +{ + V_UNUSED(arg); + vuint64_t data; + while (true) { + while (!bbq_spsc_dequeue(g_bbq, &data, 1, NON_BLOCKING)) {} + usleep(SLEEP_MICRO_SEC); + assert(data < QUEUE_SIZE - 1); + } +} + +int +main(void) +{ + srand(time(NULL)); + vuint64_t sz = bbq_spsc_memsize(QUEUE_SIZE); + g_bbq = create_shared_memory(sz); + assert(g_bbq); + vbool_t success = bbq_spsc_init(g_bbq, sz); + ASSERT(success && "BBQ init failed"); + for (vuint32_t i = 0; i < EPOCH_SIZE; i++) { + pid_t pid = fork(); + assert(pid >= 0); + if (pid != 0) { + usleep(rand() % 10000); + while (kill(pid, SIGTERM) < 0) {} + } else { + pthread_t t_writer, t_reader; + pthread_create(&t_writer, NULL, writer, NULL); + pthread_create(&t_reader, NULL, reader, NULL); + pthread_join(t_writer, NULL); + pthread_join(t_reader, NULL); + return 0; + } + } + check_space(); + return 0; +}