Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ splinterdb_update(const splinterdb *kvsb, slice key, slice delta);
// Lookups

// Size of opaque data required to hold a lookup result
#define SPLINTERDB_LOOKUP_BUFSIZE (6 * sizeof(void *))
#define SPLINTERDB_LOOKUP_BUFSIZE (10 * sizeof(void *))

// A lookup result is stored and parsed from here
//
Expand Down
94 changes: 72 additions & 22 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

#include "core.h"
#include "data_internal.h"
#include "platform_sleep.h"
#include "platform_time.h"
#include "poison.h"
Expand Down Expand Up @@ -803,6 +804,8 @@ core_range_iterator_init(core_handle *spl,
comparison start_type,
uint64 num_tuples)
{
platform_status rc;

debug_assert(!key_is_null(min_key));
debug_assert(!key_is_null(max_key));
debug_assert(!key_is_null(start_key));
Expand All @@ -815,6 +818,11 @@ core_range_iterator_init(core_handle *spl,
range_itor->can_prev = TRUE;
range_itor->can_next = TRUE;

key_buffer_init(&range_itor->min_key, spl->heap_id);
key_buffer_init(&range_itor->max_key, spl->heap_id);
key_buffer_init(&range_itor->local_min_key, spl->heap_id);
key_buffer_init(&range_itor->local_max_key, spl->heap_id);

if (core_key_compare(spl, min_key, start_key) > 0) {
// in bounds, start at min
start_key = min_key;
Expand All @@ -825,8 +833,17 @@ core_range_iterator_init(core_handle *spl,
}

// copy over global min and max
key_buffer_init_from_key(&range_itor->min_key, spl->heap_id, min_key);
key_buffer_init_from_key(&range_itor->max_key, spl->heap_id, max_key);
rc = key_buffer_copy_key(&range_itor->min_key, min_key);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}
rc = key_buffer_copy_key(&range_itor->max_key, max_key);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}


ZERO_ARRAY(range_itor->compacted);

Expand Down Expand Up @@ -868,16 +885,15 @@ core_range_iterator_init(core_handle *spl,
}

trunk_ondisk_node_handle root_handle;
trunk_init_root_handle(&spl->trunk_context, &root_handle);

rc = trunk_init_root_handle(&spl->trunk_context, &root_handle);
memtable_end_lookup(spl->mt_ctxt);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}

key_buffer_init(&range_itor->local_min_key, spl->heap_id);
key_buffer_init(&range_itor->local_max_key, spl->heap_id);

platform_status rc;
uint64 old_num_branches = range_itor->num_branches;
rc = trunk_collect_branches(&spl->trunk_context,
uint64 old_num_branches = range_itor->num_branches;
rc = trunk_collect_branches(&spl->trunk_context,
&root_handle,
start_key,
start_type,
Expand All @@ -887,7 +903,10 @@ core_range_iterator_init(core_handle *spl,
&range_itor->local_min_key,
&range_itor->local_max_key);
trunk_ondisk_node_handle_deinit(&root_handle);
platform_assert_status_ok(rc);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}

for (uint64 i = old_num_branches; i < range_itor->num_branches; i++) {
range_itor->compacted[i] = TRUE;
Expand All @@ -899,14 +918,20 @@ core_range_iterator_init(core_handle *spl,
<= 0)
{
rc = key_buffer_copy_key(&range_itor->local_min_key, min_key);
platform_assert_status_ok(rc);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}
}
if (core_key_compare(
spl, key_buffer_key(&range_itor->local_max_key), max_key)
>= 0)
{
rc = key_buffer_copy_key(&range_itor->local_max_key, max_key);
platform_assert_status_ok(rc);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}
}

for (uint64 i = 0; i < range_itor->num_branches; i++) {
Expand Down Expand Up @@ -949,7 +974,10 @@ core_range_iterator_init(core_handle *spl,
MERGE_FULL,
greater_than <= start_type,
&range_itor->merge_itor);
platform_assert_status_ok(rc);
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}

bool32 in_range = iterator_can_curr(&range_itor->merge_itor->super);

Expand All @@ -960,15 +988,26 @@ core_range_iterator_init(core_handle *spl,
if (!in_range && start_type >= greater_than) {
key local_max = key_buffer_key(&range_itor->local_max_key);
if (core_key_compare(spl, local_max, max_key) < 0) {
key_buffer local_max_buffer;
rc = key_buffer_init_from_key(
&local_max_buffer, spl->heap_id, local_max);
core_range_iterator_deinit(range_itor);
rc = core_range_iterator_init(spl,
if (!SUCCESS(rc)) {
return rc;
}
local_max = key_buffer_key(&local_max_buffer);
rc = core_range_iterator_init(spl,
range_itor,
min_key,
max_key,
local_max,
start_type,
range_itor->num_tuples);
platform_assert_status_ok(rc);
key_buffer_deinit(&local_max_buffer);
if (!SUCCESS(rc)) {
return rc;
}

} else {
range_itor->can_next = FALSE;
range_itor->can_prev =
Expand All @@ -978,15 +1017,26 @@ core_range_iterator_init(core_handle *spl,
if (!in_range && start_type <= less_than_or_equal) {
key local_min = key_buffer_key(&range_itor->local_min_key);
if (core_key_compare(spl, local_min, min_key) > 0) {
key_buffer local_min_buffer;
rc = key_buffer_init_from_key(
&local_min_buffer, spl->heap_id, local_min);
core_range_iterator_deinit(range_itor);
rc = core_range_iterator_init(spl,
if (!SUCCESS(rc)) {
return rc;
}
local_min = key_buffer_key(&local_min_buffer);
rc = core_range_iterator_init(spl,
range_itor,
min_key,
max_key,
local_min,
start_type,
range_itor->num_tuples);
platform_assert_status_ok(rc);
key_buffer_deinit(&local_min_buffer);
if (!SUCCESS(rc)) {
return rc;
}

} else {
range_itor->can_prev = FALSE;
range_itor->can_next =
Expand Down Expand Up @@ -1158,11 +1208,11 @@ core_range_iterator_deinit(core_range_iterator *range_itor)
core_memtable_dec_ref(spl, mt_gen);
}
}
key_buffer_deinit(&range_itor->min_key);
key_buffer_deinit(&range_itor->max_key);
key_buffer_deinit(&range_itor->local_min_key);
key_buffer_deinit(&range_itor->local_max_key);
}
key_buffer_deinit(&range_itor->min_key);
key_buffer_deinit(&range_itor->max_key);
key_buffer_deinit(&range_itor->local_min_key);
key_buffer_deinit(&range_itor->local_max_key);
}

/*
Expand Down
5 changes: 2 additions & 3 deletions src/data_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ key_copy_contents(void *dst, key k)
typedef struct {
key_type kind;
writable_buffer wb;
char default_buffer[DEFAULT_KEY_BUFFER_SIZE];
} key_buffer;

/*
Expand All @@ -151,8 +150,8 @@ static inline void
key_buffer_init(key_buffer *kb, platform_heap_id hid)
{
kb->kind = USER_KEY;
writable_buffer_init_with_buffer(
&kb->wb, hid, sizeof(kb->default_buffer), kb->default_buffer, 0);
writable_buffer_init(&kb->wb, hid);
writable_buffer_resize(&kb->wb, 0);
}

static inline platform_status
Expand Down
1 change: 1 addition & 0 deletions src/platform_linux/platform_heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "platform_status.h"
#include "platform_util.h"
#include "platform_machine.h"
#include "platform_log.h"
#include "shmem.h"
#include <stddef.h>
#include <stdlib.h>
Expand Down
12 changes: 12 additions & 0 deletions src/platform_linux/platform_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "platform_log.h"
#include "platform_assert.h"
#include "platform_heap.h"

// By default, platform_default_log() messages are sent to /dev/null
// and platform_error_log() messages go to stderr (see below).
Expand Down Expand Up @@ -39,3 +40,14 @@ platform_get_stdout_stream(void)
{
return Platform_default_log_handle;
}

void
platform_close_log_stream(platform_stream_handle *stream,
platform_log_handle *log_handle)
{
fclose(stream->stream);
fputs(stream->str, log_handle);
fflush(log_handle);
platform_free_from_heap(
NULL, stream->str, "stream", __func__, __FILE__, __LINE__);
}
12 changes: 2 additions & 10 deletions src/platform_linux/platform_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "splinterdb/platform_linux/public_platform.h"
#include "platform_status.h"
#include "platform_util.h"
#include "platform_heap.h"

/* Default output file handles for different logging interfaces */
#define PLATFORM_CR "\r"
Expand Down Expand Up @@ -47,16 +46,9 @@ platform_flush_log_stream(platform_stream_handle *stream)
fflush(stream->stream);
}

static inline void
void
platform_close_log_stream(platform_stream_handle *stream,
platform_log_handle *log_handle)
{
fclose(stream->stream);
fputs(stream->str, log_handle);
fflush(log_handle);
platform_free_from_heap(
NULL, stream->str, "stream", __func__, __FILE__, __LINE__);
}
platform_log_handle *log_handle);

static inline platform_log_handle *
platform_log_stream_to_log_handle(platform_stream_handle *stream)
Expand Down
23 changes: 13 additions & 10 deletions src/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,35 @@
platform_status
writable_buffer_ensure_space(writable_buffer *wb, uint64 minspace)
{
if (minspace <= wb->buffer_capacity) {
uint64 old_capacity = writable_buffer_capacity(wb);
if (minspace <= old_capacity) {
return STATUS_OK;
}

if (minspace < 2 * wb->buffer_capacity) {
minspace = 2 * wb->buffer_capacity;
if (minspace < 2 * old_capacity) {
minspace = 2 * old_capacity;
}

void *newdata = NULL;
if (wb->can_free) {
newdata = platform_realloc(
wb->heap_id, wb->buffer_capacity, wb->buffer, minspace);
if (wb->mode == WRITABLE_BUFFER_ALLOCED) {
newdata = platform_realloc(wb->heap_id,
wb->u.external.buffer_capacity,
wb->u.external.buffer,
minspace);
} else {
char *newbuf = TYPED_MANUAL_MALLOC(wb->heap_id, newbuf, minspace);
if (newbuf && writable_buffer_data(wb)) {
memcpy(newbuf, wb->buffer, wb->length);
memcpy(newbuf, writable_buffer_data(wb), wb->length);
}
newdata = (void *)newbuf;
}
if (newdata == NULL) {
return STATUS_NO_MEMORY;
}

wb->buffer_capacity = minspace;
wb->buffer = newdata;
wb->can_free = TRUE;
wb->mode = WRITABLE_BUFFER_ALLOCED;
wb->u.external.buffer_capacity = minspace;
wb->u.external.buffer = newdata;
return STATUS_OK;
}

Expand Down
Loading