lander/lsm/src/store/lsm_store_disk_write.c

116 lines
3.4 KiB
C

#include "lsm/store_internal.h"
static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) {
size_t res = fwrite(&num, sizeof(uint64_t), 1, f);
// Such a small write should succeed in one go
if (res == 0) {
return lsm_error_failed_io;
}
return lsm_error_ok;
}
static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) {
uint64_t to_write = lsm_str_len(s);
uint64_t written = 0;
do {
written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f);
} while (written < to_write);
return lsm_error_ok;
}
static lsm_error lsm_seek(FILE *f, uint64_t pos) {
if (fseek(f, pos, SEEK_SET) != 0) {
return lsm_error_failed_io;
}
return lsm_error_ok;
}
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry,
uint64_t pos) {
LSM_RES(lsm_seek(db_file, pos));
// First we write how many attributes follow
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count));
*size = sizeof(uint64_t);
for (uint64_t i = 0; i < entry->attrs.count; i++) {
// Write attribute type, length & value
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.items[i].type));
LSM_RES(lsm_entry_write_uint64_t(db_file,
lsm_str_len(entry->attrs.items[i].str)));
LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str));
*size += 2 * sizeof(uint64_t) +
lsm_str_len(entry->attrs.items[i].str) * sizeof(char);
}
return lsm_error_ok;
}
lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry,
uint64_t offset, uint64_t len, uint64_t pos) {
LSM_RES(lsm_seek(idx_file, pos));
LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key)));
LSM_RES(lsm_entry_write_str(idx_file, entry->key));
LSM_RES(lsm_entry_write_uint64_t(idx_file, offset));
LSM_RES(lsm_entry_write_uint64_t(idx_file, len));
*size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char);
return lsm_error_ok;
}
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
pthread_mutex_lock(&store->db_lock);
uint64_t entry_size;
lsm_error res = lsm_entry_write_db(
&entry_size, store->db_file, handle->wrapper->entry, store->db_file_size);
fflush(store->db_file);
if (res != lsm_error_ok) {
pthread_mutex_unlock(&store->db_lock);
return res;
}
uint64_t entry_index = store->db_file_size;
store->db_file_size += entry_size;
pthread_mutex_unlock(&store->db_lock);
// Append entry to index file
pthread_mutex_lock(&store->idx_lock);
res =
lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry,
entry_index, entry_size, store->idx_file_size);
if (res == lsm_error_ok) {
// Update the counter at the beginning of the file
rewind(store->idx_file);
uint64_t new_block_count = store->idx_file_block_count + 1;
res = lsm_entry_write_uint64_t(store->idx_file, new_block_count);
if (res == lsm_error_ok) {
// Only if we successfully updated the on-disk counter do we make the code
// aware that the file's size has increased. This way, if a write to the
// counter fails, the code will simply reuse the already written content.
store->idx_file_size += entry_size;
store->idx_file_block_count = new_block_count;
}
}
fflush(store->idx_file);
pthread_mutex_unlock(&store->idx_lock);
return res;
}