diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index 188fd18..7e7e59a 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -21,27 +21,10 @@ typedef enum lsm_attr_type : uint64_t { } lsm_attr_type; /** - * An entry inside an LSM store. - * - * Each entry consists of the key it's stored behind, zero or more attributes - * (metadata) and a data field. The data field can be stored on disk or - * in-memory, depending on the size. + * A handle referencing an entry inside a store. Read/write operations from/to + * the entry go through this handle. */ -typedef struct lsm_entry lsm_entry; - -/** - * Allocate and initialize a new lsm_entry object. - * - * @param ptr where to store newly allocated pointer - */ -lsm_error lsm_entry_init(lsm_entry **ptr); - -/** - * Deallocate an existing lsm_entry object. - * - * @param entry object to deallocate - */ -void lsm_entry_free(lsm_entry *entry); +typedef struct lsm_entry_handle lsm_entry_handle; /** * Checks whether the entry has an attribute with the specified type. @@ -49,7 +32,7 @@ void lsm_entry_free(lsm_entry *entry); * @param entry entry to check * @param type type of attribute to check for */ -bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type); +bool lsm_entry_attr_present(lsm_entry_handle *handle, lsm_attr_type type); /** * Retrieve the contents of an attribute from an entry, if present @@ -58,7 +41,7 @@ bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type); * @param entry entry to search for * @param type type of attribute to return */ -lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, +lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, lsm_attr_type type); /** @@ -68,7 +51,7 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, * @param type type of attribute to add * @param data data of attribute; ownership of pointer is taken over */ -lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type, +lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type, lsm_str *data); /** @@ -79,7 +62,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type, * @param entry entry to remove attribute from * @param type type of attribute to remove */ -lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, +lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, lsm_attr_type type); /** @@ -104,7 +87,7 @@ lsm_error lsm_store_init(lsm_store **ptr); * @param db_path path to the database file * @param data_path path to the data directory */ -lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); +lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); /** * Dealocate an existing lsm_store object. @@ -114,43 +97,45 @@ lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); void lsm_store_free(lsm_store *store); /** - * Retrieve an entry from the store, preparing & locking it for the purpose of - * reading. + * Open a read handle to the given entry. This entry must be properly closed + * using `lsm_store_handle_close`. * - * @param out pointer to store entry pointer + * @param out pointer to store handle pointer * @param store store to retrieve entry from * @param key key to search */ -lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key); +lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, + lsm_str *key); /** - * Retrieve an entry from the store for the purposes of writing. This - * write-locks the entry. + * Open a write handle to the given entry. This entry must be properly closed + * using `lsm_store_handle_close`. * - * @param out pointer to store entry pointer + * @param out pointer to store handle pointer * @param store store to retrieve entry from * @param key key to search */ -lsm_error lsm_store_get_write(lsm_entry **out, lsm_store *store, lsm_str *key); +lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, + lsm_str *key); /** - * Unlock a locked entry. + * Close an open entry handle. * - * @param store store to unlock entry in - * @param entry entry to unlock + * @param store store the handle's entry is stored in + * @param handle handle to close */ -lsm_error lsm_store_unlock(lsm_store *store, lsm_entry *entry); +void lsm_entry_close(lsm_entry_handle *handle); /** - * Allocate a new entry in the store with the specified key. The entry returned - * will be write-locked, and should be unlocked after streaming the necessary - * data. + * Insert a new entry into the store, returning a write handle to the newly + * created entry. * * @param out pointer to store new entry pointer in * @param store store to modify * @param key key to add; ownership of key pointer is taken over */ -lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key); +lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, + lsm_str *key); /** * Append new data to the given entry, which is expected to be in the store. @@ -162,7 +147,7 @@ lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key); * @param entry entry to append data to * @param data data to append */ -lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, - lsm_str *data); +lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, + lsm_str *data); #endif diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index d2c8d1f..27b6e5b 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -13,7 +13,14 @@ typedef struct lsm_attr { lsm_str *str; } lsm_attr; -struct lsm_entry { +/** + * An entry inside an LSM store. + * + * Each entry consists of the key it's stored behind, zero or more attributes + * (metadata) and a data field. The data field can be stored on disk or + * in-memory, depending on the size. + */ +typedef struct lsm_entry { lsm_str *key; struct { uint64_t count; @@ -28,7 +35,21 @@ struct lsm_entry { } value; bool on_disk; } data; -}; +} lsm_entry; + +/** + * Allocate and initialize a new lsm_entry object. + * + * @param ptr where to store newly allocated pointer + */ +lsm_error lsm_entry_init(lsm_entry **ptr); + +/** + * Deallocate an existing lsm_entry object. + * + * @param entry object to deallocate + */ +void lsm_entry_free(lsm_entry *entry); typedef struct lsm_entry_wrapper { pthread_rwlock_t lock; @@ -36,6 +57,14 @@ typedef struct lsm_entry_wrapper { } lsm_entry_wrapper; lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr); +void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper); + +struct lsm_entry_handle { + lsm_entry_wrapper *wrapper; + FILE *f; +}; + +lsm_error lsm_entry_handle_init(lsm_entry_handle **out); struct lsm_store { lsm_trie *trie; diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 1fff684..27ebf68 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -27,7 +27,7 @@ lsm_error lsm_store_init(lsm_store **ptr) { return lsm_error_ok; } -lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, +lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path) { lsm_store *store; LSM_RES(lsm_store_init(&store)); @@ -42,12 +42,13 @@ lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, return lsm_error_ok; } -lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) { +lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, + lsm_str *key) { lsm_entry_wrapper *wrapper; LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); - // We don't want to block the thread + // Try to get a read lock on the entry's lock if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) { return lsm_error_lock_busy; } @@ -62,8 +63,17 @@ lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) { return lsm_error_not_found; } + lsm_entry_handle *handle; + lsm_error res = lsm_entry_handle_init(&handle); + + if (res != lsm_error_ok) { + pthread_rwlock_unlock(&wrapper->lock); + + return res; + } + // Open a new file descriptor if needed - if (entry->data.on_disk && (entry->data.value.file == NULL)) { + if (entry->data.on_disk) { char path[store->data_path->len + entry->key->len + 2]; sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), lsm_str_ptr(entry->key)); @@ -71,36 +81,111 @@ lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) { FILE *f = fopen(path, "rb"); if (f == NULL) { + free(handle); + return lsm_error_failed_io; } - entry->data.value.file = f; + handle->f = f; } + handle->wrapper = wrapper; + *out = handle; + return lsm_error_ok; } -lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key) { +lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, + lsm_str *key) { + lsm_entry_wrapper *wrapper; + + LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); + + // Try to get a write lock on the entry's lock + // TODO make this timeout to not block + if (pthread_rwlock_wrlock(&wrapper->lock) != 0) { + return lsm_error_lock_busy; + } + + lsm_entry *entry = wrapper->entry; + + // While the trie's data field will never be NULL, the actual entry pointer + // might be + if (entry == NULL) { + pthread_rwlock_unlock(&wrapper->lock); + + return lsm_error_not_found; + } + + lsm_entry_handle *handle; + lsm_error res = lsm_entry_handle_init(&handle); + + if (res != lsm_error_ok) { + pthread_rwlock_unlock(&wrapper->lock); + + return res; + } + + // Open a new file descriptor if needed + if (entry->data.on_disk) { + char path[store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + lsm_str_ptr(entry->key)); + + FILE *f = fopen(path, "wb"); + + if (f == NULL) { + free(handle); + + return lsm_error_failed_io; + } + + handle->f = f; + } + + handle->wrapper = wrapper; + *out = handle; + + return lsm_error_ok; +} + +lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, + lsm_str *key) { + // TODO what happens when two inserts to the same key happen at the same time? lsm_entry_wrapper *wrapper; LSM_RES(lsm_entry_wrapper_init(&wrapper)); + pthread_rwlock_wrlock(&wrapper->lock); + + lsm_error res = lsm_trie_insert(store->trie, key, wrapper); + + // Check if entry isn't already present in advance + if (res != lsm_error_ok) { + lsm_entry_wrapper_free(wrapper); + + return res; + } lsm_entry *entry; LSM_RES(lsm_entry_init(&entry)); entry->key = key; wrapper->entry = entry; - pthread_rwlock_wrlock(&wrapper->lock); - // TODO mem leak if already present - LSM_RES(lsm_trie_insert(store->trie, key, wrapper)); + lsm_entry_handle *handle; + LSM_RES(lsm_entry_handle_init(&handle)); - *out = entry; + // No need to set the handle's file, as the entry doesn't have any data yet + handle->wrapper = wrapper; + + *out = handle; return lsm_error_ok; } -lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, - lsm_str *data) { +lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, + lsm_str *data) { + lsm_entry *entry = handle->wrapper->entry; + uint64_t new_len = entry->data.len + lsm_str_len(data); const char *data_s = lsm_str_ptr(data); @@ -124,13 +209,13 @@ lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), lsm_str_ptr(entry->key)); - FILE *f = fopen(path, "w"); + FILE *f = fopen(path, "ab"); if (f == NULL) { return lsm_error_failed_io; } - entry->data.value.file = f; + handle->f = f; entry->data.on_disk = true; // TODO free old buff, write original data to file @@ -141,7 +226,7 @@ lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, // TODO what happens when I/O fails? while (written < data->len) { written += fwrite(&data_s[written], sizeof(char), data->len - written, - entry->data.value.file); + handle->f); } } diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 4a83be2..ffbc138 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -32,16 +32,40 @@ lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) { return lsm_error_ok; } -bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type) { - return (entry->attrs.bitmap & type) != 0; +void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); } + +lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { + lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle)); + + if (handle == NULL) { + return lsm_error_failed_alloc; + } + + *out = handle; + + return lsm_error_ok; } -lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, +void lsm_entry_close(lsm_entry_handle *handle) { + if (handle->f != NULL) { + fclose(handle->f); + } + + pthread_rwlock_unlock(&handle->wrapper->lock); + free(handle); +} + +bool lsm_entry_attr_present(lsm_entry_handle *handle, lsm_attr_type type) { + return (handle->wrapper->entry->attrs.bitmap & type) != 0; +} + +lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, lsm_attr_type type) { - if (!lsm_entry_attr_present(entry, type)) { + if (!lsm_entry_attr_present(handle, type)) { return lsm_error_not_found; } + lsm_entry *entry = handle->wrapper->entry; uint64_t i = 0; while (entry->attrs.items[i].type != type) { @@ -53,12 +77,14 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, +lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, lsm_attr_type type) { - if (!lsm_entry_attr_present(entry, type)) { + if (!lsm_entry_attr_present(handle, type)) { return lsm_error_not_found; } + lsm_entry *entry = handle->wrapper->entry; + if (entry->attrs.count == 1) { *out = entry->attrs.items[0].str; @@ -99,12 +125,14 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type, +lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type, lsm_str *data) { - if (lsm_entry_attr_present(entry, type)) { + if (lsm_entry_attr_present(handle, type)) { return lsm_error_already_present; } + lsm_entry *entry = handle->wrapper->entry; + lsm_attr *new_attrs = realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr));