feat(lsm): data streaming, random other stuff, locks
parent
aab93d9741
commit
0e4e18da6c
|
@ -15,7 +15,9 @@ typedef enum lsm_error {
|
||||||
lsm_error_failed_alloc = 1,
|
lsm_error_failed_alloc = 1,
|
||||||
lsm_error_not_found = 2,
|
lsm_error_not_found = 2,
|
||||||
lsm_error_already_present = 3,
|
lsm_error_already_present = 3,
|
||||||
lsm_error_null_value = 4
|
lsm_error_null_value = 4,
|
||||||
|
lsm_error_failed_io = 5,
|
||||||
|
lsm_error_lock_busy = 6,
|
||||||
} lsm_error;
|
} lsm_error;
|
||||||
|
|
||||||
/*typedef struct lsm_string { */
|
/*typedef struct lsm_string { */
|
||||||
|
|
|
@ -104,7 +104,7 @@ lsm_error lsm_store_init(lsm_store **ptr);
|
||||||
* @param db_path path to the database file
|
* @param db_path path to the database file
|
||||||
* @param data_path path to the data directory
|
* @param data_path path to the data directory
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path);
|
lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dealocate an existing lsm_store object.
|
* Dealocate an existing lsm_store object.
|
||||||
|
@ -114,16 +114,37 @@ lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path);
|
||||||
void lsm_store_free(lsm_store *store);
|
void lsm_store_free(lsm_store *store);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Search for an entry in the store.
|
* Retrieve an entry from the store, preparing & locking it for the purpose of
|
||||||
|
* reading.
|
||||||
*
|
*
|
||||||
* @param out pointer to store entry pointer in
|
* @param out pointer to store entry pointer
|
||||||
* @param store store to search in
|
* @param store store to retrieve entry from
|
||||||
* @param key key to look with
|
* @param key key to search
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_store_search(lsm_entry **out, lsm_store *store, lsm_str *key);
|
lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a new entry in the store with the specified key.
|
* Retrieve an entry from the store for the purposes of writing. This
|
||||||
|
* write-locks the entry.
|
||||||
|
*
|
||||||
|
* @param out pointer to store entry pointer
|
||||||
|
* @param store store to retrieve entry from
|
||||||
|
* @param key key to search
|
||||||
|
*/
|
||||||
|
lsm_error lsm_store_get_write(lsm_entry **out, lsm_store *store, lsm_str *key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unlock a locked entry.
|
||||||
|
*
|
||||||
|
* @param store store to unlock entry in
|
||||||
|
* @param entry entry to unlock
|
||||||
|
*/
|
||||||
|
lsm_error lsm_store_unlock(lsm_store *store, lsm_entry *entry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a new entry in the store with the specified key. The entry returned
|
||||||
|
* will be write-locked, and should be unlocked after streaming the necessary
|
||||||
|
* data.
|
||||||
*
|
*
|
||||||
* @param out pointer to store new entry pointer in
|
* @param out pointer to store new entry pointer in
|
||||||
* @param store store to modify
|
* @param store store to modify
|
||||||
|
@ -141,6 +162,7 @@ lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key);
|
||||||
* @param entry entry to append data to
|
* @param entry entry to append data to
|
||||||
* @param data data to append
|
* @param data data to append
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *data);
|
lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry,
|
||||||
|
lsm_str *data);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#ifndef LSM_STORE_INTERNAL
|
#ifndef LSM_STORE_INTERNAL
|
||||||
#define LSM_STORE_INTERNAL
|
#define LSM_STORE_INTERNAL
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
|
||||||
#include "lsm/store.h"
|
#include "lsm/store.h"
|
||||||
|
@ -29,10 +30,17 @@ struct lsm_entry {
|
||||||
} data;
|
} data;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct lsm_entry_wrapper {
|
||||||
|
pthread_rwlock_t lock;
|
||||||
|
lsm_entry *entry;
|
||||||
|
} lsm_entry_wrapper;
|
||||||
|
|
||||||
|
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr);
|
||||||
|
|
||||||
struct lsm_store {
|
struct lsm_store {
|
||||||
lsm_trie *trie;
|
lsm_trie *trie;
|
||||||
char *data_path;
|
lsm_str *data_path;
|
||||||
char *db_path;
|
lsm_str *db_path;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
|
#include <pthread.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
#include "lsm.h"
|
#include "lsm.h"
|
||||||
#include "lsm/store.h"
|
#include "lsm/store.h"
|
||||||
#include "lsm/trie.h"
|
|
||||||
#include "lsm/store_internal.h"
|
#include "lsm/store_internal.h"
|
||||||
|
#include "lsm/trie.h"
|
||||||
|
|
||||||
lsm_error lsm_store_init(lsm_store **ptr) {
|
lsm_error lsm_store_init(lsm_store **ptr) {
|
||||||
lsm_store *store = calloc(1, sizeof(lsm_store));
|
lsm_store *store = calloc(1, sizeof(lsm_store));
|
||||||
|
@ -26,7 +27,8 @@ lsm_error lsm_store_init(lsm_store **ptr) {
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path) {
|
lsm_error lsm_store_open(lsm_store **ptr, lsm_str *db_path,
|
||||||
|
lsm_str *data_path) {
|
||||||
lsm_store *store;
|
lsm_store *store;
|
||||||
LSM_RES(lsm_store_init(&store));
|
LSM_RES(lsm_store_init(&store));
|
||||||
|
|
||||||
|
@ -40,24 +42,67 @@ lsm_error lsm_store_open(lsm_store **ptr, char *db_path, char *data_path) {
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_store_search(lsm_entry **out, lsm_store *store, lsm_str *key) {
|
lsm_error lsm_store_get_read(lsm_entry **out, lsm_store *store, lsm_str *key) {
|
||||||
return lsm_trie_search((void **)out, store->trie, key);
|
lsm_entry_wrapper *wrapper;
|
||||||
|
|
||||||
|
LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key));
|
||||||
|
|
||||||
|
// We don't want to block the thread
|
||||||
|
if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) {
|
||||||
|
return lsm_error_lock_busy;
|
||||||
|
}
|
||||||
|
|
||||||
|
lsm_entry *entry = wrapper->entry;
|
||||||
|
|
||||||
|
// While the trie's data field will never be NULL, the actual entry pointer
|
||||||
|
// might be
|
||||||
|
if (entry == NULL) {
|
||||||
|
pthread_rwlock_unlock(&wrapper->lock);
|
||||||
|
|
||||||
|
return lsm_error_not_found;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open a new file descriptor if needed
|
||||||
|
if (entry->data.on_disk && (entry->data.value.file == NULL)) {
|
||||||
|
char path[store->data_path->len + entry->key->len + 2];
|
||||||
|
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
|
||||||
|
lsm_str_ptr(entry->key));
|
||||||
|
|
||||||
|
FILE *f = fopen(path, "rb");
|
||||||
|
|
||||||
|
if (f == NULL) {
|
||||||
|
return lsm_error_failed_io;
|
||||||
|
}
|
||||||
|
|
||||||
|
entry->data.value.file = f;
|
||||||
|
}
|
||||||
|
|
||||||
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key) {
|
lsm_error lsm_store_insert(lsm_entry **out, lsm_store *store, lsm_str *key) {
|
||||||
lsm_entry *entry;
|
lsm_entry_wrapper *wrapper;
|
||||||
|
LSM_RES(lsm_entry_wrapper_init(&wrapper));
|
||||||
|
|
||||||
|
lsm_entry *entry;
|
||||||
LSM_RES(lsm_entry_init(&entry));
|
LSM_RES(lsm_entry_init(&entry));
|
||||||
LSM_RES(lsm_trie_insert(store->trie, key, entry));
|
|
||||||
|
|
||||||
entry->key = key;
|
entry->key = key;
|
||||||
|
wrapper->entry = entry;
|
||||||
|
pthread_rwlock_wrlock(&wrapper->lock);
|
||||||
|
|
||||||
|
// TODO mem leak if already present
|
||||||
|
LSM_RES(lsm_trie_insert(store->trie, key, wrapper));
|
||||||
|
|
||||||
*out = entry;
|
*out = entry;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *data) {
|
lsm_error lsm_store_data_write(lsm_store *store, lsm_entry *entry,
|
||||||
|
lsm_str *data) {
|
||||||
uint64_t new_len = entry->data.len + lsm_str_len(data);
|
uint64_t new_len = entry->data.len + lsm_str_len(data);
|
||||||
|
const char *data_s = lsm_str_ptr(data);
|
||||||
|
|
||||||
// Data is in memory and still fits -> keep it in memory
|
// Data is in memory and still fits -> keep it in memory
|
||||||
if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) {
|
if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) {
|
||||||
|
@ -67,7 +112,7 @@ lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *dat
|
||||||
return lsm_error_failed_alloc;
|
return lsm_error_failed_alloc;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(&buf[entry->data.len], lsm_str_ptr(data), lsm_str_len(data));
|
memcpy(&buf[entry->data.len], data_s, lsm_str_len(data));
|
||||||
entry->data.value.ptr = buf;
|
entry->data.value.ptr = buf;
|
||||||
entry->data.len = new_len;
|
entry->data.len = new_len;
|
||||||
}
|
}
|
||||||
|
@ -75,7 +120,28 @@ lsm_error lsm_store_data_append(lsm_store *store, lsm_entry *entry, lsm_str *dat
|
||||||
else {
|
else {
|
||||||
// Data is not yet on disk, so we create the file
|
// Data is not yet on disk, so we create the file
|
||||||
if (!entry->data.on_disk) {
|
if (!entry->data.on_disk) {
|
||||||
|
char path[store->data_path->len + entry->key->len + 2];
|
||||||
|
sprintf(path, "%s/%s", lsm_str_ptr(store->data_path),
|
||||||
|
lsm_str_ptr(entry->key));
|
||||||
|
|
||||||
|
FILE *f = fopen(path, "w");
|
||||||
|
|
||||||
|
if (f == NULL) {
|
||||||
|
return lsm_error_failed_io;
|
||||||
|
}
|
||||||
|
|
||||||
|
entry->data.value.file = f;
|
||||||
|
entry->data.on_disk = true;
|
||||||
|
|
||||||
|
// TODO free old buff, write original data to file
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t written = 0;
|
||||||
|
|
||||||
|
// TODO what happens when I/O fails?
|
||||||
|
while (written < data->len) {
|
||||||
|
written += fwrite(&data_s[written], sizeof(char), data->len - written,
|
||||||
|
entry->data.value.file);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
#include <pthread.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
@ -17,6 +18,20 @@ lsm_error lsm_entry_init(lsm_entry **ptr) {
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
|
||||||
|
lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper));
|
||||||
|
|
||||||
|
if (wrap == NULL) {
|
||||||
|
return lsm_error_failed_alloc;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_rwlock_init(&wrap->lock, NULL);
|
||||||
|
|
||||||
|
*ptr = wrap;
|
||||||
|
|
||||||
|
return lsm_error_ok;
|
||||||
|
}
|
||||||
|
|
||||||
bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type) {
|
bool lsm_entry_attr_present(lsm_entry *entry, lsm_attr_type type) {
|
||||||
return (entry->attrs.bitmap & type) != 0;
|
return (entry->attrs.bitmap & type) != 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue