Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,9 @@ core_memtable_flush_internal(core_handle *spl, uint64 generation)
}

static void
core_memtable_flush_internal_virtual(void *arg)
core_memtable_flush_internal_virtual(task *arg)
{
core_memtable_args *mt_args = arg;
core_memtable_args *mt_args = container_of(arg, core_memtable_args, tsk);
core_memtable_flush_internal(mt_args->spl, mt_args->generation);
}

Expand All @@ -653,8 +653,8 @@ core_memtable_flush(core_handle *spl, uint64 generation)
cmt->mt_args.generation = generation;
task_enqueue(spl->ts,
TASK_TYPE_MEMTABLE,
&cmt->mt_args.tsk,
core_memtable_flush_internal_virtual,
&cmt->mt_args,
FALSE);
}

Expand Down
1 change: 1 addition & 0 deletions src/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct core_handle core_handle;
typedef struct core_memtable_args {
core_handle *spl;
uint64 generation;
task tsk;
} core_memtable_args;

typedef struct core_compacted_memtable {
Expand Down
16 changes: 8 additions & 8 deletions src/splinterdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ splinterdb_get_version()
}

typedef struct splinterdb {
task_system *task_sys;
task_system task_sys;
io_config io_cfg;
io_handle *io_handle;
allocator_config allocator_cfg;
Expand Down Expand Up @@ -311,7 +311,7 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN
goto io_handle_create_failed;
}

status = task_system_create(kvs->heap_id, &kvs->task_sys, &kvs->task_cfg);
status = task_system_init(&kvs->task_sys, kvs->heap_id, &kvs->task_cfg);
if (!SUCCESS(status)) {
platform_error_log(
"Failed to initialize SplinterDB task system state: %s\n",
Expand Down Expand Up @@ -357,14 +357,14 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN
kvs->spl = core_mount(&kvs->trunk_cfg,
(allocator *)&kvs->allocator_handle,
(cache *)&kvs->cache_handle,
kvs->task_sys,
&kvs->task_sys,
kvs->trunk_id,
kvs->heap_id);
} else {
kvs->spl = core_create(&kvs->trunk_cfg,
(allocator *)&kvs->allocator_handle,
(cache *)&kvs->cache_handle,
kvs->task_sys,
&kvs->task_sys,
kvs->trunk_id,
kvs->heap_id);
}
Expand All @@ -385,7 +385,7 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN
deinit_allocator:
rc_allocator_unmount(&kvs->allocator_handle);
deinit_system:
task_system_destroy(kvs->heap_id, &kvs->task_sys);
task_system_deinit(&kvs->task_sys);
deinit_iohandle:
io_handle_destroy(kvs->io_handle);
io_handle_create_failed:
Expand Down Expand Up @@ -458,7 +458,7 @@ splinterdb_close(splinterdb **kvs_in) // IN
core_unmount(&kvs->spl);
clockcache_deinit(&kvs->cache_handle);
rc_allocator_unmount(&kvs->allocator_handle);
task_system_destroy(kvs->heap_id, &kvs->task_sys);
task_system_deinit(&kvs->task_sys);
io_handle_destroy(kvs->io_handle);

// Free resources carefully to avoid ASAN-test failures
Expand Down Expand Up @@ -755,7 +755,7 @@ splinterdb_stats_reset(splinterdb *kvs)
static void
splinterdb_close_print_stats(splinterdb *kvs)
{
task_print_stats(kvs->task_sys);
task_print_stats(&kvs->task_sys);
splinterdb_stats_print_insertion(kvs);
}

Expand All @@ -780,7 +780,7 @@ splinterdb_get_heap_id(const splinterdb *kvs)
const task_system *
splinterdb_get_task_system_handle(const splinterdb *kvs)
{
return kvs->task_sys;
return &kvs->task_sys;
}

const io_handle *
Expand Down
47 changes: 14 additions & 33 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ task_group_run_task(task_group *group, task *assigned_task)
}
}

assigned_task->func(assigned_task->arg);
task_fn func = assigned_task->func;
assigned_task->func = (task_fn)(void *)0xdeadbeef;
func(assigned_task);

if (group->use_stats) {
current = platform_timestamp_elapsed(current);
if (current > group->stats[tid].max_runtime_ns) {
group->stats[tid].max_runtime_ns = current;
group->stats[tid].max_runtime_func = assigned_task->func;
group->stats[tid].max_runtime_func = func;
}
}

Expand Down Expand Up @@ -119,7 +121,6 @@ task_worker_thread(void *arg)
const threadid tid = platform_get_tid();
group->stats[tid].total_bg_task_executions++;
task_group_run_task(group, task_to_run);
platform_free(group->ts->heap_id, task_to_run);
rc = task_group_lock(group);
platform_assert(SUCCESS(rc));
__sync_fetch_and_sub(&group->current_executing_tasks, 1);
Expand Down Expand Up @@ -210,16 +211,12 @@ task_group_init(task_group *group,
platform_status
task_enqueue(task_system *ts,
task_type type,
task *new_task,
task_fn func,
void *arg,
bool32 at_head)
{
task *new_task = TYPED_ZALLOC(ts->heap_id, new_task);
if (new_task == NULL) {
return STATUS_NO_MEMORY;
}
memset(new_task, 0, sizeof(*new_task));
new_task->func = func;
new_task->arg = arg;
new_task->ts = ts;

task_group *group = &ts->group[type];
Expand All @@ -228,18 +225,15 @@ task_enqueue(task_system *ts,

rc = task_group_lock(group);
if (!SUCCESS(rc)) {
platform_free(ts->heap_id, new_task);
return rc;
}

if (tq->tail) {
if (at_head) {
tq->head->prev = new_task;
new_task->next = tq->head;
tq->head = new_task;
} else {
tq->tail->next = new_task;
new_task->prev = tq->tail;
tq->tail = new_task;
}
} else {
Expand Down Expand Up @@ -309,7 +303,6 @@ task_group_perform_one(task_group *group, uint64 queue_scale_percent)
group->stats[tid].total_fg_task_executions++;
task_group_run_task(group, assigned_task);
__sync_fetch_and_sub(&group->current_executing_tasks, 1);
platform_free(group->ts->heap_id, assigned_task);
} else {
rc = STATUS_TIMEDOUT;
}
Expand Down Expand Up @@ -446,20 +439,16 @@ task_system_config_init(task_system_config *task_cfg,
* -----------------------------------------------------------------------------
*/
platform_status
task_system_create(platform_heap_id hid,
task_system **system,
const task_system_config *cfg)
task_system_init(task_system *ts,
platform_heap_id hid,
const task_system_config *cfg)
{
platform_status rc = task_config_valid(cfg->num_background_threads);
if (!SUCCESS(rc)) {
return rc;
}

task_system *ts = TYPED_ZALLOC(hid, ts);
if (ts == NULL) {
*system = NULL;
return STATUS_NO_MEMORY;
}
ZERO_CONTENTS(ts);
ts->cfg = cfg;
ts->heap_id = hid;

Expand All @@ -469,8 +458,7 @@ task_system_create(platform_heap_id hid,
cfg->use_stats,
cfg->num_background_threads[type]);
if (!SUCCESS(rc)) {
task_system_destroy(hid, &ts);
*system = NULL;
task_system_deinit(ts);
return rc;
}
uint64 nbg_threads = cfg->num_background_threads[type];
Expand All @@ -482,29 +470,22 @@ task_system_create(platform_heap_id hid,
task_type_name[type]);
}
}
debug_assert((*system == NULL),
"Task system handle, %p, is expected to be NULL.\n",
*system);
*system = ts;
return STATUS_OK;
}

/*
* -----------------------------------------------------------------------------
* task_system_destroy() : Task system de-initializer.
* task_system_deinit() : Task system de-initializer.
*
* Tear down task system structures, free allocated memory.
* Tear down task system structures.
* -----------------------------------------------------------------------------
*/
void
task_system_destroy(platform_heap_id hid, task_system **ts_in)
task_system_deinit(task_system *ts)
{
task_system *ts = *ts_in;
for (task_type type = TASK_TYPE_FIRST; type != NUM_TASK_TYPES; type++) {
task_group_deinit(&ts->group[type]);
}
platform_free(hid, ts);
*ts_in = (task_system *)NULL;
}

static void
Expand Down
30 changes: 15 additions & 15 deletions src/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ task_system_config_init(task_system_config *task_cfg,

typedef struct task_system task_system;

typedef void (*task_fn)(void *arg);
typedef struct task task;

typedef struct task {
typedef void (*task_fn)(task *arg);

struct task {
struct task *next;
struct task *prev;
task_fn func;
void *arg;
task_system *ts;
timestamp enqueue_time;
} task;
};

/*
* Run-time task-specific execution metrics structure.
Expand Down Expand Up @@ -111,35 +111,35 @@ struct task_system {
};

/*
* Create a task system and register the calling thread.
* Initialize a task system and register the calling thread.
*/
platform_status
task_system_create(platform_heap_id hid,
task_system **system,
const task_system_config *cfg);
task_system_init(task_system *system,
platform_heap_id hid,
const task_system_config *cfg);

/*
* Deregister the calling thread (if it is registered) and destroy the
* task system. It is recommended to not destroy the task system
* Deregister the calling thread (if it is registered) and deinitialize the
* task system. It is recommended to not deinitialize the task system
* until all registered threads have deregistered.
*
* task_system_destroy() waits for currently executing background
* task_system_deinit() waits for currently executing background
* tasks and cleanly shuts down all background threads, but it
* abandons tasks that are still waiting to execute. To ensure that
* no enqueued tasks are abandoned by a shutdown,
*
* 1. Ensure that your application will not enqueue any more tasks.
* 2. Call task_perform_until_quiescent().
* 3. Then call task_system_destroy().
* 3. Then call task_system_deinit().
*/
void
task_system_destroy(platform_heap_id hid, task_system **ts);
task_system_deinit(task_system *ts);

platform_status
task_enqueue(task_system *ts,
task_type type,
task *arg,
task_fn func,
void *arg,
bool32 at_head);

/*
Expand Down
Loading