From 715e1f9a5880cd84d371437b212ff6f0bdaa2f8a Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 9 Nov 2023 21:07:51 +0100 Subject: [PATCH 1/5] refactor(lsm): clean up disk write code --- include/lander.h | 2 +- lsm/src/store/lsm_store_disk_write.c | 88 +++++++++++++++------------- 2 files changed, 47 insertions(+), 43 deletions(-) diff --git a/include/lander.h b/include/lander.h index 5abea75..3c812c1 100644 --- a/include/lander.h +++ b/include/lander.h @@ -24,7 +24,7 @@ typedef enum lander_attr_type : uint8_t { lander_attr_type_url = 2, } lander_attr_type; -typedef enum lander_entry_type { +typedef enum lander_entry_type : uint8_t { lander_entry_type_redirect = 0, lander_entry_type_paste = 1, } lander_entry_type; diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index eb60c22..b79a78b 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -1,31 +1,39 @@ #include "lsm/store_internal.h" -static lsm_error lsm_entry_write_single(FILE *f, uint64_t size, void *val) { - size_t res = fwrite(val, size, 1, f); +static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size, + uint64_t count, void *val) { + size_t res = fwrite(val, size, count, f); - if (res == 0) { + if (res < count) { return lsm_error_failed_io; } + if (sum != NULL) { + *sum += size * count; + } + return lsm_error_ok; } -static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) { - return lsm_entry_write_single(f, sizeof(uint64_t), &num); -} +static lsm_error lsm_write_str(uint64_t *sum, FILE *f, lsm_str *s) { + uint64_t len = lsm_str_len(s); + + LSM_RES(lsm_fwrite(sum, f, sizeof(uint64_t), 1, &len)); -static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) { - uint64_t to_write = lsm_str_len(s); uint64_t written = 0; do { - written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f); - } while (written < to_write); + written += fwrite(lsm_str_ptr(s), sizeof(char), len - written, f); + } while (written < len); + + if (sum != NULL) { + *sum += len * sizeof(char); + } return lsm_error_ok; } -static lsm_error lsm_seek(FILE *f, uint64_t pos) { +static lsm_error lsm_fseek(FILE *f, uint64_t pos) { if (fseek(f, pos, SEEK_SET) != 0) { return lsm_error_failed_io; } @@ -35,24 +43,17 @@ static lsm_error lsm_seek(FILE *f, uint64_t pos) { lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, uint64_t pos) { - LSM_RES(lsm_seek(db_file, pos)); + *size = 0; - LSM_RES(lsm_entry_write_uint64_t(db_file, entry->data_len)); + LSM_RES(lsm_fseek(db_file, pos)); - LSM_RES( - lsm_entry_write_single(db_file, sizeof(uint8_t), &entry->attrs.count)); - *size = sizeof(uint64_t) + sizeof(uint8_t); + LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len)); + LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count)); for (uint8_t i = 0; i < entry->attrs.count; i++) { - // Write attribute type, length & value - LSM_RES(lsm_entry_write_single(db_file, sizeof(uint8_t), - &entry->attrs.items[i].type)); - LSM_RES(lsm_entry_write_uint64_t(db_file, - lsm_str_len(entry->attrs.items[i].str))); - LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str)); - - *size += sizeof(uint8_t) + sizeof(uint64_t) + - lsm_str_len(entry->attrs.items[i].str) * sizeof(char); + LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, + &entry->attrs.items[i].type)); + LSM_RES(lsm_write_str(size, db_file, entry->attrs.items[i].str)); } return lsm_error_ok; @@ -60,13 +61,13 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, uint64_t offset, uint64_t len, uint64_t pos) { - LSM_RES(lsm_seek(idx_file, pos)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); - LSM_RES(lsm_entry_write_str(idx_file, entry->key)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, len)); + *size = 0; - *size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char); + LSM_RES(lsm_fseek(idx_file, pos)); + + LSM_RES(lsm_write_str(size, idx_file, entry->key)); + LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset)); + LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len)); return lsm_error_ok; } @@ -74,9 +75,10 @@ lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { pthread_mutex_lock(&store->db_lock); - uint64_t entry_size; - lsm_error res = lsm_entry_write_db( - &entry_size, store->db_file, handle->wrapper->entry, store->db_file_size); + uint64_t db_entry_size; + lsm_error res = + lsm_entry_write_db(&db_entry_size, store->db_file, handle->wrapper->entry, + store->db_file_size); fflush(store->db_file); if (res != lsm_error_ok) { @@ -85,17 +87,17 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { return res; } - uint64_t entry_index = store->db_file_size; - store->db_file_size += entry_size; + uint64_t db_entry_index = store->db_file_size; pthread_mutex_unlock(&store->db_lock); // Append entry to index file pthread_mutex_lock(&store->idx_lock); - res = - lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, - entry_index, entry_size, store->idx_file_size); + uint64_t idx_entry_size; + res = lsm_entry_write_idx(&idx_entry_size, store->idx_file, + handle->wrapper->entry, db_entry_index, + db_entry_size, store->idx_file_size); if (res == lsm_error_ok) { // Update the counter at the beginning of the file @@ -103,14 +105,16 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { uint64_t new_block_count = store->idx_file_block_count + 1; - res = lsm_entry_write_uint64_t(store->idx_file, new_block_count); + res = lsm_fwrite(NULL, store->idx_file, sizeof(uint64_t), 1, + &new_block_count); if (res == lsm_error_ok) { // Only if we successfully updated the on-disk counter do we make the code - // aware that the file's size has increased. This way, if a write to the + // aware that the files' sizes have increased. This way, if a write to the // counter fails, the code will simply reuse the already written content. - store->idx_file_size += entry_size; + store->idx_file_size += idx_entry_size; store->idx_file_block_count = new_block_count; + store->db_file_size += db_entry_size; } } From 2f58d1ee489d25730272b940c680f5a63c9343bd Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 9 Nov 2023 21:32:39 +0100 Subject: [PATCH 2/5] feat(lsm): track entry idx file offset --- lsm/src/_include/lsm/store_internal.h | 1 + lsm/src/store/lsm_store_disk_read.c | 4 ++++ lsm/src/store/lsm_store_disk_write.c | 22 ++++++++++++---------- lsm/src/trie/lsm_trie.c | 7 ------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index e4bbdba..6bd7b00 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -31,6 +31,7 @@ typedef struct lsm_entry { lsm_attr *items; } attrs; uint64_t data_len; + uint64_t idx_file_offset; } lsm_entry; /** diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index fc4d748..17b91d7 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -150,6 +150,8 @@ lsm_error lsm_store_load_db(lsm_store *store) { store->idx_file, sizeof(uint64_t), 1)); for (uint64_t i = 0; i < store->idx_file_block_count; i++) { + uint64_t idx_file_offset = store->idx_file_size; + LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file)); LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file, sizeof(uint64_t), 2)); @@ -163,6 +165,8 @@ lsm_error lsm_store_load_db(lsm_store *store) { LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file, sizeof(uint64_t), 1)); LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); + + handle->wrapper->entry->idx_file_offset = idx_file_offset; lsm_entry_close(handle); store->db_file_size += db_dim[1]; diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index b79a78b..3482f53 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -41,7 +41,7 @@ static lsm_error lsm_fseek(FILE *f, uint64_t pos) { return lsm_error_ok; } -lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, +lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry, uint64_t pos) { *size = 0; @@ -59,7 +59,7 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, +lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, uint64_t offset, uint64_t len, uint64_t pos) { *size = 0; @@ -75,27 +75,27 @@ lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { pthread_mutex_lock(&store->db_lock); + uint64_t db_entry_index = store->db_file_size; + uint64_t db_entry_size; lsm_error res = - lsm_entry_write_db(&db_entry_size, store->db_file, handle->wrapper->entry, + lsm_write_db_entry(&db_entry_size, store->db_file, handle->wrapper->entry, store->db_file_size); fflush(store->db_file); - if (res != lsm_error_ok) { - pthread_mutex_unlock(&store->db_lock); + pthread_mutex_unlock(&store->db_lock); + if (res != lsm_error_ok) { return res; } - uint64_t db_entry_index = store->db_file_size; - - pthread_mutex_unlock(&store->db_lock); - // Append entry to index file pthread_mutex_lock(&store->idx_lock); + uint64_t idx_entry_index = store->idx_file_size; + uint64_t idx_entry_size; - res = lsm_entry_write_idx(&idx_entry_size, store->idx_file, + res = lsm_write_idx_entry(&idx_entry_size, store->idx_file, handle->wrapper->entry, db_entry_index, db_entry_size, store->idx_file_size); @@ -115,6 +115,8 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { store->idx_file_size += idx_entry_size; store->idx_file_block_count = new_block_count; store->db_file_size += db_entry_size; + + handle->wrapper->entry->idx_file_offset = idx_entry_index; } } diff --git a/lsm/src/trie/lsm_trie.c b/lsm/src/trie/lsm_trie.c index 8744b4e..0e5b548 100644 --- a/lsm/src/trie/lsm_trie.c +++ b/lsm/src/trie/lsm_trie.c @@ -252,19 +252,12 @@ lsm_error lsm_trie_remove(void **data, lsm_trie *trie, lsm_str *key) { return lsm_error_not_found; } - // Child is the node we wish to delete if (data != NULL) { *data = child->data; } child->data = NULL; - // We only remove child if it has no children of its own - if (lsm_bt_size(&child->bt) == 0) { - lsm_bt_remove(NULL, &parent->bt, c); - lsm_trie_node_free(child); - } - trie->size--; return lsm_error_ok; From d4b21fb84d611b3b73fef5b92d3b6adbebed8767 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 9 Nov 2023 21:48:15 +0100 Subject: [PATCH 3/5] feat(lsm): add valid entry marker to idx entries --- lsm/src/store/lsm_store_disk_read.c | 46 ++++++++++++++++++++-------- lsm/src/store/lsm_store_disk_write.c | 3 ++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index 17b91d7..8ce53c1 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -142,6 +142,7 @@ lsm_error lsm_store_load_db(lsm_store *store) { uint64_t db_dim[2]; lsm_str *key; lsm_entry_handle *handle; + bool valid_entry; rewind(store->idx_file); @@ -152,24 +153,43 @@ lsm_error lsm_store_load_db(lsm_store *store) { for (uint64_t i = 0; i < store->idx_file_block_count; i++) { uint64_t idx_file_offset = store->idx_file_size; - LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file)); - LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file, - sizeof(uint64_t), 2)); - LSM_RES(lsm_store_insert(&handle, store, key)); + LSM_RES(lsm_fread(&valid_entry, &store->idx_file_size, store->idx_file, + sizeof(bool), 1)); - // Read attributes from database file - if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { - return lsm_error_failed_io; + if (valid_entry) { + LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file)); + LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file, + sizeof(uint64_t), 2)); + LSM_RES(lsm_store_insert(&handle, store, key)); + + // Read attributes from database file + if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { + return lsm_error_failed_io; + } + + LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file, + sizeof(uint64_t), 1)); + LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); + + handle->wrapper->entry->idx_file_offset = idx_file_offset; + lsm_entry_close(handle); + + store->db_file_size += db_dim[1]; } + // Simply skip the invalid entry + else { + uint64_t key_len; + LSM_RES(lsm_fread(&key_len, &store->idx_file_size, store->idx_file, + sizeof(uint64_t), 1)); - LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file, - sizeof(uint64_t), 1)); - LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); + uint64_t remaining = key_len + 2 * sizeof(uint64_t); - handle->wrapper->entry->idx_file_offset = idx_file_offset; - lsm_entry_close(handle); + if (fseek(store->idx_file, remaining, SEEK_CUR) != 0) { + return lsm_error_failed_io; + } - store->db_file_size += db_dim[1]; + store->idx_file_size += remaining; + } } return lsm_error_ok; diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 3482f53..3c9293d 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -65,6 +65,9 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, LSM_RES(lsm_fseek(idx_file, pos)); + bool valid_entry_marker = true; + LSM_RES(lsm_fwrite(size, idx_file, sizeof(bool), 1, &valid_entry_marker)); + LSM_RES(lsm_write_str(size, idx_file, entry->key)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len)); From eb0ce16f78b1a554ef8cdb521c86fece4ad8a052 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 9 Nov 2023 22:05:20 +0100 Subject: [PATCH 4/5] feat(lsm): store pointer to store in entry handle --- lsm/include/lsm/store.h | 11 +++--- lsm/src/_include/lsm/store_internal.h | 1 + lsm/src/store/lsm_store.c | 50 +++++---------------------- lsm/src/store/lsm_store_disk_read.c | 2 +- lsm/src/store/lsm_store_disk_write.c | 4 ++- src/lander/lander_get.c | 6 ++-- src/lander/lander_post.c | 8 ++--- 7 files changed, 23 insertions(+), 59 deletions(-) diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index d49bbdf..c7d46d7 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -188,8 +188,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, * @param entry entry to append data to * @param data data to append */ -lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, - lsm_str *data); +lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data); /** * Same as `lsm_entry_data_append`, except that it takes a direct char array. @@ -199,8 +198,8 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, * @param data data to append * @param len length of data array */ -lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, - char *data, uint64_t len); +lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data, + uint64_t len); /** * Read a number of bytes from the entry's data field. The position from which @@ -211,7 +210,7 @@ lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, * @param handle entry handle to read from * @param len how many bytes to read at most */ -lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_entry_handle *handle, uint64_t len); /** @@ -220,7 +219,7 @@ lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, * @param store store to persist entry in * @param handle handle to entry to persist */ -lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle); +lsm_error lsm_entry_sync(lsm_entry_handle *handle); /** * Return the length of the entry's data. diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index 6bd7b00..2a5856c 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -58,6 +58,7 @@ void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper); struct lsm_entry_handle { lsm_entry_wrapper *wrapper; + lsm_store *store; FILE *f; uint64_t pos; }; diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 3ac2232..2da7c51 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -62,24 +62,8 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, return res; } - /* // Open a new file descriptor if needed */ - /* if (entry->data_len > 0) { */ - /* char path[store->data_path->len + entry->key->len + 2]; */ - /* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */ - /* lsm_str_ptr(entry->key)); */ - - /* FILE *f = fopen(path, "rb"); */ - - /* if (f == NULL) { */ - /* free(handle); */ - - /* return lsm_error_failed_io; */ - /* } */ - - /* handle->f = f; */ - /* } */ - handle->wrapper = wrapper; + handle->store = store; *out = handle; return lsm_error_ok; @@ -116,24 +100,8 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, return res; } - /* // Open a new file descriptor if needed */ - /* if (entry->data_len > 0) { */ - /* char path[store->data_path->len + entry->key->len + 2]; */ - /* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */ - /* lsm_str_ptr(entry->key)); */ - - /* FILE *f = fopen(path, "ab"); */ - - /* if (f == NULL) { */ - /* free(handle); */ - - /* return lsm_error_failed_io; */ - /* } */ - - /* handle->f = f; */ - /* } */ - handle->wrapper = wrapper; + handle->store = store; *out = handle; return lsm_error_ok; @@ -180,14 +148,14 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, // No need to set the handle's file, as the entry doesn't have any data yet handle->wrapper = wrapper; + handle->store = store; *out = handle; return lsm_error_ok; } -lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, - lsm_str *data) { +lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data) { if (lsm_str_len(data) == 0) { return lsm_error_ok; } @@ -199,8 +167,8 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, // Entries don't open their file unless needed if (handle->f == NULL) { - char path[store->data_path->len + entry->key->len + 2]; - sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + char path[handle->store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(handle->store->data_path), lsm_str_ptr(entry->key)); FILE *f = fopen(path, "ab"); @@ -225,7 +193,7 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, return lsm_error_ok; } -lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_entry_handle *handle, uint64_t len) { lsm_entry *entry = handle->wrapper->entry; @@ -237,8 +205,8 @@ lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, // Entries don't open their file unless needed if (handle->f == NULL) { - char path[store->data_path->len + entry->key->len + 2]; - sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + char path[handle->store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(handle->store->data_path), lsm_str_ptr(entry->key)); FILE *f = fopen(path, "rb"); diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index 8ce53c1..cba7a7b 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -132,7 +132,7 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle, for (uint64_t i = 0; i < attr_count; i++) { LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1)); LSM_RES(lsm_entry_read_str(&val, sum, db_file)); - lsm_entry_attr_insert(handle, attr_type, val); + LSM_RES(lsm_entry_attr_insert(handle, attr_type, val)); } return lsm_error_ok; diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 3c9293d..4a3de1b 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -75,7 +75,9 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { +lsm_error lsm_entry_sync(lsm_entry_handle *handle) { + lsm_store *store = handle->store; + pthread_mutex_lock(&store->db_lock); uint64_t db_entry_index = store->db_file_size; diff --git a/src/lander/lander_get.c b/src/lander/lander_get.c index 5d1be5f..7c467b5 100644 --- a/src/lander/lander_get.c +++ b/src/lander/lander_get.c @@ -93,8 +93,6 @@ bool lander_get_entry_lsm(event_loop_conn *conn) { bool lander_stream_body_to_client(event_loop_conn *conn) { http_loop_ctx *ctx = conn->ctx; lander_ctx *c_ctx = ctx->c; - http_loop_gctx *gctx = ctx->g; - lander_gctx *c_gctx = gctx->c; if ((c_ctx->entry == NULL) || (ctx->res.body.expected_len == ctx->res.body.len)) { @@ -105,8 +103,8 @@ bool lander_stream_body_to_client(event_loop_conn *conn) { ctx->res.body.expected_len - ctx->res.body.len); uint64_t read = 0; - lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], - c_gctx->store, c_ctx->entry, to_write); + lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], c_ctx->entry, + to_write); ctx->res.body.len += read; conn->wbuf_size += read; diff --git a/src/lander/lander_post.c b/src/lander/lander_post.c index ee1a9cb..3f8f758 100644 --- a/src/lander/lander_post.c +++ b/src/lander/lander_post.c @@ -151,11 +151,9 @@ bool lander_post_redirect_body_to_attr(event_loop_conn *conn) { bool lander_entry_sync(event_loop_conn *conn) { http_loop_ctx *ctx = conn->ctx; - http_loop_gctx *gctx = ctx->g; - lander_gctx *c_gctx = gctx->c; lander_ctx *c_ctx = ctx->c; - if (lsm_entry_sync(c_gctx->store, c_ctx->entry) != lsm_error_ok) { + if (lsm_entry_sync(c_ctx->entry) != lsm_error_ok) { ctx->res.status = http_internal_server_error; } @@ -180,8 +178,6 @@ bool lander_post_paste_lsm(event_loop_conn *conn) { bool lander_stream_body_to_entry(event_loop_conn *conn) { http_loop_ctx *ctx = conn->ctx; lander_ctx *c_ctx = ctx->c; - http_loop_gctx *gctx = ctx->g; - lander_gctx *c_gctx = gctx->c; uint64_t to_append = MIN(conn->rbuf_size - conn->rbuf_read, @@ -189,7 +185,7 @@ bool lander_stream_body_to_entry(event_loop_conn *conn) { lsm_str *data; lsm_str_init_copy_n(&data, (char *)&conn->rbuf[conn->rbuf_read], to_append); - lsm_entry_data_append(c_gctx->store, c_ctx->entry, data); + lsm_entry_data_append(c_ctx->entry, data); conn->rbuf_read += to_append; From 9b223d04a0226ae4234d39d0778ccee4d2dcf688 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 9 Nov 2023 22:40:06 +0100 Subject: [PATCH 5/5] feat(lsm): sync database when closing handle --- include/lander.h | 2 -- lsm/include/lsm/store.h | 8 -------- lsm/src/_include/lsm/store_internal.h | 21 +++++++++++++++++++++ lsm/src/store/lsm_store_disk_read.c | 2 +- lsm/src/store/lsm_store_disk_write.c | 2 +- lsm/src/store/lsm_store_entry.c | 10 ++++++++-- src/lander/lander.c | 5 ++--- src/lander/lander_post.c | 11 ----------- 8 files changed, 33 insertions(+), 28 deletions(-) diff --git a/include/lander.h b/include/lander.h index 3c812c1..88bfab9 100644 --- a/include/lander.h +++ b/include/lander.h @@ -57,6 +57,4 @@ bool lander_get_entry_lsm(event_loop_conn *conn); bool lander_post_redirect_body_to_attr(event_loop_conn *conn); -bool lander_entry_sync(event_loop_conn *conn); - #endif diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index c7d46d7..9410746 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -213,14 +213,6 @@ lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data, lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_entry_handle *handle, uint64_t len); -/** - * Persist the entry's data to disk. - * - * @param store store to persist entry in - * @param handle handle to entry to persist - */ -lsm_error lsm_entry_sync(lsm_entry_handle *handle); - /** * Return the length of the entry's data. * diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index 2a5856c..cebb41b 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -88,4 +88,25 @@ struct lsm_store { */ lsm_error lsm_store_load_db(lsm_store *store); +/** + * Close & free the handle without updating the database + * + * @param handle handle to close + */ +void lsm_entry_close_no_disk(lsm_entry_handle *handle); + +/** + * Write a new insert to the database. + * + * @param handle handle to added entry + */ +lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle); + +/** + * Remove an entry from the database + * + * @param handle handle to the removed entry + */ +lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle); + #endif diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index cba7a7b..040708a 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -172,7 +172,7 @@ lsm_error lsm_store_load_db(lsm_store *store) { LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); handle->wrapper->entry->idx_file_offset = idx_file_offset; - lsm_entry_close(handle); + lsm_entry_close_no_disk(handle); store->db_file_size += db_dim[1]; } diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 4a3de1b..ffe182f 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -75,7 +75,7 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_sync(lsm_entry_handle *handle) { +lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { lsm_store *store = handle->store; pthread_mutex_lock(&store->db_lock); diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 58eba29..45ead55 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -47,13 +47,19 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { return lsm_error_ok; } +void lsm_entry_close_no_disk(lsm_entry_handle *handle) { + pthread_rwlock_unlock(&handle->wrapper->lock); + free(handle); +} + void lsm_entry_close(lsm_entry_handle *handle) { if (handle->f != NULL) { fclose(handle->f); } - pthread_rwlock_unlock(&handle->wrapper->lock); - free(handle); + // TODO handle errors here + lsm_entry_disk_insert(handle); + lsm_entry_close_no_disk(handle); } bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type) { diff --git a/src/lander/lander.c b/src/lander/lander.c index 8045f28..57f5c5e 100644 --- a/src/lander/lander.c +++ b/src/lander/lander.c @@ -25,7 +25,7 @@ http_route lander_routes[] = { .path = "^/s(l?)/([^/]*)$", .steps = {http_loop_step_auth, lander_post_redirect_lsm, http_loop_step_body_to_buf, lander_post_redirect_body_to_attr, - lander_entry_sync, NULL}, + NULL}, .steps_res = {http_loop_step_write_header, http_loop_step_write_body, NULL}, }, @@ -33,8 +33,7 @@ http_route lander_routes[] = { .method = http_post, .path = "^/p(l?)/([^/]*)$", .steps = {http_loop_step_auth, http_loop_step_parse_content_length, - lander_post_paste_lsm, lander_stream_body_to_entry, - lander_entry_sync, NULL}, + lander_post_paste_lsm, lander_stream_body_to_entry, NULL}, .steps_res = {http_loop_step_write_header, http_loop_step_write_body, NULL}}, }; diff --git a/src/lander/lander_post.c b/src/lander/lander_post.c index 3f8f758..429ea39 100644 --- a/src/lander/lander_post.c +++ b/src/lander/lander_post.c @@ -149,17 +149,6 @@ bool lander_post_redirect_body_to_attr(event_loop_conn *conn) { return true; } -bool lander_entry_sync(event_loop_conn *conn) { - http_loop_ctx *ctx = conn->ctx; - lander_ctx *c_ctx = ctx->c; - - if (lsm_entry_sync(c_ctx->entry) != lsm_error_ok) { - ctx->res.status = http_internal_server_error; - } - - return true; -} - bool lander_post_paste_lsm(event_loop_conn *conn) { http_loop_ctx *ctx = conn->ctx; lander_ctx *c_ctx = ctx->c;