From 3dce25239bb107ae37a8e35c410178ef684dac0a Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 27 Aug 2024 17:00:12 +0200 Subject: [PATCH 1/2] feat(lsm): require changes to be committed before writing to persistent storage --- CHANGELOG.md | 4 +++ include/lander.h | 2 ++ lsm/include/lsm/store.h | 9 +++++ lsm/src/_include/lsm/store_internal.h | 3 +- lsm/src/store/lsm_store_entry.c | 48 ++++++++++++++++----------- src/lander/lander_steps.c | 10 ++++++ src/main.c | 11 ++++++ 7 files changed, 67 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aca5284..a31cd42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://git.rustybever.be/Chewing_Bever/lander/src/branch/dev) +## Fixed + +* Failed uploads now no longer leave behind a partial entry file + ## [0.2.1](https://git.rustybever.be/Chewing_Bever/lander/src/tag/0.2.1) ### Added diff --git a/include/lander.h b/include/lander.h index 7b418bc..9ebbdf2 100644 --- a/include/lander.h +++ b/include/lander.h @@ -51,6 +51,8 @@ lnm_http_step_err lander_post_paste_secure(lnm_http_conn *conn); lnm_http_step_err lander_stream_body_to_entry(lnm_http_conn *conn); +lnm_http_step_err lander_commit_entry(lnm_http_conn *conn); + lnm_http_step_err lander_post_redirect_body_to_attr(lnm_http_conn *conn); lnm_http_step_err lander_remove_entry(lnm_http_conn *conn); diff --git a/lsm/include/lsm/store.h b/lsm/include/lsm/store.h index c680c43..e2d7d86 100644 --- a/lsm/include/lsm/store.h +++ b/lsm/include/lsm/store.h @@ -159,6 +159,15 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, const lsm_str *key); +/** + * Commit any changes to the persistent storage. Any changes, insertions or + * deletions that occured without a commit are reverted when the handle is + * closed. + * + * @param handle handle to the entry + */ +lsm_error lsm_entry_commit(lsm_entry_handle *handle); + /** * Close an open entry handle. * diff --git a/lsm/src/_include/lsm/store_internal.h b/lsm/src/_include/lsm/store_internal.h index 7bfbe41..1b84b60 100644 --- a/lsm/src/_include/lsm/store_internal.h +++ b/lsm/src/_include/lsm/store_internal.h @@ -157,7 +157,8 @@ lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle); lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle); /** - * Remove the entry's data file if present and close its handle. + * Remove the entry's data file if present and close its handle. Any uncommitted + * changes will be reverted. * * @param handle handle to the entry */ diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 2a32343..7d33d30 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -58,31 +58,42 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { return lsm_error_ok; } -void lsm_entry_close(lsm_entry_handle *handle) { - if (handle->f != NULL) { - fclose(handle->f); - } - - bool state_new = handle->states & lsm_entry_handle_state_new; - bool state_removed = handle->states & lsm_entry_handle_state_removed; - /* bool state_updated = handle->states & lsm_entry_handle_state_updated; */ +lsm_error lsm_entry_commit(lsm_entry_handle *handle) { + uint8_t state_new = handle->states & lsm_entry_handle_state_new; + uint8_t state_removed = handle->states & lsm_entry_handle_state_removed; // Clean new entry if (state_new && !state_removed) { - lsm_entry_disk_insert(handle); - } - // New entry that was removed before being written to disk; only its data file - // needs to be removed if present - else if (state_new && state_removed) { - lsm_entry_data_remove(handle); - - lsm_entry_free(handle->wrapper->entry); - handle->wrapper->entry = NULL; + LSM_RES(lsm_entry_disk_insert(handle)); } // Previously stored entry that needs to be removed; should be removed from db // file as well else if (state_removed && !state_new) { - lsm_entry_disk_remove(handle); + LSM_RES(lsm_entry_disk_remove(handle)); + + lsm_entry_free(handle->wrapper->entry); + handle->wrapper->entry = NULL; + } + + // Reset states after committing current changes + handle->states = 0; + + return lsm_error_ok; +} + +void lsm_entry_close(lsm_entry_handle *handle) { + if (handle->f != NULL) { + fclose(handle->f); + handle->f = NULL; + } + + uint8_t state_new = handle->states & lsm_entry_handle_state_new; + /* bool state_updated = handle->states & lsm_entry_handle_state_updated; */ + + // New entries create a wrapper in the trie that should be removed if not + // committed + if (state_new) { + lsm_entry_data_remove(handle); lsm_entry_free(handle->wrapper->entry); handle->wrapper->entry = NULL; @@ -328,7 +339,6 @@ lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle) { return lsm_error_ok; } - lsm_error lsm_entry_data_remove(lsm_entry_handle *handle) { const lsm_entry *entry = handle->wrapper->entry; diff --git a/src/lander/lander_steps.c b/src/lander/lander_steps.c index 00c86ba..8e2d702 100644 --- a/src/lander/lander_steps.c +++ b/src/lander/lander_steps.c @@ -1,6 +1,7 @@ #include #include "lnm/http/loop.h" +#include "lnm/log.h" #include "lnm/loop.h" #include "lander.h" @@ -25,3 +26,12 @@ lnm_http_step_err lander_stream_body_to_entry(lnm_http_conn *conn) { ? lnm_http_step_err_done : lnm_http_step_err_io_needed; } + +lnm_http_step_err lander_commit_entry(lnm_http_conn *conn) { + lnm_http_loop_ctx *ctx = conn->ctx; + lander_ctx *c_ctx = ctx->c; + + lsm_entry_commit(c_ctx->entry); + + return lnm_http_step_err_done; +} diff --git a/src/main.c b/src/main.c index f0f8e65..12601b3 100644 --- a/src/main.c +++ b/src/main.c @@ -3,6 +3,7 @@ #include #include "lnm/http/loop.h" +#include "lnm/http/route.h" #include "lnm/log.h" #include "lander.h" @@ -30,54 +31,64 @@ lnm_http_loop *loop_init(lander_gctx *gctx, const char *api_key) { lnm_http_router_add(&route, router, lnm_http_method_delete, "/:key"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_remove_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/s/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_redirect, false); lnm_http_route_step_append(route, lnm_http_loop_step_body_to_buf, false); lnm_http_route_step_append(route, lander_post_redirect_body_to_attr, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/sl/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_redirect_secure, false); lnm_http_route_step_append(route, lnm_http_loop_step_body_to_buf, false); lnm_http_route_step_append(route, lander_post_redirect_body_to_attr, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/s/:key"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_redirect, false); lnm_http_route_step_append(route, lnm_http_loop_step_body_to_buf, false); lnm_http_route_step_append(route, lander_post_redirect_body_to_attr, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/p/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_paste, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/pl/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_paste_secure, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/p/:key"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_paste, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/f/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_file, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/fl/"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_file_secure, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_router_add(&route, router, lnm_http_method_post, "/f/:key"); lnm_http_route_step_append(route, lnm_http_loop_step_auth, false); lnm_http_route_step_append(route, lander_post_file, false); lnm_http_route_step_append(route, lander_stream_body_to_entry, false); + lnm_http_route_step_append(route, lander_commit_entry, true); lnm_http_loop_router_set(hl, router); From 79c31589748f88dc50d01be89c3da905518910c5 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 28 Aug 2024 20:29:47 +0200 Subject: [PATCH 2/2] feat(lsm): implement updating entries --- lsm/src/store/lsm_store_disk_write.c | 34 +++++++++++++++++++++------- lsm/src/store/lsm_store_entry.c | 5 ++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/lsm/src/store/lsm_store_disk_write.c b/lsm/src/store/lsm_store_disk_write.c index 51e9be8..543480c 100644 --- a/lsm/src/store/lsm_store_disk_write.c +++ b/lsm/src/store/lsm_store_disk_write.c @@ -1,3 +1,5 @@ +#include + #include "lsm/store_internal.h" static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size, @@ -125,16 +127,10 @@ lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { return res; } -// Marking an entry as removed in the idx file is simply setting the length of -// its entry to zero -lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { - lsm_store *store = handle->store; - const lsm_entry *entry = handle->wrapper->entry; - +static lsm_error lsm_idx_zero_block(lsm_store *store, uint64_t pos) { pthread_mutex_lock(&store->idx.lock); - lsm_error res = - lsm_fseek(store->idx.f, entry->idx_file_offset + sizeof(uint64_t)); + lsm_error res = lsm_fseek(store->idx.f, pos); if (res != lsm_error_ok) { pthread_mutex_unlock(&store->idx.lock); @@ -153,7 +149,29 @@ lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { fflush(store->idx.f); + return lsm_error_ok; +} + +// Marking an entry as removed in the idx file is simply setting the length of +// its entry to zero +lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { + const lsm_entry *entry = handle->wrapper->entry; + + LSM_RES(lsm_idx_zero_block(handle->store, + entry->idx_file_offset * sizeof(uint64_t))); LSM_RES(lsm_entry_data_remove(handle)); return lsm_error_ok; } + +lsm_error lsm_entry_disk_update(lsm_entry_handle *handle) { + // An update is implemented by reinserting the entry at the end of the db file + uint64_t old_idx_index = handle->wrapper->entry->idx_file_offset; + + // TODO is there any way we can make this atomic? If the zero write to the + // index file fails, there are two entries in the db file for the same key. + LSM_RES(lsm_entry_disk_insert(handle)); + LSM_RES(lsm_idx_zero_block(handle->store, old_idx_index * sizeof(uint64_t))); + + return lsm_error_ok; +} diff --git a/lsm/src/store/lsm_store_entry.c b/lsm/src/store/lsm_store_entry.c index 7d33d30..93a570b 100644 --- a/lsm/src/store/lsm_store_entry.c +++ b/lsm/src/store/lsm_store_entry.c @@ -61,6 +61,7 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { lsm_error lsm_entry_commit(lsm_entry_handle *handle) { uint8_t state_new = handle->states & lsm_entry_handle_state_new; uint8_t state_removed = handle->states & lsm_entry_handle_state_removed; + uint8_t state_updated = handle->states & lsm_entry_handle_state_updated; // Clean new entry if (state_new && !state_removed) { @@ -73,6 +74,8 @@ lsm_error lsm_entry_commit(lsm_entry_handle *handle) { lsm_entry_free(handle->wrapper->entry); handle->wrapper->entry = NULL; + } else if (state_updated && !(state_new || state_removed)) { + LSM_RES(lsm_entry_disk_update(handle)); } // Reset states after committing current changes @@ -99,6 +102,8 @@ void lsm_entry_close(lsm_entry_handle *handle) { handle->wrapper->entry = NULL; } + // TODO rollback uncomitted updates + pthread_rwlock_unlock(&handle->wrapper->lock); free(handle); }