diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index cf0c9f7..7de2946 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -181,7 +181,7 @@ lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle, * @param handle entry handle to read from * @param len how many bytes to read at most */ -lsm_error lsm_entry_data_read(uint64_t *out, char *buf, +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, lsm_entry_handle *handle, uint64_t len); /** diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index b8c24dc..e4bbdba 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -30,14 +30,7 @@ typedef struct lsm_entry { uint8_t count; lsm_attr *items; } attrs; - struct { - uint64_t len; - union { - FILE *file; - char *ptr; - } value; - bool on_disk; - } data; + uint64_t data_len; } lsm_entry; /** diff --git a/lsm/src/store/lsm_store.c b/lsm/src/store/lsm_store.c index 2345cb8..c6df34e 100644 --- a/lsm/src/store/lsm_store.c +++ b/lsm/src/store/lsm_store.c @@ -60,22 +60,22 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, return res; } - // Open a new file descriptor if needed - if (entry->data.on_disk) { - char path[store->data_path->len + entry->key->len + 2]; - sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), - lsm_str_ptr(entry->key)); + /* // Open a new file descriptor if needed */ + /* if (entry->data_len > 0) { */ + /* char path[store->data_path->len + entry->key->len + 2]; */ + /* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */ + /* lsm_str_ptr(entry->key)); */ - FILE *f = fopen(path, "rb"); + /* FILE *f = fopen(path, "rb"); */ - if (f == NULL) { - free(handle); + /* if (f == NULL) { */ + /* free(handle); */ - return lsm_error_failed_io; - } + /* return lsm_error_failed_io; */ + /* } */ - handle->f = f; - } + /* handle->f = f; */ + /* } */ handle->wrapper = wrapper; *out = handle; @@ -114,22 +114,22 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, return res; } - // Open a new file descriptor if needed - if (entry->data.on_disk) { - char path[store->data_path->len + entry->key->len + 2]; - sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), - lsm_str_ptr(entry->key)); + /* // Open a new file descriptor if needed */ + /* if (entry->data_len > 0) { */ + /* char path[store->data_path->len + entry->key->len + 2]; */ + /* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */ + /* lsm_str_ptr(entry->key)); */ - FILE *f = fopen(path, "ab"); + /* FILE *f = fopen(path, "ab"); */ - if (f == NULL) { - free(handle); + /* if (f == NULL) { */ + /* free(handle); */ - return lsm_error_failed_io; - } + /* return lsm_error_failed_io; */ + /* } */ - handle->f = f; - } + /* handle->f = f; */ + /* } */ handle->wrapper = wrapper; *out = handle; @@ -192,99 +192,68 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, lsm_entry *entry = handle->wrapper->entry; - uint64_t new_len = entry->data.len + lsm_str_len(data); + uint64_t new_len = entry->data_len + lsm_str_len(data); const char *data_s = lsm_str_ptr(data); - // Data is in memory and still fits -> keep it in memory - if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) { - char *buf; + // Entries don't open their file unless needed + if (handle->f == NULL) { + char path[store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + lsm_str_ptr(entry->key)); - // Entries with no data do not have an allocated buffer yet - if (entry->data.len == 0) { - buf = malloc(new_len * sizeof(char)); - } else { - buf = realloc(entry->data.value.ptr, new_len * sizeof(char)); + FILE *f = fopen(path, "ab"); + + if (f == NULL) { + return lsm_error_failed_io; } - if (buf == NULL) { - return lsm_error_failed_alloc; - } - - memcpy(&buf[entry->data.len], data_s, lsm_str_len(data)); - entry->data.value.ptr = buf; - } - // Data will end up on disk - else { - // Data is not yet on disk, so we create the file - if (!entry->data.on_disk) { - char path[store->data_path->len + entry->key->len + 2]; - sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), - lsm_str_ptr(entry->key)); - - FILE *f = fopen(path, "ab"); - - if (f == NULL) { - return lsm_error_failed_io; - } - - // If there was data present in memory already, we sync this to disk. - // This check is required because it's possible that more than the - // treshold is written to an empty entry immediately, meaning there's no - // allocated memory buffer present. - if (entry->data.len > 0) { - size_t written = 0; - - // Write original in-memory data to file - while (written < entry->data.len) { - written += fwrite(&entry->data.value.ptr[written], sizeof(char), - entry->data.len - written, f); - } - - free(entry->data.value.ptr); - entry->data.value.ptr = NULL; - } - - handle->f = f; - entry->data.on_disk = true; - } - - size_t written = 0; - - // TODO what happens when I/O fails? - while (written < data->len) { - written += fwrite(&data_s[written], sizeof(char), data->len - written, - handle->f); - } + handle->f = f; } - entry->data.len = new_len; + size_t written = 0; + + // TODO what happens when I/O fails? + while (written < data->len) { + written += + fwrite(&data_s[written], sizeof(char), data->len - written, handle->f); + } + + entry->data_len = new_len; return lsm_error_ok; } -lsm_error lsm_entry_data_read(uint64_t *out, char *buf, +lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store, lsm_entry_handle *handle, uint64_t len) { lsm_entry *entry = handle->wrapper->entry; - if (entry->data.len == 0) { + if (entry->data_len == 0) { *out = 0; return lsm_error_ok; } - uint64_t read; + // Entries don't open their file unless needed + if (handle->f == NULL) { + char path[store->data_path->len + entry->key->len + 2]; + sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), + lsm_str_ptr(entry->key)); - if (entry->data.on_disk) { - read = fread(buf, sizeof(char), len, handle->f); + FILE *f = fopen(path, "rb"); - if ((read == 0) && (ferror(handle->f) != 0)) { + if (f == NULL) { return lsm_error_failed_io; } - } else { - read = (entry->data.len - handle->pos) < len - ? (entry->data.len - handle->pos) - : len; - memcpy(buf, &entry->data.value.ptr[handle->pos], read * sizeof(char)); + + handle->f = f; + } + + uint64_t read; + + read = fread(buf, sizeof(char), len, handle->f); + + if ((read == 0) && (ferror(handle->f) != 0)) { + return lsm_error_failed_io; } handle->pos += read; diff --git a/lsm/src/store/lsm_store_disk_read.c b/lsm/src/store/lsm_store_disk_read.c index e644846..2b11a30 100644 --- a/lsm/src/store/lsm_store_disk_read.c +++ b/lsm/src/store/lsm_store_disk_read.c @@ -88,7 +88,9 @@ static lsm_error lsm_fread(void *out, uint64_t *sum, FILE *f, uint64_t size, return lsm_error_failed_io; } - *sum += size * count; + if (sum != NULL) { + *sum += size * count; + } return lsm_error_ok; } @@ -109,7 +111,9 @@ static lsm_error lsm_entry_read_str(lsm_str **out, uint64_t *sum, FILE *f) { read += fread(&buf[read], 1, len - read, f); } - *sum += len; + if (sum != NULL) { + *sum += len; + } return lsm_str_init(out, buf); } @@ -154,8 +158,7 @@ lsm_error lsm_store_load_db(lsm_store *store) { return lsm_error_failed_io; } - LSM_RES( - lsm_entry_read_attrs(&store->idx_file_size, handle, store->db_file)); + LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file)); lsm_entry_close(handle); store->db_file_size += db_dim[1]; diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 51dd998..f34d633 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -179,5 +179,5 @@ lsm_error lsm_entry_attr_insert_num(lsm_entry_handle *handle, uint8_t type, } uint64_t lsm_entry_data_len(lsm_entry_handle *handle) { - return handle->wrapper->entry->data.len; + return handle->wrapper->entry->data_len; } diff --git a/src/lander/lander_get.c b/src/lander/lander_get.c index b063e29..cfb466e 100644 --- a/src/lander/lander_get.c +++ b/src/lander/lander_get.c @@ -121,6 +121,8 @@ bool lander_get_entry_lsm(event_loop_conn *conn) { bool lander_stream_body_to_client(event_loop_conn *conn) { http_loop_ctx *ctx = conn->ctx; lander_ctx *c_ctx = ctx->c; + http_loop_gctx *gctx = ctx->g; + lander_gctx *c_gctx = gctx->c; if ((c_ctx->entry == NULL) || (ctx->res.body.expected_len == ctx->res.body.len)) { @@ -131,8 +133,8 @@ bool lander_stream_body_to_client(event_loop_conn *conn) { ctx->res.body.expected_len - ctx->res.body.len); uint64_t read = 0; - lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], c_ctx->entry, - to_write); + lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], + c_gctx->store, c_ctx->entry, to_write); ctx->res.body.len += read; conn->wbuf_size += read;