refactor(lsm): clean up disk write code
parent
4fb127d9db
commit
715e1f9a58
|
@ -24,7 +24,7 @@ typedef enum lander_attr_type : uint8_t {
|
||||||
lander_attr_type_url = 2,
|
lander_attr_type_url = 2,
|
||||||
} lander_attr_type;
|
} lander_attr_type;
|
||||||
|
|
||||||
typedef enum lander_entry_type {
|
typedef enum lander_entry_type : uint8_t {
|
||||||
lander_entry_type_redirect = 0,
|
lander_entry_type_redirect = 0,
|
||||||
lander_entry_type_paste = 1,
|
lander_entry_type_paste = 1,
|
||||||
} lander_entry_type;
|
} lander_entry_type;
|
||||||
|
|
|
@ -1,31 +1,39 @@
|
||||||
#include "lsm/store_internal.h"
|
#include "lsm/store_internal.h"
|
||||||
|
|
||||||
static lsm_error lsm_entry_write_single(FILE *f, uint64_t size, void *val) {
|
static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size,
|
||||||
size_t res = fwrite(val, size, 1, f);
|
uint64_t count, void *val) {
|
||||||
|
size_t res = fwrite(val, size, count, f);
|
||||||
|
|
||||||
if (res == 0) {
|
if (res < count) {
|
||||||
return lsm_error_failed_io;
|
return lsm_error_failed_io;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sum != NULL) {
|
||||||
|
*sum += size * count;
|
||||||
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) {
|
static lsm_error lsm_write_str(uint64_t *sum, FILE *f, lsm_str *s) {
|
||||||
return lsm_entry_write_single(f, sizeof(uint64_t), &num);
|
uint64_t len = lsm_str_len(s);
|
||||||
}
|
|
||||||
|
LSM_RES(lsm_fwrite(sum, f, sizeof(uint64_t), 1, &len));
|
||||||
|
|
||||||
static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) {
|
|
||||||
uint64_t to_write = lsm_str_len(s);
|
|
||||||
uint64_t written = 0;
|
uint64_t written = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f);
|
written += fwrite(lsm_str_ptr(s), sizeof(char), len - written, f);
|
||||||
} while (written < to_write);
|
} while (written < len);
|
||||||
|
|
||||||
|
if (sum != NULL) {
|
||||||
|
*sum += len * sizeof(char);
|
||||||
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
static lsm_error lsm_seek(FILE *f, uint64_t pos) {
|
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
|
||||||
if (fseek(f, pos, SEEK_SET) != 0) {
|
if (fseek(f, pos, SEEK_SET) != 0) {
|
||||||
return lsm_error_failed_io;
|
return lsm_error_failed_io;
|
||||||
}
|
}
|
||||||
|
@ -35,24 +43,17 @@ static lsm_error lsm_seek(FILE *f, uint64_t pos) {
|
||||||
|
|
||||||
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
||||||
uint64_t pos) {
|
uint64_t pos) {
|
||||||
LSM_RES(lsm_seek(db_file, pos));
|
*size = 0;
|
||||||
|
|
||||||
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->data_len));
|
LSM_RES(lsm_fseek(db_file, pos));
|
||||||
|
|
||||||
LSM_RES(
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len));
|
||||||
lsm_entry_write_single(db_file, sizeof(uint8_t), &entry->attrs.count));
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count));
|
||||||
*size = sizeof(uint64_t) + sizeof(uint8_t);
|
|
||||||
|
|
||||||
for (uint8_t i = 0; i < entry->attrs.count; i++) {
|
for (uint8_t i = 0; i < entry->attrs.count; i++) {
|
||||||
// Write attribute type, length & value
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1,
|
||||||
LSM_RES(lsm_entry_write_single(db_file, sizeof(uint8_t),
|
|
||||||
&entry->attrs.items[i].type));
|
&entry->attrs.items[i].type));
|
||||||
LSM_RES(lsm_entry_write_uint64_t(db_file,
|
LSM_RES(lsm_write_str(size, db_file, entry->attrs.items[i].str));
|
||||||
lsm_str_len(entry->attrs.items[i].str)));
|
|
||||||
LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str));
|
|
||||||
|
|
||||||
*size += sizeof(uint8_t) + sizeof(uint64_t) +
|
|
||||||
lsm_str_len(entry->attrs.items[i].str) * sizeof(char);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
|
@ -60,13 +61,13 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
||||||
|
|
||||||
lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
||||||
uint64_t offset, uint64_t len, uint64_t pos) {
|
uint64_t offset, uint64_t len, uint64_t pos) {
|
||||||
LSM_RES(lsm_seek(idx_file, pos));
|
*size = 0;
|
||||||
LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key)));
|
|
||||||
LSM_RES(lsm_entry_write_str(idx_file, entry->key));
|
|
||||||
LSM_RES(lsm_entry_write_uint64_t(idx_file, offset));
|
|
||||||
LSM_RES(lsm_entry_write_uint64_t(idx_file, len));
|
|
||||||
|
|
||||||
*size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char);
|
LSM_RES(lsm_fseek(idx_file, pos));
|
||||||
|
|
||||||
|
LSM_RES(lsm_write_str(size, idx_file, entry->key));
|
||||||
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset));
|
||||||
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len));
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
@ -74,9 +75,10 @@ lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
||||||
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
|
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
|
||||||
pthread_mutex_lock(&store->db_lock);
|
pthread_mutex_lock(&store->db_lock);
|
||||||
|
|
||||||
uint64_t entry_size;
|
uint64_t db_entry_size;
|
||||||
lsm_error res = lsm_entry_write_db(
|
lsm_error res =
|
||||||
&entry_size, store->db_file, handle->wrapper->entry, store->db_file_size);
|
lsm_entry_write_db(&db_entry_size, store->db_file, handle->wrapper->entry,
|
||||||
|
store->db_file_size);
|
||||||
fflush(store->db_file);
|
fflush(store->db_file);
|
||||||
|
|
||||||
if (res != lsm_error_ok) {
|
if (res != lsm_error_ok) {
|
||||||
|
@ -85,17 +87,17 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t entry_index = store->db_file_size;
|
uint64_t db_entry_index = store->db_file_size;
|
||||||
store->db_file_size += entry_size;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&store->db_lock);
|
pthread_mutex_unlock(&store->db_lock);
|
||||||
|
|
||||||
// Append entry to index file
|
// Append entry to index file
|
||||||
pthread_mutex_lock(&store->idx_lock);
|
pthread_mutex_lock(&store->idx_lock);
|
||||||
|
|
||||||
res =
|
uint64_t idx_entry_size;
|
||||||
lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry,
|
res = lsm_entry_write_idx(&idx_entry_size, store->idx_file,
|
||||||
entry_index, entry_size, store->idx_file_size);
|
handle->wrapper->entry, db_entry_index,
|
||||||
|
db_entry_size, store->idx_file_size);
|
||||||
|
|
||||||
if (res == lsm_error_ok) {
|
if (res == lsm_error_ok) {
|
||||||
// Update the counter at the beginning of the file
|
// Update the counter at the beginning of the file
|
||||||
|
@ -103,14 +105,16 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
|
||||||
|
|
||||||
uint64_t new_block_count = store->idx_file_block_count + 1;
|
uint64_t new_block_count = store->idx_file_block_count + 1;
|
||||||
|
|
||||||
res = lsm_entry_write_uint64_t(store->idx_file, new_block_count);
|
res = lsm_fwrite(NULL, store->idx_file, sizeof(uint64_t), 1,
|
||||||
|
&new_block_count);
|
||||||
|
|
||||||
if (res == lsm_error_ok) {
|
if (res == lsm_error_ok) {
|
||||||
// Only if we successfully updated the on-disk counter do we make the code
|
// Only if we successfully updated the on-disk counter do we make the code
|
||||||
// aware that the file's size has increased. This way, if a write to the
|
// aware that the files' sizes have increased. This way, if a write to the
|
||||||
// counter fails, the code will simply reuse the already written content.
|
// counter fails, the code will simply reuse the already written content.
|
||||||
store->idx_file_size += entry_size;
|
store->idx_file_size += idx_entry_size;
|
||||||
store->idx_file_block_count = new_block_count;
|
store->idx_file_block_count = new_block_count;
|
||||||
|
store->db_file_size += db_entry_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue