diff --git a/lsm/include/lsm.h b/lsm/include/lsm.h index 078a57f..0a1639f 100644 --- a/lsm/include/lsm.h +++ b/lsm/include/lsm.h @@ -19,8 +19,6 @@ } \ } -#define LSM_MAX(x, y) ((x) > (y) ? (x) : (y)) - typedef enum lsm_error { lsm_error_ok = 0, lsm_error_failed_alloc = 1, diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index f85192c..6018a5e 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -9,22 +9,96 @@ #define LSM_STORE_DATA_LEVELS 3 -/** - * Read-only handle to an entry in the store - */ -typedef struct lsm_read_handle lsm_read_handle; - -/** - * Writeable handle to an entry in the store - */ -typedef struct lsm_write_handle lsm_write_handle; - /** * A handle referencing an entry inside a store. Read/write operations from/to * the entry go through this handle. */ typedef struct lsm_entry_handle lsm_entry_handle; +/** + * Checks whether the entry has an attribute with the specified type. + * + * @param entry entry to check + * @param type type of attribute to check for + */ +bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type); + +/** + * Retrieve the contents of an attribute from an entry, if present + * + * @param out where to store pointer to attribute data + * @param entry entry to search for + * @param type type of attribute to return + */ +lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, + uint8_t type); + +/** + * Convenience wrapper around `lsm_entry_attr_get` that can be used if we know + * beforehand the attribute value is a 64-bit number. + * + * @param out where to store attribute data + * @param entry entry to search for + * @param type type of attribute to return + */ +lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle, + uint8_t type); + +/** + * Convenience wrapper around `lsm_entry_attr_get` that can be used if we know + * beforehand the attribute value is an 8-bit number. + * + * @param out where to store attribute data + * @param entry entry to search for + * @param type type of attribute to return + */ +lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle, + uint8_t type); + +/** + * Add a new attribute to the entry. This overwrites an existing version of this + * attribute. + * + * @param entry entry to modify + * @param type type of attribute to add + * @param data data of attribute; ownership of pointer is taken over + */ +lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type, + lsm_str *data); + +/** + * Convenience wrapper around `lsm_entry_attr_insert` that can be used if the + * data to be stored is a 64-bit number. + * + * @param entry entry to modify + * @param type type of attribute to add + * @param data data of attribute + */ +lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type, + uint64_t data); + +/** + * Convenience wrapper around `lsm_entry_attr_insert` that can be used if the + * data to be stored is an 8-bit number. + * + * @param entry entry to modify + * @param type type of attribute to add + * @param data data of attribute + */ +lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type, + uint8_t data); + +/** + * Remove an atribute from the given entry, if present. + * + * @param out pointer to store removed data pointer in. If NULL, data pointer + * will be leaked. + * @param entry entry to remove attribute from + * @param type type of attribute to remove + */ +lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, + uint8_t type); + /** * A store consisting of LSM entries. * @@ -71,7 +145,7 @@ void lsm_store_free(lsm_store *store); * @param store store to retrieve entry from * @param key key to search */ -lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store, +lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, const lsm_str *key); /** @@ -83,7 +157,7 @@ lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store, * @param store store to retrieve entry from * @param key key to search */ -lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store, +lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, const lsm_str *key); /** @@ -153,8 +227,8 @@ lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data, * @param handle entry handle to read from * @param len how many bytes to read at most */ -lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_read_handle *handle, - uint64_t len); +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, + lsm_entry_handle *handle, uint64_t len); /** * Return the length of the entry's data. diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index 2b140bb..1b84b60 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -48,11 +48,6 @@ lsm_error lsm_entry_init(lsm_entry **ptr); */ void lsm_entry_free(lsm_entry *entry); -/** - * Allocates a new entry that's identical to the provided one. - */ -lsm_error lsm_entry_clone(lsm_entry **out, const lsm_entry *entry); - /** * Deallocate an existing lsm_entry object. * @@ -77,7 +72,6 @@ typedef enum lsm_entry_handle_state : uint8_t { struct lsm_entry_handle { lsm_entry_wrapper *wrapper; lsm_store *store; - // Either read or append, depending on how it was opened FILE *f; // Current position in the file pointer @@ -139,22 +133,20 @@ lsm_error lsm_entry_disk_update(lsm_entry_handle *handle); /** * Return the length of the path to this entry's data file */ -uint64_t lsm_entry_data_path_len(const lsm_store *store, - const lsm_entry *entry); +uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle); /** * Fill in the entry's data file path in the provided buffer. Use * `lsm_entry_data_path_len` to allocate an appropriately-sized buffer */ -void lsm_entry_data_path(char *path, const lsm_store *store, - const lsm_entry *entry); +void lsm_entry_data_path(char *buf, const lsm_entry_handle *handle); /** * Open the entry's data file for reading * * @param handle handle to the entry */ -lsm_error lsm_entry_data_open_read(lsm_read_handle *handle); +lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle); /** * Open the entry's data file for writing. The file and all subdirectories in @@ -164,134 +156,12 @@ lsm_error lsm_entry_data_open_read(lsm_read_handle *handle); */ lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle); -lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store, - const lsm_entry *entry, const char *mode); -lsm_error lsm_entry_data_mkdirs(const lsm_store *store, const lsm_entry *entry); - /** * Remove the entry's data file if present and close its handle. Any uncommitted * changes will be reverted. * * @param handle handle to the entry */ -lsm_error lsm_entry_data_remove(const lsm_store *store, const lsm_entry *entry); - -/** - * Checks whether the entry has an attribute with the specified type. - * - * @param entry entry to check - * @param type type of attribute to check for - */ -bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type); - -/** - * Retrieve the contents of an attribute from an entry, if present - * - * @param out where to store pointer to attribute data - * @param entry entry to search for - * @param type type of attribute to return - */ -lsm_error lsm_entry_attr_get(lsm_str **out, const lsm_entry *entry, - uint8_t type); - -/** - * Convenience wrapper around `lsm_entry_attr_get` that can be used if we know - * beforehand the attribute value is a 64-bit number. - * - * @param out where to store attribute data - * @param entry entry to search for - * @param type type of attribute to return - */ -lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry, - uint8_t type); - -/** - * Convenience wrapper around `lsm_entry_attr_get` that can be used if we know - * beforehand the attribute value is an 8-bit number. - * - * @param out where to store attribute data - * @param entry entry to search for - * @param type type of attribute to return - */ -lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry, - uint8_t type); - -/** - * Add a new attribute to the entry. This overwrites an existing version of this - * attribute. - * - * @param entry entry to modify - * @param type type of attribute to add - * @param data data of attribute; ownership of pointer is taken over - */ -lsm_error lsm_entry_attr_insert(lsm_entry *entry, uint8_t type, lsm_str *data); - -/** - * Convenience wrapper around `lsm_entry_attr_insert` that can be used if the - * data to be stored is a 64-bit number. - * - * @param entry entry to modify - * @param type type of attribute to add - * @param data data of attribute - */ -lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry *entry, uint8_t type, - uint64_t data); - -/** - * Convenience wrapper around `lsm_entry_attr_insert` that can be used if the - * data to be stored is an 8-bit number. - * - * @param entry entry to modify - * @param type type of attribute to add - * @param data data of attribute - */ -lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry *entry, uint8_t type, - uint8_t data); - -/** - * Remove an atribute from the given entry, if present. - * - * @param out pointer to store removed data pointer in. If NULL, data pointer - * will be leaked. - * @param entry entry to remove attribute from - * @param type type of attribute to remove - */ -lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, uint8_t type); - -/**************************************** - *************** Handles *************** - ***************************************/ - -struct lsm_read_handle { - lsm_entry_wrapper *wrapper; - lsm_store *store; - - struct { - FILE *f; - uint64_t pos; - } data; -}; - -struct lsm_write_handle { - lsm_entry_wrapper *wrapper; - lsm_store *store; - - lsm_entry *dirty; - - struct { - FILE *f; - uint64_t pos; - } data; -}; - -/** - * Allocate a new `lsm_read_handle` object - */ -lsm_error lsm_read_handle_init(lsm_read_handle **out); - -/** - * Allocate a new `lsm_write_handle` object - */ -lsm_error lsm_write_handle_init(lsm_write_handle **out); +lsm_error lsm_entry_data_remove(lsm_entry_handle *handle); #endif diff --git a/lsm/src/store/lsm_handle_read.c b/lsm/src/store/lsm_handle_read.c deleted file mode 100644 index d750196..0000000 --- a/lsm/src/store/lsm_handle_read.c +++ /dev/null @@ -1,75 +0,0 @@ -#include - -#include "lsm/store_internal.h" - -lsm_error lsm_read_handle_init(lsm_read_handle **out) { - lsm_read_handle *handle = calloc(1, sizeof(lsm_read_handle)); - - if (handle == NULL) { - return lsm_error_failed_alloc; - } - - *out = handle; - - return lsm_error_ok; -} - -bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type) { - return (handle->wrapper->entry->attrs.bitmap[type / 64] & - (((uint64_t)1) << (type % 64))) != 0; -} - -lsm_error lsm_read_attr_get(lsm_str **out, lsm_read_handle *handle, - uint8_t type) { - return lsm_entry_attr_get(out, handle->wrapper->entry, type); -} - -lsm_error lsm_read_attr_get_uint64_t(uint64_t *out, lsm_read_handle *handle, - uint8_t type) { - return lsm_entry_attr_get_uint64_t(out, handle->wrapper->entry, type); -} - -lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, lsm_read_handle *handle, - uint8_t type) { - return lsm_entry_attr_get_uint8_t(out, handle->wrapper->entry, type); -} - -lsm_error lsm_read_data_read(uint64_t *out, char *buf, lsm_read_handle *handle, - uint64_t len) { - const lsm_entry *entry = handle->wrapper->entry; - - if (entry->data_len == 0) { - *out = 0; - - return lsm_error_ok; - } - - // Entries don't open their file unless needed - if (handle->data.f == NULL) { - LSM_RES(lsm_entry_data_open(&handle->data.f, handle->store, - handle->wrapper->entry, "rb")); - } - - uint64_t read; - - read = fread(buf, sizeof(char), len, handle->data.f); - - if ((read == 0) && (ferror(handle->data.f) != 0)) { - return lsm_error_failed_io; - } - - handle->data.pos += read; - *out = read; - - return lsm_error_ok; -} - -void lsm_read_close(lsm_read_handle *handle) { - if (handle->data.f != NULL) { - fclose(handle->data.f); - handle->data.f = NULL; - } - - pthread_rwlock_unlock(&handle->wrapper->lock); - free(handle); -} diff --git a/lsm/src/store/lsm_handle_write.c b/lsm/src/store/lsm_handle_write.c deleted file mode 100644 index 33965fc..0000000 --- a/lsm/src/store/lsm_handle_write.c +++ /dev/null @@ -1,114 +0,0 @@ -#include - -#include "lsm/store_internal.h" - -lsm_error lsm_write_handle_init(lsm_write_handle **out) { - lsm_write_handle *handle = calloc(1, sizeof(lsm_write_handle)); - - if (handle == NULL) { - return lsm_error_failed_alloc; - } - - *out = handle; - - return lsm_error_ok; -} - -bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type) { - return (handle->wrapper->entry->attrs.bitmap[type / 64] & - (((uint64_t)1) << (type % 64))) != 0; -} - -lsm_error lsm_write_attr_get(const lsm_str **out, - const lsm_write_handle *handle, uint8_t type) { - lsm_entry *entry = - handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; - return lsm_entry_attr_get(out, entry, type); -} - -lsm_error lsm_write_attr_get_uint64_t(uint64_t *out, - const lsm_write_handle *handle, - uint8_t type) { - lsm_entry *entry = - handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; - return lsm_entry_attr_get_uint64_t(out, entry, type); -} - -lsm_error lsm_write_attr_get_uint8_t(uint8_t *out, - const lsm_write_handle *handle, - uint8_t type) { - lsm_entry *entry = - handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; - return lsm_entry_attr_get_uint8_t(out, entry, type); -} - -lsm_error lsm_write_attr_remove(lsm_str **out, lsm_write_handle *handle, - uint8_t type) { - if (handle->dirty == NULL) { - LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); - } - - return lsm_entry_attr_remove(out, handle->dirty, type); -} - -lsm_error lsm_write_attr_insert(lsm_write_handle *handle, uint8_t type, - lsm_str *data) { - if (handle->dirty == NULL) { - LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); - } - - return lsm_entry_attr_insert(handle->dirty, type, data); -} - -lsm_error lsm_write_attr_insert_uint64_t(lsm_write_handle *handle, uint8_t type, - uint64_t data) { - if (handle->dirty == NULL) { - LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); - } - - return lsm_entry_attr_insert_uint64_t(handle->dirty, type, data); -} - -lsm_error lsm_write_attr_insert_uint8_t(lsm_write_handle *handle, uint8_t type, - uint8_t data) { - if (handle->dirty == NULL) { - LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); - } - - return lsm_entry_attr_insert_uint8_t(handle->dirty, type, data); -} - -lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data) { - if (lsm_str_len(data) == 0) { - return lsm_error_ok; - } - - if (handle->dirty == NULL) { - LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); - } - - lsm_entry *entry = handle->dirty; - - uint64_t new_len = entry->data_len + lsm_str_len(data); - const char *data_s = lsm_str_ptr(data); - - // Entries don't open their file unless needed - if (handle->data.f == NULL) { - // An entry with no existing data will not have a data file yet, so we set - // create to true then - LSM_RES(lsm_entry_data_mkdirs(handle->store, entry)); - LSM_RES(lsm_entry_data_open(&handle->data.f, handle->store, entry, "ab")); - } - - size_t written = 0; - - // TODO what happens when I/O fails? - while (written < data->len) { - written += fwrite(&data_s[written], sizeof(char), data->len - written, - handle->data.f); - } - - entry->data_len = new_len; - - return lsm_error_ok; -} diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index d82a669..7b3ed36 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -28,7 +28,7 @@ uint64_t lsm_store_size(const lsm_store *store) { return lsm_trie_size(store->trie); } -lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store, +lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, const lsm_str *key) { lsm_entry_wrapper *wrapper; @@ -47,8 +47,8 @@ lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store, return lsm_error_not_found; } - lsm_read_handle *handle; - LSM_RES2(lsm_read_handle_init(&handle), + lsm_entry_handle *handle; + LSM_RES2(lsm_entry_handle_init(&handle), pthread_rwlock_unlock(&wrapper->lock)); handle->wrapper = wrapper; @@ -58,7 +58,7 @@ lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store, return lsm_error_ok; } -lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store, +lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, const lsm_str *key) { lsm_entry_wrapper *wrapper; LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); @@ -77,8 +77,8 @@ lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store, return lsm_error_not_found; } - lsm_write_handle *handle; - LSM_RES2(lsm_write_handle_init(&handle), + lsm_entry_handle *handle; + LSM_RES2(lsm_entry_handle_init(&handle), pthread_rwlock_unlock(&wrapper->lock)); handle->wrapper = wrapper; @@ -138,3 +138,63 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, void lsm_entry_remove(lsm_entry_handle *handle) { handle->states |= lsm_entry_handle_state_removed; } + +lsm_error lsm_entry_data_append(lsm_entry_handle *handle, const lsm_str *data) { + if (lsm_str_len(data) == 0) { + return lsm_error_ok; + } + + lsm_entry *entry = handle->wrapper->entry; + + uint64_t new_len = entry->data_len + lsm_str_len(data); + const char *data_s = lsm_str_ptr(data); + + // Entries don't open their file unless needed + if (handle->f == NULL) { + // An entry with no existing data will not have a data file yet, so we set + // create to true then + LSM_RES(lsm_entry_data_open_write(handle)); + } + + size_t written = 0; + + // TODO what happens when I/O fails? + while (written < data->len) { + written += + fwrite(&data_s[written], sizeof(char), data->len - written, handle->f); + } + + entry->data_len = new_len; + handle->states |= lsm_entry_handle_state_updated; + + return lsm_error_ok; +} + +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, + lsm_entry_handle *handle, uint64_t len) { + const lsm_entry *entry = handle->wrapper->entry; + + if (entry->data_len == 0) { + *out = 0; + + return lsm_error_ok; + } + + // Entries don't open their file unless needed + if (handle->f == NULL) { + LSM_RES(lsm_entry_data_open_read(handle)); + } + + uint64_t read; + + read = fread(buf, sizeof(char), len, handle->f); + + if ((read == 0) && (ferror(handle->f) != 0)) { + return lsm_error_failed_io; + } + + handle->pos += read; + *out = read; + + return lsm_error_ok; +} diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index b49e9bc..721b4f3 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -163,14 +163,14 @@ lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos, LSM_RES(lsm_fseek(store->db.f, pos)); lsm_str *key; - LSM_RES(lsm_entry_read_str(&key, NULL, store->db.f)); + LSM_RES(lsm_entry_read_str(&key, &store->db.size, store->db.f)); lsm_entry_handle *handle; LSM_RES(lsm_store_insert(&handle, store, key)); - LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db.f, - sizeof(uint64_t), 1)); - LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db.f)); + LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, &store->db.size, + store->db.f, sizeof(uint64_t), 1)); + LSM_RES(lsm_entry_read_attrs(&store->db.size, handle, store->db.f)); handle->wrapper->entry->idx_file_offset = idx_file_offset; @@ -189,8 +189,6 @@ lsm_error lsm_store_load_db(lsm_store *store) { LSM_RES(lsm_fread(&store->idx.block_count, &store->idx.size, store->idx.f, sizeof(uint64_t), 1)); - uint64_t db_file_size = 0; - for (uint64_t i = 0; i < store->idx.block_count; i++) { uint64_t idx_file_offset = store->idx.size; @@ -203,14 +201,7 @@ lsm_error lsm_store_load_db(lsm_store *store) { } LSM_RES(lsm_store_insert_from_db(store, db_dim[0], idx_file_offset)); - - // The non-zeroed entry with the largest index determines the actual size of - // the file. This way, any zeroed entries at the end of the file can be - // overwritten. - db_file_size = LSM_MAX(db_file_size, db_dim[0] + db_dim[1]); } - store->db.size = db_file_size; - return lsm_error_ok; } diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index ac30271..2d73beb 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -30,35 +30,6 @@ void lsm_entry_free(lsm_entry *entry) { free(entry); } -lsm_error lsm_entry_clone(lsm_entry **out, const lsm_entry *entry) { - lsm_entry *new; - LSM_RES(lsm_entry_init(&new)); - - lsm_str_init_copy_n(&new->key, lsm_str_ptr(entry->key), - lsm_str_len(entry->key)); - - for (int i = 0; i < 4; i++) { - new->attrs.bitmap[i] = entry->attrs.bitmap[i]; - } - - new->attrs.count = entry->attrs.count; - new->attrs.items = malloc(sizeof(lsm_attr *) * entry->attrs.count); - - for (size_t i = 0; i < entry->attrs.count; i++) { - new->attrs.items[i].type = entry->attrs.items[i].type; - lsm_str_init_copy_n(&new->attrs.items[i].str, - lsm_str_ptr(entry->attrs.items[i].str), - lsm_str_len(entry->attrs.items[i].str)); - } - - new->data_len = entry->data_len; - new->idx_file_offset = entry->idx_file_offset; - - *out = new; - - return lsm_error_ok; -} - lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) { lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper)); @@ -137,16 +108,18 @@ void lsm_entry_close(lsm_entry_handle *handle) { free(handle); } -bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type) { - return (entry->attrs.bitmap[type / 64] & (((uint64_t)1) << (type % 64))) != 0; +bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type) { + return (handle->wrapper->entry->attrs.bitmap[type / 64] & + (((uint64_t)1) << (type % 64))) != 0; } -lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry, +lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, uint8_t type) { - if (!lsm_entry_attr_present(entry, type)) { + if (!lsm_entry_attr_present(handle, type)) { return lsm_error_not_found; } + lsm_entry *entry = handle->wrapper->entry; uint64_t i = 0; while (entry->attrs.items[i].type != type) { @@ -158,11 +131,11 @@ lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry, +lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle, uint8_t type) { lsm_str *s; - LSM_RES(lsm_entry_attr_get(&s, entry, type)); + LSM_RES(lsm_entry_attr_get(&s, handle, type)); uint64_t num = 0; @@ -175,22 +148,25 @@ lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry, +lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle, uint8_t type) { lsm_str *s; - LSM_RES(lsm_entry_attr_get(&s, entry, type)); + LSM_RES(lsm_entry_attr_get(&s, handle, type)); *out = lsm_str_char(s, 0); return lsm_error_ok; } -lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, uint8_t type) { - if (!lsm_entry_attr_present(entry, type)) { +lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, + uint8_t type) { + if (!lsm_entry_attr_present(handle, type)) { return lsm_error_not_found; } + lsm_entry *entry = handle->wrapper->entry; + if (entry->attrs.count == 1) { *out = entry->attrs.items[0].str; @@ -228,16 +204,21 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, uint8_t type) { entry->attrs.count--; entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64)); + handle->states |= lsm_entry_handle_state_updated; + return lsm_error_ok; } -lsm_error lsm_entry_attr_insert(lsm_entry *entry, uint8_t type, lsm_str *data) { +lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type, + lsm_str *data) { // Remove a previous version of the attribute lsm_str *out; - if (lsm_entry_attr_remove(&out, entry, type) == lsm_error_ok) { + if (lsm_entry_attr_remove(&out, handle, type) == lsm_error_ok) { lsm_str_free(out); } + lsm_entry *entry = handle->wrapper->entry; + lsm_attr *new_attrs = realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr)); @@ -252,35 +233,36 @@ lsm_error lsm_entry_attr_insert(lsm_entry *entry, uint8_t type, lsm_str *data) { entry->attrs.count++; entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64); + handle->states |= lsm_entry_handle_state_updated; + return lsm_error_ok; } -lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry *entry, uint8_t type, +lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type, uint64_t data) { lsm_str *s; LSM_RES( lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char))); - return lsm_entry_attr_insert(entry, type, s); + return lsm_entry_attr_insert(handle, type, s); } -lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry *entry, uint8_t type, +lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type, uint8_t data) { lsm_str *s; LSM_RES( lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint8_t) / sizeof(char))); - return lsm_entry_attr_insert(entry, type, s); + return lsm_entry_attr_insert(handle, type, s); } uint64_t lsm_entry_data_len(lsm_entry_handle *handle) { return handle->wrapper->entry->data_len; } -uint64_t lsm_entry_data_path_len(const lsm_store *store, - const lsm_entry *entry) { - const lsm_str *data_path = store->data_path; - const lsm_str *key = entry->key; +uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle) { + const lsm_str *data_path = handle->store->data_path; + const lsm_str *key = handle->wrapper->entry->key; uint8_t levels = key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS; @@ -289,10 +271,9 @@ uint64_t lsm_entry_data_path_len(const lsm_store *store, strlen(LSM_DATA_FILE_SUFFIX); } -void lsm_entry_data_path(char *path, const lsm_store *store, - const lsm_entry *entry) { - const lsm_str *data_path = store->data_path; - const lsm_str *key = entry->key; +void lsm_entry_data_path(char *path, const lsm_entry_handle *handle) { + const lsm_str *data_path = handle->store->data_path; + const lsm_str *key = handle->wrapper->entry->key; uint8_t levels = key->len > LSM_STORE_DATA_LEVELS ? LSM_STORE_DATA_LEVELS : key->len; @@ -317,13 +298,12 @@ void lsm_entry_data_path(char *path, const lsm_store *store, strcpy(&path[index], LSM_DATA_FILE_SUFFIX); } -lsm_error lsm_entry_data_mkdirs(const lsm_store *store, - const lsm_entry *entry) { - char path[lsm_entry_data_path_len(store, entry) + 1]; - lsm_entry_data_path(path, store, entry); +lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle) { + char path[lsm_entry_data_path_len(handle) + 1]; + lsm_entry_data_path(path, handle); - const lsm_str *data_path = store->data_path; - const lsm_str *key = entry->key; + const lsm_str *data_path = handle->store->data_path; + const lsm_str *key = handle->wrapper->entry->key; uint8_t levels = key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS; @@ -340,32 +320,43 @@ lsm_error lsm_entry_data_mkdirs(const lsm_store *store, path[data_path->len + 2 * (i + 1)] = '/'; } - return lsm_error_ok; -} - -lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store, - const lsm_entry *entry, const char *mode) { - char path[lsm_entry_data_path_len(store, entry) + 1]; - lsm_entry_data_path(path, store, entry); - - FILE *f = fopen(path, mode); + FILE *f = fopen(path, "ab"); if (f == NULL) { return lsm_error_failed_io; } - if (out != NULL) { - *out = f; - } + handle->f = f; return lsm_error_ok; } -lsm_error lsm_entry_data_remove(const lsm_store *store, - const lsm_entry *entry) { +lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle) { + char path[lsm_entry_data_path_len(handle) + 1]; + lsm_entry_data_path(path, handle); + + FILE *f = fopen(path, "rb"); + + if (f == NULL) { + return lsm_error_failed_io; + } + + handle->f = f; + + return lsm_error_ok; +} + +lsm_error lsm_entry_data_remove(lsm_entry_handle *handle) { + const lsm_entry *entry = handle->wrapper->entry; + if (entry->data_len > 0) { - char data_path[lsm_entry_data_path_len(store, entry) + 1]; - lsm_entry_data_path(data_path, store, entry); + if (handle->f != NULL) { + fclose(handle->f); + handle->f = NULL; + } + + char data_path[lsm_entry_data_path_len(handle) + 1]; + lsm_entry_data_path(data_path, handle); if (remove(data_path) != 0) { return lsm_error_failed_io;