diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 51e9be8..543480c 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -1,3 +1,5 @@ +#include + #include "lsm/store_internal.h" static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size, @@ -125,16 +127,10 @@ lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { return res; } -// Marking an entry as removed in the idx file is simply setting the length of -// its entry to zero -lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { - lsm_store *store = handle->store; - const lsm_entry *entry = handle->wrapper->entry; - +static lsm_error lsm_idx_zero_block(lsm_store *store, uint64_t pos) { pthread_mutex_lock(&store->idx.lock); - lsm_error res = - lsm_fseek(store->idx.f, entry->idx_file_offset + sizeof(uint64_t)); + lsm_error res = lsm_fseek(store->idx.f, pos); if (res != lsm_error_ok) { pthread_mutex_unlock(&store->idx.lock); @@ -153,7 +149,29 @@ lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { fflush(store->idx.f); + return lsm_error_ok; +} + +// Marking an entry as removed in the idx file is simply setting the length of +// its entry to zero +lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { + const lsm_entry *entry = handle->wrapper->entry; + + LSM_RES(lsm_idx_zero_block(handle->store, + entry->idx_file_offset * sizeof(uint64_t))); LSM_RES(lsm_entry_data_remove(handle)); return lsm_error_ok; } + +lsm_error lsm_entry_disk_update(lsm_entry_handle *handle) { + // An update is implemented by reinserting the entry at the end of the db file + uint64_t old_idx_index = handle->wrapper->entry->idx_file_offset; + + // TODO is there any way we can make this atomic? If the zero write to the + // index file fails, there are two entries in the db file for the same key. + LSM_RES(lsm_entry_disk_insert(handle)); + LSM_RES(lsm_idx_zero_block(handle->store, old_idx_index * sizeof(uint64_t))); + + return lsm_error_ok; +} diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 7d33d30..93a570b 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -61,6 +61,7 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { lsm_error lsm_entry_commit(lsm_entry_handle *handle) { uint8_t state_new = handle->states & lsm_entry_handle_state_new; uint8_t state_removed = handle->states & lsm_entry_handle_state_removed; + uint8_t state_updated = handle->states & lsm_entry_handle_state_updated; // Clean new entry if (state_new && !state_removed) { @@ -73,6 +74,8 @@ lsm_error lsm_entry_commit(lsm_entry_handle *handle) { lsm_entry_free(handle->wrapper->entry); handle->wrapper->entry = NULL; + } else if (state_updated && !(state_new || state_removed)) { + LSM_RES(lsm_entry_disk_update(handle)); } // Reset states after committing current changes @@ -99,6 +102,8 @@ void lsm_entry_close(lsm_entry_handle *handle) { handle->wrapper->entry = NULL; } + // TODO rollback uncomitted updates + pthread_rwlock_unlock(&handle->wrapper->lock); free(handle); }