diff --git a/Makefile b/Makefile index 3442847b5..e002a40d0 100644 --- a/Makefile +++ b/Makefile @@ -437,6 +437,11 @@ $(BINDIR)/$(UNITDIR)/splinterdb_quick_test: $(COMMON_TESTOBJ) $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ $(LIBDIR)/libsplinterdb.so +$(BINDIR)/$(UNITDIR)/column_family_test: $(COMMON_TESTOBJ) \ + $(COMMON_UNIT_TESTOBJ) \ + $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ + $(LIBDIR)/libsplinterdb.so + $(BINDIR)/$(UNITDIR)/splinterdb_stress_test: $(COMMON_TESTOBJ) \ $(COMMON_UNIT_TESTOBJ) \ $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ diff --git a/include/splinterdb/column_family.h b/include/splinterdb/column_family.h new file mode 100644 index 000000000..0895f7592 --- /dev/null +++ b/include/splinterdb/column_family.h @@ -0,0 +1,135 @@ +/* + * column_family.h -- + * + * The Column Family public API for SplinterDB. + * + * + */ + + +#ifndef _SPLINTERDB_COLUMN_FAMILY_H_ +#define _SPLINTERDB_COLUMN_FAMILY_H_ + +#include "splinterdb/splinterdb.h" +#include "splinterdb/public_platform.h" + +// Size of stack buffer we allocate for column family keys. +// This can be fairly large because these buffers are short +// lived. If keys are relatively small then the conversion +// can be done upon the stack. We use writable buffers to +// support larger keys. +#define CF_KEY_DEFAULT_SIZE 512 + +typedef uint32 column_family_id; + +typedef struct splinterdb_column_family { + column_family_id id; + splinterdb *kvs; +} splinterdb_column_family; + +typedef struct splinterdb_cf_iterator splinterdb_cf_iterator; + +// Initialize the cf_data_config and give a pointer to it to the user. +// This pointer is then passed to SplinterDB to add support for +// column families. The memory for the column_family_config is managed +// by SplinterDB. Not the user. +void +column_family_config_init(const uint64 max_key_size, // IN + data_config **cf_cfg // OUT +); + +// Delete the cf_data_config, freeing all associated memory. +// +// This should only be called after closing the SplinterDB instance +void +column_family_config_deinit(data_config *cf_cfg); + +// Create a new column family +// Returns a new column family struct +splinterdb_column_family +column_family_create(splinterdb *kvs, + const uint64 max_key_size, + data_config *data_cfg); + +// Delete the column family cf +// IMPORTANT: The user may NOT delete their data_config even after +// calling this function. All data_configs must persist +// for the lifetime of the SplinterDB instance +void +column_family_delete(splinterdb_column_family cf); + +// ====== SPLINTERDB Functions ====== +// We wrap these for column family support +// Column families and standard splinterdb should not be mixed +int +splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value); + +int +splinterdb_cf_delete(const splinterdb_column_family cf, slice key); + +int +splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta); + +// column family lookups + +void +splinterdb_cf_lookup_result_init(const splinterdb_column_family cf, // IN + splinterdb_lookup_result *result, // IN/OUT + uint64 buffer_len, // IN + char *buffer // IN +); + +void +splinterdb_cf_lookup_result_deinit(splinterdb_lookup_result *result); // IN + +_Bool +splinterdb_cf_lookup_found(const splinterdb_lookup_result *); // IN + +int +splinterdb_cf_lookup_result_value(const splinterdb_lookup_result *result, // IN + slice *value // OUT +); + +int +splinterdb_cf_lookup(const splinterdb_column_family cf, // IN + slice key, // IN + splinterdb_lookup_result *result // IN/OUT +); + + +// Range iterators for column families + +int +splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN + splinterdb_cf_iterator **cf_iter, // OUT + slice start_key // IN +); + +void +splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter); + +void +splinterdb_cf_iterator_prev(splinterdb_cf_iterator *cf_iter); + +void +splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter); + +_Bool +splinterdb_cf_iterator_valid(splinterdb_cf_iterator *cf_iter); + +_Bool +splinterdb_cf_iterator_can_prev(splinterdb_cf_iterator *cf_iter); + +_Bool +splinterdb_cf_iterator_can_next(splinterdb_cf_iterator *cf_iter); + +void +splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN + slice *key, // OUT + slice *value // OUT +); + +int +splinterdb_cf_iterator_status(const splinterdb_cf_iterator *cf_iter); + +#endif // _SPLINTERDB_COLUMN_FAMILY_H_ diff --git a/src/column_family.c b/src/column_family.c new file mode 100644 index 000000000..5aa0c8a10 --- /dev/null +++ b/src/column_family.c @@ -0,0 +1,530 @@ + +#include "platform.h" +#include "poison.h" + +#include "splinterdb/column_family.h" +#include "splinterdb/splinterdb.h" +#include "splinterdb_internal.h" +#include "util.h" + +#include + +// This is just a normal splinterdb_iterator +// but to have a clean interface, we pretend it's not +struct splinterdb_cf_iterator { + splinterdb_iterator iter; +}; + +typedef struct cf_data_config { + data_config general_config; + platform_semaphore table_sema; + column_family_id num_families; + column_family_id capacity; + data_config **config_table; +} cf_data_config; + +/* + * Extract errno.h -style status int from a platform_status + * + * Note this currently relies on the implementation of the splinterdb + * platform_linux. But at least it doesn't leak the dependency to callers. + */ +static inline int +platform_status_to_int(const platform_status status) // IN +{ + return status.r; +} + +// Some helper functions we'll use for managing the column family identifiers +// and the data config table +static inline column_family_id +get_cf_id(slice cf_key) +{ + // the cf id is a prefix of the key + const void *data = slice_data(cf_key); + column_family_id id; + memcpy(&id, data, sizeof(id)); + return id; +} + +static inline slice +userkey_to_cf_key(slice userkey, + column_family_id cf_id, + writable_buffer *cf_key_wb) +{ + uint64 key_len = slice_length(userkey); + const void *data = slice_data(userkey); + + // write the cf id and the user's key data to the writable_buffer + writable_buffer_append(cf_key_wb, sizeof(cf_id), &cf_id); + writable_buffer_append(cf_key_wb, key_len, data); + return writable_buffer_to_slice(cf_key_wb); +} + +static inline slice +cf_key_to_userkey(slice cf_key) +{ + uint64 key_len = slice_length(cf_key); + const void *data = slice_data(cf_key); + + return slice_create(key_len - sizeof(column_family_id), + data + sizeof(column_family_id)); +} + +static inline column_family_id +cfg_table_insert(splinterdb *kvs, cf_data_config *cf_cfg, data_config *data_cfg) +{ + platform_semaphore_wait(&cf_cfg->table_sema); + column_family_id new_id = cf_cfg->num_families; + cf_cfg->num_families += 1; + + // reallocate table memory if necessary + if (cf_cfg->capacity <= new_id) { + cf_cfg->capacity *= 2; + data_config **new_table = TYPED_ARRAY_MALLOC( + platform_get_heap_id(), new_table, cf_cfg->capacity); + if (new_table == NULL) { + platform_error_log("TYPED_ARRAY_MALLOC error\n"); + return -1; + } + + memcpy(new_table, + cf_cfg->config_table, + cf_cfg->num_families * sizeof(data_config *)); + data_config **old_table = cf_cfg->config_table; + cf_cfg->config_table = new_table; + + // wait until all threads have the change to observe this table change + // 1. make copy of counters + uint64 local_counter_copy[MAX_THREADS]; + for (int i = 0; i < MAX_THREADS; i++) + local_counter_copy[i] = kvs->spl->cfg_crit_count[i].counter; + + // 2. for each thread wait until copy is even or that current is > copy + for (int i = 0; i < MAX_THREADS; i++) { + if (local_counter_copy[i] % 2 == 1) + while (local_counter_copy[i] >= kvs->spl->cfg_crit_count[i].counter) + ; + } + // free the old table + platform_free(platform_get_heap_id(), old_table); + } + + // place new data_config in table + cf_cfg->config_table[new_id] = data_cfg; + platform_semaphore_post(&cf_cfg->table_sema); + + return new_id; +} + +// lookup a data_config in the table. +// Returns NULL if the column family ID does not exist +// otherwise, returns the associated data_config for the cf +static inline data_config * +cfg_table_lookup(cf_data_config *cf_cfg, column_family_id cf_id) +{ + if (cf_cfg->num_families <= cf_id) + return NULL; + + return cf_cfg->config_table[cf_id]; +} + +// Beginning of column family interface + +// Create a new column family +// Returns a new column family struct +splinterdb_column_family +column_family_create(splinterdb *kvs, + const uint64 max_key_size, + data_config *new_data_cfg) +{ + platform_assert(kvs->data_cfg->max_key_size + >= max_key_size + sizeof(column_family_id)); + + // convert from data_config to cf_data_config + cf_data_config *cf_cfg = (cf_data_config *)kvs->data_cfg; + + column_family_id new_id = cfg_table_insert(kvs, cf_cfg, new_data_cfg); + + // return new column family + splinterdb_column_family cf; + cf.id = new_id; + cf.kvs = kvs; + + return cf; +} + +// Delete the column family cf +void +column_family_delete(splinterdb_column_family cf) +{ + // TODO: Issue deletion messages to all keys within the column family! +} + +// SplinterDB Functions +// We wrap these for column family support +// Column families and standard splinterdb should not be mixed +int +splinterdb_cf_insert(const splinterdb_column_family cf, slice key, slice value) +{ + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's insert function and return + int rc = splinterdb_insert(cf.kvs, cf_key, value); + writable_buffer_deinit(&cf_key_wb); + return rc; +} + +int +splinterdb_cf_delete(const splinterdb_column_family cf, slice key) +{ + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's delete function and return + int rc = splinterdb_delete(cf.kvs, cf_key); + writable_buffer_deinit(&cf_key_wb); + return rc; +} + +int +splinterdb_cf_update(const splinterdb_column_family cf, slice key, slice delta) +{ + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's update function and return + int rc = splinterdb_update(cf.kvs, cf_key, delta); + writable_buffer_deinit(&cf_key_wb); + return rc; +} + +// column family lookups + +void +splinterdb_cf_lookup_result_init(const splinterdb_column_family cf, // IN + splinterdb_lookup_result *result, // IN/OUT + uint64 buffer_len, // IN + char *buffer // IN +) +{ + splinterdb_lookup_result_init(cf.kvs, result, buffer_len, buffer); +} + +void +splinterdb_cf_lookup_result_deinit(splinterdb_lookup_result *result) // IN +{ + splinterdb_lookup_result_deinit(result); +} + +_Bool +splinterdb_cf_lookup_found(const splinterdb_lookup_result *result) // IN +{ + return splinterdb_lookup_found(result); +} + +int +splinterdb_cf_lookup_result_value(const splinterdb_lookup_result *result, // IN + slice *value // OUT +) +{ + return splinterdb_lookup_result_value(result, value); +} + +int +splinterdb_cf_lookup(const splinterdb_column_family cf, // IN + slice key, // IN + splinterdb_lookup_result *result // IN/OUT +) +{ + // zero len key reserved, negative infinity + platform_assert(slice_length(key) > 0); + + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(key, cf.id, &cf_key_wb); + + // call splinter's lookup function and return + int rc = splinterdb_lookup(cf.kvs, cf_key, result); + if (rc != 0) + return rc; + writable_buffer_deinit(&cf_key_wb); + return 0; +} + +// Range iterators for column families + +int +splinterdb_cf_iterator_init(const splinterdb_column_family cf, // IN + splinterdb_cf_iterator **cf_iter, // OUT + slice user_start_key // IN +) +{ + // assert that this is a valid column family + cf_data_config *cf_cfg = (cf_data_config *)cf.kvs->data_cfg; + platform_assert(cfg_table_lookup(cf_cfg, cf.id) != NULL); + + // The minimum key contains no key data only consists of + // the column id. + // This is what a NULL key will become + // convert to column family key by prefixing the cf id + char key_buf[CF_KEY_DEFAULT_SIZE]; + writable_buffer cf_key_wb; + writable_buffer_init_with_buffer( + &cf_key_wb, platform_get_heap_id(), CF_KEY_DEFAULT_SIZE, key_buf, 0); + slice cf_key = userkey_to_cf_key(user_start_key, cf.id, &cf_key_wb); + + column_family_id next_cf = cf.id + 1; + slice curr_cf_key = slice_create(sizeof(column_family_id), (void *)&cf.id); + slice next_cf_key = slice_create(sizeof(column_family_id), (void *)&next_cf); + + splinterdb_iterator *it = TYPED_MALLOC(cf.kvs->spl->heap_id, it); + if (it == NULL) { + platform_error_log("TYPED_MALLOC error\n"); + return platform_status_to_int(STATUS_NO_MEMORY); + } + it->last_rc = STATUS_OK; + trunk_range_iterator *range_itor = &(it->sri); + + key min_key = key_create_from_slice(curr_cf_key); + key start_key = key_create_from_slice(cf_key); + key end_key = key_create_from_slice(next_cf_key); + platform_status rc = trunk_range_iterator_init(cf.kvs->spl, + range_itor, + min_key, + end_key, + start_key, + greater_than_or_equal, + UINT64_MAX); + if (!SUCCESS(rc)) { + platform_free(cf.kvs->spl->heap_id, it); + writable_buffer_deinit(&cf_key_wb); + return platform_status_to_int(rc); + } + it->parent = cf.kvs; + + *cf_iter = (splinterdb_cf_iterator *)it; + writable_buffer_deinit(&cf_key_wb); + return EXIT_SUCCESS; +} + +void +splinterdb_cf_iterator_deinit(splinterdb_cf_iterator *cf_iter) +{ + splinterdb_iterator_deinit(&cf_iter->iter); +} + +_Bool +splinterdb_cf_iterator_valid(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_valid(&cf_iter->iter); +} + +_Bool +splinterdb_cf_iterator_can_prev(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_can_prev(&cf_iter->iter); +} + +_Bool +splinterdb_cf_iterator_can_next(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_can_next(&cf_iter->iter); +} + +void +splinterdb_cf_iterator_get_current(splinterdb_cf_iterator *cf_iter, // IN + slice *key, // OUT + slice *value // OUT +) +{ + splinterdb_iterator_get_current(&cf_iter->iter, key, value); + *key = cf_key_to_userkey(*key); +} + +void +splinterdb_cf_iterator_next(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_next(&cf_iter->iter); +} + +void +splinterdb_cf_iterator_prev(splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_prev(&cf_iter->iter); +} + +int +splinterdb_cf_iterator_status(const splinterdb_cf_iterator *cf_iter) +{ + return splinterdb_iterator_status(&cf_iter->iter); +} + + +static int +cf_key_compare(const data_config *cfg, slice key1, slice key2) +{ + // sort first by the column family ids + column_family_id cf_id1 = get_cf_id(key1); + column_family_id cf_id2 = get_cf_id(key2); + + if (cf_id1 < cf_id2) + return -1; + if (cf_id1 > cf_id2) + return 1; + + // Now we are comparing two keys from the same column family + // so we will use the user defined func for this cf + slice userkey1 = cf_key_to_userkey(key1); + slice userkey2 = cf_key_to_userkey(key2); + + // empty keys are defined to be the minimum among the column family + if (slice_length(userkey1) == 0 && slice_length(userkey2) == 0) + return 0; + if (slice_length(userkey1) == 0) + return -1; + if (slice_length(userkey2) == 0) + return 1; + + // get the data_config for this column family + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id1); + platform_assert(cf_cfg != NULL); + + // call column family's function + return cf_cfg->key_compare(cf_cfg, userkey1, userkey2); +} + +static int +cf_merge_tuples(const data_config *cfg, + slice key, + message old_raw_message, + merge_accumulator *new_message) +{ + column_family_id cf_id = get_cf_id(key); + + // get the data_config for this column family and call its function + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); + platform_assert(cf_cfg != NULL); + + // call column family's function + return cf_cfg->merge_tuples( + cf_cfg, cf_key_to_userkey(key), old_raw_message, new_message); +} + +static int +cf_merge_tuples_final(const data_config *cfg, + slice key, + merge_accumulator *oldest_data // IN/OUT +) +{ + column_family_id cf_id = get_cf_id(key); + + // get the data_config for this column family + data_config *cf_cfg = cfg_table_lookup((cf_data_config *)cfg, cf_id); + platform_assert(cf_cfg != NULL); + + // call column family's function + return cf_cfg->merge_tuples_final( + cf_cfg, cf_key_to_userkey(key), oldest_data); +} + +// These functions we don't allow the column families to overwrite + +static void +cf_key_to_string(const data_config *cfg, slice key, char *str, size_t max_len) +{ + debug_hex_encode(str, max_len, slice_data(key), slice_length(key)); +} + +static void +cf_message_to_string(const data_config *cfg, + message msg, + char *str, + size_t max_len) +{ + debug_hex_encode(str, max_len, message_data(msg), message_length(msg)); +} + +// Initialize the data_config stored in the cf_data_config. +// Its data_config is then passed to SplinterDB to add support for +// column families. The memory for the column_family_config is managed +// by splinter. Not the user. +void +column_family_config_init(const uint64 max_key_size, // IN + data_config **data_cfg // OUT +) +{ + data_config cfg = { + .max_key_size = max_key_size + sizeof(column_family_id), + .key_compare = cf_key_compare, + .key_hash = platform_hash32, + .merge_tuples = cf_merge_tuples, + .merge_tuples_final = cf_merge_tuples_final, + .key_to_string = cf_key_to_string, + .message_to_string = cf_message_to_string, + }; + + platform_heap_id hid = platform_get_heap_id(); + cf_data_config *cf_cfg = TYPED_MALLOC(hid, cf_cfg); + + cf_cfg->general_config = cfg; + cf_cfg->num_families = 0; + cf_cfg->capacity = 1; + cf_cfg->config_table = TYPED_MALLOC(hid, cf_cfg->config_table); + + platform_semaphore_init(&cf_cfg->table_sema, 1, hid); + *data_cfg = (data_config *)cf_cfg; +} + +void +column_family_config_deinit(data_config *data_cfg) +{ + cf_data_config *cf_cfg = (cf_data_config *)data_cfg; + + // user deallocates their cf's data configs + // we need to deallocate everything else + platform_semaphore_destroy(&cf_cfg->table_sema); + platform_free(platform_get_heap_id(), cf_cfg->config_table); + platform_free(platform_get_heap_id(), cf_cfg); +} diff --git a/src/splinterdb.c b/src/splinterdb.c index 4c2656c2c..41490e7ec 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -14,12 +14,10 @@ */ #include "splinterdb/splinterdb.h" +#include "splinterdb_internal.h" + #include "platform.h" -#include "clockcache.h" -#include "rc_allocator.h" -#include "trunk.h" #include "btree_private.h" -#include "shard_log.h" #include "poison.h" const char *BUILD_VERSION = "splinterdb_build_version " GIT_VERSION; @@ -29,25 +27,6 @@ splinterdb_get_version() return BUILD_VERSION; } -typedef struct splinterdb { - task_system *task_sys; - io_config io_cfg; - platform_io_handle io_handle; - allocator_config allocator_cfg; - rc_allocator allocator_handle; - clockcache_config cache_cfg; - clockcache cache_handle; - shard_log_config log_cfg; - task_system_config task_cfg; - allocator_root_id trunk_id; - trunk_config trunk_cfg; - trunk_handle *spl; - platform_heap_handle heap_handle; // for platform_buffer_create - platform_heap_id heap_id; - data_config *data_cfg; -} splinterdb; - - /* * Extract errno.h -style status int from a platform_status * @@ -585,13 +564,6 @@ splinterdb_lookup(const splinterdb *kvs, // IN return platform_status_to_int(status); } - -struct splinterdb_iterator { - trunk_range_iterator sri; - platform_status last_rc; - const splinterdb *parent; -}; - int splinterdb_iterator_init(const splinterdb *kvs, // IN splinterdb_iterator **iter, // OUT diff --git a/src/splinterdb_internal.h b/src/splinterdb_internal.h new file mode 100644 index 000000000..304701490 --- /dev/null +++ b/src/splinterdb_internal.h @@ -0,0 +1,41 @@ + +/* + * splinterdb_internal.h -- + * + * Private struct declarations for splinterdb that we'd like + * to share among multiple files, but not share with users. + */ + +#ifndef SPLINTERDB_SPLINTERDB_INTERNAL_H_ +#define SPLINTERDB_SPLINTERDB_INTERNAL_H_ + +#include "trunk.h" +#include "clockcache.h" +#include "rc_allocator.h" +#include "shard_log.h" + +typedef struct splinterdb { + task_system *task_sys; + io_config io_cfg; + platform_io_handle io_handle; + allocator_config allocator_cfg; + rc_allocator allocator_handle; + clockcache_config cache_cfg; + clockcache cache_handle; + shard_log_config log_cfg; + task_system_config task_cfg; + allocator_root_id trunk_id; + trunk_config trunk_cfg; + trunk_handle *spl; + platform_heap_handle heap_handle; // for platform_buffer_create + platform_heap_id heap_id; + data_config *data_cfg; +} splinterdb; + +struct splinterdb_iterator { + trunk_range_iterator sri; + platform_status last_rc; + const splinterdb *parent; +}; + +#endif // SPLINTERDB_SPLINTERDB_INTERNAL_H_ diff --git a/src/trunk.c b/src/trunk.c index 2bc1447eb..c03bebb87 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -674,6 +674,33 @@ trunk_node_is_index(trunk_node *node) return !trunk_node_is_leaf(node); } +// call this function when entering data_config critical functions. +static inline void +trunk_cfg_counter_enter(trunk_handle *spl) +{ + threadid tid = platform_get_tid(); + debug_assert(tid < MAX_THREADS); + debug_assert(spl->cfg_crit_count[tid].depth != 0 + || spl->cfg_crit_count[tid].counter % 2 == 0); + // if depth is 0, increment the counter + spl->cfg_crit_count[tid].counter += spl->cfg_crit_count[tid].depth == 0; + ++spl->cfg_crit_count[tid].depth; +} + +// call this funciton when exiting data_config critical functions. +static inline void +trunk_cfg_counter_exit(trunk_handle *spl) +{ + threadid tid = platform_get_tid(); + debug_assert(tid < MAX_THREADS); + debug_assert(spl->cfg_crit_count[tid].depth > 0); + debug_assert(spl->cfg_crit_count[tid].depth != 0 + || spl->cfg_crit_count[tid].counter % 2 == 1); + --spl->cfg_crit_count[tid].depth; + // if depth is 0, increment the counter + spl->cfg_crit_count[tid].counter += spl->cfg_crit_count[tid].depth == 0; +} + /* *----------------------------------------------------------------------------- * Compaction Requests @@ -3789,7 +3816,9 @@ static void trunk_memtable_flush_internal_virtual(void *arg, void *scratch) { trunk_memtable_args *mt_args = arg; + trunk_cfg_counter_enter(mt_args->spl); trunk_memtable_flush_internal(mt_args->spl, mt_args->generation); + trunk_cfg_counter_exit(mt_args->spl); } /* @@ -4226,6 +4255,7 @@ trunk_bundle_build_filters(void *arg, void *scratch) trunk_compact_bundle_req *compact_req = (trunk_compact_bundle_req *)arg; trunk_handle *spl = compact_req->spl; + trunk_cfg_counter_enter(spl); bool32 should_continue_build_filters = TRUE; while (should_continue_build_filters) { trunk_node node; @@ -4267,6 +4297,7 @@ trunk_bundle_build_filters(void *arg, void *scratch) spl, &stream, "out of order, reequeuing\n"); trunk_close_log_stream_if_enabled(spl, &stream); trunk_node_unget(spl->cc, &node); + trunk_cfg_counter_exit(spl); return; } @@ -4396,6 +4427,7 @@ trunk_bundle_build_filters(void *arg, void *scratch) key_buffer_deinit(&compact_req->end_key); platform_free(spl->heap_id, compact_req); trunk_maybe_reclaim_space(spl); + trunk_cfg_counter_exit(spl); return; } @@ -5089,6 +5121,8 @@ trunk_compact_bundle(void *arg, void *scratch_buf) trunk_handle *spl = req->spl; threadid tid; + trunk_cfg_counter_enter(spl); // entering data_config critical region + /* * 1. Acquire node read lock */ @@ -5138,6 +5172,7 @@ trunk_compact_bundle(void *arg, void *scratch_buf) spl->stats[tid].compaction_time_wasted_ns[height] += platform_timestamp_elapsed(compaction_start); } + trunk_cfg_counter_exit(spl); return; } @@ -5414,6 +5449,7 @@ trunk_compact_bundle(void *arg, void *scratch_buf) out: trunk_log_stream_if_enabled(spl, &stream, "\n"); trunk_close_log_stream_if_enabled(spl, &stream); + trunk_cfg_counter_exit(spl); // exiting data_config critical region } /* @@ -6073,6 +6109,8 @@ trunk_range_iterator_init(trunk_handle *spl, range_itor->can_prev = TRUE; range_itor->can_next = TRUE; + trunk_cfg_counter_enter(spl); + if (trunk_key_compare(spl, min_key, start_key) > 0) { // in bounds, start at min start_key = min_key; @@ -6244,6 +6282,7 @@ trunk_range_iterator_init(trunk_handle *spl, MERGE_FULL, &range_itor->merge_itor); if (!SUCCESS(rc)) { + trunk_cfg_counter_exit(spl); return rc; } @@ -6264,6 +6303,7 @@ trunk_range_iterator_init(trunk_handle *spl, start_type, range_itor->num_tuples); if (!SUCCESS(rc)) { + trunk_cfg_counter_exit(spl); return rc; } } else { @@ -6283,6 +6323,7 @@ trunk_range_iterator_init(trunk_handle *spl, start_type, range_itor->num_tuples); if (!SUCCESS(rc)) { + trunk_cfg_counter_exit(spl); return rc; } } else { @@ -6291,6 +6332,7 @@ trunk_range_iterator_init(trunk_handle *spl, iterator_can_next(&range_itor->merge_itor->super); } } + trunk_cfg_counter_exit(spl); return rc; } @@ -6308,10 +6350,11 @@ trunk_range_iterator_next(iterator *itor) trunk_range_iterator *range_itor = (trunk_range_iterator *)itor; debug_assert(range_itor != NULL); platform_assert(range_itor->can_next); + trunk_cfg_counter_enter(range_itor->spl); platform_status rc = iterator_next(&range_itor->merge_itor->super); if (!SUCCESS(rc)) { - return rc; + goto out; } range_itor->num_tuples++; range_itor->can_prev = TRUE; @@ -6322,21 +6365,21 @@ trunk_range_iterator_next(iterator *itor) range_itor->spl->heap_id, key_buffer_key(&range_itor->min_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } KEY_CREATE_LOCAL_COPY(rc, max_key, range_itor->spl->heap_id, key_buffer_key(&range_itor->max_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } KEY_CREATE_LOCAL_COPY(rc, local_max_key, range_itor->spl->heap_id, key_buffer_key(&range_itor->local_max_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } // if there is more data to get, rebuild the iterator for next leaf @@ -6351,13 +6394,15 @@ trunk_range_iterator_next(iterator *itor) greater_than_or_equal, temp_tuples); if (!SUCCESS(rc)) { - return rc; + goto out; } debug_assert(range_itor->can_next == iterator_can_next(&range_itor->merge_itor->super)); } } +out: + trunk_cfg_counter_exit(range_itor->spl); return STATUS_OK; } @@ -6367,10 +6412,11 @@ trunk_range_iterator_prev(iterator *itor) trunk_range_iterator *range_itor = (trunk_range_iterator *)itor; debug_assert(itor != NULL); platform_assert(range_itor->can_prev); + trunk_cfg_counter_enter(range_itor->spl); platform_status rc = iterator_prev(&range_itor->merge_itor->super); if (!SUCCESS(rc)) { - return rc; + goto out; } range_itor->num_tuples++; range_itor->can_next = TRUE; @@ -6381,21 +6427,21 @@ trunk_range_iterator_prev(iterator *itor) range_itor->spl->heap_id, key_buffer_key(&range_itor->min_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } KEY_CREATE_LOCAL_COPY(rc, max_key, range_itor->spl->heap_id, key_buffer_key(&range_itor->max_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } KEY_CREATE_LOCAL_COPY(rc, local_min_key, range_itor->spl->heap_id, key_buffer_key(&range_itor->local_min_key)); if (!SUCCESS(rc)) { - return rc; + goto out; } // if there is more data to get, rebuild the iterator for prev leaf @@ -6409,14 +6455,16 @@ trunk_range_iterator_prev(iterator *itor) less_than, range_itor->num_tuples); if (!SUCCESS(rc)) { - return rc; + goto out; } debug_assert(range_itor->can_prev == iterator_can_prev(&range_itor->merge_itor->super)); } } - return STATUS_OK; +out: + trunk_cfg_counter_exit(range_itor->spl); + return rc; } bool32 @@ -6647,6 +6695,7 @@ trunk_insert(trunk_handle *spl, key tuple_key, message data) if (trunk_max_key_size(spl) < key_length(tuple_key)) { return STATUS_BAD_PARAM; } + trunk_cfg_counter_enter(spl); if (message_class(data) == MESSAGE_TYPE_DELETE) { data = DELETE_MESSAGE; @@ -6682,6 +6731,7 @@ trunk_insert(trunk_handle *spl, key tuple_key, message data) } out: + trunk_cfg_counter_exit(spl); return rc; } @@ -6855,6 +6905,7 @@ trunk_lookup(trunk_handle *spl, key target, merge_accumulator *result) // also handles switch to READY ^^^^^ merge_accumulator_set_to_null(result); + trunk_cfg_counter_enter(spl); memtable_begin_lookup(spl->mt_ctxt); bool32 found_in_memtable = FALSE; @@ -6934,6 +6985,7 @@ trunk_lookup(trunk_handle *spl, key target, merge_accumulator *result) merge_accumulator_set_to_null(result); } + trunk_cfg_counter_exit(spl); return STATUS_OK; } @@ -7059,6 +7111,8 @@ trunk_lookup_async(trunk_handle *spl, // IN cache_async_result res = 0; threadid tid; + trunk_cfg_counter_enter(spl); + #if TRUNK_DEBUG cache_enable_sync_get(spl->cc, FALSE); #endif @@ -7500,6 +7554,7 @@ trunk_lookup_async(trunk_handle *spl, // IN cache_enable_sync_get(spl->cc, TRUE); #endif + trunk_cfg_counter_exit(spl); return res; } @@ -7511,6 +7566,7 @@ trunk_range(trunk_handle *spl, tuple_function func, void *arg) { + trunk_cfg_counter_enter(spl); trunk_range_iterator *range_itor = TYPED_MALLOC(spl->heap_id, range_itor); platform_status rc = trunk_range_iterator_init(spl, range_itor, @@ -7538,6 +7594,7 @@ trunk_range(trunk_handle *spl, destroy_range_itor: trunk_range_iterator_deinit(range_itor); platform_free(spl->heap_id, range_itor); + trunk_cfg_counter_exit(spl); return rc; } diff --git a/src/trunk.h b/src/trunk.h index 15b6ad3a2..d1c58eb9d 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -215,6 +215,20 @@ struct trunk_handle { uint64 counter; } PLATFORM_CACHELINE_ALIGNED task_countup[MAX_THREADS]; + /* + * Per thread and per splinter table, count entering and exiting regions + * critical to the data_config. + * + * Specifically the race condition is if a trunk task or function is in the + * middle of performing a key_compare or merge function call in the + * data_config and then the memory for the data_config is free'd. This could + * happen without this change when deleting a column family. + */ + struct { + volatile uint64 counter; + uint64 depth; // only incr counter if we're the highest scope to do so + } PLATFORM_CACHELINE_ALIGNED cfg_crit_count[MAX_THREADS]; + // space rec queue srq srq; diff --git a/test.sh b/test.sh index dd0db1bef..c867c1ccf 100755 --- a/test.sh +++ b/test.sh @@ -534,6 +534,7 @@ function test_make_run_tests() { function run_fast_unit_tests() { "$BINDIR"/unit/splinterdb_quick_test + "$BINDIR"/unit/column_family_test "$BINDIR"/unit/btree_test "$BINDIR"/unit/util_test "$BINDIR"/unit/misc_test diff --git a/tests/unit/column_family_test.c b/tests/unit/column_family_test.c new file mode 100644 index 000000000..01c8cf6cd --- /dev/null +++ b/tests/unit/column_family_test.c @@ -0,0 +1,658 @@ +/* + * ----------------------------------------------------------------------------- + * splinter_cf_test.c -- + * + * Quick test of the Column Family public API for SplinterDB + * + * ----------------------------------------------------------------------------- + */ + +#include "splinterdb/column_family.h" +#include "splinterdb/data.h" +#include "splinterdb/public_platform.h" +#include "splinterdb/default_data_config.h" +#include "unit_tests.h" +#include "util.h" +#include "test_data.h" +#include "ctest.h" // This is required for all test-case files. +#include "btree.h" // for MAX_INLINE_MESSAGE_SIZE + +#define TEST_MAX_KEY_SIZE 16 +#define TEST_MAX_VALUE_SIZE 32 + + +CTEST_DATA(column_family) +{ + splinterdb *kvsb; + splinterdb_config cfg; + + // the global_data_cfg is used to route to the right data_config + // for each column family + data_config *global_data_cfg; + + // default data config for when we don't want to be special + data_config default_data_cfg; +}; + +CTEST_SETUP(column_family) +{ + default_data_config_init(TEST_MAX_KEY_SIZE, &data->default_data_cfg); + column_family_config_init(TEST_MAX_KEY_SIZE, &data->global_data_cfg); + data->cfg = (splinterdb_config){.filename = TEST_DB_NAME, + .cache_size = 1024 * Mega, + .disk_size = 2048 * Mega, + .data_cfg = data->global_data_cfg}; + + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(TEST_MAX_VALUE_SIZE + < MAX_INLINE_MESSAGE_SIZE(LAIO_DEFAULT_PAGE_SIZE)); +} + +CTEST_TEARDOWN(column_family) +{ + if (data->kvsb) { + splinterdb_close(&data->kvsb); + } + column_family_config_deinit(data->global_data_cfg); +} + +/* + * + * Basic test case that ensures we can create and use a single column family + * correctly Tests: + * - column_family_create() + * - column_family_delete() + * - splinterdb_cf_insert() + * - splinterdb_cf_delete() + * - splinterdb_cf_lookup() + * + * We evaluate that these functions perform as expected and provide the correct + * outputs + */ +CTEST2(column_family, test_single_column) +{ + // create a column family + splinterdb_column_family cf = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create some basic data to insert and lookup + char *key_data = "some-key"; + size_t key_len = strlen(key_data); + slice user_key = slice_create(key_len, key_data); + + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cf, &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + + // Lookup of a non-existent key should return not-found. + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + static char *to_insert_data = "some-value"; + size_t to_insert_len = strlen(to_insert_data); + slice to_insert = slice_create(to_insert_len, to_insert_data); + + // Basic insert of new key should succeed. + rc = splinterdb_cf_insert(cf, user_key, to_insert); + ASSERT_EQUAL(0, rc); + + // Lookup of inserted key should succeed. + rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return inserted value + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(to_insert_len, slice_length(value)); + ASSERT_STREQN(to_insert_data, slice_data(value), slice_length(value)); + + // Delete key + rc = splinterdb_cf_delete(cf, user_key); + ASSERT_EQUAL(0, rc); + + // Deleted key should not be found + rc = splinterdb_cf_lookup(cf, user_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + splinterdb_cf_lookup_result_deinit(&result); + column_family_delete(cf); +} + +/* + * Ensure keys and values of maximum length work + * with column families + */ +CTEST2(column_family, test_max_length) +{ + // create a column family + splinterdb_column_family cf = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + char large_key_data[TEST_MAX_KEY_SIZE]; + size_t large_key_len = TEST_MAX_KEY_SIZE; + memset(large_key_data, 'k', TEST_MAX_KEY_SIZE); + slice large_key = slice_create(large_key_len, large_key_data); + + char large_val_data[TEST_MAX_VALUE_SIZE]; + size_t large_val_len = TEST_MAX_VALUE_SIZE; + memset(large_val_data, 'v', TEST_MAX_VALUE_SIZE); + slice large_val = slice_create(large_val_len, large_val_data); + + // Insert of large key and value should exceed + int rc = splinterdb_cf_insert(cf, large_key, large_val); + ASSERT_EQUAL(0, rc); + + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cf, &result, 0, NULL); + + // Lookup of inserted key should succeed. + rc = splinterdb_cf_lookup(cf, large_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return inserted value + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(large_val_len, slice_length(value)); + ASSERT_STREQN(large_val_data, slice_data(value), slice_length(value)); + + // Delete key + rc = splinterdb_cf_delete(cf, large_key); + ASSERT_EQUAL(0, rc); + + // Deleted key should not be found + rc = splinterdb_cf_lookup(cf, large_key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_cf_lookup_found(&result)); + + splinterdb_cf_lookup_result_deinit(&result); + column_family_delete(cf); +} + +/* + * Test key/value operations upon multiple column families. + * Ensure that the keys can be operated upon independently. + */ +CTEST2(column_family, test_multiple_cf_same_key) +{ + // create a few column families + splinterdb_column_family cf0 = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf1 = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf2 = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + splinterdb_column_family cf3 = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // Insert a single key to each column family + char key_data[] = "key"; + char val0_data[] = "val0"; + char val1_data[] = "val1"; + char val2_data[] = "val2"; + char val3_data[] = "val3"; + slice key = slice_create(3, key_data); + slice val0 = slice_create(4, val0_data); + slice val1 = slice_create(4, val1_data); + slice val2 = slice_create(4, val2_data); + slice val3 = slice_create(4, val3_data); + + slice values[] = {val0, val1, val2, val3}; + splinterdb_column_family cfs[] = {cf0, cf1, cf2, cf3}; + + // Perform insertions + for (int idx = 0; idx < 4; idx++) { + splinterdb_cf_insert(cfs[idx], key, values[idx]); + } + + // lookup the key from each column family + // and ensure the right value is returned + for (int idx = 0; idx < 4; idx++) { + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cfs[idx], &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cfs[idx], key, &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return correct values + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(slice_length(values[idx]), slice_length(value)); + ASSERT_STREQN( + slice_data(values[idx]), slice_data(value), slice_length(value)); + + splinterdb_cf_lookup_result_deinit(&result); + } + column_family_delete(cf0); + column_family_delete(cf1); + column_family_delete(cf2); + column_family_delete(cf3); +} + +/* + * Custom key compare function that reverses the keys + */ +static int +rev_key_compare(const data_config *cfg, slice key1, slice key2) +{ + platform_assert(slice_data(key1) != NULL); + platform_assert(slice_data(key2) != NULL); + + return slice_lex_cmp(key2, key1); +} + +/* + * Test multiple column families with range iterators + * ensure that keys are found in the order defined by their + * custom key comparison functions + */ +CTEST2(column_family, test_multiple_cf_range) +{ + // create the default column family + splinterdb_column_family cf_default = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create a config with a reversed key compare function + // and create a column family that will reverse the keys + data_config rev_data_config; + default_data_config_init(TEST_MAX_KEY_SIZE, &rev_data_config); + rev_data_config.key_compare = rev_key_compare; + + splinterdb_column_family cf_reverse = + column_family_create(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + + // Insert a few key/value pairs to each cf + char key1_data[] = "aaaa"; + char key2_data[] = "bbbb"; + char key3_data[] = "cccc"; + char key4_data[] = "dddd"; + char cf1_value[] = "val-in-cf1"; + char cf2_value[] = "val-in-cf2"; + + slice key1 = slice_create(4, key1_data); + slice key2 = slice_create(4, key2_data); + slice key3 = slice_create(4, key3_data); + slice key4 = slice_create(4, key4_data); + slice val1 = slice_create(10, cf1_value); + slice val2 = slice_create(10, cf2_value); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key1, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key2, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key3, val1)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, key4, val1)); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key1, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key2, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key3, val2)); + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, key4, val2)); + + // Perform a range query over all cf1 keys + splinterdb_cf_iterator *it; + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_default, &it, NULL_SLICE)); + + slice keys[] = {key1, key2, key3, key4}; + slice key; + slice val; + int idx = 0; + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + splinterdb_cf_iterator_get_current(it, &key, &val); + ASSERT_EQUAL(slice_length(keys[idx]), slice_length(key)); + ASSERT_STREQN(slice_data(keys[idx]), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(val1), slice_length(val)); + ASSERT_STREQN(slice_data(val1), slice_data(val), slice_length(val)); + ++idx; + } + ASSERT_EQUAL(4, idx); + + splinterdb_cf_iterator_deinit(it); + + // Perform a range query over all cf2 keys + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf_reverse, &it, NULL_SLICE)); + + idx = 0; + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + splinterdb_cf_iterator_get_current(it, &key, &val); + ASSERT_EQUAL(slice_length(keys[3 - idx]), slice_length(key)); + ASSERT_STREQN( + slice_data(keys[3 - idx]), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(val2), slice_length(val)); + ASSERT_STREQN(slice_data(val2), slice_data(val), slice_length(val)); + ++idx; + } + ASSERT_EQUAL(4, idx); + ASSERT_FALSE(splinterdb_cf_iterator_can_next(it)); + ASSERT_TRUE(splinterdb_cf_iterator_can_prev(it)); + splinterdb_cf_iterator_prev(it); + --idx; + + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_prev(it)) { + splinterdb_cf_iterator_get_current(it, &key, &val); + ASSERT_EQUAL(slice_length(keys[3 - idx]), slice_length(key)); + ASSERT_STREQN( + slice_data(keys[3 - idx]), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(val2), slice_length(val)); + ASSERT_STREQN(slice_data(val2), slice_data(val), slice_length(val)); + --idx; + } + ASSERT_EQUAL(-1, idx); + + splinterdb_cf_iterator_deinit(it); + column_family_delete(cf_default); + column_family_delete(cf_reverse); +} + + +/* + * These functions implement merge functionality so that we can test the update + * function within column families + */ + +// merge two messages, with result in new_message +static int +merge_tuples(const data_config *cfg, + slice key, + message old_message, + merge_accumulator *new_message) +{ + platform_assert(slice_data(key) != NULL); + message_type type = old_message.type; + writable_buffer *wb = &new_message->data; + + // extract value slices + slice old_value = old_message.data; + slice new_value = writable_buffer_to_slice(wb); + + // use the cfg's key compare function to find maximal value + // and retain that value + if (cfg->key_compare(cfg, old_value, new_value) < 0) { + platform_status rc = writable_buffer_copy_slice(wb, new_value); + if (!SUCCESS(rc)) + return rc.r; + } else { + platform_status rc = writable_buffer_copy_slice(wb, old_value); + if (!SUCCESS(rc)) + return rc.r; + } + + if (type == MESSAGE_TYPE_INSERT) + new_message->type = MESSAGE_TYPE_INSERT; + else + new_message->type = MESSAGE_TYPE_UPDATE; + + return 0; +} + +static int +merge_tuple_final(const data_config *cfg, + slice key, + merge_accumulator *oldest_message) +{ + platform_assert(slice_data(key) != NULL); + + // simply set type to INSERT + oldest_message->type = MESSAGE_TYPE_INSERT; + return 0; +} + + +CTEST2(column_family, multiple_cf_with_updates) +{ + // create the default column family + data->default_data_cfg.merge_tuples = merge_tuples; + data->default_data_cfg.merge_tuples_final = merge_tuple_final; + splinterdb_column_family cf_default = column_family_create( + data->kvsb, TEST_MAX_KEY_SIZE, &data->default_data_cfg); + + // create a config with a reversed key compare function + // and create a column family that will reverse the keys + data_config rev_data_config; + default_data_config_init(TEST_MAX_KEY_SIZE, &rev_data_config); + rev_data_config.key_compare = rev_key_compare; + rev_data_config.merge_tuples = merge_tuples; + rev_data_config.merge_tuples_final = merge_tuple_final; + + splinterdb_column_family cf_reverse = + column_family_create(data->kvsb, TEST_MAX_KEY_SIZE, &rev_data_config); + + // Insert a few key/value pairs to each cf + char key1_data[] = "aaaa"; + char key2_data[] = "bbbb"; + char key3_data[] = "cccc"; + char key4_data[] = "dddd"; + char cf1_value[] = "val-in-cf1"; + char cf2_value[] = "val-in-cf2"; + + slice key1 = slice_create(4, key1_data); + slice key2 = slice_create(4, key2_data); + slice key3 = slice_create(4, key3_data); + slice key4 = slice_create(4, key4_data); + slice val1 = slice_create(10, cf1_value); + slice val2 = slice_create(10, cf2_value); + + slice keys[] = {key1, key2, key3, key4}; + + for (int i = 0; i < 4; i++) + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_default, keys[i], val1)); + + for (int i = 0; i < 4; i++) + ASSERT_EQUAL(0, splinterdb_cf_insert(cf_reverse, keys[i], val2)); + + // Now update these key-value pairs + char small_val[] = "aaaaaa-cf2"; + char big_val[] = "zzzzzz-cf1"; + + slice new_val1 = slice_create(10, big_val); + slice new_val2 = slice_create(10, small_val); + + // apply both updates to all keys in cf_default + for (int i = 0; i < 4; i++) { + ASSERT_EQUAL(0, splinterdb_cf_update(cf_default, keys[i], new_val1)); + ASSERT_EQUAL(0, splinterdb_cf_update(cf_default, keys[i], new_val2)); + } + + // apply both updates to all keys in cf_reverse + for (int i = 0; i < 4; i++) { + ASSERT_EQUAL(0, splinterdb_cf_update(cf_reverse, keys[i], new_val1)); + ASSERT_EQUAL(0, splinterdb_cf_update(cf_reverse, keys[i], new_val2)); + } + + splinterdb_column_family cfs[] = {cf_default, cf_reverse}; + + // now lookup each key in both cfs + for (int idx = 0; idx < 2; idx++) { + for (int i = 0; i < 4; i++) { + splinterdb_lookup_result result; + splinterdb_cf_lookup_result_init(cfs[idx], &result, 0, NULL); + + int rc = splinterdb_cf_lookup(cfs[idx], keys[i], &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_cf_lookup_found(&result)); + + // Lookup should return correct values + slice value; + rc = splinterdb_cf_lookup_result_value(&result, &value); + ASSERT_EQUAL(0, rc); + if (idx == 0) { + ASSERT_EQUAL(slice_length(new_val1), slice_length(value)); + ASSERT_STREQN( + slice_data(new_val1), slice_data(value), slice_length(value)); + } else { + ASSERT_EQUAL(slice_length(new_val2), slice_length(value)); + ASSERT_STREQN( + slice_data(new_val2), slice_data(value), slice_length(value)); + } + + splinterdb_cf_lookup_result_deinit(&result); + } + } + column_family_delete(cf_default); + column_family_delete(cf_reverse); +} + +typedef struct do_cf_args { + splinterdb *kvs; + data_config *data_cfg; + int inserts; + int deletes; +} do_cf_args; + +// function to create, insert to, delete from, and destroy a cf +// performs a variable amount of insertion/deletions +void * +do_cf(do_cf_args *args) +{ + splinterdb *kvs = args->kvs; + data_config *data_cfg = args->data_cfg; + int inserts = args->inserts; + int deletes = args->deletes; + + if (deletes > inserts) + deletes = inserts; + + if (inserts < 0) { + deletes = 0; + inserts = 0; + } + + splinterdb_register_thread(kvs); + splinterdb_column_family cf = + column_family_create(kvs, TEST_MAX_KEY_SIZE, data_cfg); + + // perform insertions + for (int i = 0; i < inserts; i++) { + char key_data[TEST_MAX_KEY_SIZE]; + char val_data[TEST_MAX_VALUE_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", i); + snprintf(val_data, sizeof(val_data), "val-%06x-%02x", i, cf.id); + slice key = slice_create(strlen(key_data), key_data); + slice val = slice_create(strlen(val_data), val_data); + + ASSERT_EQUAL(0, splinterdb_cf_insert(cf, key, val)); + } + + // perform deletions + for (int i = 0; i < deletes; i++) { + char key_data[TEST_MAX_KEY_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", i); + slice key = slice_create(strlen(key_data), key_data); + + ASSERT_EQUAL(0, splinterdb_cf_delete(cf, key)); + } + + // perform a range query + int first_idx = deletes; // first remaining insertion + int cur_idx = first_idx; + int num_inserts = inserts - deletes; + + splinterdb_cf_iterator *it; + slice key; + slice val; + ASSERT_EQUAL(0, splinterdb_cf_iterator_init(cf, &it, NULL_SLICE)); + for (; splinterdb_cf_iterator_valid(it); splinterdb_cf_iterator_next(it)) { + char key_data[TEST_MAX_KEY_SIZE]; + char val_data[TEST_MAX_VALUE_SIZE]; + + snprintf(key_data, sizeof(key_data), "key-%06x", cur_idx); + snprintf(val_data, sizeof(val_data), "val-%06x-%02x", cur_idx, cf.id); + slice exp_key = slice_create(strlen(key_data), key_data); + slice exp_val = slice_create(strlen(val_data), val_data); + + splinterdb_cf_iterator_get_current(it, &key, &val); + + ASSERT_EQUAL(slice_length(exp_key), slice_length(key)); + ASSERT_STREQN(slice_data(exp_key), slice_data(key), slice_length(key)); + + ASSERT_EQUAL(slice_length(exp_val), slice_length(val)); + ASSERT_STREQN(slice_data(exp_val), slice_data(val), slice_length(val)); + cur_idx++; + } + ASSERT_EQUAL(num_inserts, cur_idx - first_idx); + + splinterdb_cf_iterator_deinit(it); + + column_family_delete(cf); + + splinterdb_deregister_thread(kvs); + + // data config goes out of scope here and is deallocated + return NULL; +} + + +/* + * Test creating cfs, performing updates upon them, and deleting cfs + * all in parallel and with background threads. + */ +CTEST2(column_family, test_multithread_cf) +{ + // close and reopen with background threads + splinterdb_close(&data->kvsb); + data->cfg.num_normal_bg_threads = 4; + data->cfg.num_memtable_bg_threads = 2; + + splinterdb_create(&data->cfg, &data->kvsb); + + platform_thread thread1; + do_cf_args t1_args = {.kvs = data->kvsb, + .data_cfg = &data->default_data_cfg, + .inserts = 2000000, + .deletes = 0}; + platform_thread_create(&thread1, + FALSE, + (void (*)(void *))do_cf, + &t1_args, + platform_get_heap_id()); + + platform_thread thread2; + do_cf_args t2_args = {.kvs = data->kvsb, + .data_cfg = &data->default_data_cfg, + .inserts = 500000, + .deletes = 10000}; + platform_thread_create(&thread2, + FALSE, + (void (*)(void *))do_cf, + &t2_args, + platform_get_heap_id()); + + platform_thread thread3; + do_cf_args t3_args = {.kvs = data->kvsb, + .data_cfg = &data->default_data_cfg, + .inserts = 3000000, + .deletes = 250000}; + platform_thread_create(&thread3, + FALSE, + (void (*)(void *))do_cf, + &t3_args, + platform_get_heap_id()); + + platform_thread thread4; + do_cf_args t4_args = {.kvs = data->kvsb, + .data_cfg = &data->default_data_cfg, + .inserts = 2000000, + .deletes = 2000000}; + platform_thread_create(&thread4, + FALSE, + (void (*)(void *))do_cf, + &t4_args, + platform_get_heap_id()); + + platform_thread_join(thread1); + platform_thread_join(thread2); + platform_thread_join(thread3); + platform_thread_join(thread4); + + splinterdb_close(&data->kvsb); +}