173 lines
4.4 KiB
C
173 lines
4.4 KiB
C
#include "lsm/store_internal.h"
|
|
|
|
static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size,
|
|
uint64_t count, const void *val) {
|
|
size_t res = fwrite(val, size, count, f);
|
|
|
|
if (res < count) {
|
|
return lsm_error_failed_io;
|
|
}
|
|
|
|
if (sum != NULL) {
|
|
*sum += size * count;
|
|
}
|
|
|
|
return lsm_error_ok;
|
|
}
|
|
|
|
static lsm_error lsm_write_str(uint64_t *sum, FILE *f, const lsm_str *s) {
|
|
uint64_t len = lsm_str_len(s);
|
|
|
|
LSM_RES(lsm_fwrite(sum, f, sizeof(uint64_t), 1, &len));
|
|
|
|
uint64_t written = 0;
|
|
|
|
do {
|
|
written += fwrite(lsm_str_ptr(s), sizeof(char), len - written, f);
|
|
} while (written < len);
|
|
|
|
if (sum != NULL) {
|
|
*sum += len * sizeof(char);
|
|
}
|
|
|
|
return lsm_error_ok;
|
|
}
|
|
|
|
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
|
|
if (fseek(f, pos, SEEK_SET) != 0) {
|
|
return lsm_error_failed_io;
|
|
}
|
|
|
|
return lsm_error_ok;
|
|
}
|
|
|
|
lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
|
uint64_t pos) {
|
|
*size = 0;
|
|
|
|
LSM_RES(lsm_fseek(db_file, pos));
|
|
|
|
LSM_RES(lsm_write_str(size, db_file, entry->key));
|
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len));
|
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count));
|
|
|
|
for (uint8_t i = 0; i < entry->attrs.count; i++) {
|
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1,
|
|
&entry->attrs.items[i].type));
|
|
LSM_RES(lsm_write_str(size, db_file, entry->attrs.items[i].str));
|
|
}
|
|
|
|
return lsm_error_ok;
|
|
}
|
|
|
|
lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset,
|
|
uint64_t len, uint64_t pos) {
|
|
*size = 0;
|
|
|
|
LSM_RES(lsm_fseek(idx_file, pos));
|
|
|
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset));
|
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len));
|
|
|
|
return lsm_error_ok;
|
|
}
|
|
|
|
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
|
|
lsm_store *store = handle->store;
|
|
|
|
pthread_mutex_lock(&store->db.lock);
|
|
|
|
uint64_t db_entry_index = store->db.size;
|
|
|
|
uint64_t db_entry_size;
|
|
lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f,
|
|
handle->wrapper->entry, store->db.size);
|
|
fflush(store->db.f);
|
|
|
|
pthread_mutex_unlock(&store->db.lock);
|
|
|
|
if (res != lsm_error_ok) {
|
|
return res;
|
|
}
|
|
|
|
// Append entry to index file
|
|
pthread_mutex_lock(&store->idx.lock);
|
|
|
|
uint64_t idx_entry_index = store->idx.size;
|
|
|
|
uint64_t idx_entry_size;
|
|
res = lsm_write_idx_entry(&idx_entry_size, store->idx.f, db_entry_index,
|
|
db_entry_size, store->idx.size);
|
|
|
|
if (res == lsm_error_ok) {
|
|
// Update the counter at the beginning of the file
|
|
rewind(store->idx.f);
|
|
|
|
uint64_t new_block_count = store->idx.block_count + 1;
|
|
|
|
res = lsm_fwrite(NULL, store->idx.f, sizeof(uint64_t), 1, &new_block_count);
|
|
|
|
if (res == lsm_error_ok) {
|
|
// Only if we successfully updated the on-disk counter do we make the code
|
|
// aware that the files' sizes have increased. This way, if a write to the
|
|
// counter fails, the code will simply reuse the already written content.
|
|
store->idx.size += idx_entry_size;
|
|
store->idx.block_count = new_block_count;
|
|
store->db.size += db_entry_size;
|
|
|
|
handle->wrapper->entry->idx_file_offset = idx_entry_index;
|
|
}
|
|
}
|
|
|
|
fflush(store->idx.f);
|
|
pthread_mutex_unlock(&store->idx.lock);
|
|
|
|
return res;
|
|
}
|
|
|
|
// Marking an entry as removed in the idx file is simply setting the length of
|
|
// its entry to zero
|
|
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) {
|
|
lsm_store *store = handle->store;
|
|
const lsm_entry *entry = handle->wrapper->entry;
|
|
|
|
pthread_mutex_lock(&store->idx.lock);
|
|
|
|
lsm_error res =
|
|
lsm_fseek(store->idx.f, entry->idx_file_offset + sizeof(uint64_t));
|
|
|
|
if (res != lsm_error_ok) {
|
|
pthread_mutex_unlock(&store->idx.lock);
|
|
|
|
return res;
|
|
}
|
|
|
|
uint64_t val = 0;
|
|
res = lsm_fwrite(NULL, store->idx.f, sizeof(uint64_t), 1, &val);
|
|
|
|
pthread_mutex_unlock(&store->idx.lock);
|
|
|
|
if (res != lsm_error_ok) {
|
|
return res;
|
|
}
|
|
|
|
fflush(store->idx.f);
|
|
|
|
// Remove data file if present
|
|
if (entry->data_len > 0) {
|
|
if (handle->f != NULL) {
|
|
fclose(handle->f);
|
|
handle->f = NULL;
|
|
}
|
|
|
|
char data_path[lsm_entry_data_path_len(handle) + 1];
|
|
lsm_entry_data_path(data_path, handle);
|
|
|
|
if (remove(data_path) != 0) {
|
|
return lsm_error_failed_io;
|
|
}
|
|
}
|
|
|
|
return lsm_error_ok;
|
|
}
|