From 38e9496717957319f9f15a198396ffdcd1f8958a Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Tue, 7 Nov 2023 23:00:22 +0100 Subject: [PATCH] feat(lsm): possibly added reading db file on load --- lsm/example/test.c | 6 +- lsm/src/_include/lsm/store_internal.h | 3 + lsm/src/store/lsm_store.c | 14 ++- lsm/src/store/lsm_store_sync.c | 126 ++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 11 deletions(-) diff --git a/lsm/example/test.c b/lsm/example/test.c index 1445720..e1e3b69 100644 --- a/lsm/example/test.c +++ b/lsm/example/test.c @@ -10,13 +10,17 @@ int main() { lsm_str_init_copy(&data_dir, "data"); lsm_store *store; - lsm_store_load(&store, data_dir); + assert(lsm_store_load(&store, data_dir) == lsm_error_ok); lsm_str *key; lsm_str_init_copy(&key, "key"); lsm_entry_handle *handle; assert(lsm_store_insert(&handle, store, key) == lsm_error_ok); + + lsm_str *attr; + lsm_str_init_copy(&attr, "some attribute value"); + lsm_entry_attr_insert(handle, lsm_attr_type_content_type, attr); lsm_str *data; lsm_str_init_copy(&data, "hello"); diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index d45fc36..b9546e9 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -73,10 +73,13 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out); struct lsm_store { lsm_trie *trie; lsm_str *data_path; + FILE *db_file; uint64_t db_file_size; pthread_mutex_t db_lock; + FILE *idx_file; + uint64_t idx_file_block_count; uint64_t idx_file_size; pthread_mutex_t idx_lock; }; diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index e2d62cc..172c6d2 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -63,14 +63,24 @@ lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) { if (idx_file == NULL) { return lsm_error_failed_io; } - } - LSM_RES(lsm_store_load_db(store)); + // The database code expects the idx file to start with how many blocks it + // contains, so we write that here + uint64_t num = 0; + + if (fwrite(&num, sizeof(uint64_t), 1, idx_file) == 0) { + return lsm_error_failed_io; + } + + fflush(idx_file); + } store->data_path = data_path; store->db_file = db_file; store->idx_file = idx_file; + LSM_RES(lsm_store_load_db(store)); + *ptr = store; return lsm_error_ok; diff --git a/lsm/src/store/lsm_store_sync.c b/lsm/src/store/lsm_store_sync.c index 49f6d61..39e7658 100644 --- a/lsm/src/store/lsm_store_sync.c +++ b/lsm/src/store/lsm_store_sync.c @@ -1,5 +1,9 @@ -#include "lsm/store_internal.h" +#include #include +#include + +#include "lsm/store.h" +#include "lsm/store_internal.h" static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) { size_t res = fwrite(&num, sizeof(uint64_t), 1, f); @@ -37,8 +41,6 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { *size += 2 * sizeof(uint64_t) + lsm_str_len(entry->attrs.items[i].str) * sizeof(char); } - printf("db size: %lu\n", *size); - return lsm_error_ok; } @@ -57,7 +59,7 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { pthread_mutex_lock(&store->db_lock); // Append entry to end of database file - if (fseek(store->db_file, SEEK_SET, store->db_file_size) != 0) { + if (fseek(store->db_file, store->db_file_size, SEEK_SET) != 0) { pthread_mutex_unlock(&store->db_lock); return lsm_error_failed_io; @@ -83,34 +85,142 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { // Append entry to index file pthread_mutex_lock(&store->idx_lock); - if (fseek(store->idx_file, SEEK_SET, store->idx_file_size) != 0) { + if (fseek(store->idx_file, store->idx_file_size, SEEK_SET) != 0) { + printf("failed seek, %lu\n", store->idx_file_size); pthread_mutex_unlock(&store->idx_lock); return lsm_error_failed_io; } res = lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, entry_index, entry_size); - fflush(store->idx_file); if (res == lsm_error_ok) { + // Update the counter at the beginning of the file + uint64_t new_block_count = store->idx_file_block_count + 1; + + if (fseek(store->idx_file, 0, SEEK_SET) != 0) { + pthread_mutex_unlock(&store->idx_lock); + + return lsm_error_failed_io; + } + + size_t r = fwrite(&new_block_count, sizeof(uint64_t), 1, store->idx_file); + + if (r != lsm_error_ok) { + printf("wuck\n"); + pthread_mutex_unlock(&store->idx_lock); + + return res; + } + store->idx_file_size += entry_size; + store->idx_file_block_count = new_block_count; + } else { + printf("failed write\n"); } + fflush(store->idx_file); + pthread_mutex_unlock(&store->idx_lock); return res; } +static lsm_error lsm_entry_read_attrs(lsm_entry_handle *handle, FILE *db_file) { + uint64_t attr_count; + size_t res = fread(&attr_count, sizeof(uint64_t), 1, db_file); + + if (res == 0) { + return lsm_error_failed_io; + } + + // attr_type, val_len + uint64_t nums[2]; + lsm_str *val; + + for (uint64_t i = 0; i < attr_count; i++) { + res = fread(nums, sizeof(uint64_t), 2, db_file); + + if (res < 2) { + return lsm_error_failed_io; + } + + char *val_s = malloc(nums[1] + 1); + val_s[nums[1]] = '\0'; + + if (val_s == NULL) { + return lsm_error_failed_alloc; + } + + uint64_t read = 0; + + while (read < nums[1]) { + read += fread(&val_s[read], 1, nums[1] - read, db_file); + } + + LSM_RES(lsm_str_init(&val, val_s));; + lsm_entry_attr_insert(handle, nums[0], val); + } + + return lsm_error_ok; +} + lsm_error lsm_store_load_db(lsm_store *store) { uint64_t key_len; - size_t res; + uint64_t db_dim[2]; lsm_str *key; + lsm_entry_handle *handle; - while (feof(store->idx_file) > 0) { + // idx file starts with block count + size_t res = fread(&store->idx_file_block_count, sizeof(uint64_t), 1, store->idx_file); + + if (res == 0) { + return lsm_error_failed_io; + } + + store->idx_file_size += sizeof(uint64_t); + + for (uint64_t i = 0; i < store->idx_file_block_count; i++) { + // Read in idx metadata res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file); if (res == 0) { return lsm_error_failed_io; } + + char *key_s = malloc(key_len + 1); + key_s[key_len] = '\0'; + + if (key_s == NULL) { + return lsm_error_failed_alloc; + } + + res = fread(key_s, 1, key_len, store->idx_file); + + if (res < key_len) { + return lsm_error_failed_io; + } + + res = fread(db_dim, sizeof(uint64_t), 2, store->idx_file); + + if (res < 2) { + return lsm_error_failed_io; + } + + LSM_RES(lsm_str_init(&key, key_s)); + LSM_RES(lsm_store_insert(&handle, store, key)); + + // Read attributes from database file + if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { + return lsm_error_failed_io; + } + + LSM_RES(lsm_entry_read_attrs(handle, store->db_file)); + lsm_entry_close(handle); + + store->idx_file_size += 3 * sizeof(uint64_t) + key_len; + store->db_file_size += db_dim[1]; } + + return lsm_error_ok; }