feat(lsm): store pointer to store in entry handle
parent
d4b21fb84d
commit
eb0ce16f78
|
@ -188,8 +188,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
||||||
* @param entry entry to append data to
|
* @param entry entry to append data to
|
||||||
* @param data data to append
|
* @param data data to append
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
|
lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data);
|
||||||
lsm_str *data);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same as `lsm_entry_data_append`, except that it takes a direct char array.
|
* Same as `lsm_entry_data_append`, except that it takes a direct char array.
|
||||||
|
@ -199,8 +198,8 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
|
||||||
* @param data data to append
|
* @param data data to append
|
||||||
* @param len length of data array
|
* @param len length of data array
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle,
|
lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data,
|
||||||
char *data, uint64_t len);
|
uint64_t len);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read a number of bytes from the entry's data field. The position from which
|
* Read a number of bytes from the entry's data field. The position from which
|
||||||
|
@ -211,7 +210,7 @@ lsm_error lsm_entry_data_append_raw(lsm_store *store, lsm_entry_handle *handle,
|
||||||
* @param handle entry handle to read from
|
* @param handle entry handle to read from
|
||||||
* @param len how many bytes to read at most
|
* @param len how many bytes to read at most
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
|
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
|
||||||
lsm_entry_handle *handle, uint64_t len);
|
lsm_entry_handle *handle, uint64_t len);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -220,7 +219,7 @@ lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
|
||||||
* @param store store to persist entry in
|
* @param store store to persist entry in
|
||||||
* @param handle handle to entry to persist
|
* @param handle handle to entry to persist
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle);
|
lsm_error lsm_entry_sync(lsm_entry_handle *handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the length of the entry's data.
|
* Return the length of the entry's data.
|
||||||
|
|
|
@ -58,6 +58,7 @@ void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper);
|
||||||
|
|
||||||
struct lsm_entry_handle {
|
struct lsm_entry_handle {
|
||||||
lsm_entry_wrapper *wrapper;
|
lsm_entry_wrapper *wrapper;
|
||||||
|
lsm_store *store;
|
||||||
FILE *f;
|
FILE *f;
|
||||||
uint64_t pos;
|
uint64_t pos;
|
||||||
};
|
};
|
||||||
|
|
|
@ -62,24 +62,8 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* // Open a new file descriptor if needed */
|
|
||||||
/* if (entry->data_len > 0) { */
|
|
||||||
/* char path[store->data_path->len + entry->key->len + 2]; */
|
|
||||||
/* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */
|
|
||||||
/* lsm_str_ptr(entry->key)); */
|
|
||||||
|
|
||||||
/* FILE *f = fopen(path, "rb"); */
|
|
||||||
|
|
||||||
/* if (f == NULL) { */
|
|
||||||
/* free(handle); */
|
|
||||||
|
|
||||||
/* return lsm_error_failed_io; */
|
|
||||||
/* } */
|
|
||||||
|
|
||||||
/* handle->f = f; */
|
|
||||||
/* } */
|
|
||||||
|
|
||||||
handle->wrapper = wrapper;
|
handle->wrapper = wrapper;
|
||||||
|
handle->store = store;
|
||||||
*out = handle;
|
*out = handle;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
|
@ -116,24 +100,8 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* // Open a new file descriptor if needed */
|
|
||||||
/* if (entry->data_len > 0) { */
|
|
||||||
/* char path[store->data_path->len + entry->key->len + 2]; */
|
|
||||||
/* sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), */
|
|
||||||
/* lsm_str_ptr(entry->key)); */
|
|
||||||
|
|
||||||
/* FILE *f = fopen(path, "ab"); */
|
|
||||||
|
|
||||||
/* if (f == NULL) { */
|
|
||||||
/* free(handle); */
|
|
||||||
|
|
||||||
/* return lsm_error_failed_io; */
|
|
||||||
/* } */
|
|
||||||
|
|
||||||
/* handle->f = f; */
|
|
||||||
/* } */
|
|
||||||
|
|
||||||
handle->wrapper = wrapper;
|
handle->wrapper = wrapper;
|
||||||
|
handle->store = store;
|
||||||
*out = handle;
|
*out = handle;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
|
@ -180,14 +148,14 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
||||||
|
|
||||||
// No need to set the handle's file, as the entry doesn't have any data yet
|
// No need to set the handle's file, as the entry doesn't have any data yet
|
||||||
handle->wrapper = wrapper;
|
handle->wrapper = wrapper;
|
||||||
|
handle->store = store;
|
||||||
|
|
||||||
*out = handle;
|
*out = handle;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
|
lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data) {
|
||||||
lsm_str *data) {
|
|
||||||
if (lsm_str_len(data) == 0) {
|
if (lsm_str_len(data) == 0) {
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
@ -199,8 +167,8 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
|
||||||
|
|
||||||
// Entries don't open their file unless needed
|
// Entries don't open their file unless needed
|
||||||
if (handle->f == NULL) {
|
if (handle->f == NULL) {
|
||||||
char path[store->data_path->len + entry->key->len + 2];
|
char path[handle->store->data_path->len + entry->key->len + 2];
|
||||||
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
|
sprintf(path, "%s/%s", lsm_str_ptr(handle->store->data_path),
|
||||||
lsm_str_ptr(entry->key));
|
lsm_str_ptr(entry->key));
|
||||||
|
|
||||||
FILE *f = fopen(path, "ab");
|
FILE *f = fopen(path, "ab");
|
||||||
|
@ -225,7 +193,7 @@ lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
|
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
|
||||||
lsm_entry_handle *handle, uint64_t len) {
|
lsm_entry_handle *handle, uint64_t len) {
|
||||||
lsm_entry *entry = handle->wrapper->entry;
|
lsm_entry *entry = handle->wrapper->entry;
|
||||||
|
|
||||||
|
@ -237,8 +205,8 @@ lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_store *store,
|
||||||
|
|
||||||
// Entries don't open their file unless needed
|
// Entries don't open their file unless needed
|
||||||
if (handle->f == NULL) {
|
if (handle->f == NULL) {
|
||||||
char path[store->data_path->len + entry->key->len + 2];
|
char path[handle->store->data_path->len + entry->key->len + 2];
|
||||||
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
|
sprintf(path, "%s/%s", lsm_str_ptr(handle->store->data_path),
|
||||||
lsm_str_ptr(entry->key));
|
lsm_str_ptr(entry->key));
|
||||||
|
|
||||||
FILE *f = fopen(path, "rb");
|
FILE *f = fopen(path, "rb");
|
||||||
|
|
|
@ -132,7 +132,7 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
|
||||||
for (uint64_t i = 0; i < attr_count; i++) {
|
for (uint64_t i = 0; i < attr_count; i++) {
|
||||||
LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1));
|
LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1));
|
||||||
LSM_RES(lsm_entry_read_str(&val, sum, db_file));
|
LSM_RES(lsm_entry_read_str(&val, sum, db_file));
|
||||||
lsm_entry_attr_insert(handle, attr_type, val);
|
LSM_RES(lsm_entry_attr_insert(handle, attr_type, val));
|
||||||
}
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
|
|
|
@ -75,7 +75,9 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_entry_sync(lsm_store *store, lsm_entry_handle *handle) {
|
lsm_error lsm_entry_sync(lsm_entry_handle *handle) {
|
||||||
|
lsm_store *store = handle->store;
|
||||||
|
|
||||||
pthread_mutex_lock(&store->db_lock);
|
pthread_mutex_lock(&store->db_lock);
|
||||||
|
|
||||||
uint64_t db_entry_index = store->db_file_size;
|
uint64_t db_entry_index = store->db_file_size;
|
||||||
|
|
|
@ -93,8 +93,6 @@ bool lander_get_entry_lsm(event_loop_conn *conn) {
|
||||||
bool lander_stream_body_to_client(event_loop_conn *conn) {
|
bool lander_stream_body_to_client(event_loop_conn *conn) {
|
||||||
http_loop_ctx *ctx = conn->ctx;
|
http_loop_ctx *ctx = conn->ctx;
|
||||||
lander_ctx *c_ctx = ctx->c;
|
lander_ctx *c_ctx = ctx->c;
|
||||||
http_loop_gctx *gctx = ctx->g;
|
|
||||||
lander_gctx *c_gctx = gctx->c;
|
|
||||||
|
|
||||||
if ((c_ctx->entry == NULL) ||
|
if ((c_ctx->entry == NULL) ||
|
||||||
(ctx->res.body.expected_len == ctx->res.body.len)) {
|
(ctx->res.body.expected_len == ctx->res.body.len)) {
|
||||||
|
@ -105,8 +103,8 @@ bool lander_stream_body_to_client(event_loop_conn *conn) {
|
||||||
ctx->res.body.expected_len - ctx->res.body.len);
|
ctx->res.body.expected_len - ctx->res.body.len);
|
||||||
|
|
||||||
uint64_t read = 0;
|
uint64_t read = 0;
|
||||||
lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size],
|
lsm_entry_data_read(&read, (char *)&conn->wbuf[conn->wbuf_size], c_ctx->entry,
|
||||||
c_gctx->store, c_ctx->entry, to_write);
|
to_write);
|
||||||
|
|
||||||
ctx->res.body.len += read;
|
ctx->res.body.len += read;
|
||||||
conn->wbuf_size += read;
|
conn->wbuf_size += read;
|
||||||
|
|
|
@ -151,11 +151,9 @@ bool lander_post_redirect_body_to_attr(event_loop_conn *conn) {
|
||||||
|
|
||||||
bool lander_entry_sync(event_loop_conn *conn) {
|
bool lander_entry_sync(event_loop_conn *conn) {
|
||||||
http_loop_ctx *ctx = conn->ctx;
|
http_loop_ctx *ctx = conn->ctx;
|
||||||
http_loop_gctx *gctx = ctx->g;
|
|
||||||
lander_gctx *c_gctx = gctx->c;
|
|
||||||
lander_ctx *c_ctx = ctx->c;
|
lander_ctx *c_ctx = ctx->c;
|
||||||
|
|
||||||
if (lsm_entry_sync(c_gctx->store, c_ctx->entry) != lsm_error_ok) {
|
if (lsm_entry_sync(c_ctx->entry) != lsm_error_ok) {
|
||||||
ctx->res.status = http_internal_server_error;
|
ctx->res.status = http_internal_server_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,8 +178,6 @@ bool lander_post_paste_lsm(event_loop_conn *conn) {
|
||||||
bool lander_stream_body_to_entry(event_loop_conn *conn) {
|
bool lander_stream_body_to_entry(event_loop_conn *conn) {
|
||||||
http_loop_ctx *ctx = conn->ctx;
|
http_loop_ctx *ctx = conn->ctx;
|
||||||
lander_ctx *c_ctx = ctx->c;
|
lander_ctx *c_ctx = ctx->c;
|
||||||
http_loop_gctx *gctx = ctx->g;
|
|
||||||
lander_gctx *c_gctx = gctx->c;
|
|
||||||
|
|
||||||
uint64_t to_append =
|
uint64_t to_append =
|
||||||
MIN(conn->rbuf_size - conn->rbuf_read,
|
MIN(conn->rbuf_size - conn->rbuf_read,
|
||||||
|
@ -189,7 +185,7 @@ bool lander_stream_body_to_entry(event_loop_conn *conn) {
|
||||||
|
|
||||||
lsm_str *data;
|
lsm_str *data;
|
||||||
lsm_str_init_copy_n(&data, (char *)&conn->rbuf[conn->rbuf_read], to_append);
|
lsm_str_init_copy_n(&data, (char *)&conn->rbuf[conn->rbuf_read], to_append);
|
||||||
lsm_entry_data_append(c_gctx->store, c_ctx->entry, data);
|
lsm_entry_data_append(c_ctx->entry, data);
|
||||||
|
|
||||||
conn->rbuf_read += to_append;
|
conn->rbuf_read += to_append;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue