feat(lsm): introduce entry handles for concurrent access

lsm
Jef Roosens 2023-10-29 12:19:59 +01:00
parent 0e4e18da6c
commit f44c512099
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 195 additions and 68 deletions

View File

@ -21,27 +21,10 @@ typedef enum lsm_attr_type : uint64_t {
} lsm_attr_type; } lsm_attr_type;
/** /**
* An entry inside an LSM store. * A handle referencing an entry inside a store. Read/write operations from/to
* * the entry go through this handle.
* Each entry consists of the key it's stored behind, zero or more attributes
* (metadata) and a data field. The data field can be stored on disk or
* in-memory, depending on the size.
*/ */
typedef struct lsm_entry lsm_entry; typedef struct lsm_entry_handle lsm_entry_handle;
/**
* Allocate and initialize a new lsm_entry object.
*
* @param ptr where to store newly allocated pointer
*/
lsm_error lsm_entry_init(lsm_entry **ptr);
/**
* Deallocate an existing lsm_entry object.
*
* @param entry object to deallocate
*/
void lsm_entry_free(lsm_entry *entry);
/** /**
* Checks whether the entry has an attribute with the specified type. * Checks whether the entry has an attribute with the specified type.
@ -49,7 +32,7 @@ void lsm_entry_free(lsm_entry *entry);
* @param entry entry to check * @param entry entry to check
* @param type type of attribute to check for * @param type type of attribute to check for
*/ */
bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type); bool lsm_entry_attr_present(lsm_entry_handle *handle, lsm_attr_type type);
/** /**
* Retrieve the contents of an attribute from an entry, if present * Retrieve the contents of an attribute from an entry, if present
@ -58,7 +41,7 @@ bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type);
* @param entry entry to search for * @param entry entry to search for
* @param type type of attribute to return * @param type type of attribute to return
*/ */
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
lsm_attr_type type); lsm_attr_type type);
/** /**
@ -68,7 +51,7 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry,
* @param type type of attribute to add * @param type type of attribute to add
* @param data data of attribute; ownership of pointer is taken over * @param data data of attribute; ownership of pointer is taken over
*/ */
lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type, lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type,
lsm_str *data); lsm_str *data);
/** /**
@ -79,7 +62,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type,
* @param entry entry to remove attribute from * @param entry entry to remove attribute from
* @param type type of attribute to remove * @param type type of attribute to remove
*/ */
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
lsm_attr_type type); lsm_attr_type type);
/** /**
@ -104,7 +87,7 @@ lsm_error lsm_store_init(lsm_store **ptr);
* @param db_path path to the database file * @param db_path path to the database file
* @param data_path path to the data directory * @param data_path path to the data directory
*/ */
lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path); lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path);
/** /**
* Dealocate an existing lsm_store object. * Dealocate an existing lsm_store object.
@ -114,43 +97,45 @@ lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path);
void lsm_store_free(lsm_store *store); void lsm_store_free(lsm_store *store);
/** /**
* Retrieve an entry from the store, preparing & locking it for the purpose of * Open a read handle to the given entry. This entry must be properly closed
* reading. * using `lsm_store_handle_close`.
* *
* @param out pointer to store entry pointer * @param out pointer to store handle pointer
* @param store store to retrieve entry from * @param store store to retrieve entry from
* @param key key to search * @param key key to search
*/ */
lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key); lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
lsm_str *key);
/** /**
* Retrieve an entry from the store for the purposes of writing. This * Open a write handle to the given entry. This entry must be properly closed
* write-locks the entry. * using `lsm_store_handle_close`.
* *
* @param out pointer to store entry pointer * @param out pointer to store handle pointer
* @param store store to retrieve entry from * @param store store to retrieve entry from
* @param key key to search * @param key key to search
*/ */
lsm_error lsm_store_get_write(lsm_entry **out, lsm_store *store, lsm_str *key); lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
lsm_str *key);
/** /**
* Unlock a locked entry. * Close an open entry handle.
* *
* @param store store to unlock entry in * @param store store the handle's entry is stored in
* @param entry entry to unlock * @param handle handle to close
*/ */
lsm_error lsm_store_unlock(lsm_store *store, lsm_entry *entry); void lsm_entry_close(lsm_entry_handle *handle);
/** /**
* Allocate a new entry in the store with the specified key. The entry returned * Insert a new entry into the store, returning a write handle to the newly
* will be write-locked, and should be unlocked after streaming the necessary * created entry.
* data.
* *
* @param out pointer to store new entry pointer in * @param out pointer to store new entry pointer in
* @param store store to modify * @param store store to modify
* @param key key to add; ownership of key pointer is taken over * @param key key to add; ownership of key pointer is taken over
*/ */
lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key); lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
lsm_str *key);
/** /**
* Append new data to the given entry, which is expected to be in the store. * Append new data to the given entry, which is expected to be in the store.
@ -162,7 +147,7 @@ lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key);
* @param entry entry to append data to * @param entry entry to append data to
* @param data data to append * @param data data to append
*/ */
lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
lsm_str *data); lsm_str *data);
#endif #endif

View File

@ -13,7 +13,14 @@ typedef struct lsm_attr {
lsm_str *str; lsm_str *str;
} lsm_attr; } lsm_attr;
struct lsm_entry { /**
* An entry inside an LSM store.
*
* Each entry consists of the key it's stored behind, zero or more attributes
* (metadata) and a data field. The data field can be stored on disk or
* in-memory, depending on the size.
*/
typedef struct lsm_entry {
lsm_str *key; lsm_str *key;
struct { struct {
uint64_t count; uint64_t count;
@ -28,7 +35,21 @@ struct lsm_entry {
} value; } value;
bool on_disk; bool on_disk;
} data; } data;
}; } lsm_entry;
/**
* Allocate and initialize a new lsm_entry object.
*
* @param ptr where to store newly allocated pointer
*/
lsm_error lsm_entry_init(lsm_entry **ptr);
/**
* Deallocate an existing lsm_entry object.
*
* @param entry object to deallocate
*/
void lsm_entry_free(lsm_entry *entry);
typedef struct lsm_entry_wrapper { typedef struct lsm_entry_wrapper {
pthread_rwlock_t lock; pthread_rwlock_t lock;
@ -36,6 +57,14 @@ typedef struct lsm_entry_wrapper {
} lsm_entry_wrapper; } lsm_entry_wrapper;
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr); lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr);
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper);
struct lsm_entry_handle {
lsm_entry_wrapper *wrapper;
FILE *f;
};
lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
struct lsm_store { struct lsm_store {
lsm_trie *trie; lsm_trie *trie;

View File

@ -27,7 +27,7 @@ lsm_error lsm_store_init(lsm_store **ptr) {
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path,
lsm_str *data_path) { lsm_str *data_path) {
lsm_store *store; lsm_store *store;
LSM_RES(lsm_store_init(&store)); LSM_RES(lsm_store_init(&store));
@ -42,12 +42,13 @@ lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) { lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store,
lsm_str *key) {
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key));
// We don't want to block the thread // Try to get a read lock on the entry's lock
if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) { if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) {
return lsm_error_lock_busy; return lsm_error_lock_busy;
} }
@ -62,8 +63,17 @@ lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) {
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry_handle *handle;
lsm_error res = lsm_entry_handle_init(&handle);
if (res != lsm_error_ok) {
pthread_rwlock_unlock(&wrapper->lock);
return res;
}
// Open a new file descriptor if needed // Open a new file descriptor if needed
if (entry->data.on_disk && (entry->data.value.file == NULL)) { if (entry->data.on_disk) {
char path[store->data_path->len + entry->key->len + 2]; char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key)); lsm_str_ptr(entry->key));
@ -71,36 +81,111 @@ lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) {
FILE *f = fopen(path, "rb"); FILE *f = fopen(path, "rb");
if (f == NULL) { if (f == NULL) {
free(handle);
return lsm_error_failed_io; return lsm_error_failed_io;
} }
entry->data.value.file = f; handle->f = f;
} }
handle->wrapper = wrapper;
*out = handle;
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key) { lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store,
lsm_str *key) {
lsm_entry_wrapper *wrapper;
LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key));
// Try to get a write lock on the entry's lock
// TODO make this timeout to not block
if (pthread_rwlock_wrlock(&wrapper->lock) != 0) {
return lsm_error_lock_busy;
}
lsm_entry *entry = wrapper->entry;
// While the trie's data field will never be NULL, the actual entry pointer
// might be
if (entry == NULL) {
pthread_rwlock_unlock(&wrapper->lock);
return lsm_error_not_found;
}
lsm_entry_handle *handle;
lsm_error res = lsm_entry_handle_init(&handle);
if (res != lsm_error_ok) {
pthread_rwlock_unlock(&wrapper->lock);
return res;
}
// Open a new file descriptor if needed
if (entry->data.on_disk) {
char path[store->data_path->len + entry->key->len + 2];
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key));
FILE *f = fopen(path, "wb");
if (f == NULL) {
free(handle);
return lsm_error_failed_io;
}
handle->f = f;
}
handle->wrapper = wrapper;
*out = handle;
return lsm_error_ok;
}
lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
lsm_str *key) {
// TODO what happens when two inserts to the same key happen at the same time?
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
LSM_RES(lsm_entry_wrapper_init(&wrapper)); LSM_RES(lsm_entry_wrapper_init(&wrapper));
pthread_rwlock_wrlock(&wrapper->lock);
lsm_error res = lsm_trie_insert(store->trie, key, wrapper);
// Check if entry isn't already present in advance
if (res != lsm_error_ok) {
lsm_entry_wrapper_free(wrapper);
return res;
}
lsm_entry *entry; lsm_entry *entry;
LSM_RES(lsm_entry_init(&entry)); LSM_RES(lsm_entry_init(&entry));
entry->key = key; entry->key = key;
wrapper->entry = entry; wrapper->entry = entry;
pthread_rwlock_wrlock(&wrapper->lock);
// TODO mem leak if already present lsm_entry_handle *handle;
LSM_RES(lsm_trie_insert(store->trie, key, wrapper)); LSM_RES(lsm_entry_handle_init(&handle));
*out = entry; // No need to set the handle's file, as the entry doesn't have any data yet
handle->wrapper = wrapper;
*out = handle;
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry, lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle,
lsm_str *data) { lsm_str *data) {
lsm_entry *entry = handle->wrapper->entry;
uint64_t new_len = entry->data.len + lsm_str_len(data); uint64_t new_len = entry->data.len + lsm_str_len(data);
const char *data_s = lsm_str_ptr(data); const char *data_s = lsm_str_ptr(data);
@ -124,13 +209,13 @@ lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry,
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
lsm_str_ptr(entry->key)); lsm_str_ptr(entry->key));
FILE *f = fopen(path, "w"); FILE *f = fopen(path, "ab");
if (f == NULL) { if (f == NULL) {
return lsm_error_failed_io; return lsm_error_failed_io;
} }
entry->data.value.file = f; handle->f = f;
entry->data.on_disk = true; entry->data.on_disk = true;
// TODO free old buff, write original data to file // TODO free old buff, write original data to file
@ -141,7 +226,7 @@ lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry,
// TODO what happens when I/O fails? // TODO what happens when I/O fails?
while (written < data->len) { while (written < data->len) {
written += fwrite(&data_s[written], sizeof(char), data->len - written, written += fwrite(&data_s[written], sizeof(char), data->len - written,
entry->data.value.file); handle->f);
} }
} }

View File

@ -32,16 +32,40 @@ lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
return lsm_error_ok; return lsm_error_ok;
} }
bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type) { void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
return (entry->attrs.bitmap & type) != 0;
lsm_error lsm_entry_handle_init(lsm_entry_handle **out) {
lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle));
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
} }
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry, void lsm_entry_close(lsm_entry_handle *handle) {
if (handle->f != NULL) {
fclose(handle->f);
}
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}
bool lsm_entry_attr_present(lsm_entry_handle *handle, lsm_attr_type type) {
return (handle->wrapper->entry->attrs.bitmap & type) != 0;
}
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
lsm_attr_type type) { lsm_attr_type type) {
if (!lsm_entry_attr_present(entry, type)) { if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry *entry = handle->wrapper->entry;
uint64_t i = 0; uint64_t i = 0;
while (entry->attrs.items[i].type != type) { while (entry->attrs.items[i].type != type) {
@ -53,12 +77,14 @@ lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry *entry,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry, lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
lsm_attr_type type) { lsm_attr_type type) {
if (!lsm_entry_attr_present(entry, type)) { if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found; return lsm_error_not_found;
} }
lsm_entry *entry = handle->wrapper->entry;
if (entry->attrs.count == 1) { if (entry->attrs.count == 1) {
*out = entry->attrs.items[0].str; *out = entry->attrs.items[0].str;
@ -99,12 +125,14 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry *entry,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_entry_attr_insert(lsm_entry *entry, lsm_attr_type type, lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, lsm_attr_type type,
lsm_str *data) { lsm_str *data) {
if (lsm_entry_attr_present(entry, type)) { if (lsm_entry_attr_present(handle, type)) {
return lsm_error_already_present; return lsm_error_already_present;
} }
lsm_entry *entry = handle->wrapper->entry;
lsm_attr *new_attrs = lsm_attr *new_attrs =
realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr)); realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr));