From 0e4e18da6cce3479c0992a1a28f7d0077196df40 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 28 Oct 2023 15:48:28 +0200 Subject: [PATCH] feat(lsm): data streaming, random other stuff, locks --- lsm/include/lsm.h | 14 +++-- lsm/include/lsm/store.h | 38 ++++++++++--- lsm/src/_include/lsm/store_internal.h | 12 +++- lsm/src/store/lsm_store.c | 82 ++++++++++++++++++++++++--- lsm/src/store/lsm_store_entry.c | 15 +++++ 5 files changed, 137 insertions(+), 24 deletions(-) diff --git a/lsm/include/lsm.h b/lsm/include/lsm.h index f5ca373..eaecb7e 100644 --- a/lsm/include/lsm.h +++ b/lsm/include/lsm.h @@ -3,11 +3,11 @@ #include -#define LSM_RES(x) \ - { \ - lsm_error res = x; \ - if (res != lsm_error_ok) \ - return res; \ +#define LSM_RES(x) \ + { \ + lsm_error res = x; \ + if (res != lsm_error_ok) \ + return res; \ } typedef enum lsm_error { @@ -15,7 +15,9 @@ typedef enum lsm_error { lsm_error_failed_alloc = 1, lsm_error_not_found = 2, lsm_error_already_present = 3, - lsm_error_null_value = 4 + lsm_error_null_value = 4, + lsm_error_failed_io = 5, + lsm_error_lock_busy = 6, } lsm_error; /*typedef struct lsm_string { */ diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index 7edcc03..188fd18 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -104,7 +104,7 @@ lsm_error lsm_store_init(lsm_store **ptr); * @param db_path path to the database file * @param data_path path to the data directory */ -lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path); +lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); /** * Dealocate an existing lsm_store object. @@ -114,16 +114,37 @@ lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path); void lsm_store_free(lsm_store *store); /** - * Search for an entry in the store. + * Retrieve an entry from the store, preparing & locking it for the purpose of + * reading. * - * @param out pointer to store entry pointer in - * @param store store to search in - * @param key key to look with + * @param out pointer to store entry pointer + * @param store store to retrieve entry from + * @param key key to search */ -lsm_error lsm_store_search(lsm_entry **out, lsm_store *store, lsm_str *key); +lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key); /** - * Allocate a new entry in the store with the specified key. + * Retrieve an entry from the store for the purposes of writing. This + * write-locks the entry. + * + * @param out pointer to store entry pointer + * @param store store to retrieve entry from + * @param key key to search + */ +lsm_error lsm_store_get_write(lsm_entry **out, lsm_store *store, lsm_str *key); + +/** + * Unlock a locked entry. + * + * @param store store to unlock entry in + * @param entry entry to unlock + */ +lsm_error lsm_store_unlock(lsm_store *store, lsm_entry *entry); + +/** + * Allocate a new entry in the store with the specified key. The entry returned + * will be write-locked, and should be unlocked after streaming the necessary + * data. * * @param out pointer to store new entry pointer in * @param store store to modify @@ -141,6 +162,7 @@ lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key); * @param entry entry to append data to * @param data data to append */ -lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *data); +lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, + lsm_str *data); #endif diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index e77e879..d2c8d1f 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -1,6 +1,7 @@ #ifndef LSM_STORE_INTERNAL #define LSM_STORE_INTERNAL +#include #include #include "lsm/store.h" @@ -29,10 +30,17 @@ struct lsm_entry { } data; }; +typedef struct lsm_entry_wrapper { + pthread_rwlock_t lock; + lsm_entry *entry; +} lsm_entry_wrapper; + +lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr); + struct lsm_store { lsm_trie *trie; - char *data_path; - char *db_path; + lsm_str *data_path; + lsm_str *db_path; }; #endif diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 2dc7405..1fff684 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -1,10 +1,11 @@ +#include #include #include #include "lsm.h" #include "lsm/store.h" -#include "lsm/trie.h" #include "lsm/store_internal.h" +#include "lsm/trie.h" lsm_error lsm_store_init(lsm_store **ptr) { lsm_store *store = calloc(1, sizeof(lsm_store)); @@ -26,7 +27,8 @@ lsm_error lsm_store_init(lsm_store **ptr) { return lsm_error_ok; } -lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path) { +lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, + lsm_str *data_path) { lsm_store *store; LSM_RES(lsm_store_init(&store)); @@ -40,24 +42,67 @@ lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path) { return lsm_error_ok; } -lsm_error lsm_store_search(lsm_entry **out, lsm_store *store, lsm_str *key) { - return lsm_trie_search((void **)out, store->trie, key); +lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) { + lsm_entry_wrapper *wrapper; + + LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); + + // We don't want to block the thread + if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) { + return lsm_error_lock_busy; + } + + lsm_entry *entry = wrapper->entry; + + // While the trie's data field will never be NULL, the actual entry pointer + // might be + if (entry == NULL) { + pthread_rwlock_unlock(&wrapper->lock); + + return lsm_error_not_found; + } + + // Open a new file descriptor if needed + if (entry->data.on_disk && (entry->data.value.file == NULL)) { + char path[store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + lsm_str_ptr(entry->key)); + + FILE *f = fopen(path, "rb"); + + if (f == NULL) { + return lsm_error_failed_io; + } + + entry->data.value.file = f; + } + + return lsm_error_ok; } lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key) { - lsm_entry *entry; + lsm_entry_wrapper *wrapper; + LSM_RES(lsm_entry_wrapper_init(&wrapper)); + lsm_entry *entry; LSM_RES(lsm_entry_init(&entry)); - LSM_RES(lsm_trie_insert(store->trie, key, entry)); entry->key = key; + wrapper->entry = entry; + pthread_rwlock_wrlock(&wrapper->lock); + + // TODO mem leak if already present + LSM_RES(lsm_trie_insert(store->trie, key, wrapper)); + *out = entry; return lsm_error_ok; } -lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *data) { +lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, + lsm_str *data) { uint64_t new_len = entry->data.len + lsm_str_len(data); + const char *data_s = lsm_str_ptr(data); // Data is in memory and still fits -> keep it in memory if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) { @@ -67,7 +112,7 @@ lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *dat return lsm_error_failed_alloc; } - memcpy(&buf[entry->data.len], lsm_str_ptr(data), lsm_str_len(data)); + memcpy(&buf[entry->data.len], data_s, lsm_str_len(data)); entry->data.value.ptr = buf; entry->data.len = new_len; } @@ -75,7 +120,28 @@ lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *dat else { // Data is not yet on disk, so we create the file if (!entry->data.on_disk) { + char path[store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + lsm_str_ptr(entry->key)); + FILE *f = fopen(path, "w"); + + if (f == NULL) { + return lsm_error_failed_io; + } + + entry->data.value.file = f; + entry->data.on_disk = true; + + // TODO free old buff, write original data to file + } + + size_t written = 0; + + // TODO what happens when I/O fails? + while (written < data->len) { + written += fwrite(&data_s[written], sizeof(char), data->len - written, + entry->data.value.file); } } diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index a64614e..4a83be2 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -1,4 +1,5 @@ #include +#include #include #include @@ -17,6 +18,20 @@ lsm_error lsm_entry_init(lsm_entry **ptr) { return lsm_error_ok; } +lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) { + lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper)); + + if (wrap == NULL) { + return lsm_error_failed_alloc; + } + + pthread_rwlock_init(&wrap->lock, NULL); + + *ptr = wrap; + + return lsm_error_ok; +} + bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type) { return (entry->attrs.bitmap & type) != 0; }