refactor(lsm): slightly clean up disk write code

lsm
Jef Roosens 2023-11-08 10:40:12 +01:00
parent 9c249d40c7
commit 226873219b
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
1 changed files with 25 additions and 35 deletions

View File

@ -22,7 +22,18 @@ static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) {
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) { static lsm_error lsm_seek(FILE *f, uint64_t pos) {
if (fseek(f, pos, SEEK_SET) != 0) {
return lsm_error_failed_io;
}
return lsm_error_ok;
}
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry,
uint64_t pos) {
LSM_RES(lsm_seek(db_file, pos));
// First we write how many attributes follow // First we write how many attributes follow
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count)); LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count));
*size = sizeof(uint64_t); *size = sizeof(uint64_t);
@ -42,7 +53,8 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) {
} }
lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
uint64_t offset, uint64_t len) { uint64_t offset, uint64_t len, uint64_t pos) {
LSM_RES(lsm_seek(idx_file, pos));
LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key))); LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key)));
LSM_RES(lsm_entry_write_str(idx_file, entry->key)); LSM_RES(lsm_entry_write_str(idx_file, entry->key));
LSM_RES(lsm_entry_write_uint64_t(idx_file, offset)); LSM_RES(lsm_entry_write_uint64_t(idx_file, offset));
@ -56,16 +68,9 @@ lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) { lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
pthread_mutex_lock(&store->db_lock); pthread_mutex_lock(&store->db_lock);
// Append entry to end of database file
if (fseek(store->db_file, store->db_file_size, SEEK_SET) != 0) {
pthread_mutex_unlock(&store->db_lock);
return lsm_error_failed_io;
}
uint64_t entry_size; uint64_t entry_size;
lsm_error res = lsm_error res = lsm_entry_write_db(
lsm_entry_write_db(&entry_size, store->db_file, handle->wrapper->entry); &entry_size, store->db_file, handle->wrapper->entry, store->db_file_size);
fflush(store->db_file); fflush(store->db_file);
if (res != lsm_error_ok) { if (res != lsm_error_ok) {
@ -82,40 +87,25 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
// Append entry to index file // Append entry to index file
pthread_mutex_lock(&store->idx_lock); pthread_mutex_lock(&store->idx_lock);
if (fseek(store->idx_file, store->idx_file_size, SEEK_SET) != 0) { res =
printf("failed seek, %lu\n", store->idx_file_size); lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry,
pthread_mutex_unlock(&store->idx_lock); entry_index, entry_size, store->idx_file_size);
return lsm_error_failed_io;
}
res = lsm_entry_write_idx(&entry_size, store->idx_file,
handle->wrapper->entry, entry_index, entry_size);
if (res == lsm_error_ok) { if (res == lsm_error_ok) {
// Update the counter at the beginning of the file // Update the counter at the beginning of the file
rewind(store->idx_file);
uint64_t new_block_count = store->idx_file_block_count + 1; uint64_t new_block_count = store->idx_file_block_count + 1;
if (fseek(store->idx_file, 0, SEEK_SET) != 0) { res = lsm_entry_write_uint64_t(store->idx_file, new_block_count);
pthread_mutex_unlock(&store->idx_lock);
return lsm_error_failed_io;
}
size_t r = fwrite(&new_block_count, sizeof(uint64_t), 1, store->idx_file);
if (r != lsm_error_ok) {
pthread_mutex_unlock(&store->idx_lock);
return res;
}
if (res == lsm_error_ok) {
store->idx_file_size += entry_size; store->idx_file_size += entry_size;
store->idx_file_block_count = new_block_count; store->idx_file_block_count = new_block_count;
} }
}
fflush(store->idx_file); fflush(store->idx_file);
pthread_mutex_unlock(&store->idx_lock); pthread_mutex_unlock(&store->idx_lock);
return res; return res;