diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index 9410746..31eb19b 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -7,8 +7,6 @@ #include "lsm.h" #include "lsm/str.h" -#define LSM_STORE_DISK_THRESHOLD 1024 - /** * A handle referencing an entry inside a store. Read/write operations from/to * the entry go through this handle. diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index ccafe99..fb50838 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -19,9 +19,8 @@ typedef struct lsm_attr { /** * An entry inside an LSM store. * - * Each entry consists of the key it's stored behind, zero or more attributes - * (metadata) and a data field. The data field can be stored on disk or - * in-memory, depending on the size. + * Each entry consists of the key it's stored behind, zero or more attributes + * (metadata) and a data file. */ typedef struct lsm_entry { lsm_str *key; @@ -56,6 +55,12 @@ typedef struct lsm_entry_wrapper { lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr); void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper); +typedef enum lsm_entry_handle_state : uint8_t { + lsm_entry_handle_state_new = 1 << 0, + lsm_entry_handle_state_updated = 1 << 1, + lsm_entry_handle_state_removed = 1 << 2, +} lsm_entry_handle_state; + struct lsm_entry_handle { lsm_entry_wrapper *wrapper; lsm_store *store; @@ -63,8 +68,8 @@ struct lsm_entry_handle { FILE *f; // Current position in the file pointer uint64_t pos; - // Whether the entry's metadata has changed - bool dirty; + // Required to determine in what way the database files need to be synced + uint64_t states; }; lsm_error lsm_entry_handle_init(lsm_entry_handle **out); @@ -73,14 +78,18 @@ struct lsm_store { lsm_trie *trie; lsm_str *data_path; - FILE *db_file; - uint64_t db_file_size; - pthread_mutex_t db_lock; + struct { + FILE *f; + uint64_t size; + pthread_mutex_t lock; + } db; - FILE *idx_file; - uint64_t idx_file_block_count; - uint64_t idx_file_size; - pthread_mutex_t idx_lock; + struct { + FILE *f; + uint64_t size; + uint64_t block_count; + pthread_mutex_t lock; + } idx; }; /** @@ -100,10 +109,17 @@ lsm_error lsm_store_load_db(lsm_store *store); lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle); /** - * Remove an entry from the database + * Remove an entry from the database. * * @param handle handle to the removed entry */ lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle); +/** + * Update an existing entry already in the store. + * + * @param handle to updated entry + */ +lsm_error lsm_entry_disk_update(lsm_entry_handle *handle); + #endif diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 022e68b..2185418 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -22,8 +22,8 @@ lsm_error lsm_store_init(lsm_store **ptr) { return res; } - pthread_mutex_init(&store->db_lock, NULL); - pthread_mutex_init(&store->idx_lock, NULL); + pthread_mutex_init(&store->db.lock, NULL); + pthread_mutex_init(&store->idx.lock, NULL); *ptr = store; @@ -151,7 +151,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, handle->store = store; // Newly inserted entries are always dirty - handle->dirty = true; + handle->states |= lsm_entry_handle_state_new; *out = handle; @@ -192,7 +192,7 @@ lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data) { } entry->data_len = new_len; - handle->dirty = true; + handle->states |= lsm_entry_handle_state_updated; return lsm_error_ok; } diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index 5c71dee..72e34bd 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -70,8 +70,8 @@ lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) { } store->data_path = data_path; - store->db_file = db_file; - store->idx_file = idx_file; + store->db.f = db_file; + store->idx.f = idx_file; LSM_RES(lsm_store_load_db(store)); @@ -137,63 +137,60 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle, return lsm_error_ok; } - -lsm_error lsm_store_load_db(lsm_store *store) { - uint64_t db_dim[2]; - lsm_str *key; - lsm_entry_handle *handle; - bool valid_entry; - - rewind(store->idx_file); - - // idx file starts with block count - LSM_RES(lsm_fread(&store->idx_file_block_count, &store->idx_file_size, - store->idx_file, sizeof(uint64_t), 1)); - - for (uint64_t i = 0; i < store->idx_file_block_count; i++) { - uint64_t idx_file_offset = store->idx_file_size; - - LSM_RES(lsm_fread(&valid_entry, &store->idx_file_size, store->idx_file, - sizeof(bool), 1)); - - if (valid_entry) { - LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file)); - LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file, - sizeof(uint64_t), 2)); - LSM_RES(lsm_store_insert(&handle, store, key)); - - // Read attributes from database file - if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) { - return lsm_error_failed_io; - } - - LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file, - sizeof(uint64_t), 1)); - LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); - - handle->wrapper->entry->idx_file_offset = idx_file_offset; - - // We explicitely set the dirty flag here to prevent writing to the datase - // when reading it in - handle->dirty = false; - lsm_entry_close(handle); - - store->db_file_size += db_dim[1]; - } - // Simply skip the invalid entry - else { - uint64_t key_len; - LSM_RES(lsm_fread(&key_len, &store->idx_file_size, store->idx_file, - sizeof(uint64_t), 1)); - - uint64_t remaining = key_len + 2 * sizeof(uint64_t); - - if (fseek(store->idx_file, remaining, SEEK_CUR) != 0) { - return lsm_error_failed_io; - } - - store->idx_file_size += remaining; - } +static lsm_error lsm_fseek(FILE *f, uint64_t pos) { + if (fseek(f, pos, SEEK_SET) != 0) { + return lsm_error_failed_io; + } + + return lsm_error_ok; +} + +/** + * Insert a new entry by reading it from the db file + */ +lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos, + uint64_t idx_file_offset) { + LSM_RES(lsm_fseek(store->db.f, pos)); + + lsm_str *key; + LSM_RES(lsm_entry_read_str(&key, &store->db.size, store->db.f)); + + lsm_entry_handle *handle; + LSM_RES(lsm_store_insert(&handle, store, key)); + + LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, &store->db.size, + store->db.f, sizeof(uint64_t), 1)); + LSM_RES(lsm_entry_read_attrs(&store->db.size, handle, store->db.f)); + + handle->wrapper->entry->idx_file_offset = idx_file_offset; + + handle->states = 0; + lsm_entry_close(handle); + + return lsm_error_ok; +} + +lsm_error lsm_store_load_db(lsm_store *store) { + uint64_t db_dim[2]; + + rewind(store->idx.f); + + // idx file starts with block count + LSM_RES(lsm_fread(&store->idx.block_count, &store->idx.size, store->idx.f, + sizeof(uint64_t), 1)); + + for (uint64_t i = 0; i < store->idx.block_count; i++) { + uint64_t idx_file_offset = store->idx.size; + + LSM_RES(lsm_fread(&db_dim, &store->idx.size, store->idx.f, sizeof(uint64_t), + 2)); + + // We zero out the length of entries if they're no longer valid + if (db_dim[1] == 0) { + continue; + } + + LSM_RES(lsm_store_insert_from_db(store, db_dim[0], idx_file_offset)); } return lsm_error_ok; diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index ffe182f..acb0015 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -47,6 +47,7 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry, LSM_RES(lsm_fseek(db_file, pos)); + LSM_RES(lsm_write_str(size, db_file, entry->key)); LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len)); LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count)); @@ -59,16 +60,12 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry, return lsm_error_ok; } -lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, - uint64_t offset, uint64_t len, uint64_t pos) { +lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset, + uint64_t len, uint64_t pos) { *size = 0; LSM_RES(lsm_fseek(idx_file, pos)); - bool valid_entry_marker = true; - LSM_RES(lsm_fwrite(size, idx_file, sizeof(bool), 1, &valid_entry_marker)); - - LSM_RES(lsm_write_str(size, idx_file, entry->key)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len)); @@ -78,55 +75,52 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry, lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { lsm_store *store = handle->store; - pthread_mutex_lock(&store->db_lock); + pthread_mutex_lock(&store->db.lock); - uint64_t db_entry_index = store->db_file_size; + uint64_t db_entry_index = store->db.size; uint64_t db_entry_size; - lsm_error res = - lsm_write_db_entry(&db_entry_size, store->db_file, handle->wrapper->entry, - store->db_file_size); - fflush(store->db_file); + lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f, + handle->wrapper->entry, store->db.size); + fflush(store->db.f); - pthread_mutex_unlock(&store->db_lock); + pthread_mutex_unlock(&store->db.lock); if (res != lsm_error_ok) { return res; } // Append entry to index file - pthread_mutex_lock(&store->idx_lock); + pthread_mutex_lock(&store->idx.lock); - uint64_t idx_entry_index = store->idx_file_size; + uint64_t idx_entry_index = store->idx.size; uint64_t idx_entry_size; - res = lsm_write_idx_entry(&idx_entry_size, store->idx_file, - handle->wrapper->entry, db_entry_index, - db_entry_size, store->idx_file_size); + res = lsm_write_idx_entry(&idx_entry_size, store->idx.f, db_entry_index, + db_entry_size, store->idx.size); if (res == lsm_error_ok) { // Update the counter at the beginning of the file - rewind(store->idx_file); + rewind(store->idx.f); - uint64_t new_block_count = store->idx_file_block_count + 1; + uint64_t new_block_count = store->idx.block_count + 1; - res = lsm_fwrite(NULL, store->idx_file, sizeof(uint64_t), 1, - &new_block_count); + res = lsm_fwrite(NULL, store->idx.f, sizeof(uint64_t), 1, &new_block_count); if (res == lsm_error_ok) { // Only if we successfully updated the on-disk counter do we make the code // aware that the files' sizes have increased. This way, if a write to the // counter fails, the code will simply reuse the already written content. - store->idx_file_size += idx_entry_size; - store->idx_file_block_count = new_block_count; - store->db_file_size += db_entry_size; + store->idx.size += idx_entry_size; + store->idx.block_count = new_block_count; + store->db.size += db_entry_size; handle->wrapper->entry->idx_file_offset = idx_entry_index; } } - fflush(store->idx_file); - pthread_mutex_unlock(&store->idx_lock); + fflush(store->idx.f); + pthread_mutex_unlock(&store->idx.lock); return res; } diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 4623f36..8212ba6 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -53,8 +53,14 @@ void lsm_entry_close(lsm_entry_handle *handle) { } // TODO handle errors here - if (handle->dirty) { + if ((handle->states & lsm_entry_handle_state_new) && + !(handle->states & lsm_entry_handle_state_removed)) { lsm_entry_disk_insert(handle); + } else if ((handle->states & lsm_entry_handle_state_removed) && + !(handle->states & lsm_entry_handle_state_new)) { + /* lsm_entry_disk_remove(handle); */ + } else if (handle->states & lsm_entry_handle_state_updated) { + /* lsm_entry_disk_update(handle); */ } pthread_rwlock_unlock(&handle->wrapper->lock); @@ -157,7 +163,7 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, entry->attrs.count--; entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64)); - handle->dirty = true; + handle->states |= lsm_entry_handle_state_updated; return lsm_error_ok; } @@ -184,7 +190,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type, entry->attrs.count++; entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64); - handle->dirty = true; + handle->states |= lsm_entry_handle_state_updated; return lsm_error_ok; }