feat(lsm): always store entry data on disk

lsm
Jef Roosens 2023-11-08 14:11:46 +01:00
parent 35c301955f
commit ef33825b7b
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
6 changed files with 77 additions and 110 deletions

View File

@ -181,7 +181,7 @@ lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle,
* @param handle entry handle to read from
* @param len how many bytes to read at most
*/
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
lsm_entry_handle *handle, uint64_t len);
/**

View File

@ -30,14 +30,7 @@ typedef struct lsm_entry {
uint8_t count;
lsm_attr *items;
} attrs;
struct {
uint64_t len;
union {
FILE *file;
char *ptr;
} value;
bool on_disk;
} data;
uint64_t data_len;
} lsm_entry;
/**

View File

@ -60,22 +60,22 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
return res;
}
// Open a new file descriptor if needed
if (entry->data.on_disk) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
/* // Open a new file descriptor if needed */
/* if (entry->data_len > 0) { */
/* char path[store->data_path->len + entry->key->len + 2]; */
/* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */
/* lsm_str_ptr(entry->key)); */
FILE *f = fopen(path, "rb");
/* FILE *f = fopen(path, "rb"); */
if (f == NULL) {
free(handle);
/* if (f == NULL) { */
/* free(handle); */
return lsm_error_failed_io;
}
/* return lsm_error_failed_io; */
/* } */
handle->f = f;
}
/* handle->f = f; */
/* } */
handle->wrapper = wrapper;
*out = handle;
@ -114,22 +114,22 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
return res;
}
// Open a new file descriptor if needed
if (entry->data.on_disk) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
/* // Open a new file descriptor if needed */
/* if (entry->data_len > 0) { */
/* char path[store->data_path->len + entry->key->len + 2]; */
/* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */
/* lsm_str_ptr(entry->key)); */
FILE *f = fopen(path, "ab");
/* FILE *f = fopen(path, "ab"); */
if (f == NULL) {
free(handle);
/* if (f == NULL) { */
/* free(handle); */
return lsm_error_failed_io;
}
/* return lsm_error_failed_io; */
/* } */
handle->f = f;
}
/* handle->f = f; */
/* } */
handle->wrapper = wrapper;
*out = handle;
@ -192,99 +192,68 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
lsm_entry *entry = handle->wrapper->entry;
uint64_t new_len = entry->data.len + lsm_str_len(data);
uint64_t new_len = entry->data_len + lsm_str_len(data);
const char *data_s = lsm_str_ptr(data);
// Data is in memory and still fits -> keep it in memory
if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) {
char *buf;
// Entries don't open their file unless needed
if (handle->f == NULL) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
// Entries with no data do not have an allocated buffer yet
if (entry->data.len == 0) {
buf = malloc(new_len * sizeof(char));
} else {
buf = realloc(entry->data.value.ptr, new_len * sizeof(char));
FILE *f = fopen(path, "ab");
if (f == NULL) {
return lsm_error_failed_io;
}
if (buf == NULL) {
return lsm_error_failed_alloc;
}
memcpy(&buf[entry->data.len], data_s, lsm_str_len(data));
entry->data.value.ptr = buf;
}
// Data will end up on disk
else {
// Data is not yet on disk, so we create the file
if (!entry->data.on_disk) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
FILE *f = fopen(path, "ab");
if (f == NULL) {
return lsm_error_failed_io;
}
// If there was data present in memory already, we sync this to disk.
// This check is required because it's possible that more than the
// treshold is written to an empty entry immediately, meaning there's no
// allocated memory buffer present.
if (entry->data.len > 0) {
size_t written = 0;
// Write original in-memory data to file
while (written < entry->data.len) {
written += fwrite(&entry->data.value.ptr[written], sizeof(char),
entry->data.len - written, f);
}
free(entry->data.value.ptr);
entry->data.value.ptr = NULL;
}
handle->f = f;
entry->data.on_disk = true;
}
size_t written = 0;
// TODO what happens when I/O fails?
while (written < data->len) {
written += fwrite(&data_s[written], sizeof(char), data->len - written,
handle->f);
}
handle->f = f;
}
entry->data.len = new_len;
size_t written = 0;
// TODO what happens when I/O fails?
while (written < data->len) {
written +=
fwrite(&data_s[written], sizeof(char), data->len - written, handle->f);
}
entry->data_len = new_len;
return lsm_error_ok;
}
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
lsm_entry_handle *handle, uint64_t len) {
lsm_entry *entry = handle->wrapper->entry;
if (entry->data.len == 0) {
if (entry->data_len == 0) {
*out = 0;
return lsm_error_ok;
}
uint64_t read;
// Entries don't open their file unless needed
if (handle->f == NULL) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
if (entry->data.on_disk) {
read = fread(buf, sizeof(char), len, handle->f);
FILE *f = fopen(path, "rb");
if ((read == 0) && (ferror(handle->f) != 0)) {
if (f == NULL) {
return lsm_error_failed_io;
}
} else {
read = (entry->data.len - handle->pos) < len
? (entry->data.len - handle->pos)
: len;
memcpy(buf, &entry->data.value.ptr[handle->pos], read * sizeof(char));
handle->f = f;
}
uint64_t read;
read = fread(buf, sizeof(char), len, handle->f);
if ((read == 0) && (ferror(handle->f) != 0)) {
return lsm_error_failed_io;
}
handle->pos += read;

View File

@ -88,7 +88,9 @@ static lsm_error lsm_fread(void *out, uint64_t *sum, FILE *f, uint64_t size,
return lsm_error_failed_io;
}
*sum += size * count;
if (sum != NULL) {
*sum += size * count;
}
return lsm_error_ok;
}
@ -109,7 +111,9 @@ static lsm_error lsm_entry_read_str(lsm_str **out, uint64_t *sum, FILE *f) {
read += fread(&buf[read], 1, len - read, f);
}
*sum += len;
if (sum != NULL) {
*sum += len;
}
return lsm_str_init(out, buf);
}
@ -154,8 +158,7 @@ lsm_error lsm_store_load_db(lsm_store *store) {
return lsm_error_failed_io;
}
LSM_RES(
lsm_entry_read_attrs(&store->idx_file_size, handle, store->db_file));
LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file));
lsm_entry_close(handle);
store->db_file_size += db_dim[1];

View File

@ -179,5 +179,5 @@ lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, uint8_t type,
}
uint64_t lsm_entry_data_len(lsm_entry_handle *handle) {
return handle->wrapper->entry->data.len;
return handle->wrapper->entry->data_len;
}

View File

@ -121,6 +121,8 @@ bool lander_get_entry_lsm(event_loop_conn *conn) {
bool lander_stream_body_to_client(event_loop_conn *conn) {
http_loop_ctx *ctx = conn->ctx;
lander_ctx *c_ctx = ctx->c;
http_loop_gctx *gctx = ctx->g;
lander_gctx *c_gctx = gctx->c;
if ((c_ctx->entry == NULL) ||
(ctx->res.body.expected_len == ctx->res.body.len)) {
@ -131,8 +133,8 @@ bool lander_stream_body_to_client(event_loop_conn *conn) {
ctx->res.body.expected_len - ctx->res.body.len);
uint64_t read = 0;
lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], c_ctx->entry,
to_write);
lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size],
c_gctx->store, c_ctx->entry, to_write);
ctx->res.body.len += read;
conn->wbuf_size += read;