diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 8b02319..9813f40 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -22,7 +22,18 @@ static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) { return lsm_error_ok; } -lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { +static lsm_error lsm_seek(FILE *f, uint64_t pos) { + if (fseek(f, pos, SEEK_SET) != 0) { + return lsm_error_failed_io; + } + + return lsm_error_ok; +} + +lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry, + uint64_t pos) { + LSM_RES(lsm_seek(db_file, pos)); + // First we write how many attributes follow LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count)); *size = sizeof(uint64_t); @@ -42,7 +53,8 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { } lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, - uint64_t offset, uint64_t len) { + uint64_t offset, uint64_t len, uint64_t pos) { + LSM_RES(lsm_seek(idx_file, pos)); LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); LSM_RES(lsm_entry_write_str(idx_file, entry->key)); LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); @@ -56,16 +68,9 @@ lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { pthread_mutex_lock(&store->db_lock); - // Append entry to end of database file - if (fseek(store->db_file, store->db_file_size, SEEK_SET) != 0) { - pthread_mutex_unlock(&store->db_lock); - - return lsm_error_failed_io; - } - uint64_t entry_size; - lsm_error res = - lsm_entry_write_db(&entry_size, store->db_file, handle->wrapper->entry); + lsm_error res = lsm_entry_write_db( + &entry_size, store->db_file, handle->wrapper->entry, store->db_file_size); fflush(store->db_file); if (res != lsm_error_ok) { @@ -82,40 +87,25 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { // Append entry to index file pthread_mutex_lock(&store->idx_lock); - if (fseek(store->idx_file, store->idx_file_size, SEEK_SET) != 0) { - printf("failed seek, %lu\n", store->idx_file_size); - pthread_mutex_unlock(&store->idx_lock); - - return lsm_error_failed_io; - } - - res = lsm_entry_write_idx(&entry_size, store->idx_file, - handle->wrapper->entry, entry_index, entry_size); + res = + lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, + entry_index, entry_size, store->idx_file_size); if (res == lsm_error_ok) { // Update the counter at the beginning of the file + rewind(store->idx_file); + uint64_t new_block_count = store->idx_file_block_count + 1; - if (fseek(store->idx_file, 0, SEEK_SET) != 0) { - pthread_mutex_unlock(&store->idx_lock); + res = lsm_entry_write_uint64_t(store->idx_file, new_block_count); - return lsm_error_failed_io; + if (res == lsm_error_ok) { + store->idx_file_size += entry_size; + store->idx_file_block_count = new_block_count; } - - size_t r = fwrite(&new_block_count, sizeof(uint64_t), 1, store->idx_file); - - if (r != lsm_error_ok) { - pthread_mutex_unlock(&store->idx_lock); - - return res; - } - - store->idx_file_size += entry_size; - store->idx_file_block_count = new_block_count; } fflush(store->idx_file); - pthread_mutex_unlock(&store->idx_lock); return res;