diff --git a/lsm/example/test.c b/lsm/example/test.c index bd78f2e..e1e3b69 100644 --- a/lsm/example/test.c +++ b/lsm/example/test.c @@ -17,7 +17,7 @@ int main() { lsm_entry_handle *handle; assert(lsm_store_insert(&handle, store, key) == lsm_error_ok); - + lsm_str *attr; lsm_str_init_copy(&attr, "some attribute value"); lsm_entry_attr_insert(handle, lsm_attr_type_content_type, attr); diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index 72334c6..7518059 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -52,8 +52,7 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, * @param entry entry to search for * @param type type of attribute to return */ -lsm_error lsm_entry_attr_get_num(uint64_t *out, lsm_entry_handle *handle, - lsm_attr_type type); +lsm_error lsm_entry_attr_get_num(uint64_t *out, lsm_entry_handle *handle, lsm_attr_type type); /** * Add a new attribute to the entry. @@ -73,8 +72,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type, * @param type type of attribute to add * @param data data of attribute */ -lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, - lsm_attr_type type, uint64_t data); +lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, lsm_attr_type type, uint64_t data); /** * Remove an atribute from the given entry, if present. @@ -180,8 +178,7 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, * @param data data to append * @param len length of data array */ -lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, - char *data, uint64_t len); +lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, char *data, uint64_t len); /** * Read a number of bytes from the entry's data field. The position from which diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 2345cb8..172c6d2 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -30,6 +30,62 @@ lsm_error lsm_store_init(lsm_store **ptr) { return lsm_error_ok; } +lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) { + lsm_store *store; + LSM_RES(lsm_store_init(&store)); + + // Try to open an existing db file or create a new one otherwise + // This shit is why I need to improve the str library + char db_file_path[lsm_str_len(data_path) + strlen(LSM_DB_FILE_NAME) + 2]; + memcpy(db_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char)); + sprintf(&db_file_path[lsm_str_len(data_path)], "/%s", LSM_DB_FILE_NAME); + + FILE *db_file = fopen(db_file_path, "r+b"); + + if (db_file == NULL) { + db_file = fopen(db_file_path, "wb"); + + if (db_file == NULL) { + return lsm_error_failed_io; + } + } + + // Same for idx file + char idx_file_path[lsm_str_len(data_path) + strlen(LSM_IDX_FILE_NAME) + 2]; + memcpy(idx_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char)); + sprintf(&idx_file_path[lsm_str_len(data_path)], "/%s", LSM_IDX_FILE_NAME); + + FILE *idx_file = fopen(idx_file_path, "r+b"); + + if (idx_file == NULL) { + idx_file = fopen(idx_file_path, "wb"); + + if (idx_file == NULL) { + return lsm_error_failed_io; + } + + // The database code expects the idx file to start with how many blocks it + // contains, so we write that here + uint64_t num = 0; + + if (fwrite(&num, sizeof(uint64_t), 1, idx_file) == 0) { + return lsm_error_failed_io; + } + + fflush(idx_file); + } + + store->data_path = data_path; + store->db_file = db_file; + store->idx_file = idx_file; + + LSM_RES(lsm_store_load_db(store)); + + *ptr = store; + + return lsm_error_ok; +} + lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, lsm_str *key) { lsm_entry_wrapper *wrapper; @@ -144,8 +200,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, // If a key was previously removed from the trie, the wrapper will already be // present in the trie - if (lsm_trie_search((void **)&wrapper, store->trie, key) == - lsm_error_not_found) { + if (lsm_trie_search((void **)&wrapper, store->trie, key) == lsm_error_not_found) { LSM_RES(lsm_entry_wrapper_init(&wrapper)); pthread_rwlock_wrlock(&wrapper->lock); @@ -167,6 +222,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, } } + lsm_entry *entry; LSM_RES(lsm_entry_init(&entry)); diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c deleted file mode 100644 index 4c76b76..0000000 --- a/lsm/src/store/lsm_store_disk_read.c +++ /dev/null @@ -1,183 +0,0 @@ -#include -#include - -#include "lsm/store_internal.h" - -lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) { - lsm_store *store; - LSM_RES(lsm_store_init(&store)); - - // Try to open an existing db file or create a new one otherwise - // This shit is why I need to improve the str library - char db_file_path[lsm_str_len(data_path) + strlen(LSM_DB_FILE_NAME) + 2]; - memcpy(db_file_path, lsm_str_ptr(data_path), - lsm_str_len(data_path) * sizeof(char)); - sprintf(&db_file_path[lsm_str_len(data_path)], "/%s", LSM_DB_FILE_NAME); - - FILE *db_file = fopen(db_file_path, "r+b"); - - if (db_file == NULL) { - // Create the file first, then reopen it in extended read - db_file = fopen(db_file_path, "wb"); - - if (db_file == NULL) { - return lsm_error_failed_io; - } - - fclose(db_file); - - FILE *db_file = fopen(db_file_path, "r+b"); - - if (db_file == NULL) { - return lsm_error_failed_io; - } - } - - // Same for idx file - char idx_file_path[lsm_str_len(data_path) + strlen(LSM_IDX_FILE_NAME) + 2]; - memcpy(idx_file_path, lsm_str_ptr(data_path), - lsm_str_len(data_path) * sizeof(char)); - sprintf(&idx_file_path[lsm_str_len(data_path)], "/%s", LSM_IDX_FILE_NAME); - - FILE *idx_file = fopen(idx_file_path, "r+b"); - - if (idx_file == NULL) { - // Create the file first - idx_file = fopen(idx_file_path, "wb"); - - if (idx_file == NULL) { - return lsm_error_failed_io; - } - - // The database code expects the idx file to start with how many blocks it - // contains, so we write that here - uint64_t num = 0; - - if (fwrite(&num, sizeof(uint64_t), 1, idx_file) == 0) { - return lsm_error_failed_io; - } - - fflush(idx_file); - fclose(idx_file); - - // If opening it in extended read mode still fails now, there's a problem - FILE *idx_file = fopen(idx_file_path, "r+b"); - - if (idx_file == NULL) { - return lsm_error_failed_io; - } - } - - store->data_path = data_path; - store->db_file = db_file; - store->idx_file = idx_file; - - LSM_RES(lsm_store_load_db(store)); - - *ptr = store; - - return lsm_error_ok; -} - -static lsm_error lsm_entry_read_attrs(lsm_entry_handle *handle, FILE *db_file) { - uint64_t attr_count; - size_t res = fread(&attr_count, sizeof(uint64_t), 1, db_file); - - if (res == 0) { - return lsm_error_failed_io; - } - - // attr_type, val_len - uint64_t nums[2]; - lsm_str *val; - - for (uint64_t i = 0; i < attr_count; i++) { - res = fread(nums, sizeof(uint64_t), 2, db_file); - - if (res < 2) { - return lsm_error_failed_io; - } - - char *val_s = malloc(nums[1] + 1); - val_s[nums[1]] = '\0'; - - if (val_s == NULL) { - return lsm_error_failed_alloc; - } - - uint64_t read = 0; - - while (read < nums[1]) { - read += fread(&val_s[read], 1, nums[1] - read, db_file); - } - - LSM_RES(lsm_str_init(&val, val_s)); - ; - lsm_entry_attr_insert(handle, nums[0], val); - } - - return lsm_error_ok; -} - -lsm_error lsm_store_load_db(lsm_store *store) { - uint64_t key_len; - uint64_t db_dim[2]; - lsm_str *key; - lsm_entry_handle *handle; - - rewind(store->idx_file); - - // idx file starts with block count - size_t res = - fread(&store->idx_file_block_count, sizeof(uint64_t), 1, store->idx_file); - - if (res == 0) { - return lsm_error_failed_io; - } - - store->idx_file_size += sizeof(uint64_t); - - for (uint64_t i = 0; i < store->idx_file_block_count; i++) { - // Read in idx metadata - res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file); - - if (res == 0) { - return lsm_error_failed_io; - } - - char *key_s = malloc(key_len + 1); - key_s[key_len] = '\0'; - - if (key_s == NULL) { - return lsm_error_failed_alloc; - } - - res = fread(key_s, 1, key_len, store->idx_file); - - if (res < key_len) { - return lsm_error_failed_io; - } - - res = fread(db_dim, sizeof(uint64_t), 2, store->idx_file); - - if (res < 2) { - return lsm_error_failed_io; - } - - LSM_RES(lsm_str_init(&key, key_s)); - LSM_RES(lsm_store_insert(&handle, store, key)); - - // Read attributes from database file - if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { - return lsm_error_failed_io; - } - - LSM_RES(lsm_entry_read_attrs(handle, store->db_file)); - lsm_entry_close(handle); - - store->idx_file_size += 3 * sizeof(uint64_t) + key_len; - store->db_file_size += db_dim[1]; - } - - return lsm_error_ok; -} diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c deleted file mode 100644 index 9813f40..0000000 --- a/lsm/src/store/lsm_store_disk_write.c +++ /dev/null @@ -1,112 +0,0 @@ -#include "lsm/store_internal.h" - -static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) { - size_t res = fwrite(&num, sizeof(uint64_t), 1, f); - - // Such a small write should succeed in one go - if (res == 0) { - return lsm_error_failed_io; - } - - return lsm_error_ok; -} - -static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) { - uint64_t to_write = lsm_str_len(s); - uint64_t written = 0; - - do { - written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f); - } while (written < to_write); - - return lsm_error_ok; -} - -static lsm_error lsm_seek(FILE *f, uint64_t pos) { - if (fseek(f, pos, SEEK_SET) != 0) { - return lsm_error_failed_io; - } - - return lsm_error_ok; -} - -lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, - uint64_t pos) { - LSM_RES(lsm_seek(db_file, pos)); - - // First we write how many attributes follow - LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count)); - *size = sizeof(uint64_t); - - for (uint64_t i = 0; i < entry->attrs.count; i++) { - // Write attribute type, length & value - LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.items[i].type)); - LSM_RES(lsm_entry_write_uint64_t(db_file, - lsm_str_len(entry->attrs.items[i].str))); - LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str)); - - *size += 2 * sizeof(uint64_t) + - lsm_str_len(entry->attrs.items[i].str) * sizeof(char); - } - - return lsm_error_ok; -} - -lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, - uint64_t offset, uint64_t len, uint64_t pos) { - LSM_RES(lsm_seek(idx_file, pos)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); - LSM_RES(lsm_entry_write_str(idx_file, entry->key)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); - LSM_RES(lsm_entry_write_uint64_t(idx_file, len)); - - *size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char); - - return lsm_error_ok; -} - -lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { - pthread_mutex_lock(&store->db_lock); - - uint64_t entry_size; - lsm_error res = lsm_entry_write_db( - &entry_size, store->db_file, handle->wrapper->entry, store->db_file_size); - fflush(store->db_file); - - if (res != lsm_error_ok) { - pthread_mutex_unlock(&store->db_lock); - - return res; - } - - uint64_t entry_index = store->db_file_size; - store->db_file_size += entry_size; - - pthread_mutex_unlock(&store->db_lock); - - // Append entry to index file - pthread_mutex_lock(&store->idx_lock); - - res = - lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, - entry_index, entry_size, store->idx_file_size); - - if (res == lsm_error_ok) { - // Update the counter at the beginning of the file - rewind(store->idx_file); - - uint64_t new_block_count = store->idx_file_block_count + 1; - - res = lsm_entry_write_uint64_t(store->idx_file, new_block_count); - - if (res == lsm_error_ok) { - store->idx_file_size += entry_size; - store->idx_file_block_count = new_block_count; - } - } - - fflush(store->idx_file); - pthread_mutex_unlock(&store->idx_lock); - - return res; -} diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 2878996..d7bbc40 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -78,8 +78,7 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle, return lsm_error_ok; } -lsm_error lsm_entry_attr_get_num(uint64_t *out, lsm_entry_handle *handle, - lsm_attr_type type) { +lsm_error lsm_entry_attr_get_num(uint64_t *out, lsm_entry_handle *handle, lsm_attr_type type) { lsm_str *s; LSM_RES(lsm_entry_attr_get(&s, handle, type)); @@ -168,11 +167,9 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type, return lsm_error_ok; } -lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, - lsm_attr_type type, uint64_t data) { +lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, lsm_attr_type type, uint64_t data) { lsm_str *s; - LSM_RES( - lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char))); + LSM_RES(lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char))); return lsm_entry_attr_insert(handle, type, s); } diff --git a/lsm/src/store/lsm_store_sync.c b/lsm/src/store/lsm_store_sync.c new file mode 100644 index 0000000..39e7658 --- /dev/null +++ b/lsm/src/store/lsm_store_sync.c @@ -0,0 +1,226 @@ +#include +#include +#include + +#include "lsm/store.h" +#include "lsm/store_internal.h" + +static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) { + size_t res = fwrite(&num, sizeof(uint64_t), 1, f); + + // Such a small write should succeed in one go + if (res == 0) { + return lsm_error_failed_io; + } + + return lsm_error_ok; +} + +static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) { + uint64_t to_write = lsm_str_len(s); + uint64_t written = 0; + + do { + written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f); + } while (written < to_write); + + return lsm_error_ok; +} + +lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { + // First we write how many attributes follow + LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count)); + *size = sizeof(uint64_t); + + for (uint64_t i = 0; i < entry->attrs.count; i++) { + // Write attribute type, length & value + LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.items[i].type)); + LSM_RES(lsm_entry_write_uint64_t(db_file, lsm_str_len(entry->attrs.items[i].str))); + LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str)); + + *size += 2 * sizeof(uint64_t) + lsm_str_len(entry->attrs.items[i].str) * sizeof(char); + } + + return lsm_error_ok; +} + +lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, uint64_t offset, uint64_t len) { + LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); + LSM_RES(lsm_entry_write_str(idx_file, entry->key)); + LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); + LSM_RES(lsm_entry_write_uint64_t(idx_file, len)); + + *size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char); + + return lsm_error_ok; +} + +lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { + pthread_mutex_lock(&store->db_lock); + + // Append entry to end of database file + if (fseek(store->db_file, store->db_file_size, SEEK_SET) != 0) { + pthread_mutex_unlock(&store->db_lock); + + return lsm_error_failed_io; + } + + uint64_t entry_size; + lsm_error res = lsm_entry_write_db(&entry_size, store->db_file, handle->wrapper->entry); + fflush(store->db_file); + + // TODO fsync db file? + + if (res != lsm_error_ok) { + pthread_mutex_unlock(&store->db_lock); + + return res; + } + + uint64_t entry_index = store->db_file_size; + store->db_file_size += entry_size; + + pthread_mutex_unlock(&store->db_lock); + + // Append entry to index file + pthread_mutex_lock(&store->idx_lock); + + if (fseek(store->idx_file, store->idx_file_size, SEEK_SET) != 0) { + printf("failed seek, %lu\n", store->idx_file_size); + pthread_mutex_unlock(&store->idx_lock); + + return lsm_error_failed_io; + } + + res = lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, entry_index, entry_size); + + if (res == lsm_error_ok) { + // Update the counter at the beginning of the file + uint64_t new_block_count = store->idx_file_block_count + 1; + + if (fseek(store->idx_file, 0, SEEK_SET) != 0) { + pthread_mutex_unlock(&store->idx_lock); + + return lsm_error_failed_io; + } + + size_t r = fwrite(&new_block_count, sizeof(uint64_t), 1, store->idx_file); + + if (r != lsm_error_ok) { + printf("wuck\n"); + pthread_mutex_unlock(&store->idx_lock); + + return res; + } + + store->idx_file_size += entry_size; + store->idx_file_block_count = new_block_count; + } else { + printf("failed write\n"); + } + + fflush(store->idx_file); + + pthread_mutex_unlock(&store->idx_lock); + + return res; +} + +static lsm_error lsm_entry_read_attrs(lsm_entry_handle *handle, FILE *db_file) { + uint64_t attr_count; + size_t res = fread(&attr_count, sizeof(uint64_t), 1, db_file); + + if (res == 0) { + return lsm_error_failed_io; + } + + // attr_type, val_len + uint64_t nums[2]; + lsm_str *val; + + for (uint64_t i = 0; i < attr_count; i++) { + res = fread(nums, sizeof(uint64_t), 2, db_file); + + if (res < 2) { + return lsm_error_failed_io; + } + + char *val_s = malloc(nums[1] + 1); + val_s[nums[1]] = '\0'; + + if (val_s == NULL) { + return lsm_error_failed_alloc; + } + + uint64_t read = 0; + + while (read < nums[1]) { + read += fread(&val_s[read], 1, nums[1] - read, db_file); + } + + LSM_RES(lsm_str_init(&val, val_s));; + lsm_entry_attr_insert(handle, nums[0], val); + } + + return lsm_error_ok; +} + +lsm_error lsm_store_load_db(lsm_store *store) { + uint64_t key_len; + uint64_t db_dim[2]; + lsm_str *key; + lsm_entry_handle *handle; + + // idx file starts with block count + size_t res = fread(&store->idx_file_block_count, sizeof(uint64_t), 1, store->idx_file); + + if (res == 0) { + return lsm_error_failed_io; + } + + store->idx_file_size += sizeof(uint64_t); + + for (uint64_t i = 0; i < store->idx_file_block_count; i++) { + // Read in idx metadata + res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file); + + if (res == 0) { + return lsm_error_failed_io; + } + + char *key_s = malloc(key_len + 1); + key_s[key_len] = '\0'; + + if (key_s == NULL) { + return lsm_error_failed_alloc; + } + + res = fread(key_s, 1, key_len, store->idx_file); + + if (res < key_len) { + return lsm_error_failed_io; + } + + res = fread(db_dim, sizeof(uint64_t), 2, store->idx_file); + + if (res < 2) { + return lsm_error_failed_io; + } + + LSM_RES(lsm_str_init(&key, key_s)); + LSM_RES(lsm_store_insert(&handle, store, key)); + + // Read attributes from database file + if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { + return lsm_error_failed_io; + } + + LSM_RES(lsm_entry_read_attrs(handle, store->db_file)); + lsm_entry_close(handle); + + store->idx_file_size += 3 * sizeof(uint64_t) + key_len; + store->db_file_size += db_dim[1]; + } + + return lsm_error_ok; +}