feat(lsm): possibly added reading db file on load

lsm
Jef Roosens 2023-11-07 23:00:22 +01:00
parent 46f89059e4
commit 38e9496717
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
4 changed files with 138 additions and 11 deletions

View File

@ -10,7 +10,7 @@ int main() {
lsm_str_init_copy(&data_dir, "data");
lsm_store *store;
lsm_store_load(&store, data_dir);
assert(lsm_store_load(&store, data_dir) == lsm_error_ok);
lsm_str *key;
lsm_str_init_copy(&key, "key");
@ -18,6 +18,10 @@ int main() {
lsm_entry_handle *handle;
assert(lsm_store_insert(&handle, store, key) == lsm_error_ok);
lsm_str *attr;
lsm_str_init_copy(&attr, "some attribute value");
lsm_entry_attr_insert(handle, lsm_attr_type_content_type, attr);
lsm_str *data;
lsm_str_init_copy(&data, "hello");

View File

@ -73,10 +73,13 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
struct lsm_store {
lsm_trie *trie;
lsm_str *data_path;
FILE *db_file;
uint64_t db_file_size;
pthread_mutex_t db_lock;
FILE *idx_file;
uint64_t idx_file_block_count;
uint64_t idx_file_size;
pthread_mutex_t idx_lock;
};

View File

@ -63,14 +63,24 @@ lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) {
if (idx_file == NULL) {
return lsm_error_failed_io;
}
// The database code expects the idx file to start with how many blocks it
// contains, so we write that here
uint64_t num = 0;
if (fwrite(&num, sizeof(uint64_t), 1, idx_file) == 0) {
return lsm_error_failed_io;
}
LSM_RES(lsm_store_load_db(store));
fflush(idx_file);
}
store->data_path = data_path;
store->db_file = db_file;
store->idx_file = idx_file;
LSM_RES(lsm_store_load_db(store));
*ptr = store;
return lsm_error_ok;

View File

@ -1,5 +1,9 @@
#include "lsm/store_internal.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "lsm/store.h"
#include "lsm/store_internal.h"
static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) {
size_t res = fwrite(&num, sizeof(uint64_t), 1, f);
@ -37,8 +41,6 @@ lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) {
*size += 2 * sizeof(uint64_t) + lsm_str_len(entry->attrs.items[i].str) * sizeof(char);
}
printf("db size: %lu\n", *size);
return lsm_error_ok;
}
@ -57,7 +59,7 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
pthread_mutex_lock(&store->db_lock);
// Append entry to end of database file
if (fseek(store->db_file, SEEK_SET, store->db_file_size) != 0) {
if (fseek(store->db_file, store->db_file_size, SEEK_SET) != 0) {
pthread_mutex_unlock(&store->db_lock);
return lsm_error_failed_io;
@ -83,34 +85,142 @@ lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
// Append entry to index file
pthread_mutex_lock(&store->idx_lock);
if (fseek(store->idx_file, SEEK_SET, store->idx_file_size) != 0) {
if (fseek(store->idx_file, store->idx_file_size, SEEK_SET) != 0) {
printf("failed seek, %lu\n", store->idx_file_size);
pthread_mutex_unlock(&store->idx_lock);
return lsm_error_failed_io;
}
res = lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, entry_index, entry_size);
fflush(store->idx_file);
if (res == lsm_error_ok) {
store->idx_file_size += entry_size;
// Update the counter at the beginning of the file
uint64_t new_block_count = store->idx_file_block_count + 1;
if (fseek(store->idx_file, 0, SEEK_SET) != 0) {
pthread_mutex_unlock(&store->idx_lock);
return lsm_error_failed_io;
}
size_t r = fwrite(&new_block_count, sizeof(uint64_t), 1, store->idx_file);
if (r != lsm_error_ok) {
printf("wuck\n");
pthread_mutex_unlock(&store->idx_lock);
return res;
}
store->idx_file_size += entry_size;
store->idx_file_block_count = new_block_count;
} else {
printf("failed write\n");
}
fflush(store->idx_file);
pthread_mutex_unlock(&store->idx_lock);
return res;
}
static lsm_error lsm_entry_read_attrs(lsm_entry_handle *handle, FILE *db_file) {
uint64_t attr_count;
size_t res = fread(&attr_count, sizeof(uint64_t), 1, db_file);
if (res == 0) {
return lsm_error_failed_io;
}
// attr_type, val_len
uint64_t nums[2];
lsm_str *val;
for (uint64_t i = 0; i < attr_count; i++) {
res = fread(nums, sizeof(uint64_t), 2, db_file);
if (res < 2) {
return lsm_error_failed_io;
}
char *val_s = malloc(nums[1] + 1);
val_s[nums[1]] = '\0';
if (val_s == NULL) {
return lsm_error_failed_alloc;
}
uint64_t read = 0;
while (read < nums[1]) {
read += fread(&val_s[read], 1, nums[1] - read, db_file);
}
LSM_RES(lsm_str_init(&val, val_s));;
lsm_entry_attr_insert(handle, nums[0], val);
}
return lsm_error_ok;
}
lsm_error lsm_store_load_db(lsm_store *store) {
uint64_t key_len;
size_t res;
uint64_t db_dim[2];
lsm_str *key;
lsm_entry_handle *handle;
while (feof(store->idx_file) > 0) {
// idx file starts with block count
size_t res = fread(&store->idx_file_block_count, sizeof(uint64_t), 1, store->idx_file);
if (res == 0) {
return lsm_error_failed_io;
}
store->idx_file_size += sizeof(uint64_t);
for (uint64_t i = 0; i < store->idx_file_block_count; i++) {
// Read in idx metadata
res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file);
if (res == 0) {
return lsm_error_failed_io;
}
char *key_s = malloc(key_len + 1);
key_s[key_len] = '\0';
if (key_s == NULL) {
return lsm_error_failed_alloc;
}
res = fread(key_s, 1, key_len, store->idx_file);
if (res < key_len) {
return lsm_error_failed_io;
}
res = fread(db_dim, sizeof(uint64_t), 2, store->idx_file);
if (res < 2) {
return lsm_error_failed_io;
}
LSM_RES(lsm_str_init(&key, key_s));
LSM_RES(lsm_store_insert(&handle, store, key));
// Read attributes from database file
if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) {
return lsm_error_failed_io;
}
LSM_RES(lsm_entry_read_attrs(handle, store->db_file));
lsm_entry_close(handle);
store->idx_file_size += 3 * sizeof(uint64_t) + key_len;
store->db_file_size += db_dim[1];
}
return lsm_error_ok;
}