From a87f615ddb583575fcd502ebb57fa6d121502839 Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 14 Jun 2023 23:59:51 +0000 Subject: [PATCH 01/12] column family support: initial commit --- Makefile | 5 + include/splinterdb/column_family.h | 127 ++++++++++ src/column_family.c | 361 +++++++++++++++++++++++++++++ src/splinterdb.c | 25 +- src/splinterdb_internal.h | 27 +++ test.sh | 1 + tests/unit/column_family_test.c | 331 ++++++++++++++++++++++++++ 7 files changed, 854 insertions(+), 23 deletions(-) create mode 100644 include/splinterdb/column_family.h create mode 100644 src/column_family.c create mode 100755 src/splinterdb_internal.h create mode 100644 tests/unit/column_family_test.c diff --git a/Makefile b/Makefile index 3442847b5..e002a40d0 100644 --- a/Makefile +++ b/Makefile @@ -437,6 +437,11 @@ $(BINDIR)/$(UNITDIR)/splinterdb_quick_test: $(COMMON_TESTOBJ) $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ $(LIBDIR)/libsplinterdb.so +$(BINDIR)/$(UNITDIR)/column_family_test: $(COMMON_TESTOBJ) \ + $(COMMON_UNIT_TESTOBJ) \ + $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ + $(LIBDIR)/libsplinterdb.so + $(BINDIR)/$(UNITDIR)/splinterdb_stress_test: $(COMMON_TESTOBJ) \ $(COMMON_UNIT_TESTOBJ) \ $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h new file mode 100644 index 000000000..aee09f13b --- /dev/null +++ b/include/splinterdb/column_family.h @@ -0,0 +1,127 @@ +/* + * column_family.h -- + * + * The Column Family public API for SplinterDB. + * + * + */ + + +#ifndef _SPLINTERDB_COLUMN_FAMILY_H_ +#define _SPLINTERDB_COLUMN_FAMILY_H_ + +#include "splinterdb/splinterdb.h" + +// Maximum size of a key within a column family +// allows conversion from user key to cf key +// to be performed upon the stack. +#define COLUMN_FAMILY_KEY_BYTES 512 + +typedef uint32 column_family_id; + +typedef struct cf_data_config { + data_config general_config; + column_family_id num_families; + data_config **config_table; + column_family_id table_mem; +} cf_data_config; + +typedef struct splinterdb_column_family { + column_family_id id; + splinterdb *kvs; +} splinterdb_column_family; + +typedef struct splinterdb_cf_iterator { + column_family_id id; + splinterdb_iterator *iter; +} splinterdb_cf_iterator; + +#define CF_ITER_UNINIT ((splinterdb_cf_iterator){0, NULL}) + +// Initialize the data_config stored in the cf_data_config +// this data_config is then passed to SplinterDB to add support for +// column families +void +init_column_family_config(const uint64 max_key_size, // IN + cf_data_config *cf_cfg // OUT +); + +// Delete the cf_data_config, freeing the memory used by the +// config table +void +deinit_column_family_config(cf_data_config *cf_cfg); + +// Create a new column family +// Returns a new column family struct +splinterdb_column_family +create_column_family(splinterdb *kvs, + const uint64 max_key_size, + data_config *data_cfg); + +// Delete the column family cf +void +delete_column_family(splinterdb_column_family cf); + +// ====== SPLINTERDB Functions ====== +// We wrap these for column family support +// Column families and standard splinterdb should not be mixed +int +splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value); + +int +splinterdb_cf_delete(const splinterdb_column_family cf, slice key); + +int +splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta); + +// column family lookups + +void +splinterdb_cf_lookup_result_init(const splinterdb_column_family cf, // IN + splinterdb_lookup_result *result, // IN/OUT + uint64 buffer_len, // IN + char *buffer // IN +); + +void +splinterdb_cf_lookup_result_deinit(splinterdb_lookup_result *result); // IN + +_Bool +splinterdb_cf_lookup_found(const splinterdb_lookup_result *); // IN + +int +splinterdb_cf_lookup_result_value(const splinterdb_lookup_result *result, // IN + slice *value // OUT +); + +int +splinterdb_cf_lookup(const splinterdb_column_family cf, // IN + slice key, // IN + splinterdb_lookup_result *result // IN/OUT +); + + +// Range iterators for column families + +int +splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN + splinterdb_cf_iterator *cf_iter, // OUT + slice start_key // IN +); + +void +splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter); + +void +splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter); + +_Bool +splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN + slice *key, // OUT + slice *value // OUT +); + +int +splinterdb_cf_iterator_status(const splinterdb_cf_iterator *cf_iter); + +#endif // _SPLINTERDB_COLUMN_FAMILY_H_ diff --git a/src/column_family.c b/src/column_family.c new file mode 100644 index 000000000..416674146 --- /dev/null +++ b/src/column_family.c @@ -0,0 +1,361 @@ + +#include "platform.h" + +#include "splinterdb/column_family.h" +#include "splinterdb/splinterdb.h" +#include "splinterdb_internal.h" +#include "util.h" + +#include +#include + +// Some helper functions we'll use for managing the column family identifiers +// and the data config table +column_family_id +get_cf_id(slice cf_key) +{ + // the cf id is a prefix of the key + const void *data = slice_data(cf_key); + column_family_id id; + memcpy(&id, data, sizeof(id)); + return id; +} + +slice +userkey_to_cf_key(slice userkey, + column_family_id cf_id, + char *buf, + uint32 buf_size) +{ + uint64 key_len = slice_length(userkey); + const void *data = slice_data(userkey); + platform_assert(buf_size >= key_len + sizeof(column_family_id)); + + memcpy(buf, &cf_id, sizeof(cf_id)); + if (key_len > 0) + memcpy(buf + sizeof(cf_id), data, key_len); + return slice_create(key_len + sizeof(cf_id), buf); +} + +slice +cf_key_to_userkey(slice cf_key) +{ + uint64 key_len = slice_length(cf_key); + const void *data = slice_data(cf_key); + + return slice_create(key_len - sizeof(column_family_id), + data + sizeof(column_family_id)); +} + +column_family_id +cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) +{ + column_family_id new_id = cf_cfg->num_families; + cf_cfg->num_families += 1; + + // reallocate table memory if necessary + if (cf_cfg->table_mem <= new_id) { + cf_cfg->config_table = (data_config **)realloc( + cf_cfg->config_table, new_id * 2 * sizeof(data_config *)); + } + + // place new data_config in table + cf_cfg->config_table[new_id] = data_cfg; + + return new_id; +} + +void +cfg_table_delete(cf_data_config *cf_cfg, column_family_id cf_id) +{ + // memory is held by user so don't free it + // just mark the config_table entry as NULL + cf_cfg->config_table[cf_id] = NULL; + + // TODO: Reuse this slot somehow? +} + +// Beginning of column family interface + +// Create a new column family +// Returns a new column family struct +splinterdb_column_family +create_column_family(splinterdb *kvs, + const uint64 max_key_size, + data_config *new_data_cfg) +{ + platform_assert(kvs->data_cfg->max_key_size >= max_key_size); + + // convert from data_config to cf_data_config + cf_data_config *cf_cfg = (cf_data_config *)kvs->data_cfg; + + column_family_id new_id = cfg_table_insert(cf_cfg, new_data_cfg); + + // return new column family + splinterdb_column_family cf; + cf.id = new_id; + cf.kvs = kvs; + + return cf; +} + +// Delete the column family cf +void +delete_column_family(splinterdb_column_family cf) +{ + // convert from data_config to cf_data_config + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + + cfg_table_delete(cf_cfg, cf.id); +} + +// SplinterDB Functions +// We wrap these for column family support +// Column families and standard splinterdb should not be mixed +int +splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value) +{ + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // convert to column family key by prefixing the cf id + char key_buf[COLUMN_FAMILY_KEY_BYTES]; + slice cf_key = + userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + return splinterdb_insert(cf.kvs, cf_key, value); +} + +int +splinterdb_cf_delete(const splinterdb_column_family cf, slice key) +{ + char key_buf[COLUMN_FAMILY_KEY_BYTES]; + slice cf_key = + userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + return splinterdb_delete(cf.kvs, cf_key); +} + +int +splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta) +{ + char key_buf[COLUMN_FAMILY_KEY_BYTES]; + slice cf_key = + userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + return splinterdb_insert(cf.kvs, cf_key, delta); +} + +// column family lookups + +void +splinterdb_cf_lookup_result_init(const splinterdb_column_family cf, // IN + splinterdb_lookup_result *result, // IN/OUT + uint64 buffer_len, // IN + char *buffer // IN +) +{ + splinterdb_lookup_result_init(cf.kvs, result, buffer_len, buffer); +} + +void +splinterdb_cf_lookup_result_deinit(splinterdb_lookup_result *result) // IN +{ + splinterdb_lookup_result_deinit(result); +} + +_Bool +splinterdb_cf_lookup_found(const splinterdb_lookup_result *result) // IN +{ + return splinterdb_lookup_found(result); +} + +int +splinterdb_cf_lookup_result_value(const splinterdb_lookup_result *result, // IN + slice *value // OUT +) +{ + return splinterdb_lookup_result_value(result, value); +} + +int +splinterdb_cf_lookup(const splinterdb_column_family cf, // IN + slice key, // IN + splinterdb_lookup_result *result // IN/OUT +) +{ + char key_buf[COLUMN_FAMILY_KEY_BYTES]; + slice cf_key = + userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + return splinterdb_lookup(cf.kvs, cf_key, result); +} + + +// Range iterators for column families + +int +splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN + splinterdb_cf_iterator *cf_iter, // OUT + slice start_key // IN +) +{ + // The minimum key contains no key data only consists of + // the column id so this is what a NULL key will become + char key_buf[COLUMN_FAMILY_KEY_BYTES]; + slice cf_key = + userkey_to_cf_key(start_key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + cf_iter->id = cf.id; + return splinterdb_iterator_init(cf.kvs, &cf_iter->iter, cf_key); +} + +void +splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter) +{ + splinterdb_iterator_deinit(cf_iter->iter); +} + +_Bool +splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN + slice *key, // OUT + slice *value // OUT +) +{ + _Bool valid = splinterdb_iterator_valid(cf_iter->iter); + + if (!valid) + return false; + + // if valid, check the key to ensure it's within this column family + splinterdb_iterator_get_current(cf_iter->iter, key, value); + column_family_id key_cf = get_cf_id(*key); + + if (key_cf != cf_iter->id) + return false; + + *key = cf_key_to_userkey(*key); + return true; +} + +void +splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_next(cf_iter->iter); +} + +int +splinterdb_cf_iterator_status(const splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_status(cf_iter->iter); +} + + +static int +cf_key_compare(const data_config *cfg, slice key1, slice key2) +{ + // sort first by the column family ids + column_family_id cf_id1 = get_cf_id(key1); + column_family_id cf_id2 = get_cf_id(key2); + + if (cf_id1 < cf_id2) + return -1; + if (cf_id1 > cf_id2) + return 1; + + // Now we are comparing two keys from the same column family + // so we will use the user defined func for this cf + slice userkey1 = cf_key_to_userkey(key1); + slice userkey2 = cf_key_to_userkey(key2); + + // empty keys are defined to be the minimum among the column family + if (slice_length(userkey1) == 0) return -1; + if (slice_length(userkey2) == 0) return 1; + + // get the data_config for this column family and call its function + data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id1]; + return cf_cfg->key_compare(cf_cfg, userkey1, userkey2); +} + +static int +cf_merge_tuples(const data_config *cfg, + slice key, + message old_raw_message, + merge_accumulator *new_data) +{ + column_family_id cf_id = get_cf_id(key); + + // get the data_config for this column family and call its function + data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id]; + return cf_cfg->merge_tuples( + cf_cfg, cf_key_to_userkey(key), old_raw_message, new_data); +} + +static int +cf_merge_tuples_final(const data_config *cfg, + slice key, + merge_accumulator *oldest_data // IN/OUT +) +{ + column_family_id cf_id = get_cf_id(key); + + // get the data_config for this column family and call its function + data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id]; + return cf_cfg->merge_tuples_final( + cf_cfg, cf_key_to_userkey(key), oldest_data); +} + +// These functions we don't allow the column families to overwrite + +static void +cf_key_to_string(const data_config *cfg, slice key, char *str, size_t max_len) +{ + debug_hex_encode(str, max_len, slice_data(key), slice_length(key)); +} + +static void +cf_message_to_string(const data_config *cfg, + message msg, + char *str, + size_t max_len) +{ + debug_hex_encode(str, max_len, message_data(msg), message_length(msg)); +} + +// Initialize the data_config stored in the cf_data_config. +// Its data_config is then passed to SplinterDB to add support for +// column families +// +// TODO: The key_hash function cannot be overwritten by column families +// at this time. This is because we do not have access to the cfg +// in the key_hash. So we have no way of accessing a user defined +// key_hash for the column family. This should probably be fixed. +// Likely requires adding the cfg to the key_hash_fn type. +void +init_column_family_config(const uint64 max_key_size, // IN + cf_data_config *cf_cfg // OUT +) +{ + platform_assert(max_key_size + sizeof(column_family_id) + < COLUMN_FAMILY_KEY_BYTES); + data_config cfg = { + .max_key_size = max_key_size + sizeof(column_family_id), + .key_compare = cf_key_compare, + .key_hash = platform_hash32, + .merge_tuples = cf_merge_tuples, + .merge_tuples_final = cf_merge_tuples_final, + .key_to_string = cf_key_to_string, + .message_to_string = cf_message_to_string, + }; + + cf_cfg->general_config = cfg; + cf_cfg->num_families = 0; + cf_cfg->config_table = NULL; + cf_cfg->table_mem = 0; +} + +void +deinit_column_family_config(cf_data_config *cf_cfg) +{ + // we assume that the user will handle deallocating the table entries + // we just need to dealloc our array of pointers + if (cf_cfg->config_table != NULL) + free(cf_cfg->config_table); + cf_cfg->config_table = NULL; + cf_cfg->table_mem = 0; +} diff --git a/src/splinterdb.c b/src/splinterdb.c index a3f95c190..f28742ada 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -14,12 +14,10 @@ */ #include "splinterdb/splinterdb.h" +#include "splinterdb_internal.h" + #include "platform.h" -#include "clockcache.h" -#include "rc_allocator.h" -#include "trunk.h" #include "btree_private.h" -#include "shard_log.h" #include "poison.h" const char *BUILD_VERSION = "splinterdb_build_version " GIT_VERSION; @@ -29,25 +27,6 @@ splinterdb_get_version() return BUILD_VERSION; } -typedef struct splinterdb { - task_system *task_sys; - io_config io_cfg; - platform_io_handle io_handle; - allocator_config allocator_cfg; - rc_allocator allocator_handle; - clockcache_config cache_cfg; - clockcache cache_handle; - shard_log_config log_cfg; - task_system_config task_cfg; - allocator_root_id trunk_id; - trunk_config trunk_cfg; - trunk_handle *spl; - platform_heap_handle heap_handle; // for platform_buffer_create - platform_heap_id heap_id; - data_config *data_cfg; -} splinterdb; - - /* * Extract errno.h -style status int from a platform_status * diff --git a/src/splinterdb_internal.h b/src/splinterdb_internal.h new file mode 100755 index 000000000..0544d2a66 --- /dev/null +++ b/src/splinterdb_internal.h @@ -0,0 +1,27 @@ +#ifndef SPLINTERDB_SPLINTERDB_INTERNAL_H_ +#define SPLINTERDB_SPLINTERDB_INTERNAL_H_ + +#include "trunk.h" +#include "clockcache.h" +#include "rc_allocator.h" +#include "shard_log.h" + +typedef struct splinterdb { + task_system *task_sys; + io_config io_cfg; + platform_io_handle io_handle; + allocator_config allocator_cfg; + rc_allocator allocator_handle; + clockcache_config cache_cfg; + clockcache cache_handle; + shard_log_config log_cfg; + task_system_config task_cfg; + allocator_root_id trunk_id; + trunk_config trunk_cfg; + trunk_handle *spl; + platform_heap_handle heap_handle; // for platform_buffer_create + platform_heap_id heap_id; + data_config *data_cfg; +} splinterdb; + +#endif // SPLINTERDB_SPLINTERDB_INTERNAL_H_ diff --git a/test.sh b/test.sh index dd0db1bef..c867c1ccf 100755 --- a/test.sh +++ b/test.sh @@ -534,6 +534,7 @@ function test_make_run_tests() { function run_fast_unit_tests() { "$BINDIR"/unit/splinterdb_quick_test + "$BINDIR"/unit/column_family_test "$BINDIR"/unit/btree_test "$BINDIR"/unit/util_test "$BINDIR"/unit/misc_test diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c new file mode 100644 index 000000000..a057d4bb1 --- /dev/null +++ b/tests/unit/column_family_test.c @@ -0,0 +1,331 @@ +/* + * ----------------------------------------------------------------------------- + * splinter_cf_test.c -- + * + * Quick test of the Column Family public API for SplinterDB + * + * ----------------------------------------------------------------------------- + */ + +#include "splinterdb/column_family.h" +#include "splinterdb/data.h" +#include "splinterdb/public_platform.h" +#include "splinterdb/default_data_config.h" +#include "unit_tests.h" +#include "util.h" +#include "test_data.h" +#include "ctest.h" // This is required for all test-case files. +#include "btree.h" // for MAX_INLINE_MESSAGE_SIZE + +#define TEST_MAX_KEY_SIZE 16 +#define TEST_MAX_VALUE_SIZE 32 + +// Hard-coded format strings to generate key and values +// static const char key_fmt[] = "key-%04x"; +// static const char val_fmt[] = "val-%04x"; +// #define KEY_FMT_LENGTH (8) +// #define VAL_FMT_LENGTH (8) + + +CTEST_DATA(column_family) +{ + splinterdb *kvsb; + splinterdb_config cfg; + + // the global_data_cfg is used to route to the right data_config + // for each column family + cf_data_config global_data_cfg; + + // default data config for when we don't want to be special + data_config default_data_cfg; +}; + +CTEST_SETUP(column_family) +{ + default_data_config_init(TEST_MAX_KEY_SIZE, &data->default_data_cfg); + init_column_family_config(TEST_MAX_KEY_SIZE, &data->global_data_cfg); + data->cfg = + (splinterdb_config){.filename = TEST_DB_NAME, + .cache_size = 64 * Mega, + .disk_size = 128 * Mega, + .data_cfg = (data_config *)&data->global_data_cfg}; + + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(TEST_MAX_VALUE_SIZE + < MAX_INLINE_MESSAGE_SIZE(LAIO_DEFAULT_PAGE_SIZE)); +} + +CTEST_TEARDOWN(column_family) +{ + if (data->kvsb) { + splinterdb_close(&data->kvsb); + } + deinit_column_family_config(&data->global_data_cfg); +} + +/* + * + * Basic test case that ensures we can create and use a single column family + * correctly Tests: + * - create_column_family() + * - delete_column_family() + * - splinterdb_cf_insert() + * - splinterdb_cf_delete() + * - splinterdb_cf_lookup() + * + * We evaluate that these functions perform as expected and provide the correct + * outputs + */ +CTEST2(column_family, test_single_column) +{ + // create a column family + splinterdb_column_family cf = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create some basic data to insert and lookup + char *key_data = "some-key"; + size_t key_len = sizeof("some-key"); + slice user_key = slice_create(key_len, key_data); + + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cf, &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + + // Lookup of a non-existent key should return not-found. + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + static char *to_insert_data = "some-value"; + size_t to_insert_len = strlen(to_insert_data); + slice to_insert = slice_create(to_insert_len, to_insert_data); + + // Basic insert of new key should succeed. + rc = splinterdb_cf_insert(cf, user_key, to_insert); + ASSERT_EQUAL(0, rc); + + // Lookup of inserted key should succeed. + rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return inserted value + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(to_insert_len, slice_length(value)); + ASSERT_STREQN(to_insert_data, slice_data(value), slice_length(value)); + + // Delete key + rc = splinterdb_cf_delete(cf, user_key); + ASSERT_EQUAL(0, rc); + + // Deleted key should not be found + rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + splinterdb_cf_lookup_result_deinit(&result); +} + +/* + * Ensure keys and values of maximum length work + * with column families + */ +CTEST2(column_family, test_max_length) +{ + // create a column family + splinterdb_column_family cf = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + char large_key_data[TEST_MAX_KEY_SIZE]; + size_t large_key_len = TEST_MAX_KEY_SIZE; + memset(large_key_data, 'k', TEST_MAX_KEY_SIZE); + slice large_key = slice_create(large_key_len, large_key_data); + + char large_val_data[TEST_MAX_VALUE_SIZE]; + size_t large_val_len = TEST_MAX_VALUE_SIZE; + memset(large_val_data, 'v', TEST_MAX_VALUE_SIZE); + slice large_val = slice_create(large_val_len, large_val_data); + + // Insert of large key and value should exceed + int rc = splinterdb_cf_insert(cf, large_key, large_val); + ASSERT_EQUAL(0, rc); + + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cf, &result, 0, NULL); + + // Lookup of inserted key should succeed. + rc = splinterdb_cf_lookup(cf, large_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return inserted value + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(large_val_len, slice_length(value)); + ASSERT_STREQN(large_val_data, slice_data(value), slice_length(value)); + + // Delete key + rc = splinterdb_cf_delete(cf, large_key); + ASSERT_EQUAL(0, rc); + + // Deleted key should not be found + rc = splinterdb_cf_lookup(cf, large_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + splinterdb_cf_lookup_result_deinit(&result); +} + +static int +rev_key_compare(const data_config *cfg, slice key1, slice key2) +{ + platform_assert(slice_data(key1) != NULL); + platform_assert(slice_data(key2) != NULL); + + return slice_lex_cmp(key2, key1); +} + +/* + * Test key/value operations upon multiple column families. + * Ensure that the keys can be operated upon independently. + */ +CTEST2(column_family, test_multiple_cf_same_key) +{ + // create a few column families + splinterdb_column_family cf0 = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf1 = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf2 = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf3 = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // Insert a single key to each column family + char key_data[] = "key"; + char val0_data[] = "val0"; + char val1_data[] = "val1"; + char val2_data[] = "val2"; + char val3_data[] = "val3"; + slice key = slice_create(3, key_data); + slice val0 = slice_create(4, val0_data); + slice val1 = slice_create(4, val1_data); + slice val2 = slice_create(4, val2_data); + slice val3 = slice_create(4, val3_data); + + slice values[] = {val0, val1, val2, val3}; + splinterdb_column_family cfs[] = {cf0, cf1, cf2, cf3}; + + // Perform insertions + for (int idx = 0; idx < 4; idx++) { + splinterdb_cf_insert(cfs[idx], key, values[idx]); + } + + // lookup the key from each column family + // and ensure the right value is returned + for (int idx = 0; idx < 4; idx++) { + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cfs[idx], &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cfs[idx], key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return correct values + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(slice_length(values[idx]), slice_length(value)); + ASSERT_STREQN(slice_data(values[idx]), slice_data(value), slice_length(value)); + } +} + +/* Test multiple column families with range iterators + * ensure that keys are found in the order defined by their + * custom key comparison functions + */ +CTEST2(column_family, test_multiple_cf_range) +{ + // create the default column family + splinterdb_column_family cf_default = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create a config with a reversed key compare function + // and create a column family that will reverse the keys + data_config rev_data_config; + default_data_config_init(TEST_MAX_KEY_SIZE, &rev_data_config); + rev_data_config.key_compare = rev_key_compare; + + splinterdb_column_family cf_reverse = + create_column_family(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + + // Insert a few key/value pairs to each cf + char key1_data[] = "aaaa"; + char key2_data[] = "bbbb"; + char key3_data[] = "cccc"; + char key4_data[] = "dddd"; + char cf1_value[] = "val-in-cf1"; + char cf2_value[] = "val-in-cf2"; + + slice key1 = slice_create(4, key1_data); + slice key2 = slice_create(4, key2_data); + slice key3 = slice_create(4, key3_data); + slice key4 = slice_create(4, key4_data); + slice val1 = slice_create(10, cf1_value); + slice val2 = slice_create(10, cf2_value); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key1, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key2, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key3, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key4, val1)); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key1, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key2, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key3, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key4, val2)); + + // Perform a range query over all cf1 keys + splinterdb_cf_iterator *it = &CF_ITER_UNINIT; + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_default, it, NULL_SLICE)); + + slice keys[] = {key1, key2, key3, key4}; + slice key; + slice val; + int idx = 0; + for (; splinterdb_cf_iterator_get_current(it, &key, &val); + splinterdb_cf_iterator_next(it)) + { + ASSERT_EQUAL(slice_length(keys[idx]), slice_length(key)); + ASSERT_STREQN(slice_data(keys[idx]), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(val1), slice_length(val)); + ASSERT_STREQN(slice_data(val1), slice_data(val), slice_length(val)); + ++idx; + } + ASSERT_EQUAL(4, idx); + + splinterdb_cf_iterator_deinit(it); + + // Perform a range query over all cf2 keys + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, it, NULL_SLICE)); + + idx = 0; + for (; splinterdb_cf_iterator_get_current(it, &key, &val); + splinterdb_cf_iterator_next(it)) + { + ASSERT_EQUAL(slice_length(keys[3 - idx]), slice_length(key)); + ASSERT_STREQN( + slice_data(keys[3 - idx]), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(val2), slice_length(val)); + ASSERT_STREQN(slice_data(val2), slice_data(val), slice_length(val)); + ++idx; + } + ASSERT_EQUAL(4, idx); + + splinterdb_cf_iterator_deinit(it); +} From 63d8a724108651c05d73ef15caa9f7237f3f89a8 Mon Sep 17 00:00:00 2001 From: Evan West Date: Thu, 15 Jun 2023 18:24:34 +0000 Subject: [PATCH 02/12] column_family_support: small fixes and formatting --- src/column_family.c | 12 ++++++++---- tests/unit/column_family_test.c | 29 +++++++++++++++-------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/column_family.c b/src/column_family.c index 416674146..f9689b584 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -84,7 +84,8 @@ create_column_family(splinterdb *kvs, const uint64 max_key_size, data_config *new_data_cfg) { - platform_assert(kvs->data_cfg->max_key_size >= max_key_size); + platform_assert(kvs->data_cfg->max_key_size + >= max_key_size + sizeof(column_family_id)); // convert from data_config to cf_data_config cf_data_config *cf_cfg = (cf_data_config *)kvs->data_cfg; @@ -197,7 +198,8 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN ) { // The minimum key contains no key data only consists of - // the column id so this is what a NULL key will become + // the column id. + // This is what a NULL key will become char key_buf[COLUMN_FAMILY_KEY_BYTES]; slice cf_key = userkey_to_cf_key(start_key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); @@ -264,8 +266,10 @@ cf_key_compare(const data_config *cfg, slice key1, slice key2) slice userkey2 = cf_key_to_userkey(key2); // empty keys are defined to be the minimum among the column family - if (slice_length(userkey1) == 0) return -1; - if (slice_length(userkey2) == 0) return 1; + if (slice_length(userkey1) == 0) + return -1; + if (slice_length(userkey2) == 0) + return 1; // get the data_config for this column family and call its function data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id1]; diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index a057d4bb1..92aaef059 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -206,19 +206,19 @@ CTEST2(column_family, test_multiple_cf_same_key) data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // Insert a single key to each column family - char key_data[] = "key"; - char val0_data[] = "val0"; - char val1_data[] = "val1"; - char val2_data[] = "val2"; - char val3_data[] = "val3"; - slice key = slice_create(3, key_data); - slice val0 = slice_create(4, val0_data); - slice val1 = slice_create(4, val1_data); - slice val2 = slice_create(4, val2_data); - slice val3 = slice_create(4, val3_data); - - slice values[] = {val0, val1, val2, val3}; - splinterdb_column_family cfs[] = {cf0, cf1, cf2, cf3}; + char key_data[] = "key"; + char val0_data[] = "val0"; + char val1_data[] = "val1"; + char val2_data[] = "val2"; + char val3_data[] = "val3"; + slice key = slice_create(3, key_data); + slice val0 = slice_create(4, val0_data); + slice val1 = slice_create(4, val1_data); + slice val2 = slice_create(4, val2_data); + slice val3 = slice_create(4, val3_data); + + slice values[] = {val0, val1, val2, val3}; + splinterdb_column_family cfs[] = {cf0, cf1, cf2, cf3}; // Perform insertions for (int idx = 0; idx < 4; idx++) { @@ -240,7 +240,8 @@ CTEST2(column_family, test_multiple_cf_same_key) rc = splinterdb_cf_lookup_result_value(&result, &value); ASSERT_EQUAL(0, rc); ASSERT_EQUAL(slice_length(values[idx]), slice_length(value)); - ASSERT_STREQN(slice_data(values[idx]), slice_data(value), slice_length(value)); + ASSERT_STREQN( + slice_data(values[idx]), slice_data(value), slice_length(value)); } } From 6b7c09d80d644aa260d53000cf75e36e0d03df6f Mon Sep 17 00:00:00 2001 From: Evan West Date: Tue, 20 Jun 2023 18:43:07 +0000 Subject: [PATCH 03/12] column family support: writable buffers to support large keys --- include/splinterdb/column_family.h | 10 ++- src/column_family.c | 115 ++++++++++++++++++++--------- 2 files changed, 87 insertions(+), 38 deletions(-) diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h index aee09f13b..ef454a339 100644 --- a/include/splinterdb/column_family.h +++ b/include/splinterdb/column_family.h @@ -12,10 +12,12 @@ #include "splinterdb/splinterdb.h" -// Maximum size of a key within a column family -// allows conversion from user key to cf key -// to be performed upon the stack. -#define COLUMN_FAMILY_KEY_BYTES 512 +// Size of stack buffer we allocate for column family keys. +// This can be fairly large because these buffers are short +// lived. If keys are relatively small then the conversion +// can be done upon the stack. We use writable buffers to +// support larger keys. +#define CF_KEY_DEFAULT_SIZE 512 typedef uint32 column_family_id; diff --git a/src/column_family.c b/src/column_family.c index f9689b584..778689044 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -6,7 +6,6 @@ #include "splinterdb_internal.h" #include "util.h" -#include #include // Some helper functions we'll use for managing the column family identifiers @@ -24,17 +23,16 @@ get_cf_id(slice cf_key) slice userkey_to_cf_key(slice userkey, column_family_id cf_id, - char *buf, - uint32 buf_size) + writable_buffer *cf_key_wb) { + // extract from the user's key and resize the buffer uint64 key_len = slice_length(userkey); const void *data = slice_data(userkey); - platform_assert(buf_size >= key_len + sizeof(column_family_id)); - memcpy(buf, &cf_id, sizeof(cf_id)); - if (key_len > 0) - memcpy(buf + sizeof(cf_id), data, key_len); - return slice_create(key_len + sizeof(cf_id), buf); + // write the cf id and the user's key data to the writable_buffer + writable_buffer_append(cf_key_wb, sizeof(cf_id), &cf_id); + writable_buffer_append(cf_key_wb, key_len, data); + return writable_buffer_to_slice(cf_key_wb); } slice @@ -120,28 +118,60 @@ splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value) platform_assert(slice_length(key) > 0); // convert to column family key by prefixing the cf id - char key_buf[COLUMN_FAMILY_KEY_BYTES]; - slice cf_key = - userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); - return splinterdb_insert(cf.kvs, cf_key, value); + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's insert function and return + int rc = splinterdb_insert(cf.kvs, cf_key, value); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; } int splinterdb_cf_delete(const splinterdb_column_family cf, slice key) { - char key_buf[COLUMN_FAMILY_KEY_BYTES]; - slice cf_key = - userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); - return splinterdb_delete(cf.kvs, cf_key); + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's delete function and return + int rc = splinterdb_delete(cf.kvs, cf_key); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; } int splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta) { - char key_buf[COLUMN_FAMILY_KEY_BYTES]; - slice cf_key = - userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); - return splinterdb_insert(cf.kvs, cf_key, delta); + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's update function and return + int rc = splinterdb_update(cf.kvs, cf_key, delta); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; } // column family lookups @@ -182,12 +212,23 @@ splinterdb_cf_lookup(const splinterdb_column_family cf, // IN splinterdb_lookup_result *result // IN/OUT ) { - char key_buf[COLUMN_FAMILY_KEY_BYTES]; - slice cf_key = - userkey_to_cf_key(key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); - return splinterdb_lookup(cf.kvs, cf_key, result); -} + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's lookup function and return + int rc = splinterdb_lookup(cf.kvs, cf_key, result); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; +} // Range iterators for column families @@ -200,11 +241,19 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN // The minimum key contains no key data only consists of // the column id. // This is what a NULL key will become - char key_buf[COLUMN_FAMILY_KEY_BYTES]; - slice cf_key = - userkey_to_cf_key(start_key, cf.id, key_buf, COLUMN_FAMILY_KEY_BYTES); + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(start_key, cf.id, &cf_key_wb); + cf_iter->id = cf.id; - return splinterdb_iterator_init(cf.kvs, &cf_iter->iter, cf_key); + int rc = splinterdb_iterator_init(cf.kvs, &cf_iter->iter, cf_key); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; } void @@ -222,17 +271,17 @@ splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN _Bool valid = splinterdb_iterator_valid(cf_iter->iter); if (!valid) - return false; + return FALSE; // if valid, check the key to ensure it's within this column family splinterdb_iterator_get_current(cf_iter->iter, key, value); column_family_id key_cf = get_cf_id(*key); if (key_cf != cf_iter->id) - return false; + return FALSE; *key = cf_key_to_userkey(*key); - return true; + return TRUE; } void @@ -335,8 +384,6 @@ init_column_family_config(const uint64 max_key_size, // IN cf_data_config *cf_cfg // OUT ) { - platform_assert(max_key_size + sizeof(column_family_id) - < COLUMN_FAMILY_KEY_BYTES); data_config cfg = { .max_key_size = max_key_size + sizeof(column_family_id), .key_compare = cf_key_compare, From 30336e8d4f854cb1d2caecf920a4ab2d11b0a293 Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 21 Jun 2023 16:34:04 +0000 Subject: [PATCH 04/12] column family support: test updates with cf --- tests/unit/column_family_test.c | 166 ++++++++++++++++++++++++++++++-- 1 file changed, 156 insertions(+), 10 deletions(-) diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 92aaef059..21d686c27 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -180,15 +180,6 @@ CTEST2(column_family, test_max_length) splinterdb_cf_lookup_result_deinit(&result); } -static int -rev_key_compare(const data_config *cfg, slice key1, slice key2) -{ - platform_assert(slice_data(key1) != NULL); - platform_assert(slice_data(key2) != NULL); - - return slice_lex_cmp(key2, key1); -} - /* * Test key/value operations upon multiple column families. * Ensure that the keys can be operated upon independently. @@ -245,7 +236,20 @@ CTEST2(column_family, test_multiple_cf_same_key) } } -/* Test multiple column families with range iterators +/* + * Custom key compare function that reverses the keys + */ +static int +rev_key_compare(const data_config *cfg, slice key1, slice key2) +{ + platform_assert(slice_data(key1) != NULL); + platform_assert(slice_data(key2) != NULL); + + return slice_lex_cmp(key2, key1); +} + +/* + * Test multiple column families with range iterators * ensure that keys are found in the order defined by their * custom key comparison functions */ @@ -330,3 +334,145 @@ CTEST2(column_family, test_multiple_cf_range) splinterdb_cf_iterator_deinit(it); } + + +/* + * These functions implement merge functionality so that we can test the update + * function within column families + */ + +// merge two messages, with result in new_message +static int +merge_tuples(const data_config *cfg, slice key, message old_message, merge_accumulator *new_message) +{ + platform_assert(slice_data(key) != NULL); + message_type type = old_message.type; + writable_buffer *wb = &new_message->data; + + // extract value slices + slice old_value = old_message.data; + slice new_value = writable_buffer_to_slice(wb); + + // use the cfg's key compare function to find maximal value + // and retain that value + if (cfg->key_compare(cfg, old_value, new_value) < 0) { + platform_status rc = writable_buffer_copy_slice(wb, new_value); + if (!SUCCESS(rc)) + return rc.r; + } + else { + platform_status rc = writable_buffer_copy_slice(wb, old_value); + if (!SUCCESS(rc)) + return rc.r; + } + + if (type == MESSAGE_TYPE_INSERT) + new_message->type = MESSAGE_TYPE_INSERT; + else + new_message->type = MESSAGE_TYPE_UPDATE; + + return 0; +} + +static int +merge_tuple_final(const data_config *cfg, slice key, merge_accumulator *oldest_message) +{ + platform_assert(slice_data(key) != NULL); + + // simply set type to INSERT + oldest_message->type = MESSAGE_TYPE_INSERT; + return 0; +} + + + +CTEST2(column_family, multiple_cf_with_updates) +{ + // create the default column family + data->default_data_cfg.merge_tuples = merge_tuples; + data->default_data_cfg.merge_tuples_final = merge_tuple_final; + splinterdb_column_family cf_default = create_column_family( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create a config with a reversed key compare function + // and create a column family that will reverse the keys + data_config rev_data_config; + default_data_config_init(TEST_MAX_KEY_SIZE, &rev_data_config); + rev_data_config.key_compare = rev_key_compare; + rev_data_config.merge_tuples = merge_tuples; + rev_data_config.merge_tuples_final = merge_tuple_final; + + splinterdb_column_family cf_reverse = + create_column_family(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + + // Insert a few key/value pairs to each cf + char key1_data[] = "aaaa"; + char key2_data[] = "bbbb"; + char key3_data[] = "cccc"; + char key4_data[] = "dddd"; + char cf1_value[] = "val-in-cf1"; + char cf2_value[] = "val-in-cf2"; + + slice key1 = slice_create(4, key1_data); + slice key2 = slice_create(4, key2_data); + slice key3 = slice_create(4, key3_data); + slice key4 = slice_create(4, key4_data); + slice val1 = slice_create(10, cf1_value); + slice val2 = slice_create(10, cf2_value); + + slice keys[] = {key1, key2, key3, key4}; + + for (int i = 0; i < 4; i++) + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, keys[i], val1)); + + for (int i = 0; i < 4; i++) + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, keys[i], val2)); + + // Now update these key-value pairs + char small_val[] = "aaaaaa-cf2"; + char big_val[] = "zzzzzz-cf1"; + + slice new_val1 = slice_create(10, big_val); + slice new_val2 = slice_create(10, small_val); + + // apply both updates to all keys in cf_default + for (int i = 0; i < 4; i++) { + ASSERT_EQUAL(0, splinterdb_cf_update(cf_default, keys[i], new_val1)); + ASSERT_EQUAL(0, splinterdb_cf_update(cf_default, keys[i], new_val2)); + } + + // apply both updates to all keys in cf_reverse + for (int i = 0; i < 4; i++) { + ASSERT_EQUAL(0, splinterdb_cf_update(cf_reverse, keys[i], new_val1)); + ASSERT_EQUAL(0, splinterdb_cf_update(cf_reverse, keys[i], new_val2)); + } + + splinterdb_column_family cfs[] = {cf_default, cf_reverse}; + + // now lookup each key in both cfs + for (int idx = 0; idx < 2; idx++) { + for (int i = 0; i < 4; i++) { + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cfs[idx], &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cfs[idx], keys[i], &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return correct values + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + if (idx == 0) { + ASSERT_EQUAL(slice_length(new_val1), slice_length(value)); + ASSERT_STREQN( + slice_data(new_val1), slice_data(value), slice_length(value)); + } + else { + ASSERT_EQUAL(slice_length(new_val2), slice_length(value)); + ASSERT_STREQN( + slice_data(new_val2), slice_data(value), slice_length(value)); + } + } + } +} From 51ffb16a5407b7abd16e035ca59cd3493d84d436 Mon Sep 17 00:00:00 2001 From: Evan West Date: Wed, 21 Jun 2023 17:49:47 +0000 Subject: [PATCH 05/12] column family support: follow naming conventions --- include/splinterdb/column_family.h | 8 ++++---- src/column_family.c | 8 ++++---- tests/unit/column_family_test.c | 29 +++++++++++++++-------------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h index ef454a339..41fd25d8e 100644 --- a/include/splinterdb/column_family.h +++ b/include/splinterdb/column_family.h @@ -44,25 +44,25 @@ typedef struct splinterdb_cf_iterator { // this data_config is then passed to SplinterDB to add support for // column families void -init_column_family_config(const uint64 max_key_size, // IN +column_family_config_init(const uint64 max_key_size, // IN cf_data_config *cf_cfg // OUT ); // Delete the cf_data_config, freeing the memory used by the // config table void -deinit_column_family_config(cf_data_config *cf_cfg); +column_family_config_deinit(cf_data_config *cf_cfg); // Create a new column family // Returns a new column family struct splinterdb_column_family -create_column_family(splinterdb *kvs, +column_family_create(splinterdb *kvs, const uint64 max_key_size, data_config *data_cfg); // Delete the column family cf void -delete_column_family(splinterdb_column_family cf); +column_family_delete(splinterdb_column_family cf); // ====== SPLINTERDB Functions ====== // We wrap these for column family support diff --git a/src/column_family.c b/src/column_family.c index 778689044..751d21f25 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -78,7 +78,7 @@ cfg_table_delete(cf_data_config *cf_cfg, column_family_id cf_id) // Create a new column family // Returns a new column family struct splinterdb_column_family -create_column_family(splinterdb *kvs, +column_family_create(splinterdb *kvs, const uint64 max_key_size, data_config *new_data_cfg) { @@ -100,7 +100,7 @@ create_column_family(splinterdb *kvs, // Delete the column family cf void -delete_column_family(splinterdb_column_family cf) +column_family_delete(splinterdb_column_family cf) { // convert from data_config to cf_data_config cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; @@ -380,7 +380,7 @@ cf_message_to_string(const data_config *cfg, // key_hash for the column family. This should probably be fixed. // Likely requires adding the cfg to the key_hash_fn type. void -init_column_family_config(const uint64 max_key_size, // IN +column_family_config_init(const uint64 max_key_size, // IN cf_data_config *cf_cfg // OUT ) { @@ -401,7 +401,7 @@ init_column_family_config(const uint64 max_key_size, // IN } void -deinit_column_family_config(cf_data_config *cf_cfg) +column_family_config_deinit(cf_data_config *cf_cfg) { // we assume that the user will handle deallocating the table entries // we just need to dealloc our array of pointers diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 21d686c27..60bc5df64 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -43,7 +43,7 @@ CTEST_DATA(column_family) CTEST_SETUP(column_family) { default_data_config_init(TEST_MAX_KEY_SIZE, &data->default_data_cfg); - init_column_family_config(TEST_MAX_KEY_SIZE, &data->global_data_cfg); + column_family_config_init(TEST_MAX_KEY_SIZE, &data->global_data_cfg); data->cfg = (splinterdb_config){.filename = TEST_DB_NAME, .cache_size = 64 * Mega, @@ -61,15 +61,15 @@ CTEST_TEARDOWN(column_family) if (data->kvsb) { splinterdb_close(&data->kvsb); } - deinit_column_family_config(&data->global_data_cfg); + column_family_config_deinit(&data->global_data_cfg); } /* * * Basic test case that ensures we can create and use a single column family * correctly Tests: - * - create_column_family() - * - delete_column_family() + * - column_family_create() + * - column_family_delete() * - splinterdb_cf_insert() * - splinterdb_cf_delete() * - splinterdb_cf_lookup() @@ -80,7 +80,7 @@ CTEST_TEARDOWN(column_family) CTEST2(column_family, test_single_column) { // create a column family - splinterdb_column_family cf = create_column_family( + splinterdb_column_family cf = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // create some basic data to insert and lookup @@ -127,6 +127,7 @@ CTEST2(column_family, test_single_column) ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); splinterdb_cf_lookup_result_deinit(&result); + column_family_delete(cf); } /* @@ -136,7 +137,7 @@ CTEST2(column_family, test_single_column) CTEST2(column_family, test_max_length) { // create a column family - splinterdb_column_family cf = create_column_family( + splinterdb_column_family cf = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); char large_key_data[TEST_MAX_KEY_SIZE]; @@ -187,13 +188,13 @@ CTEST2(column_family, test_max_length) CTEST2(column_family, test_multiple_cf_same_key) { // create a few column families - splinterdb_column_family cf0 = create_column_family( + splinterdb_column_family cf0 = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); - splinterdb_column_family cf1 = create_column_family( + splinterdb_column_family cf1 = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); - splinterdb_column_family cf2 = create_column_family( + splinterdb_column_family cf2 = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); - splinterdb_column_family cf3 = create_column_family( + splinterdb_column_family cf3 = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // Insert a single key to each column family @@ -256,7 +257,7 @@ rev_key_compare(const data_config *cfg, slice key1, slice key2) CTEST2(column_family, test_multiple_cf_range) { // create the default column family - splinterdb_column_family cf_default = create_column_family( + splinterdb_column_family cf_default = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // create a config with a reversed key compare function @@ -266,7 +267,7 @@ CTEST2(column_family, test_multiple_cf_range) rev_data_config.key_compare = rev_key_compare; splinterdb_column_family cf_reverse = - create_column_family(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + column_family_create(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); // Insert a few key/value pairs to each cf char key1_data[] = "aaaa"; @@ -391,7 +392,7 @@ CTEST2(column_family, multiple_cf_with_updates) // create the default column family data->default_data_cfg.merge_tuples = merge_tuples; data->default_data_cfg.merge_tuples_final = merge_tuple_final; - splinterdb_column_family cf_default = create_column_family( + splinterdb_column_family cf_default = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // create a config with a reversed key compare function @@ -403,7 +404,7 @@ CTEST2(column_family, multiple_cf_with_updates) rev_data_config.merge_tuples_final = merge_tuple_final; splinterdb_column_family cf_reverse = - create_column_family(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + column_family_create(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); // Insert a few key/value pairs to each cf char key1_data[] = "aaaa"; From 57b9ffd154e28a453a6b3ce67edbc41144d27712 Mon Sep 17 00:00:00 2001 From: Evan West Date: Tue, 27 Jun 2023 00:15:10 +0000 Subject: [PATCH 06/12] column family support: better cf iterators --- include/splinterdb/column_family.h | 9 +--- src/column_family.c | 67 +++++++++++++++++++++++------- src/splinterdb.c | 7 ---- src/splinterdb_internal.h | 14 +++++++ tests/unit/column_family_test.c | 48 +++++++++++---------- 5 files changed, 95 insertions(+), 50 deletions(-) mode change 100755 => 100644 src/splinterdb_internal.h diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h index 41fd25d8e..12f9c95db 100644 --- a/include/splinterdb/column_family.h +++ b/include/splinterdb/column_family.h @@ -33,12 +33,7 @@ typedef struct splinterdb_column_family { splinterdb *kvs; } splinterdb_column_family; -typedef struct splinterdb_cf_iterator { - column_family_id id; - splinterdb_iterator *iter; -} splinterdb_cf_iterator; - -#define CF_ITER_UNINIT ((splinterdb_cf_iterator){0, NULL}) +typedef struct splinterdb_cf_iterator splinterdb_cf_iterator; // Initialize the data_config stored in the cf_data_config // this data_config is then passed to SplinterDB to add support for @@ -107,7 +102,7 @@ splinterdb_cf_lookup(const splinterdb_column_family cf, // IN int splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN - splinterdb_cf_iterator *cf_iter, // OUT + splinterdb_cf_iterator **cf_iter, // OUT slice start_key // IN ); diff --git a/src/column_family.c b/src/column_family.c index 751d21f25..8d0a3a046 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -1,5 +1,6 @@ #include "platform.h" +// #include "poison.h" #include "splinterdb/column_family.h" #include "splinterdb/splinterdb.h" @@ -8,6 +9,23 @@ #include +struct splinterdb_cf_iterator { + column_family_id id; + splinterdb_iterator iter; +}; + +/* + * Extract errno.h -style status int from a platform_status + * + * Note this currently relies on the implementation of the splinterdb + * platform_linux. But at least it doesn't leak the dependency to callers. + */ +static inline int +platform_status_to_int(const platform_status status) // IN +{ + return status.r; +} + // Some helper functions we'll use for managing the column family identifiers // and the data config table column_family_id @@ -54,7 +72,7 @@ cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) // reallocate table memory if necessary if (cf_cfg->table_mem <= new_id) { cf_cfg->config_table = (data_config **)realloc( - cf_cfg->config_table, new_id * 2 * sizeof(data_config *)); + cf_cfg->config_table, (new_id + 1) * 2 * sizeof(data_config *)); } // place new data_config in table @@ -233,9 +251,9 @@ splinterdb_cf_lookup(const splinterdb_column_family cf, // IN // Range iterators for column families int -splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN - splinterdb_cf_iterator *cf_iter, // OUT - slice start_key // IN +splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN + splinterdb_cf_iterator **cf_iter, // OUT + slice user_start_key // IN ) { // The minimum key contains no key data only consists of @@ -246,20 +264,39 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN writable_buffer cf_key_wb; writable_buffer_init_with_buffer( &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); - slice cf_key = userkey_to_cf_key(start_key, cf.id, &cf_key_wb); + slice cf_key = userkey_to_cf_key(user_start_key, cf.id, &cf_key_wb); - cf_iter->id = cf.id; - int rc = splinterdb_iterator_init(cf.kvs, &cf_iter->iter, cf_key); - if (rc != 0) - return rc; + splinterdb_cf_iterator *cf_it = TYPED_MALLOC(cf.kvs->spl->heap_id, cf_it); + if (cf_it == NULL) { + platform_error_log("TYPED_MALLOC error\n"); + return platform_status_to_int(STATUS_NO_MEMORY); + } + cf_it->iter.last_rc = STATUS_OK; + trunk_range_iterator *range_itor = &(cf_it->iter.sri); + + key start_key = key_create_from_slice(cf_key); + platform_status rc = trunk_range_iterator_init( + cf.kvs->spl, range_itor, start_key, POSITIVE_INFINITY_KEY, UINT64_MAX); + if (!SUCCESS(rc)) { + platform_free(cf.kvs->spl->heap_id, *cf_iter); + writable_buffer_deinit(&cf_key_wb); + return platform_status_to_int(rc); + } + cf_it->iter.parent = cf.kvs; + cf_it->id = cf.id; + + *cf_iter = cf_it; writable_buffer_deinit(&cf_key_wb); - return 0; + return EXIT_SUCCESS; } void splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter) { - splinterdb_iterator_deinit(cf_iter->iter); + trunk_range_iterator *range_itor = &(cf_iter->iter.sri); + trunk_range_iterator_deinit(range_itor); + + platform_free(range_itor->spl->heap_id, cf_iter); } _Bool @@ -268,13 +305,13 @@ splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN slice *value // OUT ) { - _Bool valid = splinterdb_iterator_valid(cf_iter->iter); + _Bool valid = splinterdb_iterator_valid(&cf_iter->iter); if (!valid) return FALSE; // if valid, check the key to ensure it's within this column family - splinterdb_iterator_get_current(cf_iter->iter, key, value); + splinterdb_iterator_get_current(&cf_iter->iter, key, value); column_family_id key_cf = get_cf_id(*key); if (key_cf != cf_iter->id) @@ -287,13 +324,13 @@ splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN void splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter) { - return splinterdb_iterator_next(cf_iter->iter); + return splinterdb_iterator_next(&cf_iter->iter); } int splinterdb_cf_iterator_status(const splinterdb_cf_iterator *cf_iter) { - return splinterdb_iterator_status(cf_iter->iter); + return splinterdb_iterator_status(&cf_iter->iter); } diff --git a/src/splinterdb.c b/src/splinterdb.c index f28742ada..ac0abba04 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -565,13 +565,6 @@ splinterdb_lookup(const splinterdb *kvs, // IN return platform_status_to_int(status); } - -struct splinterdb_iterator { - trunk_range_iterator sri; - platform_status last_rc; - const splinterdb *parent; -}; - int splinterdb_iterator_init(const splinterdb *kvs, // IN splinterdb_iterator **iter, // OUT diff --git a/src/splinterdb_internal.h b/src/splinterdb_internal.h old mode 100755 new mode 100644 index 0544d2a66..304701490 --- a/src/splinterdb_internal.h +++ b/src/splinterdb_internal.h @@ -1,3 +1,11 @@ + +/* + * splinterdb_internal.h -- + * + * Private struct declarations for splinterdb that we'd like + * to share among multiple files, but not share with users. + */ + #ifndef SPLINTERDB_SPLINTERDB_INTERNAL_H_ #define SPLINTERDB_SPLINTERDB_INTERNAL_H_ @@ -24,4 +32,10 @@ typedef struct splinterdb { data_config *data_cfg; } splinterdb; +struct splinterdb_iterator { + trunk_range_iterator sri; + platform_status last_rc; + const splinterdb *parent; +}; + #endif // SPLINTERDB_SPLINTERDB_INTERNAL_H_ diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 60bc5df64..02462a343 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -85,7 +85,7 @@ CTEST2(column_family, test_single_column) // create some basic data to insert and lookup char *key_data = "some-key"; - size_t key_len = sizeof("some-key"); + size_t key_len = strlen(key_data); slice user_key = slice_create(key_len, key_data); splinterdb_lookup_result result; @@ -234,6 +234,8 @@ CTEST2(column_family, test_multiple_cf_same_key) ASSERT_EQUAL(slice_length(values[idx]), slice_length(value)); ASSERT_STREQN( slice_data(values[idx]), slice_data(value), slice_length(value)); + + splinterdb_cf_lookup_result_deinit(&result); } } @@ -249,7 +251,7 @@ rev_key_compare(const data_config *cfg, slice key1, slice key2) return slice_lex_cmp(key2, key1); } -/* +/* * Test multiple column families with range iterators * ensure that keys are found in the order defined by their * custom key comparison functions @@ -295,8 +297,8 @@ CTEST2(column_family, test_multiple_cf_range) ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key4, val2)); // Perform a range query over all cf1 keys - splinterdb_cf_iterator *it = &CF_ITER_UNINIT; - ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_default, it, NULL_SLICE)); + splinterdb_cf_iterator *it; + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_default, &it, NULL_SLICE)); slice keys[] = {key1, key2, key3, key4}; slice key; @@ -317,7 +319,7 @@ CTEST2(column_family, test_multiple_cf_range) splinterdb_cf_iterator_deinit(it); // Perform a range query over all cf2 keys - ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, it, NULL_SLICE)); + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, &it, NULL_SLICE)); idx = 0; for (; splinterdb_cf_iterator_get_current(it, &key, &val); @@ -344,11 +346,14 @@ CTEST2(column_family, test_multiple_cf_range) // merge two messages, with result in new_message static int -merge_tuples(const data_config *cfg, slice key, message old_message, merge_accumulator *new_message) +merge_tuples(const data_config *cfg, + slice key, + message old_message, + merge_accumulator *new_message) { platform_assert(slice_data(key) != NULL); - message_type type = old_message.type; - writable_buffer *wb = &new_message->data; + message_type type = old_message.type; + writable_buffer *wb = &new_message->data; // extract value slices slice old_value = old_message.data; @@ -358,10 +363,9 @@ merge_tuples(const data_config *cfg, slice key, message old_message, merge_accum // and retain that value if (cfg->key_compare(cfg, old_value, new_value) < 0) { platform_status rc = writable_buffer_copy_slice(wb, new_value); - if (!SUCCESS(rc)) + if (!SUCCESS(rc)) return rc.r; - } - else { + } else { platform_status rc = writable_buffer_copy_slice(wb, old_value); if (!SUCCESS(rc)) return rc.r; @@ -376,7 +380,9 @@ merge_tuples(const data_config *cfg, slice key, message old_message, merge_accum } static int -merge_tuple_final(const data_config *cfg, slice key, merge_accumulator *oldest_message) +merge_tuple_final(const data_config *cfg, + slice key, + merge_accumulator *oldest_message) { platform_assert(slice_data(key) != NULL); @@ -386,21 +392,20 @@ merge_tuple_final(const data_config *cfg, slice key, merge_accumulator *oldest_m } - CTEST2(column_family, multiple_cf_with_updates) { // create the default column family - data->default_data_cfg.merge_tuples = merge_tuples; + data->default_data_cfg.merge_tuples = merge_tuples; data->default_data_cfg.merge_tuples_final = merge_tuple_final; - splinterdb_column_family cf_default = column_family_create( + splinterdb_column_family cf_default = column_family_create( data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); // create a config with a reversed key compare function // and create a column family that will reverse the keys data_config rev_data_config; default_data_config_init(TEST_MAX_KEY_SIZE, &rev_data_config); - rev_data_config.key_compare = rev_key_compare; - rev_data_config.merge_tuples = merge_tuples; + rev_data_config.key_compare = rev_key_compare; + rev_data_config.merge_tuples = merge_tuples; rev_data_config.merge_tuples_final = merge_tuple_final; splinterdb_column_family cf_reverse = @@ -431,7 +436,7 @@ CTEST2(column_family, multiple_cf_with_updates) // Now update these key-value pairs char small_val[] = "aaaaaa-cf2"; - char big_val[] = "zzzzzz-cf1"; + char big_val[] = "zzzzzz-cf1"; slice new_val1 = slice_create(10, big_val); slice new_val2 = slice_create(10, small_val); @@ -468,12 +473,13 @@ CTEST2(column_family, multiple_cf_with_updates) ASSERT_EQUAL(slice_length(new_val1), slice_length(value)); ASSERT_STREQN( slice_data(new_val1), slice_data(value), slice_length(value)); - } - else { + } else { ASSERT_EQUAL(slice_length(new_val2), slice_length(value)); ASSERT_STREQN( slice_data(new_val2), slice_data(value), slice_length(value)); } + + splinterdb_cf_lookup_result_deinit(&result); } - } + } } From e7892d97edf2fcff6eaf0145a7d7963c89b871d0 Mon Sep 17 00:00:00 2001 From: Evan West Date: Fri, 30 Jun 2023 21:17:57 +0000 Subject: [PATCH 07/12] column family support: match splinter's iterator API --- include/splinterdb/column_family.h | 3 ++ src/column_family.c | 47 +++++++++++++----------------- tests/unit/column_family_test.c | 8 ++--- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h index 12f9c95db..703a4621b 100644 --- a/include/splinterdb/column_family.h +++ b/include/splinterdb/column_family.h @@ -113,6 +113,9 @@ void splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter); _Bool +splinterdb_cf_iterator_valid(splinterdb_cf_iterator *cf_iter); + +void splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN slice *key, // OUT slice *value // OUT diff --git a/src/column_family.c b/src/column_family.c index 8d0a3a046..56dc565f5 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -9,8 +9,9 @@ #include +// This is just a normal splinterdb_iterator +// but to have a clean interface, we pretend its not struct splinterdb_cf_iterator { - column_family_id id; splinterdb_iterator iter; }; @@ -266,26 +267,29 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); slice cf_key = userkey_to_cf_key(user_start_key, cf.id, &cf_key_wb); - splinterdb_cf_iterator *cf_it = TYPED_MALLOC(cf.kvs->spl->heap_id, cf_it); - if (cf_it == NULL) { + column_family_id next_cf = cf.id + 1; + slice next_cf_key = slice_create(sizeof(column_family_id), (void *)&next_cf); + + splinterdb_iterator *it = TYPED_MALLOC(cf.kvs->spl->heap_id, it); + if (it == NULL) { platform_error_log("TYPED_MALLOC error\n"); return platform_status_to_int(STATUS_NO_MEMORY); } - cf_it->iter.last_rc = STATUS_OK; - trunk_range_iterator *range_itor = &(cf_it->iter.sri); + it->last_rc = STATUS_OK; + trunk_range_iterator *range_itor = &(it->sri); key start_key = key_create_from_slice(cf_key); + key end_key = key_create_from_slice(next_cf_key); platform_status rc = trunk_range_iterator_init( - cf.kvs->spl, range_itor, start_key, POSITIVE_INFINITY_KEY, UINT64_MAX); + cf.kvs->spl, range_itor, start_key, end_key, UINT64_MAX); if (!SUCCESS(rc)) { - platform_free(cf.kvs->spl->heap_id, *cf_iter); + platform_free(cf.kvs->spl->heap_id, it); writable_buffer_deinit(&cf_key_wb); return platform_status_to_int(rc); } - cf_it->iter.parent = cf.kvs; - cf_it->id = cf.id; + it->parent = cf.kvs; - *cf_iter = cf_it; + *cf_iter = (splinterdb_cf_iterator *)it; writable_buffer_deinit(&cf_key_wb); return EXIT_SUCCESS; } @@ -293,32 +297,23 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN void splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter) { - trunk_range_iterator *range_itor = &(cf_iter->iter.sri); - trunk_range_iterator_deinit(range_itor); - - platform_free(range_itor->spl->heap_id, cf_iter); + splinterdb_iterator_deinit(&cf_iter->iter); } _Bool +splinterdb_cf_iterator_valid(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_valid(&cf_iter->iter); +} + +void splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN slice *key, // OUT slice *value // OUT ) { - _Bool valid = splinterdb_iterator_valid(&cf_iter->iter); - - if (!valid) - return FALSE; - - // if valid, check the key to ensure it's within this column family splinterdb_iterator_get_current(&cf_iter->iter, key, value); - column_family_id key_cf = get_cf_id(*key); - - if (key_cf != cf_iter->id) - return FALSE; - *key = cf_key_to_userkey(*key); - return TRUE; } void diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 02462a343..6bf0ceff0 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -304,9 +304,9 @@ CTEST2(column_family, test_multiple_cf_range) slice key; slice val; int idx = 0; - for (; splinterdb_cf_iterator_get_current(it, &key, &val); - splinterdb_cf_iterator_next(it)) + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + splinterdb_cf_iterator_get_current(it, &key, &val); ASSERT_EQUAL(slice_length(keys[idx]), slice_length(key)); ASSERT_STREQN(slice_data(keys[idx]), slice_data(key), slice_length(key)); @@ -322,9 +322,9 @@ CTEST2(column_family, test_multiple_cf_range) ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, &it, NULL_SLICE)); idx = 0; - for (; splinterdb_cf_iterator_get_current(it, &key, &val); - splinterdb_cf_iterator_next(it)) + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + splinterdb_cf_iterator_get_current(it, &key, &val); ASSERT_EQUAL(slice_length(keys[3 - idx]), slice_length(key)); ASSERT_STREQN( slice_data(keys[3 - idx]), slice_data(key), slice_length(key)); From b81f6bc71fa68ed8e631590f9bd52216da191374 Mon Sep 17 00:00:00 2001 From: Evan West Date: Fri, 30 Jun 2023 21:44:35 +0000 Subject: [PATCH 08/12] column family support: handle cfs that don't exist or are deleted --- src/column_family.c | 90 +++++++++++++++++++++++++-------- tests/unit/column_family_test.c | 6 +-- 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/src/column_family.c b/src/column_family.c index 56dc565f5..6129cf1c2 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -29,7 +29,7 @@ platform_status_to_int(const platform_status status) // IN // Some helper functions we'll use for managing the column family identifiers // and the data config table -column_family_id +static inline column_family_id get_cf_id(slice cf_key) { // the cf id is a prefix of the key @@ -39,7 +39,7 @@ get_cf_id(slice cf_key) return id; } -slice +static inline slice userkey_to_cf_key(slice userkey, column_family_id cf_id, writable_buffer *cf_key_wb) @@ -54,7 +54,7 @@ userkey_to_cf_key(slice userkey, return writable_buffer_to_slice(cf_key_wb); } -slice +static inline slice cf_key_to_userkey(slice cf_key) { uint64 key_len = slice_length(cf_key); @@ -64,7 +64,7 @@ cf_key_to_userkey(slice cf_key) data + sizeof(column_family_id)); } -column_family_id +static inline column_family_id cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) { column_family_id new_id = cf_cfg->num_families; @@ -82,7 +82,7 @@ cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) return new_id; } -void +static inline void cfg_table_delete(cf_data_config *cf_cfg, column_family_id cf_id) { // memory is held by user so don't free it @@ -92,6 +92,18 @@ cfg_table_delete(cf_data_config *cf_cfg, column_family_id cf_id) // TODO: Reuse this slot somehow? } +// lookup a data_config in the table. +// Returns NULL if the column family ID does not exist +// otherwise, returns the associated data_config for the cf +static inline data_config * +cfg_table_lookup(cf_data_config *cf_cfg, column_family_id cf_id) +{ + if (cf_cfg->num_families <= cf_id) + return NULL; + + return cf_cfg->config_table[cf_id]; +} + // Beginning of column family interface // Create a new column family @@ -136,6 +148,10 @@ splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value) // zero len key reserved, negative infinity platform_assert(slice_length(key) > 0); + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + // convert to column family key by prefixing the cf id char key_buf[CF_KEY_DEFAULT_SIZE]; writable_buffer cf_key_wb; @@ -145,10 +161,8 @@ splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value) // call splinter's insert function and return int rc = splinterdb_insert(cf.kvs, cf_key, value); - if (rc != 0) - return rc; writable_buffer_deinit(&cf_key_wb); - return 0; + return rc; } int @@ -157,6 +171,10 @@ splinterdb_cf_delete(const splinterdb_column_family cf, slice key) // zero len key reserved, negative infinity platform_assert(slice_length(key) > 0); + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + // convert to column family key by prefixing the cf id char key_buf[CF_KEY_DEFAULT_SIZE]; writable_buffer cf_key_wb; @@ -166,10 +184,8 @@ splinterdb_cf_delete(const splinterdb_column_family cf, slice key) // call splinter's delete function and return int rc = splinterdb_delete(cf.kvs, cf_key); - if (rc != 0) - return rc; writable_buffer_deinit(&cf_key_wb); - return 0; + return rc; } int @@ -178,6 +194,10 @@ splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta) // zero len key reserved, negative infinity platform_assert(slice_length(key) > 0); + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + // convert to column family key by prefixing the cf id char key_buf[CF_KEY_DEFAULT_SIZE]; writable_buffer cf_key_wb; @@ -187,10 +207,8 @@ splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta) // call splinter's update function and return int rc = splinterdb_update(cf.kvs, cf_key, delta); - if (rc != 0) - return rc; writable_buffer_deinit(&cf_key_wb); - return 0; + return rc; } // column family lookups @@ -234,6 +252,10 @@ splinterdb_cf_lookup(const splinterdb_column_family cf, // IN // zero len key reserved, negative infinity platform_assert(slice_length(key) > 0); + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + // convert to column family key by prefixing the cf id char key_buf[CF_KEY_DEFAULT_SIZE]; writable_buffer cf_key_wb; @@ -257,6 +279,10 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN slice user_start_key // IN ) { + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + // The minimum key contains no key data only consists of // the column id. // This is what a NULL key will become @@ -275,7 +301,7 @@ splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN platform_error_log("TYPED_MALLOC error\n"); return platform_status_to_int(STATUS_NO_MEMORY); } - it->last_rc = STATUS_OK; + it->last_rc = STATUS_OK; trunk_range_iterator *range_itor = &(it->sri); key start_key = key_create_from_slice(cf_key); @@ -352,8 +378,14 @@ cf_key_compare(const data_config *cfg, slice key1, slice key2) if (slice_length(userkey2) == 0) return 1; - // get the data_config for this column family and call its function - data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id1]; + // get the data_config for this column family + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id1); + + // keys are equal if the config has been deleted. Squish it all together. + if (cf_cfg == NULL) + return 0; + + // call column family's function return cf_cfg->key_compare(cf_cfg, userkey1, userkey2); } @@ -361,14 +393,21 @@ static int cf_merge_tuples(const data_config *cfg, slice key, message old_raw_message, - merge_accumulator *new_data) + merge_accumulator *new_message) { column_family_id cf_id = get_cf_id(key); // get the data_config for this column family and call its function - data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id]; + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); + if (cf_cfg == NULL) { + // new_message becomes a deletion as the column family has been deleted + new_message->type = MESSAGE_TYPE_DELETE; + return 0; + } + + // call column family's function return cf_cfg->merge_tuples( - cf_cfg, cf_key_to_userkey(key), old_raw_message, new_data); + cf_cfg, cf_key_to_userkey(key), old_raw_message, new_message); } static int @@ -379,8 +418,15 @@ cf_merge_tuples_final(const data_config *cfg, { column_family_id cf_id = get_cf_id(key); - // get the data_config for this column family and call its function - data_config *cf_cfg = ((cf_data_config *)cfg)->config_table[cf_id]; + // get the data_config for this column family + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); + if (cf_cfg == NULL) { + // oldest_data becomes a deletion as the column family has been deleted + oldest_data->type = MESSAGE_TYPE_DELETE; + return 0; + } + + // call column family's function return cf_cfg->merge_tuples_final( cf_cfg, cf_key_to_userkey(key), oldest_data); } diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 6bf0ceff0..4e1e4bba5 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -304,8 +304,7 @@ CTEST2(column_family, test_multiple_cf_range) slice key; slice val; int idx = 0; - for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) - { + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { splinterdb_cf_iterator_get_current(it, &key, &val); ASSERT_EQUAL(slice_length(keys[idx]), slice_length(key)); ASSERT_STREQN(slice_data(keys[idx]), slice_data(key), slice_length(key)); @@ -322,8 +321,7 @@ CTEST2(column_family, test_multiple_cf_range) ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, &it, NULL_SLICE)); idx = 0; - for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) - { + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { splinterdb_cf_iterator_get_current(it, &key, &val); ASSERT_EQUAL(slice_length(keys[3 - idx]), slice_length(key)); ASSERT_STREQN( From 7fb8eee80083b8c8e27a0d88428ab47217912daa Mon Sep 17 00:00:00 2001 From: Evan West Date: Fri, 30 Jun 2023 21:49:54 +0000 Subject: [PATCH 09/12] column family support: remove poisoned functions --- src/column_family.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/column_family.c b/src/column_family.c index 6129cf1c2..406121d58 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -1,6 +1,6 @@ #include "platform.h" -// #include "poison.h" +#include "poison.h" #include "splinterdb/column_family.h" #include "splinterdb/splinterdb.h" @@ -72,7 +72,7 @@ cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) // reallocate table memory if necessary if (cf_cfg->table_mem <= new_id) { - cf_cfg->config_table = (data_config **)realloc( + cf_cfg->config_table = (data_config **)platform_realloc(platform_get_heap_id(), cf_cfg->config_table, (new_id + 1) * 2 * sizeof(data_config *)); } @@ -484,7 +484,7 @@ column_family_config_deinit(cf_data_config *cf_cfg) // we assume that the user will handle deallocating the table entries // we just need to dealloc our array of pointers if (cf_cfg->config_table != NULL) - free(cf_cfg->config_table); + platform_free(platform_get_heap_id(), cf_cfg->config_table); cf_cfg->config_table = NULL; cf_cfg->table_mem = 0; } From 0097bd133b206f5846b4126468a3ae4013923eb5 Mon Sep 17 00:00:00 2001 From: Evan West Date: Thu, 6 Jul 2023 00:40:57 +0000 Subject: [PATCH 10/12] column family support: multi-threading --- include/splinterdb/column_family.h | 29 ++--- src/column_family.c | 121 ++++++++++-------- src/merge.c | 1 + src/trunk.c | 63 +++++++-- src/trunk.h | 14 ++ tests/unit/column_family_test.c | 199 +++++++++++++++++++++++++++-- 6 files changed, 333 insertions(+), 94 deletions(-) diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h index 703a4621b..d598107d0 100644 --- a/include/splinterdb/column_family.h +++ b/include/splinterdb/column_family.h @@ -11,6 +11,7 @@ #define _SPLINTERDB_COLUMN_FAMILY_H_ #include "splinterdb/splinterdb.h" +#include "splinterdb/public_platform.h" // Size of stack buffer we allocate for column family keys. // This can be fairly large because these buffers are short @@ -21,13 +22,6 @@ typedef uint32 column_family_id; -typedef struct cf_data_config { - data_config general_config; - column_family_id num_families; - data_config **config_table; - column_family_id table_mem; -} cf_data_config; - typedef struct splinterdb_column_family { column_family_id id; splinterdb *kvs; @@ -35,18 +29,20 @@ typedef struct splinterdb_column_family { typedef struct splinterdb_cf_iterator splinterdb_cf_iterator; -// Initialize the data_config stored in the cf_data_config -// this data_config is then passed to SplinterDB to add support for -// column families +// Initialize the cf_data_config and give a pointer to it to the user. +// This pointer is then passed to SplinterDB to add support for +// column families. The memory for the column_family_config is managed +// by SplinterDB. Not the user. void -column_family_config_init(const uint64 max_key_size, // IN - cf_data_config *cf_cfg // OUT +column_family_config_init(const uint64 max_key_size, // IN + data_config **cf_cfg // OUT ); -// Delete the cf_data_config, freeing the memory used by the -// config table +// Delete the cf_data_config, freeing all associated memory. +// +// This should only be called after closing the SplinterDB instance void -column_family_config_deinit(cf_data_config *cf_cfg); +column_family_config_deinit(data_config *cf_cfg); // Create a new column family // Returns a new column family struct @@ -56,6 +52,9 @@ column_family_create(splinterdb *kvs, data_config *data_cfg); // Delete the column family cf +// IMPORTANT: The user may NOT delete their data_config even after +// calling this function. All data_configs must persist +// for the lifetime of the SplinterDB instance void column_family_delete(splinterdb_column_family cf); diff --git a/src/column_family.c b/src/column_family.c index 406121d58..4bcf8b073 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -10,11 +10,19 @@ #include // This is just a normal splinterdb_iterator -// but to have a clean interface, we pretend its not +// but to have a clean interface, we pretend it's not struct splinterdb_cf_iterator { splinterdb_iterator iter; }; +typedef struct cf_data_config { + data_config general_config; + platform_semaphore table_sema; + column_family_id num_families; + column_family_id capacity; + data_config **config_table; +} cf_data_config; + /* * Extract errno.h -style status int from a platform_status * @@ -44,7 +52,6 @@ userkey_to_cf_key(slice userkey, column_family_id cf_id, writable_buffer *cf_key_wb) { - // extract from the user's key and resize the buffer uint64 key_len = slice_length(userkey); const void *data = slice_data(userkey); @@ -65,31 +72,51 @@ cf_key_to_userkey(slice cf_key) } static inline column_family_id -cfg_table_insert(cf_data_config *cf_cfg, data_config *data_cfg) +cfg_table_insert(splinterdb *kvs, cf_data_config *cf_cfg, data_config *data_cfg) { + + platform_semaphore_wait(&cf_cfg->table_sema); column_family_id new_id = cf_cfg->num_families; cf_cfg->num_families += 1; // reallocate table memory if necessary - if (cf_cfg->table_mem <= new_id) { - cf_cfg->config_table = (data_config **)platform_realloc(platform_get_heap_id(), - cf_cfg->config_table, (new_id + 1) * 2 * sizeof(data_config *)); + if (cf_cfg->capacity <= new_id) { + cf_cfg->capacity *= 2; + data_config **new_table = TYPED_ARRAY_MALLOC( + platform_get_heap_id(), new_table, cf_cfg->capacity); + if (new_table == NULL) { + platform_error_log("TYPED_ARRAY_MALLOC error\n"); + return -1; + } + + memcpy(new_table, + cf_cfg->config_table, + cf_cfg->num_families * sizeof(data_config *)); + data_config **old_table = cf_cfg->config_table; + cf_cfg->config_table = new_table; + + // wait until all threads have the change to observe this table change + // 1. make copy of counters + uint64 local_counter_copy[MAX_THREADS]; + for (int i = 0; i < MAX_THREADS; i++) + local_counter_copy[i] = kvs->spl->cfg_crit_count[i].counter; + + // 2. for each thread wait until copy is even or that current is > copy + for (int i = 0; i < MAX_THREADS; i++) { + if (local_counter_copy[i] % 2 == 1) + while (local_counter_copy[i] >= kvs->spl->cfg_crit_count[i].counter) + ; + } + // free the old table + platform_free(platform_get_heap_id(), old_table); } // place new data_config in table cf_cfg->config_table[new_id] = data_cfg; - return new_id; -} - -static inline void -cfg_table_delete(cf_data_config *cf_cfg, column_family_id cf_id) -{ - // memory is held by user so don't free it - // just mark the config_table entry as NULL - cf_cfg->config_table[cf_id] = NULL; + platform_semaphore_post(&cf_cfg->table_sema); - // TODO: Reuse this slot somehow? + return new_id; } // lookup a data_config in the table. @@ -119,7 +146,7 @@ column_family_create(splinterdb *kvs, // convert from data_config to cf_data_config cf_data_config *cf_cfg = (cf_data_config *)kvs->data_cfg; - column_family_id new_id = cfg_table_insert(cf_cfg, new_data_cfg); + column_family_id new_id = cfg_table_insert(kvs, cf_cfg, new_data_cfg); // return new column family splinterdb_column_family cf; @@ -133,10 +160,7 @@ column_family_create(splinterdb *kvs, void column_family_delete(splinterdb_column_family cf) { - // convert from data_config to cf_data_config - cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; - - cfg_table_delete(cf_cfg, cf.id); + // TODO: Issue deletion messages to all keys within the column family! } // SplinterDB Functions @@ -380,10 +404,7 @@ cf_key_compare(const data_config *cfg, slice key1, slice key2) // get the data_config for this column family data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id1); - - // keys are equal if the config has been deleted. Squish it all together. - if (cf_cfg == NULL) - return 0; + platform_assert(cf_cfg != NULL); // call column family's function return cf_cfg->key_compare(cf_cfg, userkey1, userkey2); @@ -399,11 +420,7 @@ cf_merge_tuples(const data_config *cfg, // get the data_config for this column family and call its function data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); - if (cf_cfg == NULL) { - // new_message becomes a deletion as the column family has been deleted - new_message->type = MESSAGE_TYPE_DELETE; - return 0; - } + platform_assert(cf_cfg != NULL); // call column family's function return cf_cfg->merge_tuples( @@ -420,11 +437,7 @@ cf_merge_tuples_final(const data_config *cfg, // get the data_config for this column family data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); - if (cf_cfg == NULL) { - // oldest_data becomes a deletion as the column family has been deleted - oldest_data->type = MESSAGE_TYPE_DELETE; - return 0; - } + platform_assert(cf_cfg != NULL); // call column family's function return cf_cfg->merge_tuples_final( @@ -450,16 +463,11 @@ cf_message_to_string(const data_config *cfg, // Initialize the data_config stored in the cf_data_config. // Its data_config is then passed to SplinterDB to add support for -// column families -// -// TODO: The key_hash function cannot be overwritten by column families -// at this time. This is because we do not have access to the cfg -// in the key_hash. So we have no way of accessing a user defined -// key_hash for the column family. This should probably be fixed. -// Likely requires adding the cfg to the key_hash_fn type. +// column families. The memory for the column_family_config is managed +// by splinter. Not the user. void -column_family_config_init(const uint64 max_key_size, // IN - cf_data_config *cf_cfg // OUT +column_family_config_init(const uint64 max_key_size, // IN + data_config **data_cfg // OUT ) { data_config cfg = { @@ -472,19 +480,26 @@ column_family_config_init(const uint64 max_key_size, // IN .message_to_string = cf_message_to_string, }; + platform_heap_id hid = platform_get_heap_id(); + cf_data_config *cf_cfg = TYPED_MALLOC(hid, cf_cfg); + cf_cfg->general_config = cfg; cf_cfg->num_families = 0; - cf_cfg->config_table = NULL; - cf_cfg->table_mem = 0; + cf_cfg->capacity = 1; + cf_cfg->config_table = TYPED_MALLOC(hid, cf_cfg->config_table); + + platform_semaphore_init(&cf_cfg->table_sema, 1, hid); + *data_cfg = (data_config *)cf_cfg; } void -column_family_config_deinit(cf_data_config *cf_cfg) +column_family_config_deinit(data_config *data_cfg) { - // we assume that the user will handle deallocating the table entries - // we just need to dealloc our array of pointers - if (cf_cfg->config_table != NULL) - platform_free(platform_get_heap_id(), cf_cfg->config_table); - cf_cfg->config_table = NULL; - cf_cfg->table_mem = 0; + cf_data_config *cf_cfg = (cf_data_config *)data_cfg; + + // user deallocates their cf's data configs + // we need to deallocate everything else + platform_semaphore_destroy(&cf_cfg->table_sema); + platform_free(platform_get_heap_id(), cf_cfg->config_table); + platform_free(platform_get_heap_id(), cf_cfg); } diff --git a/src/merge.c b/src/merge.c index 13444240b..6f2e02895 100644 --- a/src/merge.c +++ b/src/merge.c @@ -522,6 +522,7 @@ merge_iterator_create(platform_heap_id hid, debug_assert_message_type_valid(merge_itor); } } + platform_assert(SUCCESS(rc) == (*out_itor != NULL)); return rc; } diff --git a/src/trunk.c b/src/trunk.c index 78c2f697a..61425dc6c 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -674,6 +674,33 @@ trunk_node_is_index(trunk_node *node) return !trunk_node_is_leaf(node); } +// call this function when entering data_config critical functions. +static inline void +trunk_cfg_counter_enter(trunk_handle *spl) +{ + threadid tid = platform_get_tid(); + platform_assert(tid < MAX_THREADS); + if (spl->cfg_crit_count[tid].depth == 0) { + platform_assert(spl->cfg_crit_count[tid].counter % 2 == 0); + ++spl->cfg_crit_count[tid].counter; + } + ++spl->cfg_crit_count[tid].depth; +} + +// call this funciton when exiting data_config critical functions. +static inline void +trunk_cfg_counter_exit(trunk_handle *spl) +{ + threadid tid = platform_get_tid(); + platform_assert(tid < MAX_THREADS); + platform_assert(spl->cfg_crit_count[tid].depth > 0); + --spl->cfg_crit_count[tid].depth; + if (spl->cfg_crit_count[tid].depth == 0) { + platform_assert(spl->cfg_crit_count[tid].counter % 2 == 1); + ++spl->cfg_crit_count[tid].counter; + } +} + /* *----------------------------------------------------------------------------- * Compaction Requests @@ -3785,7 +3812,9 @@ static void trunk_memtable_flush_internal_virtual(void *arg, void *scratch) { trunk_memtable_args *mt_args = arg; + trunk_cfg_counter_enter(mt_args->spl); trunk_memtable_flush_internal(mt_args->spl, mt_args->generation); + trunk_cfg_counter_exit(mt_args->spl); } /* @@ -4221,6 +4250,8 @@ trunk_bundle_build_filters(void *arg, void *scratch) trunk_compact_bundle_req *compact_req = (trunk_compact_bundle_req *)arg; trunk_handle *spl = compact_req->spl; + trunk_cfg_counter_enter(spl); + bool should_continue_build_filters = TRUE; while (should_continue_build_filters) { trunk_node node; @@ -4262,6 +4293,7 @@ trunk_bundle_build_filters(void *arg, void *scratch) spl, &stream, "out of order, reequeuing\n"); trunk_close_log_stream_if_enabled(spl, &stream); trunk_node_unget(spl->cc, &node); + trunk_cfg_counter_exit(spl); return; } @@ -4391,6 +4423,7 @@ trunk_bundle_build_filters(void *arg, void *scratch) key_buffer_deinit(&compact_req->end_key); platform_free(spl->heap_id, compact_req); trunk_maybe_reclaim_space(spl); + trunk_cfg_counter_exit(spl); return; } @@ -5071,6 +5104,8 @@ trunk_compact_bundle(void *arg, void *scratch_buf) trunk_handle *spl = req->spl; threadid tid; + trunk_cfg_counter_enter(spl); // entering data_config critical region + /* * 1. Acquire node read lock */ @@ -5396,6 +5431,7 @@ trunk_compact_bundle(void *arg, void *scratch_buf) out: trunk_log_stream_if_enabled(spl, &stream, "\n"); trunk_close_log_stream_if_enabled(spl, &stream); + trunk_cfg_counter_exit(spl); // exiting data_config critical region } /* @@ -6212,13 +6248,6 @@ trunk_range_iterator_init(trunk_handle *spl, * db/range, move to next leaf */ if (at_end) { - KEY_CREATE_LOCAL_COPY(rc, - local_max_key, - spl->heap_id, - key_buffer_key(&range_itor->local_max_key)); - if (!SUCCESS(rc)) { - return rc; - } KEY_CREATE_LOCAL_COPY(rc, rebuild_key, spl->heap_id, @@ -6226,19 +6255,18 @@ trunk_range_iterator_init(trunk_handle *spl, if (!SUCCESS(rc)) { return rc; } + + uint64 temp_tuples = range_itor->num_tuples; trunk_range_iterator_deinit(range_itor); - if (1 && trunk_key_compare(spl, local_max_key, POSITIVE_INFINITY_KEY) != 0 - && trunk_key_compare(spl, local_max_key, max_key) < 0) - { + if (trunk_key_compare(spl, rebuild_key, max_key) < 0) { rc = trunk_range_iterator_init( - spl, range_itor, rebuild_key, max_key, range_itor->num_tuples); + spl, range_itor, rebuild_key, max_key, temp_tuples); if (!SUCCESS(rc)) { return rc; } - iterator_at_end(&range_itor->merge_itor->super, &at_end); + at_end = range_itor->at_end; } } - range_itor->at_end = at_end; return rc; @@ -6519,6 +6547,7 @@ trunk_insert(trunk_handle *spl, key tuple_key, message data) if (trunk_max_key_size(spl) < key_length(tuple_key)) { return STATUS_BAD_PARAM; } + trunk_cfg_counter_enter(spl); if (message_class(data) == MESSAGE_TYPE_DELETE) { data = DELETE_MESSAGE; @@ -6554,6 +6583,7 @@ trunk_insert(trunk_handle *spl, key tuple_key, message data) } out: + trunk_cfg_counter_exit(spl); return rc; } @@ -6727,6 +6757,7 @@ trunk_lookup(trunk_handle *spl, key target, merge_accumulator *result) // also handles switch to READY ^^^^^ merge_accumulator_set_to_null(result); + trunk_cfg_counter_enter(spl); memtable_begin_lookup(spl->mt_ctxt); bool found_in_memtable = FALSE; @@ -6805,6 +6836,7 @@ trunk_lookup(trunk_handle *spl, key target, merge_accumulator *result) merge_accumulator_set_to_null(result); } + trunk_cfg_counter_exit(spl); return STATUS_OK; } @@ -6930,6 +6962,8 @@ trunk_lookup_async(trunk_handle *spl, // IN cache_async_result res = 0; threadid tid; + trunk_cfg_counter_enter(spl); + #if TRUNK_DEBUG cache_enable_sync_get(spl->cc, FALSE); #endif @@ -7371,6 +7405,7 @@ trunk_lookup_async(trunk_handle *spl, // IN cache_enable_sync_get(spl->cc, TRUE); #endif + trunk_cfg_counter_exit(spl); return res; } @@ -7382,6 +7417,7 @@ trunk_range(trunk_handle *spl, tuple_function func, void *arg) { + trunk_cfg_counter_enter(spl); trunk_range_iterator *range_itor = TYPED_MALLOC(spl->heap_id, range_itor); platform_status rc = trunk_range_iterator_init( spl, range_itor, start_key, POSITIVE_INFINITY_KEY, num_tuples); @@ -7404,6 +7440,7 @@ trunk_range(trunk_handle *spl, destroy_range_itor: trunk_range_iterator_deinit(range_itor); platform_free(spl->heap_id, range_itor); + trunk_cfg_counter_exit(spl); return rc; } diff --git a/src/trunk.h b/src/trunk.h index ec120682d..217d87d9b 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -215,6 +215,20 @@ struct trunk_handle { uint64 counter; } PLATFORM_CACHELINE_ALIGNED task_countup[MAX_THREADS]; + /* + * Per thread and per splinter table, count entering and exiting regions + * critical to the data_config. + * + * Specifically the race condition is if a trunk task or function is in the + * middle of performing a key_compare or merge function call in the + * data_config and then the memory for the data_config is free'd. This could + * happen without this change when deleting a column family. + */ + struct { + uint64 counter; + uint64 depth; // only incr counter if we're the highest scope to do so + } PLATFORM_CACHELINE_ALIGNED cfg_crit_count[MAX_THREADS]; + // space rec queue srq srq; diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 4e1e4bba5..ea6346e37 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -20,12 +20,6 @@ #define TEST_MAX_KEY_SIZE 16 #define TEST_MAX_VALUE_SIZE 32 -// Hard-coded format strings to generate key and values -// static const char key_fmt[] = "key-%04x"; -// static const char val_fmt[] = "val-%04x"; -// #define KEY_FMT_LENGTH (8) -// #define VAL_FMT_LENGTH (8) - CTEST_DATA(column_family) { @@ -34,7 +28,7 @@ CTEST_DATA(column_family) // the global_data_cfg is used to route to the right data_config // for each column family - cf_data_config global_data_cfg; + data_config *global_data_cfg; // default data config for when we don't want to be special data_config default_data_cfg; @@ -44,11 +38,10 @@ CTEST_SETUP(column_family) { default_data_config_init(TEST_MAX_KEY_SIZE, &data->default_data_cfg); column_family_config_init(TEST_MAX_KEY_SIZE, &data->global_data_cfg); - data->cfg = - (splinterdb_config){.filename = TEST_DB_NAME, - .cache_size = 64 * Mega, - .disk_size = 128 * Mega, - .data_cfg = (data_config *)&data->global_data_cfg}; + data->cfg = (splinterdb_config){.filename = TEST_DB_NAME, + .cache_size = 1024 * Mega, + .disk_size = 2048 * Mega, + .data_cfg = data->global_data_cfg}; int rc = splinterdb_create(&data->cfg, &data->kvsb); ASSERT_EQUAL(0, rc); @@ -61,7 +54,7 @@ CTEST_TEARDOWN(column_family) if (data->kvsb) { splinterdb_close(&data->kvsb); } - column_family_config_deinit(&data->global_data_cfg); + column_family_config_deinit(data->global_data_cfg); } /* @@ -179,6 +172,7 @@ CTEST2(column_family, test_max_length) ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); splinterdb_cf_lookup_result_deinit(&result); + column_family_delete(cf); } /* @@ -237,6 +231,10 @@ CTEST2(column_family, test_multiple_cf_same_key) splinterdb_cf_lookup_result_deinit(&result); } + column_family_delete(cf0); + column_family_delete(cf1); + column_family_delete(cf2); + column_family_delete(cf3); } /* @@ -334,6 +332,8 @@ CTEST2(column_family, test_multiple_cf_range) ASSERT_EQUAL(4, idx); splinterdb_cf_iterator_deinit(it); + column_family_delete(cf_default); + column_family_delete(cf_reverse); } @@ -480,4 +480,177 @@ CTEST2(column_family, multiple_cf_with_updates) splinterdb_cf_lookup_result_deinit(&result); } } + column_family_delete(cf_default); + column_family_delete(cf_reverse); +} + +typedef struct do_cf_args { + splinterdb *kvs; + data_config *data_cfg; + int inserts; + int deletes; +} do_cf_args; + +// function to create, insert to, delete from, and destroy a cf +// performs a variable amount of insertion/deletions +void * +do_cf(do_cf_args *args) +{ + splinterdb *kvs = args->kvs; + data_config *data_cfg = args->data_cfg; + int inserts = args->inserts; + int deletes = args->deletes; + + if (deletes > inserts) + deletes = inserts; + + if (inserts < 0) { + deletes = 0; + inserts = 0; + } + + splinterdb_register_thread(kvs); + splinterdb_column_family cf = + column_family_create(kvs, TEST_MAX_KEY_SIZE, data_cfg); + printf("column family ID = %i, inserts = %i, deletes = %i\n", + cf.id, + inserts, + deletes); + + // perform insertions + for (int i = 0; i < inserts; i++) { + char key_data[TEST_MAX_KEY_SIZE]; + char val_data[TEST_MAX_VALUE_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", i); + snprintf(val_data, sizeof(val_data), "val-%06x-%02x", i, cf.id); + slice key = slice_create(strlen(key_data), key_data); + slice val = slice_create(strlen(val_data), val_data); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf, key, val)); + } + printf("column family ID = %i, inserts done\n", cf.id); + + // perform deletions + for (int i = 0; i < deletes; i++) { + char key_data[TEST_MAX_KEY_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", i); + slice key = slice_create(strlen(key_data), key_data); + + ASSERT_EQUAL(0, splinterdb_cf_delete(cf, key)); + } + printf("column family ID = %i, deletes done\n", cf.id); + + // perform a range query + int first_idx = deletes; // first remaining insertion + int cur_idx = first_idx; + int num_inserts = inserts - deletes; + + splinterdb_cf_iterator *it; + slice key; + slice val; + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf, &it, NULL_SLICE)); + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + char key_data[TEST_MAX_KEY_SIZE]; + char val_data[TEST_MAX_VALUE_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", cur_idx); + snprintf(val_data, sizeof(val_data), "val-%06x-%02x", cur_idx, cf.id); + slice exp_key = slice_create(strlen(key_data), key_data); + slice exp_val = slice_create(strlen(val_data), val_data); + + splinterdb_cf_iterator_get_current(it, &key, &val); + + ASSERT_EQUAL(slice_length(exp_key), slice_length(key)); + ASSERT_STREQN(slice_data(exp_key), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(exp_val), slice_length(val)); + ASSERT_STREQN(slice_data(exp_val), slice_data(val), slice_length(val)); + cur_idx++; + } + ASSERT_EQUAL(num_inserts, cur_idx - first_idx); + printf("column family ID = %i, range query done\n", cf.id); + + splinterdb_cf_iterator_deinit(it); + + printf("Deleting column family %i\n", cf.id); + column_family_delete(cf); + + splinterdb_deregister_thread(kvs); + + // data config goes out of scope here and is deallocated + return NULL; +} + + +/* + * Test creating cfs, performing updates upon them, and deleting cfs + * all in parallel and with background threads. + */ +CTEST2(column_family, test_multithread_cf) +{ + data_config t1_cfg, t2_cfg, t3_cfg, t4_cfg; + default_data_config_init(TEST_MAX_KEY_SIZE, &t1_cfg); + default_data_config_init(TEST_MAX_KEY_SIZE, &t2_cfg); + default_data_config_init(TEST_MAX_KEY_SIZE, &t3_cfg); + default_data_config_init(TEST_MAX_KEY_SIZE, &t4_cfg); + + // close and reopen with background threads + splinterdb_close(&data->kvsb); + data->cfg.num_normal_bg_threads = 4; + data->cfg.num_memtable_bg_threads = 2; + + splinterdb_create(&data->cfg, &data->kvsb); + + printf("\n"); + + platform_thread thread1; + do_cf_args t1_args = { + .kvs = data->kvsb, .data_cfg = &t1_cfg, .inserts = 2000000, .deletes = 0}; + platform_thread_create(&thread1, + FALSE, + (void (*)(void *))do_cf, + &t1_args, + platform_get_heap_id()); + + platform_thread thread2; + do_cf_args t2_args = {.kvs = data->kvsb, + .data_cfg = &t2_cfg, + .inserts = 500000, + .deletes = 10000}; + platform_thread_create(&thread2, + FALSE, + (void (*)(void *))do_cf, + &t2_args, + platform_get_heap_id()); + + platform_thread thread3; + do_cf_args t3_args = {.kvs = data->kvsb, + .data_cfg = &t3_cfg, + .inserts = 3000000, + .deletes = 250000}; + platform_thread_create(&thread3, + FALSE, + (void (*)(void *))do_cf, + &t3_args, + platform_get_heap_id()); + + platform_thread thread4; + do_cf_args t4_args = {.kvs = data->kvsb, + .data_cfg = &t4_cfg, + .inserts = 2000000, + .deletes = 200000}; + platform_thread_create(&thread4, + FALSE, + (void (*)(void *))do_cf, + &t4_args, + platform_get_heap_id()); + + platform_thread_join(thread1); + platform_thread_join(thread2); + platform_thread_join(thread3); + platform_thread_join(thread4); + + splinterdb_close(&data->kvsb); } From 48aad065e07e97b17b5b36491c8d63a6877f0496 Mon Sep 17 00:00:00 2001 From: Evan West Date: Thu, 6 Jul 2023 22:53:28 +0000 Subject: [PATCH 11/12] column family support: fix deadlock in cfg_table_insert and bug fix in trunk --- src/column_family.c | 2 -- src/trunk.c | 4 +++- src/trunk.h | 2 +- tests/unit/column_family_test.c | 10 +--------- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/column_family.c b/src/column_family.c index 4bcf8b073..9d934c178 100644 --- a/src/column_family.c +++ b/src/column_family.c @@ -74,7 +74,6 @@ cf_key_to_userkey(slice cf_key) static inline column_family_id cfg_table_insert(splinterdb *kvs, cf_data_config *cf_cfg, data_config *data_cfg) { - platform_semaphore_wait(&cf_cfg->table_sema); column_family_id new_id = cf_cfg->num_families; cf_cfg->num_families += 1; @@ -113,7 +112,6 @@ cfg_table_insert(splinterdb *kvs, cf_data_config *cf_cfg, data_config *data_cfg) // place new data_config in table cf_cfg->config_table[new_id] = data_cfg; - platform_semaphore_post(&cf_cfg->table_sema); return new_id; diff --git a/src/trunk.c b/src/trunk.c index 61425dc6c..96ff5a3a1 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -4191,7 +4191,8 @@ trunk_replace_routing_filter(trunk_handle *spl, // Move the tuples count from the bundle to whole branch uint64 bundle_num_tuples = compact_req->output_pivot_tuple_count[pos]; debug_assert(pdata->num_tuples_bundle >= bundle_num_tuples); - debug_assert((bundle_num_tuples == 0) == (pdata->filter.addr == 0)); + debug_assert((bundle_num_tuples + pdata->num_tuples_whole == 0) + == (pdata->filter.addr == 0)); pdata->num_tuples_bundle -= bundle_num_tuples; pdata->num_tuples_whole += bundle_num_tuples; @@ -5155,6 +5156,7 @@ trunk_compact_bundle(void *arg, void *scratch_buf) spl->stats[tid].compaction_time_wasted_ns[height] += platform_timestamp_elapsed(compaction_start); } + trunk_cfg_counter_exit(spl); return; } diff --git a/src/trunk.h b/src/trunk.h index 217d87d9b..9b681b858 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -225,7 +225,7 @@ struct trunk_handle { * happen without this change when deleting a column family. */ struct { - uint64 counter; + volatile uint64 counter; uint64 depth; // only incr counter if we're the highest scope to do so } PLATFORM_CACHELINE_ALIGNED cfg_crit_count[MAX_THREADS]; diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index ea6346e37..655fac81d 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -512,10 +512,6 @@ do_cf(do_cf_args *args) splinterdb_register_thread(kvs); splinterdb_column_family cf = column_family_create(kvs, TEST_MAX_KEY_SIZE, data_cfg); - printf("column family ID = %i, inserts = %i, deletes = %i\n", - cf.id, - inserts, - deletes); // perform insertions for (int i = 0; i < inserts; i++) { @@ -529,7 +525,6 @@ do_cf(do_cf_args *args) ASSERT_EQUAL(0, splinterdb_cf_insert(cf, key, val)); } - printf("column family ID = %i, inserts done\n", cf.id); // perform deletions for (int i = 0; i < deletes; i++) { @@ -540,7 +535,6 @@ do_cf(do_cf_args *args) ASSERT_EQUAL(0, splinterdb_cf_delete(cf, key)); } - printf("column family ID = %i, deletes done\n", cf.id); // perform a range query int first_idx = deletes; // first remaining insertion @@ -570,11 +564,9 @@ do_cf(do_cf_args *args) cur_idx++; } ASSERT_EQUAL(num_inserts, cur_idx - first_idx); - printf("column family ID = %i, range query done\n", cf.id); splinterdb_cf_iterator_deinit(it); - printf("Deleting column family %i\n", cf.id); column_family_delete(cf); splinterdb_deregister_thread(kvs); @@ -640,7 +632,7 @@ CTEST2(column_family, test_multithread_cf) do_cf_args t4_args = {.kvs = data->kvsb, .data_cfg = &t4_cfg, .inserts = 2000000, - .deletes = 200000}; + .deletes = 2000000}; platform_thread_create(&thread4, FALSE, (void (*)(void *))do_cf, From fdbb063b73230850623ed0571d423e6fffae8b33 Mon Sep 17 00:00:00 2001 From: Evan West Date: Fri, 7 Jul 2023 00:31:03 +0000 Subject: [PATCH 12/12] column family support: simplify trunk and adjust mt_cf test --- src/trunk.c | 12 +++++++----- src/trunk.h | 1 - tests/unit/column_family_test.c | 16 ++++------------ 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/trunk.c b/src/trunk.c index 96ff5a3a1..07b83200a 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -6191,12 +6191,13 @@ trunk_range_iterator_init(trunk_handle *spl, : max_key; key_buffer_init_from_key( &range_itor->rebuild_key, spl->heap_id, rebuild_key); + key_buffer local_max_key; if (trunk_key_compare(spl, max_key, rebuild_key) < 0) { key_buffer_init_from_key( - &range_itor->local_max_key, spl->heap_id, max_key); + &local_max_key, spl->heap_id, max_key); } else { key_buffer_init_from_key( - &range_itor->local_max_key, spl->heap_id, rebuild_key); + &local_max_key, spl->heap_id, rebuild_key); } trunk_node_unget(spl->cc, &node); @@ -6214,7 +6215,7 @@ trunk_range_iterator_init(trunk_handle *spl, btree_itor, branch, key_buffer_key(&range_itor->min_key), - key_buffer_key(&range_itor->local_max_key), + key_buffer_key(&local_max_key), do_prefetch, FALSE); } else { @@ -6225,13 +6226,15 @@ trunk_range_iterator_init(trunk_handle *spl, btree_itor, mt_root_addr, key_buffer_key(&range_itor->min_key), - key_buffer_key(&range_itor->local_max_key), + key_buffer_key(&local_max_key), is_live, FALSE); } range_itor->itor[i] = &btree_itor->super; } + key_buffer_deinit(&local_max_key); + platform_status rc = merge_iterator_create(spl->heap_id, spl->cfg.data_cfg, range_itor->num_branches, @@ -6360,7 +6363,6 @@ trunk_range_iterator_deinit(trunk_range_iterator *range_itor) key_buffer_deinit(&range_itor->min_key); key_buffer_deinit(&range_itor->max_key); - key_buffer_deinit(&range_itor->local_max_key); key_buffer_deinit(&range_itor->rebuild_key); } diff --git a/src/trunk.h b/src/trunk.h index 9b681b858..9f63f7d25 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -248,7 +248,6 @@ typedef struct trunk_range_iterator { bool at_end; key_buffer min_key; key_buffer max_key; - key_buffer local_max_key; key_buffer rebuild_key; btree_iterator btree_itor[TRUNK_RANGE_ITOR_MAX_BRANCHES]; trunk_branch branch[TRUNK_RANGE_ITOR_MAX_BRANCHES]; diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c index 655fac81d..1ca448a25 100644 --- a/tests/unit/column_family_test.c +++ b/tests/unit/column_family_test.c @@ -582,12 +582,6 @@ do_cf(do_cf_args *args) */ CTEST2(column_family, test_multithread_cf) { - data_config t1_cfg, t2_cfg, t3_cfg, t4_cfg; - default_data_config_init(TEST_MAX_KEY_SIZE, &t1_cfg); - default_data_config_init(TEST_MAX_KEY_SIZE, &t2_cfg); - default_data_config_init(TEST_MAX_KEY_SIZE, &t3_cfg); - default_data_config_init(TEST_MAX_KEY_SIZE, &t4_cfg); - // close and reopen with background threads splinterdb_close(&data->kvsb); data->cfg.num_normal_bg_threads = 4; @@ -595,11 +589,9 @@ CTEST2(column_family, test_multithread_cf) splinterdb_create(&data->cfg, &data->kvsb); - printf("\n"); - platform_thread thread1; do_cf_args t1_args = { - .kvs = data->kvsb, .data_cfg = &t1_cfg, .inserts = 2000000, .deletes = 0}; + .kvs = data->kvsb, .data_cfg = &data->default_data_cfg, .inserts = 2000000, .deletes = 0}; platform_thread_create(&thread1, FALSE, (void (*)(void *))do_cf, @@ -608,7 +600,7 @@ CTEST2(column_family, test_multithread_cf) platform_thread thread2; do_cf_args t2_args = {.kvs = data->kvsb, - .data_cfg = &t2_cfg, + .data_cfg = &data->default_data_cfg, .inserts = 500000, .deletes = 10000}; platform_thread_create(&thread2, @@ -619,7 +611,7 @@ CTEST2(column_family, test_multithread_cf) platform_thread thread3; do_cf_args t3_args = {.kvs = data->kvsb, - .data_cfg = &t3_cfg, + .data_cfg = &data->default_data_cfg, .inserts = 3000000, .deletes = 250000}; platform_thread_create(&thread3, @@ -630,7 +622,7 @@ CTEST2(column_family, test_multithread_cf) platform_thread thread4; do_cf_args t4_args = {.kvs = data->kvsb, - .data_cfg = &t4_cfg, + .data_cfg = &data->default_data_cfg, .inserts = 2000000, .deletes = 2000000}; platform_thread_create(&thread4,