feat(lsm): separate handle design into read and write; atomic attribute
ci/woodpecker/push/build Pipeline was successful Details

updates
feature/50-one-time-keys
Jef Roosens 2024-08-29 12:40:51 +02:00 committed by Chewing_Bever
parent 7c938d592e
commit 4b469623dd
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
15 changed files with 624 additions and 453 deletions

View File

@ -12,9 +12,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Ability to generate keys that allow one-time unauthenticated uploads (a.k.a. * Ability to generate keys that allow one-time unauthenticated uploads (a.k.a.
generating upload links) generating upload links)
## Changed
* LSM
* Switched to a split read/write handle design
* Changes now need to be explicitely committed before being written to
persistent storage
* Changes to entry attributes are now atomically committed
## Fixed ## Fixed
* Failed uploads now no longer leave behind a partial entry file * Failed uploads now no longer leave behind a partial entry file
* Size of db file is now correctly calculated when the store contains deleted
entries
## [0.2.1](https://git.rustybever.be/Chewing_Bever/lander/src/tag/0.2.1) ## [0.2.1](https://git.rustybever.be/Chewing_Bever/lander/src/tag/0.2.1)

View File

@ -13,7 +13,11 @@ typedef struct lander_gctx {
} lander_gctx; } lander_gctx;
typedef struct lander_ctx { typedef struct lander_ctx {
lsm_entry_handle *entry; union {
lsm_write_handle *write;
lsm_read_handle *read;
} entry;
bool write;
} lander_ctx; } lander_ctx;
typedef enum lander_attr_type : uint8_t { typedef enum lander_attr_type : uint8_t {

View File

@ -10,94 +10,14 @@
#define LSM_STORE_DATA_LEVELS 3 #define LSM_STORE_DATA_LEVELS 3
/** /**
* A handle referencing an entry inside a store. Read/write operations from/to * Read-only handle to an entry in the store
* the entry go through this handle.
*/ */
typedef struct lsm_entry_handle lsm_entry_handle; typedef struct lsm_read_handle lsm_read_handle;
/** /**
* Checks whether the entry has an attribute with the specified type. * Writeable handle to an entry in the store
*
* @param entry entry to check
* @param type type of attribute to check for
*/ */
bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type); typedef struct lsm_write_handle lsm_write_handle;
/**
* Retrieve the contents of an attribute from an entry, if present
*
* @param out where to store pointer to attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
uint8_t type);
/**
* Convenience wrapper around `lsm_entry_attr_get` that can be used if we know
* beforehand the attribute value is a 64-bit number.
*
* @param out where to store attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle,
uint8_t type);
/**
* Convenience wrapper around `lsm_entry_attr_get` that can be used if we know
* beforehand the attribute value is an 8-bit number.
*
* @param out where to store attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle,
uint8_t type);
/**
* Add a new attribute to the entry. This overwrites an existing version of this
* attribute.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute; ownership of pointer is taken over
*/
lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
lsm_str *data);
/**
* Convenience wrapper around `lsm_entry_attr_insert` that can be used if the
* data to be stored is a 64-bit number.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute
*/
lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type,
uint64_t data);
/**
* Convenience wrapper around `lsm_entry_attr_insert` that can be used if the
* data to be stored is an 8-bit number.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute
*/
lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type,
uint8_t data);
/**
* Remove an atribute from the given entry, if present.
*
* @param out pointer to store removed data pointer in. If NULL, data pointer
* will be leaked.
* @param entry entry to remove attribute from
* @param type type of attribute to remove
*/
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
uint8_t type);
/** /**
* A store consisting of LSM entries. * A store consisting of LSM entries.
@ -145,7 +65,7 @@ void lsm_store_free(lsm_store *store);
* @param store store to retrieve entry from * @param store store to retrieve entry from
* @param key key to search * @param key key to search
*/ */
lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store,
const lsm_str *key); const lsm_str *key);
/** /**
@ -157,26 +77,9 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
* @param store store to retrieve entry from * @param store store to retrieve entry from
* @param key key to search * @param key key to search
*/ */
lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store,
const lsm_str *key); const lsm_str *key);
/**
* Commit any changes to the persistent storage. Any changes, insertions or
* deletions that occured without a commit are reverted when the handle is
* closed.
*
* @param handle handle to the entry
*/
lsm_error lsm_entry_commit(lsm_entry_handle *handle);
/**
* Close an open entry handle.
*
* @param store store the handle's entry is stored in
* @param handle handle to close
*/
void lsm_entry_close(lsm_entry_handle *handle);
/** /**
* Insert a new entry into the store, returning a write handle to the newly * Insert a new entry into the store, returning a write handle to the newly
* created entry. * created entry.
@ -185,57 +88,43 @@ void lsm_entry_close(lsm_entry_handle *handle);
* @param store store to modify * @param store store to modify
* @param key key to add; ownership of key pointer is taken over * @param key key to add; ownership of key pointer is taken over
*/ */
lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_new(lsm_write_handle **out, lsm_store *store,
lsm_str *key); lsm_str *key);
/** bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type);
* Mark the entry as removed. lsm_error lsm_read_attr_get(const lsm_str **out, const lsm_read_handle *handle,
* uint8_t type);
* @param handle handle to entry to remove lsm_error lsm_read_attr_get_uint64_t(uint64_t *out,
*/ const lsm_read_handle *handle,
void lsm_entry_remove(lsm_entry_handle *handle); uint8_t type);
lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, const lsm_read_handle *handle,
uint8_t type);
uint64_t lsm_read_data_len(const lsm_read_handle *handle);
lsm_error lsm_read_data_read(uint64_t *out, char *buf, lsm_read_handle *handle,
uint64_t len);
void lsm_read_close(lsm_read_handle *handle);
/** bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type);
* Append new data to the given entry, which is expected to be in the store. lsm_error lsm_write_attr_get(const lsm_str **out,
* const lsm_write_handle *handle, uint8_t type);
* This function will append either to disk or to memory, depending on the lsm_error lsm_write_attr_get_uint64_t(uint64_t *out,
* length of the entry's data. const lsm_write_handle *handle,
* uint8_t type);
* @param store store the entry is stored in lsm_error lsm_write_attr_get_uint8_t(uint8_t *out,
* @param entry entry to append data to const lsm_write_handle *handle,
* @param data data to append uint8_t type);
*/ lsm_error lsm_write_attr_remove(lsm_str **out, lsm_write_handle *handle,
lsm_error lsm_entry_data_append(lsm_entry_handle *handle, const lsm_str *data); uint8_t type);
lsm_error lsm_write_attr_insert(lsm_write_handle *handle, uint8_t type,
/** lsm_str *data);
* Same as `lsm_entry_data_append`, except that it takes a direct char array. lsm_error lsm_write_attr_insert_uint64_t(lsm_write_handle *handle, uint8_t type,
* uint64_t data);
* @param store store the entry is stored in lsm_error lsm_write_attr_insert_uint8_t(lsm_write_handle *handle, uint8_t type,
* @param entry entry to append data to uint8_t data);
* @param data data to append uint64_t lsm_write_data_len(const lsm_write_handle *handle);
* @param len length of data array lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data);
*/ void lsm_write_remove(lsm_write_handle *handle);
lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data, void lsm_write_close(lsm_write_handle *handle);
uint64_t len); lsm_error lsm_write_commit(lsm_write_handle *handle);
/**
* Read a number of bytes from the entry's data field. The position from which
* data is read is dependent on previous read calls.
*
* @param out where to write how many bytes were read
* @param buf buffer to store read data in
* @param handle entry handle to read from
* @param len how many bytes to read at most
*/
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
lsm_entry_handle *handle, uint64_t len);
/**
* Return the length of the entry's data.
*
* @param handle entry handle to return length for
* @return length of the data
*/
uint64_t lsm_entry_data_len(lsm_entry_handle *handle);
#endif #endif

View File

@ -48,6 +48,11 @@ lsm_error lsm_entry_init(lsm_entry **ptr);
*/ */
void lsm_entry_free(lsm_entry *entry); void lsm_entry_free(lsm_entry *entry);
/**
* Allocates a new entry that's identical to the provided one.
*/
lsm_error lsm_entry_clone(lsm_entry **out, const lsm_entry *entry);
/** /**
* Deallocate an existing lsm_entry object. * Deallocate an existing lsm_entry object.
* *
@ -69,19 +74,6 @@ typedef enum lsm_entry_handle_state : uint8_t {
lsm_entry_handle_state_removed = 1 << 2, lsm_entry_handle_state_removed = 1 << 2,
} lsm_entry_handle_state; } lsm_entry_handle_state;
struct lsm_entry_handle {
lsm_entry_wrapper *wrapper;
lsm_store *store;
// Either read or append, depending on how it was opened
FILE *f;
// Current position in the file pointer
uint64_t pos;
// Required to determine in what way the database files need to be synced
uint64_t states;
};
lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
struct lsm_store { struct lsm_store {
lsm_trie *trie; lsm_trie *trie;
lsm_str *data_path; lsm_str *data_path;
@ -114,47 +106,45 @@ lsm_error lsm_store_load_db(lsm_store *store);
* *
* @param handle handle to added entry * @param handle handle to added entry
*/ */
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle); lsm_error lsm_entry_disk_insert(lsm_store *store, lsm_entry *entry);
/** /**
* Remove an entry from the database. * Remove an entry from the database.
* *
* @param handle handle to the removed entry * @param handle handle to the removed entry
*/ */
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle); lsm_error lsm_entry_disk_remove(lsm_store *store, lsm_entry *entry);
/** /**
* Update an existing entry already in the store. * Update an existing entry already in the store.
* *
* @param handle to updated entry * @param handle to updated entry
*/ */
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle); lsm_error lsm_entry_disk_update(lsm_store *store, lsm_entry *entry);
/** /**
* Return the length of the path to this entry's data file * Return the length of the path to this entry's data file
*/ */
uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle); uint64_t lsm_entry_data_path_len(const lsm_store *store,
const lsm_entry *entry);
/** /**
* Fill in the entry's data file path in the provided buffer. Use * Fill in the entry's data file path in the provided buffer. Use
* `lsm_entry_data_path_len` to allocate an appropriately-sized buffer * `lsm_entry_data_path_len` to allocate an appropriately-sized buffer
*/ */
void lsm_entry_data_path(char *buf, const lsm_entry_handle *handle); void lsm_entry_data_path(char *path, const lsm_store *store,
const lsm_entry *entry);
/** /**
* Open the entry's data file for reading * Open the entry's data file for reading
* *
* @param handle handle to the entry * @param handle handle to the entry
*/ */
lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle); lsm_error lsm_entry_data_open_read(lsm_read_handle *handle);
/** lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store,
* Open the entry's data file for writing. The file and all subdirectories in const lsm_entry *entry, const char *mode);
* the data dir are created as needed. lsm_error lsm_entry_data_mkdirs(const lsm_store *store, const lsm_entry *entry);
*
* @param handle handle to the entry
*/
lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle);
/** /**
* Remove the entry's data file if present and close its handle. Any uncommitted * Remove the entry's data file if present and close its handle. Any uncommitted
@ -162,6 +152,130 @@ lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle);
* *
* @param handle handle to the entry * @param handle handle to the entry
*/ */
lsm_error lsm_entry_data_remove(lsm_entry_handle *handle); lsm_error lsm_entry_data_remove(const lsm_store *store, const lsm_entry *entry);
/**
* Checks whether the entry has an attribute with the specified type.
*
* @param entry entry to check
* @param type type of attribute to check for
*/
bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type);
/**
* Retrieve the contents of an attribute from an entry, if present
*
* @param out where to store pointer to attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry,
uint8_t type);
/**
* Convenience wrapper around `lsm_entry_attr_get` that can be used if we know
* beforehand the attribute value is a 64-bit number.
*
* @param out where to store attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry,
uint8_t type);
/**
* Convenience wrapper around `lsm_entry_attr_get` that can be used if we know
* beforehand the attribute value is an 8-bit number.
*
* @param out where to store attribute data
* @param entry entry to search for
* @param type type of attribute to return
*/
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry,
uint8_t type);
/**
* Add a new attribute to the entry. This overwrites an existing version of this
* attribute.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute; ownership of pointer is taken over
*/
lsm_error lsm_entry_attr_insert(lsm_entry *entry, uint8_t type, lsm_str *data);
/**
* Convenience wrapper around `lsm_entry_attr_insert` that can be used if the
* data to be stored is a 64-bit number.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute
*/
lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry *entry, uint8_t type,
uint64_t data);
/**
* Convenience wrapper around `lsm_entry_attr_insert` that can be used if the
* data to be stored is an 8-bit number.
*
* @param entry entry to modify
* @param type type of attribute to add
* @param data data of attribute
*/
lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry *entry, uint8_t type,
uint8_t data);
/**
* Remove an atribute from the given entry, if present.
*
* @param out pointer to store removed data pointer in. If NULL, data pointer
* will be leaked.
* @param entry entry to remove attribute from
* @param type type of attribute to remove
*/
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, uint8_t type);
/****************************************
*************** Handles ***************
***************************************/
struct lsm_read_handle {
lsm_entry_wrapper *wrapper;
lsm_store *store;
struct {
FILE *f;
uint64_t pos;
} data;
};
struct lsm_write_handle {
lsm_entry_wrapper *wrapper;
lsm_store *store;
lsm_entry *dirty;
bool removed;
struct {
FILE *f;
uint64_t pos;
} data;
};
/**
* Allocate a new `lsm_read_handle` object
*/
lsm_error lsm_read_handle_init(lsm_read_handle **out);
/**
* Allocate a new `lsm_write_handle` object
*/
lsm_error lsm_write_handle_init(lsm_write_handle **out);
/**
* Commit changes solely to the memory part of the store.
*/
void lsm_write_commit_mem(lsm_write_handle *handle);
#endif #endif

View File

@ -0,0 +1,79 @@
#include <stdlib.h>
#include "lsm/store_internal.h"
lsm_error lsm_read_handle_init(lsm_read_handle **out) {
lsm_read_handle *handle = calloc(1, sizeof(lsm_read_handle));
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
}
bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type) {
return lsm_entry_attr_present(handle->wrapper->entry, type);
}
lsm_error lsm_read_attr_get(const lsm_str **out, const lsm_read_handle *handle,
uint8_t type) {
return lsm_entry_attr_get(out, handle->wrapper->entry, type);
}
lsm_error lsm_read_attr_get_uint64_t(uint64_t *out,
const lsm_read_handle *handle,
uint8_t type) {
return lsm_entry_attr_get_uint64_t(out, handle->wrapper->entry, type);
}
lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, const lsm_read_handle *handle,
uint8_t type) {
return lsm_entry_attr_get_uint8_t(out, handle->wrapper->entry, type);
}
uint64_t lsm_read_data_len(const lsm_read_handle *handle) {
return handle->wrapper->entry->data_len;
}
lsm_error lsm_read_data_read(uint64_t *out, char *buf, lsm_read_handle *handle,
uint64_t len) {
const lsm_entry *entry = handle->wrapper->entry;
if (entry->data_len == 0) {
*out = 0;
return lsm_error_ok;
}
// Entries don't open their file unless needed
if (handle->data.f == NULL) {
LSM_RES(lsm_entry_data_open(&handle->data.f, handle->store,
handle->wrapper->entry, "rb"));
}
uint64_t read;
read = fread(buf, sizeof(char), len, handle->data.f);
if ((read == 0) && (ferror(handle->data.f) != 0)) {
return lsm_error_failed_io;
}
handle->data.pos += read;
*out = read;
return lsm_error_ok;
}
void lsm_read_close(lsm_read_handle *handle) {
if (handle->data.f != NULL) {
fclose(handle->data.f);
handle->data.f = NULL;
}
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}

View File

@ -0,0 +1,183 @@
#include <stdlib.h>
#include "lsm/store_internal.h"
lsm_error lsm_write_handle_init(lsm_write_handle **out) {
lsm_write_handle *handle = calloc(1, sizeof(lsm_write_handle));
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
}
bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type) {
return lsm_entry_attr_present(handle->wrapper->entry, type);
}
lsm_error lsm_write_attr_get(const lsm_str **out,
const lsm_write_handle *handle, uint8_t type) {
lsm_entry *entry =
handle->dirty == NULL ? handle->wrapper->entry : handle->dirty;
return lsm_entry_attr_get(out, entry, type);
}
lsm_error lsm_write_attr_get_uint64_t(uint64_t *out,
const lsm_write_handle *handle,
uint8_t type) {
lsm_entry *entry =
handle->dirty == NULL ? handle->wrapper->entry : handle->dirty;
return lsm_entry_attr_get_uint64_t(out, entry, type);
}
lsm_error lsm_write_attr_get_uint8_t(uint8_t *out,
const lsm_write_handle *handle,
uint8_t type) {
lsm_entry *entry =
handle->dirty == NULL ? handle->wrapper->entry : handle->dirty;
return lsm_entry_attr_get_uint8_t(out, entry, type);
}
lsm_error lsm_write_attr_remove(lsm_str **out, lsm_write_handle *handle,
uint8_t type) {
if (handle->dirty == NULL) {
LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry));
}
return lsm_entry_attr_remove(out, handle->dirty, type);
}
lsm_error lsm_write_attr_insert(lsm_write_handle *handle, uint8_t type,
lsm_str *data) {
if (handle->dirty == NULL) {
LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry));
}
return lsm_entry_attr_insert(handle->dirty, type, data);
}
lsm_error lsm_write_attr_insert_uint64_t(lsm_write_handle *handle, uint8_t type,
uint64_t data) {
if (handle->dirty == NULL) {
LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry));
}
return lsm_entry_attr_insert_uint64_t(handle->dirty, type, data);
}
lsm_error lsm_write_attr_insert_uint8_t(lsm_write_handle *handle, uint8_t type,
uint8_t data) {
if (handle->dirty == NULL) {
LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry));
}
return lsm_entry_attr_insert_uint8_t(handle->dirty, type, data);
}
uint64_t lsm_write_data_len(const lsm_write_handle *handle) {
lsm_entry *entry =
handle->dirty == NULL ? handle->wrapper->entry : handle->dirty;
return entry->data_len;
}
lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data) {
if (lsm_str_len(data) == 0) {
return lsm_error_ok;
}
if (handle->dirty == NULL) {
LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry));
}
lsm_entry *entry = handle->dirty;
uint64_t new_len = entry->data_len + lsm_str_len(data);
const char *data_s = lsm_str_ptr(data);
// Entries don't open their file unless needed
if (handle->data.f == NULL) {
// An entry with no existing data will not have a data file yet, so we set
// create to true then
LSM_RES(lsm_entry_data_mkdirs(handle->store, entry));
LSM_RES(lsm_entry_data_open(&handle->data.f, handle->store, entry, "ab"));
}
size_t written = 0;
// TODO what happens when I/O fails?
while (written < data->len) {
written += fwrite(&data_s[written], sizeof(char), data->len - written,
handle->data.f);
}
entry->data_len = new_len;
return lsm_error_ok;
}
void lsm_write_remove(lsm_write_handle *handle) { handle->removed = true; }
void lsm_write_close(lsm_write_handle *handle) {
if (handle->data.f != NULL) {
fclose(handle->data.f);
handle->data.f = NULL;
}
if (handle->dirty != NULL) {
// Entry was never committed to store, so any created data file should be
// removed
if (handle->wrapper->entry == NULL) {
lsm_entry_data_remove(handle->store, handle->dirty);
}
lsm_entry_free(handle->dirty);
}
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}
lsm_error lsm_write_commit(lsm_write_handle *handle) {
if (handle->removed && (handle->wrapper->entry != NULL)) {
LSM_RES(lsm_entry_disk_remove(handle->store, handle->wrapper->entry));
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
handle->removed = false;
return lsm_error_ok;
}
if (handle->dirty == NULL) {
return lsm_error_ok;
}
if (handle->wrapper->entry == NULL) {
LSM_RES(lsm_entry_disk_insert(handle->store, handle->dirty));
} else {
LSM_RES(lsm_entry_disk_update(handle->store, handle->dirty));
lsm_entry_free(handle->wrapper->entry);
}
handle->wrapper->entry = handle->dirty;
handle->dirty = NULL;
return lsm_error_ok;
}
void lsm_write_commit_mem(lsm_write_handle *handle) {
if (handle->dirty == NULL) {
return;
}
if (handle->wrapper->entry != NULL) {
lsm_entry_free(handle->wrapper->entry);
}
handle->wrapper->entry = handle->dirty;
handle->dirty = NULL;
}

View File

@ -28,7 +28,7 @@ uint64_t lsm_store_size(const lsm_store *store) {
return lsm_trie_size(store->trie); return lsm_trie_size(store->trie);
} }
lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store,
const lsm_str *key) { const lsm_str *key) {
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
@ -47,8 +47,8 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry_handle *handle; lsm_read_handle *handle;
LSM_RES2(lsm_entry_handle_init(&handle), LSM_RES2(lsm_read_handle_init(&handle),
pthread_rwlock_unlock(&wrapper->lock)); pthread_rwlock_unlock(&wrapper->lock));
handle->wrapper = wrapper; handle->wrapper = wrapper;
@ -58,7 +58,7 @@ lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store,
const lsm_str *key) { const lsm_str *key) {
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key));
@ -77,8 +77,8 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry_handle *handle; lsm_write_handle *handle;
LSM_RES2(lsm_entry_handle_init(&handle), LSM_RES2(lsm_write_handle_init(&handle),
pthread_rwlock_unlock(&wrapper->lock)); pthread_rwlock_unlock(&wrapper->lock));
handle->wrapper = wrapper; handle->wrapper = wrapper;
@ -88,8 +88,8 @@ lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, lsm_error lsm_store_open_new(lsm_write_handle **out, lsm_store *store,
lsm_str *key) { lsm_str *key) {
// TODO what happens when two inserts to the same key happen at the same time? // TODO what happens when two inserts to the same key happen at the same time?
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
@ -117,84 +117,16 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
LSM_RES2(lsm_entry_init(&entry), pthread_rwlock_unlock(&wrapper->lock)); LSM_RES2(lsm_entry_init(&entry), pthread_rwlock_unlock(&wrapper->lock));
entry->key = key; entry->key = key;
wrapper->entry = entry;
lsm_entry_handle *handle; lsm_write_handle *handle;
LSM_RES2(lsm_entry_handle_init(&handle), LSM_RES2(lsm_write_handle_init(&handle),
pthread_rwlock_unlock(&wrapper->lock)); pthread_rwlock_unlock(&wrapper->lock));
// No need to set the handle's file, as the entry doesn't have any data yet
handle->wrapper = wrapper; handle->wrapper = wrapper;
handle->store = store; handle->store = store;
handle->dirty = entry;
// Newly inserted entries are always dirty
handle->states |= lsm_entry_handle_state_new;
*out = handle; *out = handle;
return lsm_error_ok; return lsm_error_ok;
} }
void lsm_entry_remove(lsm_entry_handle *handle) {
handle->states |= lsm_entry_handle_state_removed;
}
lsm_error lsm_entry_data_append(lsm_entry_handle *handle, const lsm_str *data) {
if (lsm_str_len(data) == 0) {
return lsm_error_ok;
}
lsm_entry *entry = handle->wrapper->entry;
uint64_t new_len = entry->data_len + lsm_str_len(data);
const char *data_s = lsm_str_ptr(data);
// Entries don't open their file unless needed
if (handle->f == NULL) {
// An entry with no existing data will not have a data file yet, so we set
// create to true then
LSM_RES(lsm_entry_data_open_write(handle));
}
size_t written = 0;
// TODO what happens when I/O fails?
while (written < data->len) {
written +=
fwrite(&data_s[written], sizeof(char), data->len - written, handle->f);
}
entry->data_len = new_len;
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok;
}
lsm_error lsm_entry_data_read(uint64_t *out, char *buf,
lsm_entry_handle *handle, uint64_t len) {
const lsm_entry *entry = handle->wrapper->entry;
if (entry->data_len == 0) {
*out = 0;
return lsm_error_ok;
}
// Entries don't open their file unless needed
if (handle->f == NULL) {
LSM_RES(lsm_entry_data_open_read(handle));
}
uint64_t read;
read = fread(buf, sizeof(char), len, handle->f);
if ((read == 0) && (ferror(handle->f) != 0)) {
return lsm_error_failed_io;
}
handle->pos += read;
*out = read;
return lsm_error_ok;
}

View File

@ -130,7 +130,7 @@ static lsm_error lsm_entry_read_str(lsm_str **out, uint64_t *sum, FILE *f) {
return lsm_str_init(out, buf); return lsm_str_init(out, buf);
} }
static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle, static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry *entry,
FILE *db_file) { FILE *db_file) {
uint8_t attr_count; uint8_t attr_count;
LSM_RES(lsm_fread(&attr_count, sum, db_file, sizeof(uint8_t), 1)); LSM_RES(lsm_fread(&attr_count, sum, db_file, sizeof(uint8_t), 1));
@ -142,11 +142,12 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
for (uint64_t i = 0; i < attr_count; i++) { for (uint64_t i = 0; i < attr_count; i++) {
LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1)); LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1));
LSM_RES(lsm_entry_read_str(&val, sum, db_file)); LSM_RES(lsm_entry_read_str(&val, sum, db_file));
LSM_RES(lsm_entry_attr_insert(handle, attr_type, val)); LSM_RES(lsm_entry_attr_insert(entry, attr_type, val));
} }
return lsm_error_ok; return lsm_error_ok;
} }
static lsm_error lsm_fseek(FILE *f, uint64_t pos) { static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
if (fseek(f, pos, SEEK_SET) != 0) { if (fseek(f, pos, SEEK_SET) != 0) {
return lsm_error_failed_io; return lsm_error_failed_io;
@ -165,17 +166,17 @@ lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos,
lsm_str *key; lsm_str *key;
LSM_RES(lsm_entry_read_str(&key, NULL, store->db.f)); LSM_RES(lsm_entry_read_str(&key, NULL, store->db.f));
lsm_entry_handle *handle; lsm_write_handle *handle;
LSM_RES(lsm_store_insert(&handle, store, key)); LSM_RES(lsm_store_open_new(&handle, store, key));
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db.f, LSM_RES(lsm_fread(&handle->dirty->data_len, NULL, store->db.f,
sizeof(uint64_t), 1)); sizeof(uint64_t), 1));
LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db.f)); LSM_RES(lsm_entry_read_attrs(NULL, handle->dirty, store->db.f));
handle->wrapper->entry->idx_file_offset = idx_file_offset; handle->dirty->idx_file_offset = idx_file_offset;
handle->states = 0; lsm_write_commit_mem(handle);
lsm_entry_close(handle); lsm_write_close(handle);
return lsm_error_ok; return lsm_error_ok;
} }

View File

@ -74,16 +74,14 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { lsm_error lsm_entry_disk_insert(lsm_store *store, lsm_entry *entry) {
lsm_store *store = handle->store;
pthread_mutex_lock(&store->db.lock); pthread_mutex_lock(&store->db.lock);
uint64_t db_entry_index = store->db.size; uint64_t db_entry_index = store->db.size;
uint64_t db_entry_size; uint64_t db_entry_size;
lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f, lsm_error res =
handle->wrapper->entry, store->db.size); lsm_write_db_entry(&db_entry_size, store->db.f, entry, store->db.size);
fflush(store->db.f); fflush(store->db.f);
pthread_mutex_unlock(&store->db.lock); pthread_mutex_unlock(&store->db.lock);
@ -117,7 +115,7 @@ lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
store->idx.block_count = new_block_count; store->idx.block_count = new_block_count;
store->db.size += db_entry_size; store->db.size += db_entry_size;
handle->wrapper->entry->idx_file_offset = idx_entry_index; entry->idx_file_offset = idx_entry_index;
} }
} }
@ -154,24 +152,21 @@ static lsm_error lsm_idx_zero_block(lsm_store *store, uint64_t pos) {
// Marking an entry as removed in the idx file is simply setting the length of // Marking an entry as removed in the idx file is simply setting the length of
// its entry to zero // its entry to zero
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { lsm_error lsm_entry_disk_remove(lsm_store *store, lsm_entry *entry) {
const lsm_entry *entry = handle->wrapper->entry; LSM_RES(lsm_idx_zero_block(store, entry->idx_file_offset + sizeof(uint64_t)));
LSM_RES(lsm_entry_data_remove(store, entry));
LSM_RES(lsm_idx_zero_block(handle->store,
entry->idx_file_offset + sizeof(uint64_t)));
LSM_RES(lsm_entry_data_remove(handle));
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle) { lsm_error lsm_entry_disk_update(lsm_store *store, lsm_entry *entry) {
// An update is implemented by reinserting the entry at the end of the db file // An update is implemented by reinserting the entry at the end of the db file
uint64_t old_idx_index = handle->wrapper->entry->idx_file_offset; uint64_t old_idx_index = entry->idx_file_offset;
// TODO is there any way we can make this atomic? If the zero write to the // TODO is there any way we can make this atomic? If the zero write to the
// index file fails, there are two entries in the db file for the same key. // index file fails, there are two entries in the db file for the same key.
LSM_RES(lsm_entry_disk_insert(handle)); LSM_RES(lsm_entry_disk_insert(store, entry));
LSM_RES(lsm_idx_zero_block(handle->store, old_idx_index + sizeof(uint64_t))); LSM_RES(lsm_idx_zero_block(store, old_idx_index + sizeof(uint64_t)));
return lsm_error_ok; return lsm_error_ok;
} }

View File

@ -24,12 +24,45 @@ lsm_error lsm_entry_init(lsm_entry **ptr) {
void lsm_entry_free(lsm_entry *entry) { void lsm_entry_free(lsm_entry *entry) {
if (entry->attrs.count > 0) { if (entry->attrs.count > 0) {
for (size_t i = 0; i < entry->attrs.count; i++) {
lsm_str_free(entry->attrs.items[i].str);
}
free(entry->attrs.items); free(entry->attrs.items);
} }
free(entry); free(entry);
} }
lsm_error lsm_entry_clone(lsm_entry **out, const lsm_entry *entry) {
lsm_entry *new;
LSM_RES(lsm_entry_init(&new));
lsm_str_init_copy_n(&new->key, lsm_str_ptr(entry->key),
lsm_str_len(entry->key));
for (int i = 0; i < 4; i++) {
new->attrs.bitmap[i] = entry->attrs.bitmap[i];
}
new->attrs.count = entry->attrs.count;
new->attrs.items = malloc(sizeof(lsm_attr) * entry->attrs.count);
for (size_t i = 0; i < entry->attrs.count; i++) {
new->attrs.items[i].type = entry->attrs.items[i].type;
lsm_str_init_copy_n(&new->attrs.items[i].str,
lsm_str_ptr(entry->attrs.items[i].str),
lsm_str_len(entry->attrs.items[i].str));
}
new->data_len = entry->data_len;
new->idx_file_offset = entry->idx_file_offset;
*out = new;
return lsm_error_ok;
}
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) { lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper)); lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper));
@ -46,80 +79,16 @@ lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); } void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type) {
lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle)); return (entry->attrs.bitmap[type / 64] & (((uint64_t)1) << (type % 64))) != 0;
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
} }
lsm_error lsm_entry_commit(lsm_entry_handle *handle) { lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry,
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
uint8_t state_removed = handle->states & lsm_entry_handle_state_removed;
uint8_t state_updated = handle->states & lsm_entry_handle_state_updated;
// Clean new entry
if (state_new && !state_removed) {
LSM_RES(lsm_entry_disk_insert(handle));
}
// Previously stored entry that needs to be removed; should be removed from db
// file as well
else if (state_removed && !state_new) {
LSM_RES(lsm_entry_disk_remove(handle));
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
} else if (state_updated && !(state_new || state_removed)) {
LSM_RES(lsm_entry_disk_update(handle));
}
// Reset states after committing current changes
handle->states = 0;
return lsm_error_ok;
}
void lsm_entry_close(lsm_entry_handle *handle) {
if (handle->f != NULL) {
fclose(handle->f);
handle->f = NULL;
}
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
/* bool state_updated = handle->states & lsm_entry_handle_state_updated; */
// New entries create a wrapper in the trie that should be removed if not
// committed
if (state_new) {
lsm_entry_data_remove(handle);
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
}
// TODO rollback uncomitted updates
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}
bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type) {
return (handle->wrapper->entry->attrs.bitmap[type / 64] &
(((uint64_t)1) << (type % 64))) != 0;
}
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
uint8_t type) { uint8_t type) {
if (!lsm_entry_attr_present(handle, type)) { if (!lsm_entry_attr_present(entry, type)) {
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry *entry = handle->wrapper->entry;
uint64_t i = 0; uint64_t i = 0;
while (entry->attrs.items[i].type != type) { while (entry->attrs.items[i].type != type) {
@ -131,11 +100,11 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle, lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry,
uint8_t type) { uint8_t type) {
lsm_str *s; const lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type)); LSM_RES(lsm_entry_attr_get(&s, entry, type));
uint64_t num = 0; uint64_t num = 0;
@ -148,25 +117,22 @@ lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle, lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry,
uint8_t type) { uint8_t type) {
lsm_str *s; const lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type)); LSM_RES(lsm_entry_attr_get(&s, entry, type));
*out = lsm_str_char(s, 0); *out = lsm_str_char(s, 0);
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle, lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, uint8_t type) {
uint8_t type) { if (!lsm_entry_attr_present(entry, type)) {
if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry *entry = handle->wrapper->entry;
if (entry->attrs.count == 1) { if (entry->attrs.count == 1) {
*out = entry->attrs.items[0].str; *out = entry->attrs.items[0].str;
@ -204,21 +170,16 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
entry->attrs.count--; entry->attrs.count--;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64)); entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type, lsm_error lsm_entry_attr_insert(lsm_entry *entry, uint8_t type, lsm_str *data) {
lsm_str *data) {
// Remove a previous version of the attribute // Remove a previous version of the attribute
lsm_str *out; lsm_str *out;
if (lsm_entry_attr_remove(&out, handle, type) == lsm_error_ok) { if (lsm_entry_attr_remove(&out, entry, type) == lsm_error_ok) {
lsm_str_free(out); lsm_str_free(out);
} }
lsm_entry *entry = handle->wrapper->entry;
lsm_attr *new_attrs = lsm_attr *new_attrs =
realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr)); realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr));
@ -233,36 +194,31 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
entry->attrs.count++; entry->attrs.count++;
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64); entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type, lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry *entry, uint8_t type,
uint64_t data) { uint64_t data) {
lsm_str *s; lsm_str *s;
LSM_RES( LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char))); lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s); return lsm_entry_attr_insert(entry, type, s);
} }
lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type, lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry *entry, uint8_t type,
uint8_t data) { uint8_t data) {
lsm_str *s; lsm_str *s;
LSM_RES( LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint8_t) / sizeof(char))); lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint8_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s); return lsm_entry_attr_insert(entry, type, s);
} }
uint64_t lsm_entry_data_len(lsm_entry_handle *handle) { uint64_t lsm_entry_data_path_len(const lsm_store *store,
return handle->wrapper->entry->data_len; const lsm_entry *entry) {
} const lsm_str *data_path = store->data_path;
const lsm_str *key = entry->key;
uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle) {
const lsm_str *data_path = handle->store->data_path;
const lsm_str *key = handle->wrapper->entry->key;
uint8_t levels = uint8_t levels =
key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS; key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS;
@ -271,9 +227,10 @@ uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle) {
strlen(LSM_DATA_FILE_SUFFIX); strlen(LSM_DATA_FILE_SUFFIX);
} }
void lsm_entry_data_path(char *path, const lsm_entry_handle *handle) { void lsm_entry_data_path(char *path, const lsm_store *store,
const lsm_str *data_path = handle->store->data_path; const lsm_entry *entry) {
const lsm_str *key = handle->wrapper->entry->key; const lsm_str *data_path = store->data_path;
const lsm_str *key = entry->key;
uint8_t levels = uint8_t levels =
key->len > LSM_STORE_DATA_LEVELS ? LSM_STORE_DATA_LEVELS : key->len; key->len > LSM_STORE_DATA_LEVELS ? LSM_STORE_DATA_LEVELS : key->len;
@ -298,12 +255,13 @@ void lsm_entry_data_path(char *path, const lsm_entry_handle *handle) {
strcpy(&path[index], LSM_DATA_FILE_SUFFIX); strcpy(&path[index], LSM_DATA_FILE_SUFFIX);
} }
lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle) { lsm_error lsm_entry_data_mkdirs(const lsm_store *store,
char path[lsm_entry_data_path_len(handle) + 1]; const lsm_entry *entry) {
lsm_entry_data_path(path, handle); char path[lsm_entry_data_path_len(store, entry) + 1];
lsm_entry_data_path(path, store, entry);
const lsm_str *data_path = handle->store->data_path; const lsm_str *data_path = store->data_path;
const lsm_str *key = handle->wrapper->entry->key; const lsm_str *key = entry->key;
uint8_t levels = uint8_t levels =
key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS; key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS;
@ -320,43 +278,32 @@ lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle) {
path[data_path->len + 2 * (i + 1)] = '/'; path[data_path->len + 2 * (i + 1)] = '/';
} }
FILE *f = fopen(path, "ab"); return lsm_error_ok;
}
lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store,
const lsm_entry *entry, const char *mode) {
char path[lsm_entry_data_path_len(store, entry) + 1];
lsm_entry_data_path(path, store, entry);
FILE *f = fopen(path, mode);
if (f == NULL) { if (f == NULL) {
return lsm_error_failed_io; return lsm_error_failed_io;
} }
handle->f = f; if (out != NULL) {
*out = f;
return lsm_error_ok;
}
lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle) {
char path[lsm_entry_data_path_len(handle) + 1];
lsm_entry_data_path(path, handle);
FILE *f = fopen(path, "rb");
if (f == NULL) {
return lsm_error_failed_io;
} }
handle->f = f;
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_data_remove(lsm_entry_handle *handle) { lsm_error lsm_entry_data_remove(const lsm_store *store,
const lsm_entry *entry = handle->wrapper->entry; const lsm_entry *entry) {
if (entry->data_len > 0) { if (entry->data_len > 0) {
if (handle->f != NULL) { char data_path[lsm_entry_data_path_len(store, entry) + 1];
fclose(handle->f); lsm_entry_data_path(data_path, store, entry);
handle->f = NULL;
}
char data_path[lsm_entry_data_path_len(handle) + 1];
lsm_entry_data_path(data_path, handle);
if (remove(data_path) != 0) { if (remove(data_path) != 0) {
return lsm_error_failed_io; return lsm_error_failed_io;

View File

@ -24,10 +24,15 @@ lnm_err lander_ctx_init(void **c_ctx, void *gctx) {
} }
void lander_ctx_reset(lander_ctx *ctx) { void lander_ctx_reset(lander_ctx *ctx) {
if (ctx->entry != NULL) { if (ctx->entry.read != NULL) {
lsm_entry_close(ctx->entry); if (ctx->write) {
lsm_write_close(ctx->entry.write);
} else {
lsm_read_close(ctx->entry.read);
}
ctx->entry = NULL; ctx->entry.read = NULL;
ctx->write = false;
} }
} }
@ -45,16 +50,23 @@ void lander_header_to_attr(lnm_http_loop_ctx *ctx, const char *header_name,
lsm_str *value; lsm_str *value;
lsm_str_init_copy_n(&value, (char *)header_value, header_value_len); lsm_str_init_copy_n(&value, (char *)header_value, header_value_len);
lsm_entry_attr_insert(c_ctx->entry, attr_type, value); lsm_write_attr_insert(c_ctx->entry.write, attr_type, value);
} }
} }
void lander_attr_to_header(lnm_http_loop_ctx *ctx, lander_attr_type attr_type, void lander_attr_to_header(lnm_http_loop_ctx *ctx, lander_attr_type attr_type,
lnm_http_header header_type) { lnm_http_header header_type) {
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
lsm_str *value; const lsm_str *value;
lsm_error res;
if (lsm_entry_attr_get(&value, c_ctx->entry, attr_type) == lsm_error_ok) { if (c_ctx->write) {
res = lsm_write_attr_get(&value, c_ctx->entry.write, attr_type);
} else {
res = lsm_read_attr_get(&value, c_ctx->entry.read, attr_type);
}
if (res == lsm_error_ok) {
lnm_http_res_add_header_len(&ctx->res, header_type, lnm_http_res_add_header_len(&ctx->res, header_type,
(char *)lsm_str_ptr(value), lsm_str_len(value), (char *)lsm_str_ptr(value), lsm_str_len(value),
false); false);

View File

@ -15,9 +15,9 @@ lnm_http_step_err lander_remove_entry(lnm_http_conn *conn) {
lsm_str *key; lsm_str *key;
lsm_str_init_copy_n(&key, (char *)key_s, key_len); lsm_str_init_copy_n(&key, (char *)key_s, key_len);
switch (lsm_store_open_write(&c_ctx->entry, c_gctx->store, key)) { switch (lsm_store_open_write(&c_ctx->entry.write, c_gctx->store, key)) {
case lsm_error_ok: case lsm_error_ok:
lsm_entry_remove(c_ctx->entry); lsm_write_remove(c_ctx->entry.write);
break; break;
case lsm_error_not_found: case lsm_error_not_found:
ctx->res.status = lnm_http_status_not_found; ctx->res.status = lnm_http_status_not_found;

View File

@ -36,17 +36,17 @@ lnm_http_step_err lander_get_redirect(lnm_http_conn *conn) {
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
// For redirects, the URL is stored as an in-memory attribute // For redirects, the URL is stored as an in-memory attribute
lsm_str *url_attr_val; const lsm_str *url_attr_val;
// This shouldn't be able to happen // This shouldn't be able to happen
if (lsm_entry_attr_get(&url_attr_val, c_ctx->entry, lander_attr_type_url) != if (lsm_read_attr_get(&url_attr_val, c_ctx->entry.read,
lsm_error_ok) { lander_attr_type_url) != lsm_error_ok) {
lnm_lerror("lander", "%s", lnm_lerror("lander", "%s",
"Entry of type redirect detected without URL attribute"); "Entry of type redirect detected without URL attribute");
ctx->res.status = lnm_http_status_internal_server_error; ctx->res.status = lnm_http_status_internal_server_error;
lsm_entry_close(c_ctx->entry); lsm_read_close(c_ctx->entry.read);
c_ctx->entry = NULL; c_ctx->entry.read = NULL;
return lnm_http_step_err_res; return lnm_http_step_err_res;
} }
@ -68,7 +68,7 @@ lnm_err lander_entry_data_streamer(uint64_t *written, char *buf,
lnm_http_loop_ctx *ctx = conn->ctx; lnm_http_loop_ctx *ctx = conn->ctx;
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
lsm_entry_data_read(written, buf, c_ctx->entry, len); lsm_read_data_read(written, buf, c_ctx->entry.read, len);
return lnm_err_ok; return lnm_err_ok;
} }
@ -78,7 +78,7 @@ lnm_http_step_err lander_get_paste(lnm_http_conn *conn) {
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
lnm_http_res_body_set_fn(&ctx->res, lander_entry_data_streamer, lnm_http_res_body_set_fn(&ctx->res, lander_entry_data_streamer,
lsm_entry_data_len(c_ctx->entry)); lsm_read_data_len(c_ctx->entry.read));
lnm_http_res_add_header(&ctx->res, lnm_http_header_content_type, "text/plain", lnm_http_res_add_header(&ctx->res, lnm_http_header_content_type, "text/plain",
false); false);
@ -90,7 +90,7 @@ lnm_http_step_err lander_get_file(lnm_http_conn *conn) {
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
lnm_http_res_body_set_fn(&ctx->res, lander_entry_data_streamer, lnm_http_res_body_set_fn(&ctx->res, lander_entry_data_streamer,
lsm_entry_data_len(c_ctx->entry)); lsm_read_data_len(c_ctx->entry.read));
lander_attr_to_header(ctx, lander_attr_type_content_type, lander_attr_to_header(ctx, lander_attr_type_content_type,
lnm_http_header_content_type); lnm_http_header_content_type);
@ -108,7 +108,8 @@ lnm_http_step_err lander_get_entry(lnm_http_conn *conn) {
lsm_str *key; lsm_str *key;
lsm_str_init_copy_n(&key, (char *)key_s, key_len); lsm_str_init_copy_n(&key, (char *)key_s, key_len);
lsm_error lsm_res = lsm_store_open_read(&c_ctx->entry, c_gctx->store, key); lsm_error lsm_res =
lsm_store_open_read(&c_ctx->entry.read, c_gctx->store, key);
lsm_str_free(key); lsm_str_free(key);
switch (lsm_res) { switch (lsm_res) {
@ -123,8 +124,8 @@ lnm_http_step_err lander_get_entry(lnm_http_conn *conn) {
} }
lander_entry_type t; lander_entry_type t;
lsm_entry_attr_get_uint8_t((uint8_t *)&t, c_ctx->entry, lsm_read_attr_get_uint8_t((uint8_t *)&t, c_ctx->entry.read,
lander_attr_type_entry_type); lander_attr_type_entry_type);
lnm_http_step_err res; lnm_http_step_err res;

View File

@ -29,7 +29,7 @@ bool lander_insert_entry(lnm_http_loop_ctx *ctx, bool secure) {
// With placeholders, the entry will already be open so an entry should no // With placeholders, the entry will already be open so an entry should no
// longer be created // longer be created
if (c_ctx->entry != NULL) { if (c_ctx->entry.write != NULL) {
return true; return true;
} }
@ -50,11 +50,12 @@ bool lander_insert_entry(lnm_http_loop_ctx *ctx, bool secure) {
} }
// TODO free key on error // TODO free key on error
switch (lsm_store_insert(&c_ctx->entry, c_gctx->store, key)) { switch (lsm_store_open_new(&c_ctx->entry.write, c_gctx->store, key)) {
case lsm_error_already_present: case lsm_error_already_present:
ctx->res.status = lnm_http_status_conflict; ctx->res.status = lnm_http_status_conflict;
return false; return false;
case lsm_error_ok: case lsm_error_ok:
c_ctx->write = true;
break; break;
default: default:
ctx->res.status = lnm_http_status_internal_server_error; ctx->res.status = lnm_http_status_internal_server_error;
@ -82,7 +83,7 @@ static lnm_http_step_err __lander_post_redirect(lnm_http_conn *conn,
return lnm_http_step_err_res; return lnm_http_step_err_res;
} }
lsm_entry_attr_insert_uint8_t(c_ctx->entry, lander_attr_type_entry_type, lsm_write_attr_insert_uint8_t(c_ctx->entry.write, lander_attr_type_entry_type,
lander_entry_type_redirect); lander_entry_type_redirect);
return lnm_http_step_err_done; return lnm_http_step_err_done;
@ -102,7 +103,7 @@ lnm_http_step_err lander_post_redirect_body_to_attr(lnm_http_conn *conn) {
lsm_str *attr_value; lsm_str *attr_value;
lsm_str_init_copy_n(&attr_value, ctx->req.body.buf, ctx->req.body.len); lsm_str_init_copy_n(&attr_value, ctx->req.body.buf, ctx->req.body.len);
lsm_entry_attr_insert(c_ctx->entry, lander_attr_type_url, attr_value); lsm_write_attr_insert(c_ctx->entry.write, lander_attr_type_url, attr_value);
return lnm_http_step_err_done; return lnm_http_step_err_done;
} }
@ -115,7 +116,7 @@ static lnm_http_step_err __lander_post_paste(lnm_http_conn *conn, bool secure) {
return lnm_http_step_err_res; return lnm_http_step_err_res;
} }
lsm_entry_attr_insert_uint8_t(c_ctx->entry, lander_attr_type_entry_type, lsm_write_attr_insert_uint8_t(c_ctx->entry.write, lander_attr_type_entry_type,
lander_entry_type_paste); lander_entry_type_paste);
lander_header_to_attr(ctx, "X-Lander-Filename", lander_attr_type_file_name); lander_header_to_attr(ctx, "X-Lander-Filename", lander_attr_type_file_name);
@ -138,7 +139,7 @@ static lnm_http_step_err __lander_post_file(lnm_http_conn *conn, bool secure) {
return lnm_http_step_err_res; return lnm_http_step_err_res;
} }
lsm_entry_attr_insert_uint8_t(c_ctx->entry, lander_attr_type_entry_type, lsm_write_attr_insert_uint8_t(c_ctx->entry.write, lander_attr_type_entry_type,
lander_entry_type_file); lander_entry_type_file);
lander_header_to_attr(ctx, "X-Lander-Content-Type", lander_header_to_attr(ctx, "X-Lander-Content-Type",
lander_attr_type_content_type); lander_attr_type_content_type);
@ -163,7 +164,7 @@ lnm_http_step_err __lander_post_placeholder(lnm_http_conn *conn, bool secure) {
return lnm_http_step_err_res; return lnm_http_step_err_res;
} }
lsm_entry_attr_insert_uint8_t(c_ctx->entry, lander_attr_type_entry_type, lsm_write_attr_insert_uint8_t(c_ctx->entry.write, lander_attr_type_entry_type,
lander_entry_type_placeholder); lander_entry_type_placeholder);
return lnm_http_step_err_done; return lnm_http_step_err_done;

View File

@ -11,19 +11,19 @@ lnm_http_step_err lander_stream_body_to_entry(lnm_http_conn *conn) {
lnm_http_loop_ctx *ctx = conn->ctx; lnm_http_loop_ctx *ctx = conn->ctx;
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
uint64_t to_append = uint64_t to_append = LNM_MIN(conn->r.size - conn->r.read,
LNM_MIN(conn->r.size - conn->r.read, ctx->req.body.expected_len -
ctx->req.body.expected_len - lsm_entry_data_len(c_ctx->entry)); lsm_write_data_len(c_ctx->entry.write));
lsm_str *data; lsm_str *data;
lsm_str_init_copy_n(&data, (char *)&conn->r.buf[conn->r.read], to_append); lsm_str_init_copy_n(&data, (char *)&conn->r.buf[conn->r.read], to_append);
lsm_entry_data_append(c_ctx->entry, data); lsm_write_data_append(c_ctx->entry.write, data);
conn->r.read += to_append; conn->r.read += to_append;
lsm_str_free(data); lsm_str_free(data);
return lsm_entry_data_len(c_ctx->entry) == ctx->req.body.expected_len return lsm_write_data_len(c_ctx->entry.write) == ctx->req.body.expected_len
? lnm_http_step_err_done ? lnm_http_step_err_done
: lnm_http_step_err_io_needed; : lnm_http_step_err_io_needed;
} }
@ -32,7 +32,7 @@ lnm_http_step_err lander_commit_entry(lnm_http_conn *conn) {
lnm_http_loop_ctx *ctx = conn->ctx; lnm_http_loop_ctx *ctx = conn->ctx;
lander_ctx *c_ctx = ctx->c; lander_ctx *c_ctx = ctx->c;
lsm_entry_commit(c_ctx->entry); lsm_write_commit(c_ctx->entry.write);
return lnm_http_step_err_done; return lnm_http_step_err_done;
} }
@ -54,17 +54,20 @@ lnm_http_step_err lander_auth_or_placeholder(lnm_http_conn *conn) {
lsm_str *key; lsm_str *key;
lsm_str_init_copy_n(&key, key_s, key_len); lsm_str_init_copy_n(&key, key_s, key_len);
lsm_error res = lsm_store_open_write(&c_ctx->entry, c_gctx->store, key); lsm_error res = lsm_store_open_write(&c_ctx->entry.write, c_gctx->store, key);
lsm_str_free(key); lsm_str_free(key);
switch (res) { switch (res) {
case lsm_error_ok: { case lsm_error_ok: {
c_ctx->write = true;
lander_entry_type t; lander_entry_type t;
// If the entry is a placeholder, the request is always authenticated // If the entry is a placeholder, the request is always authenticated
if ((lsm_entry_attr_get_uint8_t( if ((lsm_write_attr_get_uint8_t(&t, c_ctx->entry.write,
&t, c_ctx->entry, lander_attr_type_entry_type) == lsm_error_ok) && lander_attr_type_entry_type) ==
lsm_error_ok) &&
(t == lander_entry_type_placeholder)) { (t == lander_entry_type_placeholder)) {
return lnm_http_step_err_done; return lnm_http_step_err_done;
} else { } else {