diff --git a/lsm/example/test.c b/lsm/example/test.c index 2a7e3d3..1445720 100644 --- a/lsm/example/test.c +++ b/lsm/example/test.c @@ -6,12 +6,11 @@ #include "lsm/str.h" int main() { - lsm_str *db_path, *data_dir; - lsm_str_init_copy(&db_path, "data/data.db"); + lsm_str *data_dir; lsm_str_init_copy(&data_dir, "data"); lsm_store *store; - lsm_store_load(&store, db_path, data_dir); + lsm_store_load(&store, data_dir); lsm_str *key; lsm_str_init_copy(&key, "key"); @@ -26,6 +25,10 @@ int main() { lsm_entry_data_append(store, handle, data); } + if (lsm_entry_sync(store, handle) != lsm_error_ok) { + printf("godver"); + return 1; + } lsm_entry_close(handle); assert(lsm_store_open_read(&handle, store, key) == lsm_error_ok); diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index 1557dd1..7518059 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -104,10 +104,9 @@ lsm_error lsm_store_init(lsm_store **ptr); * Open the given database file and load it into a new store object. * * @param ptr pointer to store newly allocated store - * @param db_path path to the database file * @param data_path path to the data directory */ -lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); +lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path); /** * Dealocate an existing lsm_store object. @@ -193,6 +192,14 @@ lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_entry_handle *handle, uint64_t len); +/** + * Persist the entry's data to disk. + * + * @param store store to persist entry in + * @param handle handle to entry to persist + */ +lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle); + /** * Return the length of the entry's data. * diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index c8bad4c..d45fc36 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -8,6 +8,9 @@ #include "lsm/str_internal.h" #include "lsm/trie.h" +#define LSM_DB_FILE_NAME "lsm.db" +#define LSM_IDX_FILE_NAME "lsm.idx" + typedef struct lsm_attr { lsm_attr_type type; lsm_str *str; @@ -70,7 +73,21 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out); struct lsm_store { lsm_trie *trie; lsm_str *data_path; - lsm_str *db_path; + FILE *db_file; + uint64_t db_file_size; + pthread_mutex_t db_lock; + FILE *idx_file; + uint64_t idx_file_size; + pthread_mutex_t idx_lock; }; +/** + * Read in the database and construct the in-memory trie index. This function + * assumes the provided store is a newly initialized empty store with the + * database files opened. + * + * @param store store to read + */ +lsm_error lsm_store_load_db(lsm_store *store); + #endif diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index d056621..e2d62cc 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -22,20 +22,54 @@ lsm_error lsm_store_init(lsm_store **ptr) { return res; } + pthread_mutex_init(&store->db_lock, NULL); + pthread_mutex_init(&store->idx_lock, NULL); + *ptr = store; return lsm_error_ok; } -lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, - lsm_str *data_path) { +lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) { lsm_store *store; LSM_RES(lsm_store_init(&store)); - // TODO implement all of reading the db file + // Try to open an existing db file or create a new one otherwise + // This shit is why I need to improve the str library + char db_file_path[lsm_str_len(data_path) + strlen(LSM_DB_FILE_NAME) + 2]; + memcpy(db_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char)); + sprintf(&db_file_path[lsm_str_len(data_path)], "/%s", LSM_DB_FILE_NAME); + + FILE *db_file = fopen(db_file_path, "r+b"); + + if (db_file == NULL) { + db_file = fopen(db_file_path, "wb"); + + if (db_file == NULL) { + return lsm_error_failed_io; + } + } + + // Same for idx file + char idx_file_path[lsm_str_len(data_path) + strlen(LSM_IDX_FILE_NAME) + 2]; + memcpy(idx_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char)); + sprintf(&idx_file_path[lsm_str_len(data_path)], "/%s", LSM_IDX_FILE_NAME); + + FILE *idx_file = fopen(idx_file_path, "r+b"); + + if (idx_file == NULL) { + idx_file = fopen(idx_file_path, "wb"); + + if (idx_file == NULL) { + return lsm_error_failed_io; + } + } + + LSM_RES(lsm_store_load_db(store)); - store->db_path = db_path; store->data_path = data_path; + store->db_file = db_file; + store->idx_file = idx_file; *ptr = store; diff --git a/lsm/src/store/lsm_store_sync.c b/lsm/src/store/lsm_store_sync.c new file mode 100644 index 0000000..49f6d61 --- /dev/null +++ b/lsm/src/store/lsm_store_sync.c @@ -0,0 +1,116 @@ +#include "lsm/store_internal.h" +#include + +static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) { + size_t res = fwrite(&num, sizeof(uint64_t), 1, f); + + // Such a small write should succeed in one go + if (res == 0) { + return lsm_error_failed_io; + } + + return lsm_error_ok; +} + +static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) { + uint64_t to_write = lsm_str_len(s); + uint64_t written = 0; + + do { + written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f); + } while (written < to_write); + + return lsm_error_ok; +} + +lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { + // First we write how many attributes follow + LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count)); + *size = sizeof(uint64_t); + + for (uint64_t i = 0; i < entry->attrs.count; i++) { + // Write attribute type, length & value + LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.items[i].type)); + LSM_RES(lsm_entry_write_uint64_t(db_file, lsm_str_len(entry->attrs.items[i].str))); + LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str)); + + *size += 2 * sizeof(uint64_t) + lsm_str_len(entry->attrs.items[i].str) * sizeof(char); + } + + printf("db size: %lu\n", *size); + + return lsm_error_ok; +} + +lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, uint64_t offset, uint64_t len) { + LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); + LSM_RES(lsm_entry_write_str(idx_file, entry->key)); + LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); + LSM_RES(lsm_entry_write_uint64_t(idx_file, len)); + + *size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char); + + return lsm_error_ok; +} + +lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { + pthread_mutex_lock(&store->db_lock); + + // Append entry to end of database file + if (fseek(store->db_file, SEEK_SET, store->db_file_size) != 0) { + pthread_mutex_unlock(&store->db_lock); + + return lsm_error_failed_io; + } + + uint64_t entry_size; + lsm_error res = lsm_entry_write_db(&entry_size, store->db_file, handle->wrapper->entry); + fflush(store->db_file); + + // TODO fsync db file? + + if (res != lsm_error_ok) { + pthread_mutex_unlock(&store->db_lock); + + return res; + } + + uint64_t entry_index = store->db_file_size; + store->db_file_size += entry_size; + + pthread_mutex_unlock(&store->db_lock); + + // Append entry to index file + pthread_mutex_lock(&store->idx_lock); + + if (fseek(store->idx_file, SEEK_SET, store->idx_file_size) != 0) { + pthread_mutex_unlock(&store->idx_lock); + + return lsm_error_failed_io; + } + + res = lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, entry_index, entry_size); + fflush(store->idx_file); + + if (res == lsm_error_ok) { + store->idx_file_size += entry_size; + } + + pthread_mutex_unlock(&store->idx_lock); + + return res; +} + +lsm_error lsm_store_load_db(lsm_store *store) { + uint64_t key_len; + size_t res; + lsm_str *key; + + while (feof(store->idx_file) > 0) { + res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file); + + if (res == 0) { + return lsm_error_failed_io; + } + } +} diff --git a/src/main.c b/src/main.c index 6d69baf..f552081 100644 --- a/src/main.c +++ b/src/main.c @@ -31,28 +31,27 @@ int main() { critical(1, "Invalid TCP port %s", port_str); } - char file_path[strlen(data_dir) + 12 + 1]; - sprintf(file_path, "%s/lander.data", data_dir); + /* char file_path[strlen(data_dir) + 12 + 1]; */ + /* sprintf(file_path, "%s/lander.data", data_dir); */ - info("Initializing trie from file '%s'", file_path); + /* info("Initializing trie from file '%s'", file_path); */ - Trie *trie; - TrieExitCode res = trie_init(&trie, file_path); + /* Trie *trie; */ + /* TrieExitCode res = trie_init(&trie, file_path); */ - if (res != Ok) { - critical(1, "An error occured while populating the trie."); - } + /* if (res != Ok) { */ + /* critical(1, "An error occured while populating the trie."); */ + /* } */ - info("Trie initialized and populated with %i entries", trie_size(trie)); + /* info("Trie initialized and populated with %i entries", trie_size(trie)); */ lander_gctx *c_gctx = lander_gctx_init(); c_gctx->data_dir = data_dir; - c_gctx->trie = trie; + /* c_gctx->trie = trie; */ - lsm_str *db_path, *data_dir2; - lsm_str_init_copy(&db_path, "data/store.db"); + lsm_str *data_dir2; lsm_str_init_copy(&data_dir2, "data"); - lsm_store_load(&c_gctx->store, db_path, data_dir2); + lsm_store_load(&c_gctx->store, data_dir2); http_loop *hl = http_loop_init( lander_routes, sizeof(lander_routes) / sizeof(lander_routes[0]), c_gctx,