Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ jobs:
###########################################################################
Run-Checks:
runs-on: ubuntu-22.04
container: ghcr.io/open-s4c/vsyncer-ci:sha-25de355efbbb7bf9d85523aeb50864b118755392
container: ghcr.io/open-s4c/vsyncer-ci:sha-76569f0d2ee9a688da95b8025116b3a202e67321
strategy:
matrix:
test-dir: [ {p: "test", c: "spinlock"}, {p: "test", c: "quack"}, {p: "test", c: "queue"},
{p: "verify", c: "unbounded_queue"}, {p: "verify", c: "listset"} ,
{p: "verify", c: "stack"}, {p: "verify", c: "thread"}, {p: "verify", c: "simpleht"},
{p: "verify", c: "bitmap"}, {p: "verify", c: "treeset"}, {p: "verify", c: "pool"},
{p: "verify", c: "cachedq"}, {p: "verify", c: "chaselev"} , {p: "verify", c: "priority_queue"},
{p: "verify", c: "skiplist"}, {p: "verify", c: "smr"}
{p: "verify", c: "skiplist"}, {p: "verify", c: "smr"}, {p: "verify", c: "bbq"},
{p: "verify", c: "mpsc"}
]
steps:
- name: Print vsyncer version
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ is not guaranteed to result in increment of major version.
Please note that the version correlates to the internal libvsync, which is a superset of
what exists in open-s4c libvsync.

## [4.3.0]

### Added

- mpsc queue
- bbq_mpmc and bbq_spsc

## [4.2.2]

### Changed
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.16)
project(
libvsync
LANGUAGES C
VERSION 4.2.2
VERSION 4.3.0
DESCRIPTION
"Verified library of atomics, synchronization primitives and concurrent data structures"
)
Expand Down
2 changes: 1 addition & 1 deletion cmake/check.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function(add_vsyncer_check)
--timeout
${TIMEOUT}s)
add_test(NAME ${TEST_NAME} COMMAND ${VSYNCER_CMD} ${VSYNCER_CHECK_LL})
set_property(TEST ${TEST_NAME} PROPERTY SKIP_RETURN_CODE 1)
# set_property(TEST ${TEST_NAME} PROPERTY SKIP_RETURN_CODE 1)
math(EXPR CTEST_TIMEOUT "${TIMEOUT} + 5")
set_tests_properties(${TEST_NAME} PROPERTIES TIMEOUT ${CTEST_TIMEOUT})
endforeach()
Expand Down
1 change: 1 addition & 0 deletions doc/api/vsync/GROUP_linearizable.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ _Group of algorithms linearizable algorithms._
| [vsync/map/treeset_bst_fine.h](map/treeset_bst_fine.h.md)|This implementation of treeset uses unbalanced binary search tree (BST) and fine-grained locking. | ✔ | ❌ | ❌ | ❌ |
| [vsync/map/treeset_rb_coarse.h](map/treeset_rb_coarse.h.md)|This implementation of treeset uses balanced red-black tree (RB) and coarse-grained locking. | ✔ | ❌ | ❌ | ❌ |
| [vsync/map/treeset_rb_fine.h](map/treeset_rb_fine.h.md)|This implementation of treeset uses balanced red-black tree (RB) and fine-grained locking. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/bbq_spsc.h](queue/bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ |
| [vsync/queue/bounded_locked.h](queue/bounded_locked.h.md)|Multi-producer, multi-consumer bounded queue protected by a spinlock. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/bounded_mpmc.h](queue/bounded_mpmc.h.md)|Lockless, multi-producer, multi-consumer bounded queue. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/bounded_spsc.h](queue/bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ |
Expand Down
1 change: 1 addition & 0 deletions doc/api/vsync/GROUP_lock_free.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ _Group of algorithms with lock-free progress condition._
| [vsync/map/listset_lf.h](map/listset_lf.h.md)|Lock-free implementation of listset. | ✔ | ✔ | ✔ | ❌ |
| [vsync/map/simpleht.h](map/simpleht.h.md)|Simple lock-free hashtable. | ✔ | ✔ | ❌ | ❌ |
| [vsync/map/skiplist_lf.h](map/skiplist_lf.h.md)|Lock-free concurrent skiplist. | ✔ | ✔ | ✔ | ❌ |
| [vsync/queue/bbq_spsc.h](queue/bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ |
| [vsync/queue/bounded_spsc.h](queue/bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ |
| [vsync/queue/chaselev.h](queue/chaselev.h.md)|Chase-Lev Work-Stealing bounded deque. | ❌ | ✔ | ❌ | ❌ |
| [vsync/queue/unbounded_queue_lf.h](queue/unbounded_queue_lf.h.md)|Lock-free unbounded queue. | ✔ | ✔ | ✔ | ✔ |
Expand Down
4 changes: 2 additions & 2 deletions doc/api/vsync/pool/cached_pool.h.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ _Calculate the needed memory space for creating a pool._

**Parameters:**

- `thread_num`: maxinum thread number
- `thread_num`: maximum thread number
- `entry_num`: minimal number of entires
- `entry_size`: size of each entry

Expand Down Expand Up @@ -129,7 +129,7 @@ Make sure the buffer has enough size (calculated by cached_pool_memsize)
**Parameters:**

- `buf`: pointer to the buffer
- `thread_num`: maxinum thread number
- `thread_num`: maximum thread number
- `entry_num`: minimal number of entires
- `entry_size`: size of each entry

Expand Down
1 change: 1 addition & 0 deletions doc/api/vsync/queue/GROUP_unbounded_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ These queues have no capacity limit and thus enqueue operations shall always suc

| File|Description|Linearizable|Lock-free|SMR-required|Unbounded-Queue|
| --- | --- | --- | --- | --- | --- |
| [vsync/queue/mpsc.h](mpsc.h.md)|Multi-producer single-consumer queue. | ❌ | ❌ | ❌ | ✔ |
| [vsync/queue/unbounded_queue_lf.h](unbounded_queue_lf.h.md)|Lock-free unbounded queue. | ✔ | ✔ | ✔ | ✔ |
| [vsync/queue/unbounded_queue_lf_recycle.h](unbounded_queue_lf_recycle.h.md)|Lock-free recycle unbounded queue. | ✔ | ✔ | ❌ | ✔ |
| [vsync/queue/unbounded_queue_total.h](unbounded_queue_total.h.md)|Unbounded blocking total queue. | ✔ | ❌ | ❌ | ✔ |
Expand Down
3 changes: 3 additions & 0 deletions doc/api/vsync/queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ _Queues, priority queues and ringbuffers._

| File|Description|Linearizable|Lock-free|SMR-required|Unbounded-Queue|
| --- | --- | --- | --- | --- | --- |
| [vsync/queue/bbq_mpmc.h](bbq_mpmc.h.md)|Block-based Bounded Queue multi-producer/multi-consumer. | ❌ | ❌ | ❌ | ❌ |
| [vsync/queue/bbq_spsc.h](bbq_spsc.h.md)|Block-based Bounded Queue single-producer/single-consumer. | ✔ | ✔ | ❌ | ❌ |
| [vsync/queue/bounded_locked.h](bounded_locked.h.md)|Multi-producer, multi-consumer bounded queue protected by a spinlock. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/bounded_mpmc.h](bounded_mpmc.h.md)|Lockless, multi-producer, multi-consumer bounded queue. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/bounded_spsc.h](bounded_spsc.h.md)|Single-producer, single-consumer, wait-free bounded queue. | ✔ | ✔ | ❌ | ❌ |
| [vsync/queue/cachedq.h](cachedq.h.md)|Lockless, multi-producer, multi-consumer queue. | ✔ | ❌ | ❌ | ❌ |
| [vsync/queue/chaselev.h](chaselev.h.md)|Chase-Lev Work-Stealing bounded deque. | ❌ | ✔ | ❌ | ❌ |
| [vsync/queue/mpsc.h](mpsc.h.md)|Multi-producer single-consumer queue. | ❌ | ❌ | ❌ | ✔ |
| [vsync/queue/unbounded_queue_lf.h](unbounded_queue_lf.h.md)|Lock-free unbounded queue. | ✔ | ✔ | ✔ | ✔ |
| [vsync/queue/unbounded_queue_lf_recycle.h](unbounded_queue_lf_recycle.h.md)|Lock-free recycle unbounded queue. | ✔ | ✔ | ❌ | ✔ |
| [vsync/queue/unbounded_queue_total.h](unbounded_queue_total.h.md)|Unbounded blocking total queue. | ✔ | ❌ | ❌ | ✔ |
Expand Down
239 changes: 239 additions & 0 deletions doc/api/vsync/queue/bbq_mpmc.h.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# [vsync](../README.md) / [queue](README.md) / bbq_mpmc.h
_Block-based Bounded Queue multi-producer/multi-consumer._

A highly performant bounded queue that splits the buffer in multiple blocks.

### Remarks:

In this implementation, values have the fixed size (pointer size). This implementation does not support `DROP_OLD` mode as described in the original paper.


### References:
[BBQ: A Block-based Bounded Queue for Exchanging Data and Profiling](https://www.usenix.org/conference/atc22/presentation/wang-jiawei)


### Example:



```c
#include <vsync/queue/bbq_mpmc.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>

#define BUFFER_ENTRY_NUM 4096U
#define NUM 10U
#define ENQUEUE_BATCH 5UL
#define DEQUEUE_BATCH 4UL
#define NUM_WRITER 4U
#define NUM_READER 5U

#define NON_BLOCKING false
#define BLOCKING true

bbq_mpmc_t *g_bbq_mpmc = NULL;

void *
writer(void *arg)
{
vuintptr_t buf[ENQUEUE_BATCH] = {0};
vuint64_t ptr = 0;
vuint64_t rest = NUM * NUM_READER;

while (rest) {
vuint32_t count = rest < ENQUEUE_BATCH ? rest : ENQUEUE_BATCH;
for (vuint32_t i = 0; i < count; i++) {
buf[i] = ++ptr;
}
rest -= bbq_mpmc_enqueue(g_bbq_mpmc, buf, count, BLOCKING);
}

(void)arg;
return NULL;
}

void *
reader(void *arg)
{
vuintptr_t buf[DEQUEUE_BATCH] = {0};
vuint64_t rest = NUM * NUM_WRITER;

while (rest) {
vuint32_t batch_size = rest < DEQUEUE_BATCH ? rest : DEQUEUE_BATCH;
vuint32_t count =
bbq_mpmc_dequeue(g_bbq_mpmc, buf, batch_size, NON_BLOCKING);

for (vuint32_t i = 0; i < count; i++) {
ASSERT(buf[i] > 0);
printf("dequeue item %lu\n", buf[i]);
}
rest -= count;
}
(void)arg;
return NULL;
}

int
main(void)
{
pthread_t t_writer[NUM_WRITER];
pthread_t t_reader[NUM_READER];

// allocate
vsize_t sz = bbq_mpmc_memsize(BUFFER_ENTRY_NUM);
g_bbq_mpmc = (bbq_mpmc_t *)malloc(sz);
ASSERT(g_bbq_mpmc);

// init
vbool_t success = bbq_mpmc_init(g_bbq_mpmc, sz);
ASSERT(success);

for (vsize_t i = 0; i < NUM_WRITER; i++) {
pthread_create(&t_writer[i], NULL, writer, NULL);
}
for (vsize_t i = 0; i < NUM_READER; i++) {
pthread_create(&t_reader[i], NULL, reader, NULL);
}
for (vsize_t i = 0; i < NUM_WRITER; i++) {
pthread_join(t_writer[i], NULL);
}
for (vsize_t i = 0; i < NUM_READER; i++) {
pthread_join(t_reader[i], NULL);
}

// deallocate
free(g_bbq_mpmc);
return 0;
}
```



---
# Macros

| Macro | Description |
|---|---|
| [BBQ_BLOCK_NUM_LOG](bbq_mpmc.h.md#macro-bbq_block_num_log) | Define this macro with `-DBBQ_BLOCK_NUM_LOG=N` to define the total number of blocks equals to `2^N` |
| [BBQ_ENTRY_SIZE_LOG](bbq_mpmc.h.md#macro-bbq_entry_size_log) | Define this macro with `-BBQ_ENTRY_SIZE_LOG=N` to define an entry size equals to `2^N` |

## Macro `BBQ_BLOCK_NUM_LOG`


_Define this macro with_ `-DBBQ_BLOCK_NUM_LOG=N` _to define the total number of blocks equals to_ `2^N` __


> **Note:** default value is `3U` -> (`8` blocks)


## Macro `BBQ_ENTRY_SIZE_LOG`


_Define this macro with_ `-BBQ_ENTRY_SIZE_LOG=N` _to define an entry size equals to_ `2^N` __


> **Note:** default value is `log2(sizeof(vuintptr_t))`


---
# Functions

| Function | Description |
|---|---|
| [bbq_mpmc_enqueue](bbq_mpmc.h.md#function-bbq_mpmc_enqueue) | Enqueues one or more entries. |
| [bbq_mpmc_dequeue](bbq_mpmc.h.md#function-bbq_mpmc_dequeue) | Dequeues one or more entries. |
| [bbq_mpmc_memsize](bbq_mpmc.h.md#function-bbq_mpmc_memsize) | Calculates the size of the bbq queue. |
| [bbq_mpmc_init](bbq_mpmc.h.md#function-bbq_mpmc_init) | Initializes a bbq data structure. |

## Function `bbq_mpmc_enqueue`

```c
static vuint32_t bbq_mpmc_enqueue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait)
```
_Enqueues one or more entries._


Multiple entries can be enqueued if `src` points to an array. Use `count` to indicate how many entries should be enqueued, starting from `src`.



**Parameters:**

- `q`: address of `bbq_mpmc_t` object.
- `src`: pointer to first entry.
- `count`: number of entries to enqueue.
- `wait`: should wait for space to be available.


**Returns:** number of enqueued entries.



## Function `bbq_mpmc_dequeue`

```c
static vuint32_t bbq_mpmc_dequeue(bbq_mpmc_t *q, vuintptr_t *buf, vuint32_t count, vbool_t wait)
```
_Dequeues one or more entries._


Multiple entries can be dequeued if `src` points to an array. Use `count` to indicate how many entries should be dequeued.



**Parameters:**

- `q`: address of `bbq_mpmc_t` object.
- `src`: pointer to preallocated memory for the first entry.
- `count`: number of entries to dequeue.
- `wait`: should wait for entries to be available.


**Returns:** number of dequeued entries.



## Function `bbq_mpmc_memsize`

```c
static vsize_t bbq_mpmc_memsize(vsize_t capacity)
```
_Calculates the size of the bbq queue._




**Parameters:**

- `capacity`: maximum number of entries that can fit in the queue.


**Returns:** size to be allocated in bytes.



## Function `bbq_mpmc_init`

```c
static vbool_t bbq_mpmc_init(bbq_mpmc_t *q, vsize_t size)
```
_Initializes a bbq data structure._




**Parameters:**

- `q`: pointer to bbq data structure.
- `size`: number of bytes allocated for bbq data structure.


**Returns:** true initialization succeeded.

**Returns:** false initialization failed.




---
Loading