feat(lsm): start of on-disk database

This commit is contained in:
Jef Roosens 2023-11-07 17:43:15 +01:00
parent eb9157281b
commit 46f89059e4
Signed by: Jef Roosens
GPG key ID: 02D4C0997E74717B
6 changed files with 199 additions and 23 deletions

View file

@ -8,6 +8,9 @@
#include "lsm/str_internal.h"
#include "lsm/trie.h"
#define LSM_DB_FILE_NAME "lsm.db"
#define LSM_IDX_FILE_NAME "lsm.idx"
typedef struct lsm_attr {
lsm_attr_type type;
lsm_str *str;
@ -70,7 +73,21 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
struct lsm_store {
lsm_trie *trie;
lsm_str *data_path;
lsm_str *db_path;
FILE *db_file;
uint64_t db_file_size;
pthread_mutex_t db_lock;
FILE *idx_file;
uint64_t idx_file_size;
pthread_mutex_t idx_lock;
};
/**
* Read in the database and construct the in-memory trie index. This function
* assumes the provided store is a newly initialized empty store with the
* database files opened.
*
* @param store store to read
*/
lsm_error lsm_store_load_db(lsm_store *store);
#endif

View file

@ -22,20 +22,54 @@ lsm_error lsm_store_init(lsm_store **ptr) {
return res;
}
pthread_mutex_init(&store->db_lock, NULL);
pthread_mutex_init(&store->idx_lock, NULL);
*ptr = store;
return lsm_error_ok;
}
lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path,
lsm_str *data_path) {
lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) {
lsm_store *store;
LSM_RES(lsm_store_init(&store));
// TODO implement all of reading the db file
// Try to open an existing db file or create a new one otherwise
// This shit is why I need to improve the str library
char db_file_path[lsm_str_len(data_path) + strlen(LSM_DB_FILE_NAME) + 2];
memcpy(db_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char));
sprintf(&db_file_path[lsm_str_len(data_path)], "/%s", LSM_DB_FILE_NAME);
FILE *db_file = fopen(db_file_path, "r+b");
if (db_file == NULL) {
db_file = fopen(db_file_path, "wb");
if (db_file == NULL) {
return lsm_error_failed_io;
}
}
// Same for idx file
char idx_file_path[lsm_str_len(data_path) + strlen(LSM_IDX_FILE_NAME) + 2];
memcpy(idx_file_path, lsm_str_ptr(data_path), lsm_str_len(data_path) * sizeof(char));
sprintf(&idx_file_path[lsm_str_len(data_path)], "/%s", LSM_IDX_FILE_NAME);
FILE *idx_file = fopen(idx_file_path, "r+b");
if (idx_file == NULL) {
idx_file = fopen(idx_file_path, "wb");
if (idx_file == NULL) {
return lsm_error_failed_io;
}
}
LSM_RES(lsm_store_load_db(store));
store->db_path = db_path;
store->data_path = data_path;
store->db_file = db_file;
store->idx_file = idx_file;
*ptr = store;

View file

@ -0,0 +1,116 @@
#include "lsm/store_internal.h"
#include <stdio.h>
static lsm_error lsm_entry_write_uint64_t(FILE *f, uint64_t num) {
size_t res = fwrite(&num, sizeof(uint64_t), 1, f);
// Such a small write should succeed in one go
if (res == 0) {
return lsm_error_failed_io;
}
return lsm_error_ok;
}
static lsm_error lsm_entry_write_str(FILE *f, lsm_str *s) {
uint64_t to_write = lsm_str_len(s);
uint64_t written = 0;
do {
written += fwrite(lsm_str_ptr(s), sizeof(char), to_write - written, f);
} while (written < to_write);
return lsm_error_ok;
}
lsm_error lsm_entry_write_db(uint64_t *size, FILE *db_file, lsm_entry *entry) {
// First we write how many attributes follow
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.count));
*size = sizeof(uint64_t);
for (uint64_t i = 0; i < entry->attrs.count; i++) {
// Write attribute type, length & value
LSM_RES(lsm_entry_write_uint64_t(db_file, entry->attrs.items[i].type));
LSM_RES(lsm_entry_write_uint64_t(db_file, lsm_str_len(entry->attrs.items[i].str)));
LSM_RES(lsm_entry_write_str(db_file, entry->attrs.items[i].str));
*size += 2 * sizeof(uint64_t) + lsm_str_len(entry->attrs.items[i].str) * sizeof(char);
}
printf("db size: %lu\n", *size);
return lsm_error_ok;
}
lsm_error lsm_entry_write_idx(uint64_t *size, FILE *idx_file, lsm_entry *entry, uint64_t offset, uint64_t len) {
LSM_RES(lsm_entry_write_uint64_t(idx_file, lsm_str_len(entry->key)));
LSM_RES(lsm_entry_write_str(idx_file, entry->key));
LSM_RES(lsm_entry_write_uint64_t(idx_file, offset));
LSM_RES(lsm_entry_write_uint64_t(idx_file, len));
*size = 3 * sizeof(uint64_t) + lsm_str_len(entry->key) * sizeof(char);
return lsm_error_ok;
}
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
pthread_mutex_lock(&store->db_lock);
// Append entry to end of database file
if (fseek(store->db_file, SEEK_SET, store->db_file_size) != 0) {
pthread_mutex_unlock(&store->db_lock);
return lsm_error_failed_io;
}
uint64_t entry_size;
lsm_error res = lsm_entry_write_db(&entry_size, store->db_file, handle->wrapper->entry);
fflush(store->db_file);
// TODO fsync db file?
if (res != lsm_error_ok) {
pthread_mutex_unlock(&store->db_lock);
return res;
}
uint64_t entry_index = store->db_file_size;
store->db_file_size += entry_size;
pthread_mutex_unlock(&store->db_lock);
// Append entry to index file
pthread_mutex_lock(&store->idx_lock);
if (fseek(store->idx_file, SEEK_SET, store->idx_file_size) != 0) {
pthread_mutex_unlock(&store->idx_lock);
return lsm_error_failed_io;
}
res = lsm_entry_write_idx(&entry_size, store->idx_file, handle->wrapper->entry, entry_index, entry_size);
fflush(store->idx_file);
if (res == lsm_error_ok) {
store->idx_file_size += entry_size;
}
pthread_mutex_unlock(&store->idx_lock);
return res;
}
lsm_error lsm_store_load_db(lsm_store *store) {
uint64_t key_len;
size_t res;
lsm_str *key;
while (feof(store->idx_file) > 0) {
res = fread(&key_len, sizeof(uint64_t), 1, store->idx_file);
if (res == 0) {
return lsm_error_failed_io;
}
}
}