From 8cd49d27e5e5fff0a12d05ffda3a2e31102b4281 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Sun, 11 Jan 2026 23:33:13 -0800 Subject: [PATCH 1/2] reduce allocations in task system Signed-off-by: Rob Johnson --- src/core.c | 6 ++--- src/core.h | 1 + src/splinterdb.c | 16 ++++++------ src/task.c | 43 +++++++++----------------------- src/task.h | 29 ++++++++++----------- src/trunk.c | 40 +++++++++++++++-------------- tests/functional/btree_test.c | 8 +++--- tests/functional/cache_test.c | 18 ++++++------- tests/functional/filter_test.c | 6 ++--- tests/functional/io_apis_test.c | 8 +++--- tests/functional/log_test.c | 12 ++++----- tests/functional/splinter_test.c | 22 ++++++++-------- tests/functional/test.h | 10 ++++---- tests/functional/ycsb_test.c | 12 ++++----- tests/unit/btree_stress_test.c | 7 +++--- tests/unit/splinter_test.c | 13 +++++----- tests/unit/task_system_test.c | 24 +++++++++--------- 17 files changed, 129 insertions(+), 146 deletions(-) diff --git a/src/core.c b/src/core.c index 3dead0ba..b8f96e96 100644 --- a/src/core.c +++ b/src/core.c @@ -630,9 +630,9 @@ core_memtable_flush_internal(core_handle *spl, uint64 generation) } static void -core_memtable_flush_internal_virtual(void *arg) +core_memtable_flush_internal_virtual(task *arg) { - core_memtable_args *mt_args = arg; + core_memtable_args *mt_args = container_of(arg, core_memtable_args, tsk); core_memtable_flush_internal(mt_args->spl, mt_args->generation); } @@ -653,8 +653,8 @@ core_memtable_flush(core_handle *spl, uint64 generation) cmt->mt_args.generation = generation; task_enqueue(spl->ts, TASK_TYPE_MEMTABLE, + &cmt->mt_args.tsk, core_memtable_flush_internal_virtual, - &cmt->mt_args, FALSE); } diff --git a/src/core.h b/src/core.h index f756b433..65e2a066 100644 --- a/src/core.h +++ b/src/core.h @@ -87,6 +87,7 @@ typedef struct core_handle core_handle; typedef struct core_memtable_args { core_handle *spl; uint64 generation; + task tsk; } core_memtable_args; typedef struct core_compacted_memtable { diff --git a/src/splinterdb.c b/src/splinterdb.c index efa2ea54..1e12b5df 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -38,7 +38,7 @@ splinterdb_get_version() } typedef struct splinterdb { - task_system *task_sys; + task_system task_sys; io_config io_cfg; io_handle *io_handle; allocator_config allocator_cfg; @@ -311,7 +311,7 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN goto io_handle_create_failed; } - status = task_system_create(kvs->heap_id, &kvs->task_sys, &kvs->task_cfg); + status = task_system_init(&kvs->task_sys, kvs->heap_id, &kvs->task_cfg); if (!SUCCESS(status)) { platform_error_log( "Failed to initialize SplinterDB task system state: %s\n", @@ -357,14 +357,14 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN kvs->spl = core_mount(&kvs->trunk_cfg, (allocator *)&kvs->allocator_handle, (cache *)&kvs->cache_handle, - kvs->task_sys, + &kvs->task_sys, kvs->trunk_id, kvs->heap_id); } else { kvs->spl = core_create(&kvs->trunk_cfg, (allocator *)&kvs->allocator_handle, (cache *)&kvs->cache_handle, - kvs->task_sys, + &kvs->task_sys, kvs->trunk_id, kvs->heap_id); } @@ -385,7 +385,7 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN deinit_allocator: rc_allocator_unmount(&kvs->allocator_handle); deinit_system: - task_system_destroy(kvs->heap_id, &kvs->task_sys); + task_system_deinit(&kvs->task_sys); deinit_iohandle: io_handle_destroy(kvs->io_handle); io_handle_create_failed: @@ -458,7 +458,7 @@ splinterdb_close(splinterdb **kvs_in) // IN core_unmount(&kvs->spl); clockcache_deinit(&kvs->cache_handle); rc_allocator_unmount(&kvs->allocator_handle); - task_system_destroy(kvs->heap_id, &kvs->task_sys); + task_system_deinit(&kvs->task_sys); io_handle_destroy(kvs->io_handle); // Free resources carefully to avoid ASAN-test failures @@ -755,7 +755,7 @@ splinterdb_stats_reset(splinterdb *kvs) static void splinterdb_close_print_stats(splinterdb *kvs) { - task_print_stats(kvs->task_sys); + task_print_stats(&kvs->task_sys); splinterdb_stats_print_insertion(kvs); } @@ -780,7 +780,7 @@ splinterdb_get_heap_id(const splinterdb *kvs) const task_system * splinterdb_get_task_system_handle(const splinterdb *kvs) { - return kvs->task_sys; + return &kvs->task_sys; } const io_handle * diff --git a/src/task.c b/src/task.c index c55b1818..1e4c4609 100644 --- a/src/task.c +++ b/src/task.c @@ -80,13 +80,14 @@ task_group_run_task(task_group *group, task *assigned_task) } } - assigned_task->func(assigned_task->arg); + task_fn func = assigned_task->func; + func(assigned_task); if (group->use_stats) { current = platform_timestamp_elapsed(current); if (current > group->stats[tid].max_runtime_ns) { group->stats[tid].max_runtime_ns = current; - group->stats[tid].max_runtime_func = assigned_task->func; + group->stats[tid].max_runtime_func = func; } } @@ -119,7 +120,6 @@ task_worker_thread(void *arg) const threadid tid = platform_get_tid(); group->stats[tid].total_bg_task_executions++; task_group_run_task(group, task_to_run); - platform_free(group->ts->heap_id, task_to_run); rc = task_group_lock(group); platform_assert(SUCCESS(rc)); __sync_fetch_and_sub(&group->current_executing_tasks, 1); @@ -210,16 +210,11 @@ task_group_init(task_group *group, platform_status task_enqueue(task_system *ts, task_type type, + task *new_task, task_fn func, - void *arg, bool32 at_head) { - task *new_task = TYPED_ZALLOC(ts->heap_id, new_task); - if (new_task == NULL) { - return STATUS_NO_MEMORY; - } new_task->func = func; - new_task->arg = arg; new_task->ts = ts; task_group *group = &ts->group[type]; @@ -228,7 +223,6 @@ task_enqueue(task_system *ts, rc = task_group_lock(group); if (!SUCCESS(rc)) { - platform_free(ts->heap_id, new_task); return rc; } @@ -309,7 +303,6 @@ task_group_perform_one(task_group *group, uint64 queue_scale_percent) group->stats[tid].total_fg_task_executions++; task_group_run_task(group, assigned_task); __sync_fetch_and_sub(&group->current_executing_tasks, 1); - platform_free(group->ts->heap_id, assigned_task); } else { rc = STATUS_TIMEDOUT; } @@ -446,20 +439,16 @@ task_system_config_init(task_system_config *task_cfg, * ----------------------------------------------------------------------------- */ platform_status -task_system_create(platform_heap_id hid, - task_system **system, - const task_system_config *cfg) +task_system_init(task_system *ts, + platform_heap_id hid, + const task_system_config *cfg) { platform_status rc = task_config_valid(cfg->num_background_threads); if (!SUCCESS(rc)) { return rc; } - task_system *ts = TYPED_ZALLOC(hid, ts); - if (ts == NULL) { - *system = NULL; - return STATUS_NO_MEMORY; - } + ZERO_CONTENTS(ts); ts->cfg = cfg; ts->heap_id = hid; @@ -469,8 +458,7 @@ task_system_create(platform_heap_id hid, cfg->use_stats, cfg->num_background_threads[type]); if (!SUCCESS(rc)) { - task_system_destroy(hid, &ts); - *system = NULL; + task_system_deinit(ts); return rc; } uint64 nbg_threads = cfg->num_background_threads[type]; @@ -482,29 +470,22 @@ task_system_create(platform_heap_id hid, task_type_name[type]); } } - debug_assert((*system == NULL), - "Task system handle, %p, is expected to be NULL.\n", - *system); - *system = ts; return STATUS_OK; } /* * ----------------------------------------------------------------------------- - * task_system_destroy() : Task system de-initializer. + * task_system_deinit() : Task system de-initializer. * - * Tear down task system structures, free allocated memory. + * Tear down task system structures. * ----------------------------------------------------------------------------- */ void -task_system_destroy(platform_heap_id hid, task_system **ts_in) +task_system_deinit(task_system *ts) { - task_system *ts = *ts_in; for (task_type type = TASK_TYPE_FIRST; type != NUM_TASK_TYPES; type++) { task_group_deinit(&ts->group[type]); } - platform_free(hid, ts); - *ts_in = (task_system *)NULL; } static void diff --git a/src/task.h b/src/task.h index 8fb9d41b..a6e09ad7 100644 --- a/src/task.h +++ b/src/task.h @@ -36,16 +36,17 @@ task_system_config_init(task_system_config *task_cfg, typedef struct task_system task_system; -typedef void (*task_fn)(void *arg); +typedef struct task task; -typedef struct task { +typedef void (*task_fn)(task *arg); + +struct task { struct task *next; struct task *prev; task_fn func; - void *arg; task_system *ts; timestamp enqueue_time; -} task; +}; /* * Run-time task-specific execution metrics structure. @@ -111,35 +112,35 @@ struct task_system { }; /* - * Create a task system and register the calling thread. + * Initialize a task system and register the calling thread. */ platform_status -task_system_create(platform_heap_id hid, - task_system **system, - const task_system_config *cfg); +task_system_init(task_system *system, + platform_heap_id hid, + const task_system_config *cfg); /* - * Deregister the calling thread (if it is registered) and destroy the - * task system. It is recommended to not destroy the task system + * Deregister the calling thread (if it is registered) and deinitialize the + * task system. It is recommended to not deinitialize the task system * until all registered threads have deregistered. * - * task_system_destroy() waits for currently executing background + * task_system_deinit() waits for currently executing background * tasks and cleanly shuts down all background threads, but it * abandons tasks that are still waiting to execute. To ensure that * no enqueued tasks are abandoned by a shutdown, * * 1. Ensure that your application will not enqueue any more tasks. * 2. Call task_perform_until_quiescent(). - * 3. Then call task_system_destroy(). + * 3. Then call task_system_deinit(). */ void -task_system_destroy(platform_heap_id hid, task_system **ts); +task_system_deinit(task_system *ts); platform_status task_enqueue(task_system *ts, task_type type, + task *arg, task_fn func, - void *arg, bool32 at_head); /* diff --git a/src/trunk.c b/src/trunk.c index 02a1e5a9..e19f2483 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -72,6 +72,8 @@ typedef VECTOR(trunk_branch_info) trunk_branch_info_vector; typedef struct bundle_compaction { struct bundle_compaction *next; + task tsk; + trunk_pivot_state *pivot_state; uint64 num_bundles; trunk_pivot_stats input_stats; bundle_compaction_state state; @@ -87,6 +89,7 @@ typedef struct trunk_context trunk_context; struct trunk_pivot_state { struct trunk_pivot_state *next; + task tsk; uint64 refcount; bool32 maplet_compaction_initiated; bool32 abandoned; @@ -2630,6 +2633,7 @@ bundle_compaction_create(trunk_context *context, "%s():%d: platform_malloc() failed", __func__, __LINE__); return NULL; } + result->pivot_state = state; result->state = BUNDLE_COMPACTION_NOT_STARTED; result->input_stats = trunk_pivot_received_bundles_stats(pvt); @@ -3151,12 +3155,12 @@ static platform_status enqueue_maplet_compaction(trunk_pivot_state *args); static void -maplet_compaction_task(void *arg) +maplet_compaction_task(task *arg) { - platform_status rc = STATUS_OK; - trunk_pivot_state *state = (trunk_pivot_state *)arg; - trunk_context *context = state->context; - routing_filter new_maplet = state->maplet; + platform_status rc = STATUS_OK; + trunk_pivot_state *state = container_of(arg, trunk_pivot_state, tsk); + trunk_context *context = state->context; + routing_filter new_maplet = state->maplet; maplet_compaction_apply_args apply_args; threadid tid; @@ -3338,8 +3342,11 @@ enqueue_maplet_compaction(trunk_pivot_state *args) return STATUS_OK; } trunk_pivot_state_incref(args); - platform_status rc = task_enqueue( - args->context->ts, TASK_TYPE_NORMAL, maplet_compaction_task, args, FALSE); + platform_status rc = task_enqueue(args->context->ts, + TASK_TYPE_NORMAL, + &args->tsk, + maplet_compaction_task, + TRUE); if (!SUCCESS(rc)) { platform_error_log("enqueue_maplet_compaction: task_enqueue failed: %d\n", rc.r); @@ -3372,10 +3379,11 @@ compute_tuple_bound(trunk_context *context, static void -bundle_compaction_task(void *arg) +bundle_compaction_task(task *arg) { platform_status rc; - trunk_pivot_state *state = (trunk_pivot_state *)arg; + bundle_compaction *bc = container_of(arg, bundle_compaction, tsk); + trunk_pivot_state *state = bc->pivot_state; trunk_context *context = state->context; threadid tid = platform_get_tid(); @@ -3397,16 +3405,10 @@ bundle_compaction_task(void *arg) // Find a bundle compaction that needs doing for this pivot trunk_pivot_state_lock_compactions(state); - bundle_compaction *bc = state->bundle_compactions; - while (bc != NULL - && !__sync_bool_compare_and_swap(&bc->state, - BUNDLE_COMPACTION_NOT_STARTED, - BUNDLE_COMPACTION_IN_PROGRESS)) - { - bc = bc->next; - } + platform_assert(__sync_bool_compare_and_swap(&bc->state, + BUNDLE_COMPACTION_NOT_STARTED, + BUNDLE_COMPACTION_IN_PROGRESS)); trunk_pivot_state_unlock_compactions(state); - platform_assert(bc != NULL); platform_assert(0 < vector_length(&bc->input_branches)); trunk_branch_merger merger; @@ -3571,8 +3573,8 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) trunk_pivot_state_incref(state); rc = task_enqueue(context->ts, TASK_TYPE_NORMAL, + &bc->tsk, bundle_compaction_task, - state, FALSE); if (!SUCCESS(rc)) { trunk_pivot_state_decref(state); diff --git a/tests/functional/btree_test.c b/tests/functional/btree_test.c index 1f1b33f2..134d5eb5 100644 --- a/tests/functional/btree_test.c +++ b/tests/functional/btree_test.c @@ -1522,7 +1522,7 @@ btree_test(int argc, char *argv[]) bool32 run_perf_test; platform_status rc; uint64 seed; - task_system *ts = NULL; + task_system ts; test_message_generator gen; platform_register_thread(); @@ -1607,7 +1607,7 @@ btree_test(int argc, char *argv[]) goto cleanup; } - rc = test_init_task_system(hid, &ts, &system_cfg.task_cfg); + rc = test_init_task_system(&ts, hid, &system_cfg.task_cfg); if (!SUCCESS(rc)) { platform_error_log("Failed to init splinter state: %s\n", platform_status_to_string(rc)); @@ -1637,7 +1637,7 @@ btree_test(int argc, char *argv[]) if (run_perf_test) { uint64 total_inserts = 64 * max_tuples_per_memtable; - rc = test_btree_perf(ccp, &test_cfg, total_inserts, 10, 128, ts, hid); + rc = test_btree_perf(ccp, &test_cfg, total_inserts, 10, 128, &ts, hid); platform_assert_status_ok(rc); rc = test_btree_merge_perf(ccp, &test_cfg, hid, 8, 8); @@ -1667,7 +1667,7 @@ btree_test(int argc, char *argv[]) clockcache_deinit(cc); platform_free(hid, cc); rc_allocator_deinit(&al); - test_deinit_task_system(hid, &ts); + test_deinit_task_system(&ts); rc = STATUS_OK; destroy_iohandle: io_handle_destroy(io); diff --git a/tests/functional/cache_test.c b/tests/functional/cache_test.c index 5a2bd126..46890eb7 100644 --- a/tests/functional/cache_test.c +++ b/tests/functional/cache_test.c @@ -905,7 +905,7 @@ cache_test(int argc, char *argv[]) int config_argc = argc - 1; char **config_argv = argv + 1; platform_status rc; - task_system *ts = NULL; + task_system ts; bool32 benchmark = FALSE, async = FALSE; uint64 seed; test_message_generator gen; @@ -976,7 +976,7 @@ cache_test(int argc, char *argv[]) goto cleanup; } - rc = test_init_task_system(hid, &ts, &system_cfg.task_cfg); + rc = test_init_task_system(&ts, hid, &system_cfg.task_cfg); if (!SUCCESS(rc)) { platform_error_log("Failed to init splinter state: %s\n", platform_status_to_string(rc)); @@ -1009,7 +1009,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 1, // num readers 0, // num writers 10); // per-thread working set @@ -1018,7 +1018,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 8, // num reader 0, // num writers 10); // per-thread working set @@ -1027,7 +1027,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 8, // num reader 2, // num writers 10); // per-thread working set @@ -1036,7 +1036,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 1, // num readers 0, // num writers 80); // per-thread working set @@ -1045,7 +1045,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 8, // num readers 0, // num writers 80); // per-thread working set @@ -1053,7 +1053,7 @@ cache_test(int argc, char *argv[]) rc = test_cache_async(ccp, &system_cfg.cache_cfg, hid, - ts, + &ts, 8, // num readers 0, // num writers 96); // per-thread working set @@ -1066,7 +1066,7 @@ cache_test(int argc, char *argv[]) clockcache_deinit(cc); platform_free(hid, cc); rc_allocator_deinit(&al); - test_deinit_task_system(hid, &ts); + test_deinit_task_system(&ts); rc = STATUS_OK; destroy_iohandle: io_handle_destroy(io); diff --git a/tests/functional/filter_test.c b/tests/functional/filter_test.c index d84238ca..d8954462 100644 --- a/tests/functional/filter_test.c +++ b/tests/functional/filter_test.c @@ -339,8 +339,8 @@ filter_test(int argc, char *argv[]) goto cleanup; } - task_system *ts = NULL; - rc = task_system_create(hid, &ts, &system_cfg.task_cfg); + task_system ts; + rc = task_system_init(&ts, hid, &system_cfg.task_cfg); platform_assert_status_ok(rc); rc = rc_allocator_init( @@ -399,7 +399,7 @@ filter_test(int argc, char *argv[]) clockcache_deinit(cc); platform_free(hid, cc); rc_allocator_deinit(&al); - task_system_destroy(hid, &ts); + task_system_deinit(&ts); io_handle_destroy(io); r = 0; cleanup: diff --git a/tests/functional/io_apis_test.c b/tests/functional/io_apis_test.c index c0066964..c4bf6986 100644 --- a/tests/functional/io_apis_test.c +++ b/tests/functional/io_apis_test.c @@ -267,8 +267,8 @@ splinter_io_apis_test(int argc, char *argv[]) task_system_config_init(&task_cfg, TRUE /* use stats */, num_bg_threads); platform_assert(SUCCESS(rc)); - task_system *tasks = NULL; - rc = task_system_create(hid, &tasks, &task_cfg); + task_system tasks; + rc = task_system_init(&tasks, hid, &task_cfg); platform_assert(SUCCESS(rc)); threadid main_thread_idx = platform_get_tid(); @@ -297,7 +297,7 @@ splinter_io_apis_test(int argc, char *argv[]) io_test_fn_args io_test_fn_arg = {.hid = hid, .io_cfgp = &io_cfg, .io_hdlp = io_hdl, - .tasks = tasks, + .tasks = &tasks, .start_addr = start_addr, .end_addr = end_addr, .stamp_char = 'A', @@ -392,7 +392,7 @@ splinter_io_apis_test(int argc, char *argv[]) io_free: // Only the parent process should dismantle stuff if (pid != 0) { - task_system_destroy(hid, &tasks); + task_system_deinit(&tasks); io_handle_destroy(io_hdl); } diff --git a/tests/functional/log_test.c b/tests/functional/log_test.c index d2a8e864..67999c07 100644 --- a/tests/functional/log_test.c +++ b/tests/functional/log_test.c @@ -231,7 +231,7 @@ log_test(int argc, char *argv[]) bool32 run_crash_test; int rc; uint64 seed; - task_system *ts = NULL; + task_system ts; test_message_generator gen; platform_register_thread(); @@ -292,7 +292,7 @@ log_test(int argc, char *argv[]) goto cleanup; } - status = test_init_task_system(hid, &ts, &system_cfg.task_cfg); + status = test_init_task_system(&ts, hid, &system_cfg.task_cfg); if (!SUCCESS(status)) { platform_error_log("Failed to init splinter state: %s\n", platform_status_to_string(status)); @@ -319,7 +319,7 @@ log_test(int argc, char *argv[]) platform_assert(log != NULL); if (run_perf_test) { ret = test_log_perf( - (cache *)cc, &system_cfg.log_cfg, log, 200000000, &gen, 16, ts, hid); + (cache *)cc, &system_cfg.log_cfg, log, 200000000, &gen, 16, &ts, hid); rc = -1; platform_assert_status_ok(ret); } else if (run_crash_test) { @@ -329,7 +329,7 @@ log_test(int argc, char *argv[]) (allocator *)&al, &system_cfg.log_cfg, log, - ts, + &ts, hid, &gen, 500000, @@ -342,7 +342,7 @@ log_test(int argc, char *argv[]) (allocator *)&al, &system_cfg.log_cfg, log, - ts, + &ts, hid, &gen, 500000, @@ -354,7 +354,7 @@ log_test(int argc, char *argv[]) platform_free(hid, log); platform_free(hid, cc); rc_allocator_deinit(&al); - test_deinit_task_system(hid, &ts); + test_deinit_task_system(&ts); destroy_iohandle: io_handle_destroy(io); cleanup: diff --git a/tests/functional/splinter_test.c b/tests/functional/splinter_test.c index c929bd7f..64078c7f 100644 --- a/tests/functional/splinter_test.c +++ b/tests/functional/splinter_test.c @@ -2446,8 +2446,8 @@ splinter_test(int argc, char *argv[]) uint8 num_tables = 1; bool32 cache_per_table = FALSE; uint64 insert_rate = 0; // no rate throttling by default. - task_system *ts = NULL; - bool use_shmem = FALSE; + task_system ts; + bool use_shmem = FALSE; uint8 lookup_positive_pct = 0; test_message_generator gen; test_exec_config test_exec_cfg; @@ -2704,7 +2704,7 @@ splinter_test(int argc, char *argv[]) goto cfg_free; } - rc = test_init_task_system(hid, &ts, &system_cfg[0].task_cfg); + rc = test_init_task_system(&ts, hid, &system_cfg[0].task_cfg); if (!SUCCESS(rc)) { platform_error_log("Failed to init splinter state: %s\n", platform_status_to_string(rc)); @@ -2747,7 +2747,7 @@ splinter_test(int argc, char *argv[]) num_lookup_threads, num_range_lookup_threads, max_async_inflight, - ts, + &ts, hid, num_tables, num_caches, @@ -2762,7 +2762,7 @@ splinter_test(int argc, char *argv[]) num_insert_threads, num_lookup_threads, max_async_inflight, - ts, + &ts, hid, num_tables, num_caches, @@ -2778,7 +2778,7 @@ splinter_test(int argc, char *argv[]) num_lookup_threads, num_range_lookup_threads, max_async_inflight, - ts, + &ts, hid, num_tables, num_caches, @@ -2794,7 +2794,7 @@ splinter_test(int argc, char *argv[]) num_lookup_threads, num_range_lookup_threads, max_async_inflight, - ts, + &ts, hid, num_tables, num_caches, @@ -2818,7 +2818,7 @@ splinter_test(int argc, char *argv[]) 7, max_async_inflight, lookup_positive_pct, - ts, + &ts, hid, num_tables, num_caches); @@ -2833,7 +2833,7 @@ splinter_test(int argc, char *argv[]) num_lookup_threads, num_range_lookup_threads, max_async_inflight, - ts, + &ts, hid, num_tables, num_caches, @@ -2852,7 +2852,7 @@ splinter_test(int argc, char *argv[]) seed, test_ops, correctness_check_frequency, - ts, + &ts, hid, num_tables, num_caches, @@ -2879,7 +2879,7 @@ splinter_test(int argc, char *argv[]) platform_free(hid, cc); allocator_assert_noleaks(alp); rc_allocator_deinit(&al); - test_deinit_task_system(hid, &ts); + test_deinit_task_system(&ts); handle_destroy: io_handle_destroy(io); cfg_free: diff --git a/tests/functional/test.h b/tests/functional/test.h index a8363fae..596dfe38 100644 --- a/tests/functional/test.h +++ b/tests/functional/test.h @@ -54,18 +54,18 @@ splinter_io_apis_test(int argc, char *argv[]); * main function. This initializes SplinterDB's task sub-system. */ static inline platform_status -test_init_task_system(platform_heap_id hid, - task_system **system, +test_init_task_system(task_system *system, + platform_heap_id hid, const task_system_config *cfg) { // splinter initialization - return task_system_create(hid, system, cfg); + return task_system_init(system, hid, cfg); } static inline void -test_deinit_task_system(platform_heap_id hid, task_system **ts) +test_deinit_task_system(task_system *ts) { - task_system_destroy(hid, ts); + task_system_deinit(ts); } static inline key diff --git a/tests/functional/ycsb_test.c b/tests/functional/ycsb_test.c index 98864b26..dce0db51 100644 --- a/tests/functional/ycsb_test.c +++ b/tests/functional/ycsb_test.c @@ -1145,7 +1145,7 @@ ycsb_test(int argc, char *argv[]) platform_status rc; uint64 seed; task_system_config task_cfg; - task_system *ts = NULL; + task_system ts; uint64 nphases; bool32 use_existing = 0; @@ -1263,7 +1263,7 @@ ycsb_test(int argc, char *argv[]) goto cleanup; } - rc = test_init_task_system(hid, &ts, &task_cfg); + rc = test_init_task_system(&ts, hid, &task_cfg); if (!SUCCESS(rc)) { platform_error_log("Failed to init splinter state: %s\n", platform_status_to_string(rc)); @@ -1288,7 +1288,7 @@ ycsb_test(int argc, char *argv[]) spl = core_mount(&system_cfg->splinter_cfg, (allocator *)&al, (cache *)cc, - ts, + &ts, test_generate_allocator_root_id(), hid); platform_assert(spl); @@ -1306,19 +1306,19 @@ ycsb_test(int argc, char *argv[]) spl = core_create(&system_cfg->splinter_cfg, (allocator *)&al, (cache *)cc, - ts, + &ts, test_generate_allocator_root_id(), hid); platform_assert(spl); } - run_all_ycsb_phases(spl, phases, nphases, ts, hid); + run_all_ycsb_phases(spl, phases, nphases, &ts, hid); core_unmount(&spl); clockcache_deinit(cc); platform_free(hid, cc); rc_allocator_unmount(&al); - test_deinit_task_system(hid, &ts); + test_deinit_task_system(&ts); rc = STATUS_OK; // struct rusage usage; diff --git a/tests/unit/btree_stress_test.c b/tests/unit/btree_stress_test.c index 48588f12..4b1c1e8f 100644 --- a/tests/unit/btree_stress_test.c +++ b/tests/unit/btree_stress_test.c @@ -112,7 +112,7 @@ CTEST_DATA(btree_stress) // Stuff needed to setup and exercise multiple threads. io_handle *io; - task_system *ts; + task_system ts; rc_allocator al; clockcache cc; }; @@ -153,10 +153,9 @@ CTEST_SETUP(btree_stress) ASSERT_TRUE(FALSE, "Failed to init heap\n"); } // Setup execution of concurrent threads - data->ts = NULL; data->io = io_handle_create(&data->io_cfg, data->hid); if (data->io == NULL - || !SUCCESS(task_system_create(data->hid, &data->ts, &data->task_cfg)) + || !SUCCESS(task_system_init(&data->ts, data->hid, &data->task_cfg)) || !SUCCESS(rc_allocator_init(&data->al, &data->allocator_cfg, data->io, @@ -181,7 +180,7 @@ CTEST_TEARDOWN(btree_stress) { clockcache_deinit(&data->cc); rc_allocator_deinit(&data->al); - task_system_destroy(data->hid, &data->ts); + task_system_deinit(&data->ts); io_handle_destroy(data->io); platform_heap_destroy(&data->hid); platform_deregister_thread(); diff --git a/tests/unit/splinter_test.c b/tests/unit/splinter_test.c index d9b8b4ff..5ade6ae8 100644 --- a/tests/unit/splinter_test.c +++ b/tests/unit/splinter_test.c @@ -91,7 +91,7 @@ CTEST_DATA(splinter) system_config *system_cfg; io_handle *io; clockcache *clock_cache; - task_system *tasks; + task_system tasks; test_message_generator gen; // Test execution related configuration @@ -162,8 +162,7 @@ CTEST_SETUP(splinter) data->io = io_handle_create(&data->system_cfg->io_cfg, data->hid); ASSERT_TRUE((data->io != NULL), "Failed to create IO handle\n"); - data->tasks = NULL; - rc = test_init_task_system(data->hid, &data->tasks, &data->system_cfg->task_cfg); + rc = test_init_task_system(&data->tasks, data->hid, &data->system_cfg->task_cfg); ASSERT_TRUE(SUCCESS(rc), "Failed to init splinter state: %s\n", platform_status_to_string(rc)); @@ -201,7 +200,7 @@ CTEST_TEARDOWN(splinter) allocator_assert_noleaks(alp); rc_allocator_deinit(&data->al); - test_deinit_task_system(data->hid, &data->tasks); + test_deinit_task_system(&data->tasks); io_handle_destroy(data->io); @@ -228,7 +227,7 @@ CTEST2(splinter, test_inserts) core_handle *spl = core_create(&data->system_cfg->splinter_cfg, alp, (cache *)data->clock_cache, - data->tasks, + &data->tasks, test_generate_allocator_root_id(), data->hid); ASSERT_TRUE(spl != NULL); @@ -399,7 +398,7 @@ CTEST2(splinter, test_lookups) core_handle *spl = core_create(&data->system_cfg->splinter_cfg, alp, (cache *)data->clock_cache, - data->tasks, + &data->tasks, test_generate_allocator_root_id(), data->hid); ASSERT_TRUE(spl != NULL); @@ -621,7 +620,7 @@ CTEST2(splinter, test_splinter_print_diags) core_handle *spl = core_create(&data->system_cfg->splinter_cfg, alp, (cache *)data->clock_cache, - data->tasks, + &data->tasks, test_generate_allocator_root_id(), data->hid); ASSERT_TRUE(spl != NULL); diff --git a/tests/unit/task_system_test.c b/tests/unit/task_system_test.c index 44109250..aa514c85 100644 --- a/tests/unit/task_system_test.c +++ b/tests/unit/task_system_test.c @@ -83,8 +83,8 @@ CTEST_DATA(task_system) task_system_config task_cfg; // Following get setup pointing to allocated memory - io_handle *ioh; // Only prerequisite needed to setup task system - task_system *tasks; + io_handle *ioh; // Only prerequisite needed to setup task system + task_system tasks; }; /* @@ -133,7 +133,7 @@ CTEST_SETUP(task_system) // Teardown function for suite, called after every test in suite CTEST_TEARDOWN(task_system) { - task_system_destroy(data->hid, &data->tasks); + task_system_deinit(&data->tasks); io_handle_destroy(data->ioh); platform_heap_destroy(&data->hid); platform_deregister_thread(); @@ -181,7 +181,7 @@ CTEST2(task_system, test_one_thread_using_lower_apis) ASSERT_EQUAL(main_thread_idx, 0, "main_thread_idx=%lu", main_thread_idx); // Setup thread-specific struct, needed for validation in thread's worker fn - thread_cfg.tasks = data->tasks; + thread_cfg.tasks = &data->tasks; // Main thread is at index 0 thread_cfg.exp_thread_idx = 1; @@ -235,7 +235,7 @@ CTEST2(task_system, test_one_thread_using_extern_apis) ASSERT_EQUAL(main_thread_idx, 0, "main_thread_idx=%lu", main_thread_idx); // Setup thread-specific struct, needed for validation in thread's worker fn - thread_cfg.tasks = data->tasks; + thread_cfg.tasks = &data->tasks; // Main thread is at index 0 thread_cfg.exp_thread_idx = 1; @@ -303,7 +303,7 @@ CTEST2(task_system, test_max_threads_using_lower_apis) thread_cfgp = &thread_cfg[tctr]; // These are independent of the new thread's creation. - thread_cfgp->tasks = data->tasks; + thread_cfgp->tasks = &data->tasks; thread_cfgp->exp_thread_idx = tctr; thread_cfgp->done = &done; @@ -330,7 +330,7 @@ CTEST2(task_system, test_max_threads_using_lower_apis) CTEST2(task_system, test_task_system_creation_with_bg_threads) { // Destroy the task system setup by the harness, by default, w/o bg threads. - task_system_destroy(data->hid, &data->tasks); + task_system_deinit(&data->tasks); platform_status rc = create_task_system_with_bg_threads(data, 2, 4); ASSERT_TRUE(SUCCESS(rc)); } @@ -348,7 +348,7 @@ CTEST2(task_system, test_use_all_but_one_threads_for_bg_threads) set_log_streams_for_tests(MSG_LEVEL_ERRORS); // Destroy the task system setup by the harness, by default, w/o bg threads. - task_system_destroy(data->hid, &data->tasks); + task_system_deinit(&data->tasks); // Consume all-but-one available threads with background threads. rc = create_task_system_with_bg_threads(data, 1, (MAX_THREADS - 3)); @@ -359,7 +359,7 @@ CTEST2(task_system, test_use_all_but_one_threads_for_bg_threads) thread_config_lockstep thread_cfg[2]; ZERO_ARRAY(thread_cfg); - thread_cfg[0].tasks = data->tasks; + thread_cfg[0].tasks = &data->tasks; thread_cfg[0].exp_thread_idx = platform_num_threads(); thread_cfg[0].exp_max_tid = MAX_THREADS; thread_cfg[0].line = __LINE__; @@ -378,7 +378,7 @@ CTEST2(task_system, test_use_all_but_one_threads_for_bg_threads) while (!thread_cfg[0].waitfor_stop_signal) { platform_sleep_ns(USEC_TO_NSEC(100000)); // 100 msec. } - thread_cfg[1].tasks = data->tasks; + thread_cfg[1].tasks = &data->tasks; thread_cfg[1].exp_thread_idx = platform_num_threads(); // We've used up all threads. This thread creation should fail. @@ -421,7 +421,7 @@ create_task_system_without_bg_threads(void *datap) TRUE, // use stats num_bg_threads); ASSERT_TRUE(SUCCESS(rc)); - rc = task_system_create(data->hid, &data->tasks, &data->task_cfg); + rc = task_system_init(&data->tasks, data->hid, &data->task_cfg); return rc; } @@ -448,7 +448,7 @@ create_task_system_with_bg_threads(void *datap, num_bg_threads); ASSERT_TRUE(SUCCESS(rc)); - rc = task_system_create(data->hid, &data->tasks, &data->task_cfg); + rc = task_system_init(&data->tasks, data->hid, &data->task_cfg); if (!SUCCESS(rc)) { return rc; } From ed98e23c044a3d967a44cfcc44e93aadc1573ab1 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Tue, 13 Jan 2026 23:58:21 -0800 Subject: [PATCH 2/2] fix up a bunch of small bugs/races in pivot_state management --- src/task.c | 6 ++-- src/task.h | 1 - src/trunk.c | 89 +++++++++++++++++++++++++++++------------------------ 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/src/task.c b/src/task.c index 1e4c4609..965552f7 100644 --- a/src/task.c +++ b/src/task.c @@ -80,7 +80,8 @@ task_group_run_task(task_group *group, task *assigned_task) } } - task_fn func = assigned_task->func; + task_fn func = assigned_task->func; + assigned_task->func = (task_fn)(void *)0xdeadbeef; func(assigned_task); if (group->use_stats) { @@ -214,6 +215,7 @@ task_enqueue(task_system *ts, task_fn func, bool32 at_head) { + memset(new_task, 0, sizeof(*new_task)); new_task->func = func; new_task->ts = ts; @@ -228,12 +230,10 @@ task_enqueue(task_system *ts, if (tq->tail) { if (at_head) { - tq->head->prev = new_task; new_task->next = tq->head; tq->head = new_task; } else { tq->tail->next = new_task; - new_task->prev = tq->tail; tq->tail = new_task; } } else { diff --git a/src/task.h b/src/task.h index a6e09ad7..239bf2bd 100644 --- a/src/task.h +++ b/src/task.h @@ -42,7 +42,6 @@ typedef void (*task_fn)(task *arg); struct task { struct task *next; - struct task *prev; task_fn func; task_system *ts; timestamp enqueue_time; diff --git a/src/trunk.c b/src/trunk.c index e19f2483..e3dc1819 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -65,7 +65,8 @@ typedef enum bundle_compaction_state { BUNDLE_COMPACTION_IN_PROGRESS = 1, BUNDLE_COMPACTION_MIN_ENDED = 2, BUNDLE_COMPACTION_FAILED = 2, - BUNDLE_COMPACTION_SUCCEEDED = 3 + BUNDLE_COMPACTION_ABORTED = 3, + BUNDLE_COMPACTION_SUCCEEDED = 4 } bundle_compaction_state; typedef VECTOR(trunk_branch_info) trunk_branch_info_vector; @@ -2595,6 +2596,7 @@ bundle_compaction_destroy(bundle_compaction *compaction, trunk_context *context) // bundle_compaction_print_table_header(Platform_default_log_handle, 4); // bundle_compaction_print_table_entry( // compaction, Platform_default_log_handle, 4); + platform_assert(compaction->state >= BUNDLE_COMPACTION_MIN_ENDED); for (uint64 i = 0; i < vector_length(&compaction->input_branches); i++) { trunk_branch_info bi = vector_get(&compaction->input_branches, i); @@ -2663,6 +2665,7 @@ bundle_compaction_create(trunk_context *context, __func__, __LINE__, platform_status_to_string(rc)); + result->state = BUNDLE_COMPACTION_FAILED; bundle_compaction_destroy(result, context); return NULL; } @@ -2707,7 +2710,7 @@ trunk_pivot_state_map_aquire_lock(pivot_state_map_lock *lock, *lock = trunk_pivot_state_map_hash(context->cfg->data_cfg, pivot_key, height); uint64 wait = 1; - while (__sync_val_compare_and_swap(&map->locks[*lock], 0, 1) != 0) { + while (__sync_lock_test_and_set(&map->locks[*lock], 1)) { platform_sleep_ns(wait); wait = MIN(2 * wait, 2048); } @@ -3268,21 +3271,13 @@ maplet_compaction_task(task *arg) if (!apply_args.found_match) { if (!state->abandoned) { - platform_error_log("Failed to find matching pivot for non-abandoned " - "compaction state\n"); trunk_pivot_state_print( state, Platform_error_log_handle, context->cfg->data_cfg, 4); + platform_assert(!state->abandoned, + "Failed to find matching pivot for non-abandoned " + "compaction state\n"); } - pivot_state_map_lock lock; - trunk_pivot_state_map_aquire_lock(&lock, - context, - &context->pivot_states, - key_buffer_key(&state->key), - state->height); - trunk_pivot_state_map_remove( - &context->pivot_states, &lock, apply_args.state); - trunk_pivot_state_map_release_lock(&lock, &context->pivot_states); trunk_modification_end(context); if (context->stats) { @@ -3300,24 +3295,28 @@ maplet_compaction_task(task *arg) state->maplet = new_maplet; } state->num_branches += vector_length(&apply_args.branches); - trunk_pivot_state_lock_compactions(state); - while (state->bundle_compactions != last) { - bundle_compaction *next = state->bundle_compactions->next; - state->total_bundles -= state->bundle_compactions->num_bundles; - bundle_compaction_destroy(state->bundle_compactions, context); - state->bundle_compactions = next; - } - platform_assert(state->bundle_compactions == last); - state->bundle_compactions = last->next; - state->total_bundles -= last->num_bundles; - bundle_compaction_destroy(last, context); - __sync_lock_release(&state->maplet_compaction_initiated); - if (state->bundle_compactions - && state->bundle_compactions->state == BUNDLE_COMPACTION_SUCCEEDED) + trunk_pivot_state_lock_compactions(state); { - enqueue_maplet_compaction(state); + while (state->bundle_compactions != last) { + bundle_compaction *next = state->bundle_compactions->next; + state->total_bundles -= state->bundle_compactions->num_bundles; + bundle_compaction_destroy(state->bundle_compactions, context); + state->bundle_compactions = next; + } + platform_assert(state->bundle_compactions == last); + state->bundle_compactions = last->next; + state->total_bundles -= last->num_bundles; + bundle_compaction_destroy(last, context); + + __sync_lock_release(&state->maplet_compaction_initiated); + + if (state->bundle_compactions + && state->bundle_compactions->state == BUNDLE_COMPACTION_SUCCEEDED) + { + enqueue_maplet_compaction(state); + } } trunk_pivot_state_unlock_compactions(state); @@ -3387,28 +3386,31 @@ bundle_compaction_task(task *arg) trunk_context *context = state->context; threadid tid = platform_get_tid(); + platform_assert(__sync_bool_compare_and_swap(&bc->state, + BUNDLE_COMPACTION_NOT_STARTED, + BUNDLE_COMPACTION_IN_PROGRESS)); + if (context->stats) { context->stats[tid].compactions[state->height]++; } if (state->abandoned) { - trunk_pivot_state_map_release_entry( - context, &context->pivot_states, state); - if (context->stats) { context->stats[tid].compactions_aborted[state->height]++; } + + platform_assert(__sync_bool_compare_and_swap( + &bc->state, BUNDLE_COMPACTION_IN_PROGRESS, BUNDLE_COMPACTION_ABORTED)); + + + trunk_pivot_state_map_release_entry( + context, &context->pivot_states, state); + return; } uint64 compaction_start = platform_get_timestamp(); - // Find a bundle compaction that needs doing for this pivot - trunk_pivot_state_lock_compactions(state); - platform_assert(__sync_bool_compare_and_swap(&bc->state, - BUNDLE_COMPACTION_NOT_STARTED, - BUNDLE_COMPACTION_IN_PROGRESS)); - trunk_pivot_state_unlock_compactions(state); platform_assert(0 < vector_length(&bc->input_branches)); trunk_branch_merger merger; @@ -3517,12 +3519,16 @@ bundle_compaction_task(task *arg) btree_pack_req_deinit(&pack_req, context->hid); trunk_branch_merger_deinit(&merger); + trunk_pivot_state_lock_compactions(state); if (SUCCESS(rc)) { - bc->state = BUNDLE_COMPACTION_SUCCEEDED; + platform_assert( + __sync_bool_compare_and_swap(&bc->state, + BUNDLE_COMPACTION_IN_PROGRESS, + BUNDLE_COMPACTION_SUCCEEDED)); } else { - bc->state = BUNDLE_COMPACTION_FAILED; + platform_assert(__sync_bool_compare_and_swap( + &bc->state, BUNDLE_COMPACTION_IN_PROGRESS, BUNDLE_COMPACTION_FAILED)); } - trunk_pivot_state_lock_compactions(state); if (bc->state == BUNDLE_COMPACTION_SUCCEEDED && state->bundle_compactions == bc) { @@ -3568,9 +3574,10 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) goto next; } + trunk_pivot_state_incref(state); + trunk_pivot_state_append_compaction(state, bc); - trunk_pivot_state_incref(state); rc = task_enqueue(context->ts, TASK_TYPE_NORMAL, &bc->tsk,