Compare commits
3 Commits
ddc38452be
...
51e4a203e9
| Author | SHA1 | Date |
|---|---|---|
|
|
51e4a203e9 | |
|
|
a6887d4094 | |
|
|
418de748f0 |
|
|
@ -7,8 +7,6 @@
|
||||||
#include "lsm.h"
|
#include "lsm.h"
|
||||||
#include "lsm/str.h"
|
#include "lsm/str.h"
|
||||||
|
|
||||||
#define LSM_STORE_DISK_THRESHOLD 1024
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A handle referencing an entry inside a store. Read/write operations from/to
|
* A handle referencing an entry inside a store. Read/write operations from/to
|
||||||
* the entry go through this handle.
|
* the entry go through this handle.
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,8 @@ typedef struct lsm_attr {
|
||||||
/**
|
/**
|
||||||
* An entry inside an LSM store.
|
* An entry inside an LSM store.
|
||||||
*
|
*
|
||||||
* Each entry consists of the key it's stored behind, zero or more attributes
|
* Each entry consists of the key it's stored behind, zero or more attributes
|
||||||
* (metadata) and a data field. The data field can be stored on disk or
|
* (metadata) and a data file.
|
||||||
* in-memory, depending on the size.
|
|
||||||
*/
|
*/
|
||||||
typedef struct lsm_entry {
|
typedef struct lsm_entry {
|
||||||
lsm_str *key;
|
lsm_str *key;
|
||||||
|
|
@ -56,6 +55,12 @@ typedef struct lsm_entry_wrapper {
|
||||||
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr);
|
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr);
|
||||||
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper);
|
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper);
|
||||||
|
|
||||||
|
typedef enum lsm_entry_handle_state : uint8_t {
|
||||||
|
lsm_entry_handle_state_new = 1 << 0,
|
||||||
|
lsm_entry_handle_state_updated = 1 << 1,
|
||||||
|
lsm_entry_handle_state_removed = 1 << 2,
|
||||||
|
} lsm_entry_handle_state;
|
||||||
|
|
||||||
struct lsm_entry_handle {
|
struct lsm_entry_handle {
|
||||||
lsm_entry_wrapper *wrapper;
|
lsm_entry_wrapper *wrapper;
|
||||||
lsm_store *store;
|
lsm_store *store;
|
||||||
|
|
@ -63,8 +68,8 @@ struct lsm_entry_handle {
|
||||||
FILE *f;
|
FILE *f;
|
||||||
// Current position in the file pointer
|
// Current position in the file pointer
|
||||||
uint64_t pos;
|
uint64_t pos;
|
||||||
// Whether the entry's metadata has changed
|
// Required to determine in what way the database files need to be synced
|
||||||
bool dirty;
|
uint64_t states;
|
||||||
};
|
};
|
||||||
|
|
||||||
lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
|
lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
|
||||||
|
|
@ -73,14 +78,18 @@ struct lsm_store {
|
||||||
lsm_trie *trie;
|
lsm_trie *trie;
|
||||||
lsm_str *data_path;
|
lsm_str *data_path;
|
||||||
|
|
||||||
FILE *db_file;
|
struct {
|
||||||
uint64_t db_file_size;
|
FILE *f;
|
||||||
pthread_mutex_t db_lock;
|
uint64_t size;
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
} db;
|
||||||
|
|
||||||
FILE *idx_file;
|
struct {
|
||||||
uint64_t idx_file_block_count;
|
FILE *f;
|
||||||
uint64_t idx_file_size;
|
uint64_t size;
|
||||||
pthread_mutex_t idx_lock;
|
uint64_t block_count;
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
} idx;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -100,10 +109,17 @@ lsm_error lsm_store_load_db(lsm_store *store);
|
||||||
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle);
|
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove an entry from the database
|
* Remove an entry from the database.
|
||||||
*
|
*
|
||||||
* @param handle handle to the removed entry
|
* @param handle handle to the removed entry
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle);
|
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update an existing entry already in the store.
|
||||||
|
*
|
||||||
|
* @param handle to updated entry
|
||||||
|
*/
|
||||||
|
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,8 @@ lsm_error lsm_store_init(lsm_store **ptr) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_init(&store->db_lock, NULL);
|
pthread_mutex_init(&store->db.lock, NULL);
|
||||||
pthread_mutex_init(&store->idx_lock, NULL);
|
pthread_mutex_init(&store->idx.lock, NULL);
|
||||||
|
|
||||||
*ptr = store;
|
*ptr = store;
|
||||||
|
|
||||||
|
|
@ -151,7 +151,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
||||||
handle->store = store;
|
handle->store = store;
|
||||||
|
|
||||||
// Newly inserted entries are always dirty
|
// Newly inserted entries are always dirty
|
||||||
handle->dirty = true;
|
handle->states |= lsm_entry_handle_state_new;
|
||||||
|
|
||||||
*out = handle;
|
*out = handle;
|
||||||
|
|
||||||
|
|
@ -192,7 +192,7 @@ lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
entry->data_len = new_len;
|
entry->data_len = new_len;
|
||||||
handle->dirty = true;
|
handle->states |= lsm_entry_handle_state_updated;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,8 +70,8 @@ lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
store->data_path = data_path;
|
store->data_path = data_path;
|
||||||
store->db_file = db_file;
|
store->db.f = db_file;
|
||||||
store->idx_file = idx_file;
|
store->idx.f = idx_file;
|
||||||
|
|
||||||
LSM_RES(lsm_store_load_db(store));
|
LSM_RES(lsm_store_load_db(store));
|
||||||
|
|
||||||
|
|
@ -137,63 +137,60 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
|
||||||
lsm_error lsm_store_load_db(lsm_store *store) {
|
if (fseek(f, pos, SEEK_SET) != 0) {
|
||||||
uint64_t db_dim[2];
|
return lsm_error_failed_io;
|
||||||
lsm_str *key;
|
}
|
||||||
lsm_entry_handle *handle;
|
|
||||||
bool valid_entry;
|
return lsm_error_ok;
|
||||||
|
}
|
||||||
rewind(store->idx_file);
|
|
||||||
|
/**
|
||||||
// idx file starts with block count
|
* Insert a new entry by reading it from the db file
|
||||||
LSM_RES(lsm_fread(&store->idx_file_block_count, &store->idx_file_size,
|
*/
|
||||||
store->idx_file, sizeof(uint64_t), 1));
|
lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos,
|
||||||
|
uint64_t idx_file_offset) {
|
||||||
for (uint64_t i = 0; i < store->idx_file_block_count; i++) {
|
LSM_RES(lsm_fseek(store->db.f, pos));
|
||||||
uint64_t idx_file_offset = store->idx_file_size;
|
|
||||||
|
lsm_str *key;
|
||||||
LSM_RES(lsm_fread(&valid_entry, &store->idx_file_size, store->idx_file,
|
LSM_RES(lsm_entry_read_str(&key, &store->db.size, store->db.f));
|
||||||
sizeof(bool), 1));
|
|
||||||
|
lsm_entry_handle *handle;
|
||||||
if (valid_entry) {
|
LSM_RES(lsm_store_insert(&handle, store, key));
|
||||||
LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file));
|
|
||||||
LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file,
|
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, &store->db.size,
|
||||||
sizeof(uint64_t), 2));
|
store->db.f, sizeof(uint64_t), 1));
|
||||||
LSM_RES(lsm_store_insert(&handle, store, key));
|
LSM_RES(lsm_entry_read_attrs(&store->db.size, handle, store->db.f));
|
||||||
|
|
||||||
// Read attributes from database file
|
handle->wrapper->entry->idx_file_offset = idx_file_offset;
|
||||||
if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) {
|
|
||||||
return lsm_error_failed_io;
|
handle->states = 0;
|
||||||
}
|
lsm_entry_close(handle);
|
||||||
|
|
||||||
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file,
|
return lsm_error_ok;
|
||||||
sizeof(uint64_t), 1));
|
}
|
||||||
LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file));
|
|
||||||
|
lsm_error lsm_store_load_db(lsm_store *store) {
|
||||||
handle->wrapper->entry->idx_file_offset = idx_file_offset;
|
uint64_t db_dim[2];
|
||||||
|
|
||||||
// We explicitely set the dirty flag here to prevent writing to the datase
|
rewind(store->idx.f);
|
||||||
// when reading it in
|
|
||||||
handle->dirty = false;
|
// idx file starts with block count
|
||||||
lsm_entry_close(handle);
|
LSM_RES(lsm_fread(&store->idx.block_count, &store->idx.size, store->idx.f,
|
||||||
|
sizeof(uint64_t), 1));
|
||||||
store->db_file_size += db_dim[1];
|
|
||||||
}
|
for (uint64_t i = 0; i < store->idx.block_count; i++) {
|
||||||
// Simply skip the invalid entry
|
uint64_t idx_file_offset = store->idx.size;
|
||||||
else {
|
|
||||||
uint64_t key_len;
|
LSM_RES(lsm_fread(&db_dim, &store->idx.size, store->idx.f, sizeof(uint64_t),
|
||||||
LSM_RES(lsm_fread(&key_len, &store->idx_file_size, store->idx_file,
|
2));
|
||||||
sizeof(uint64_t), 1));
|
|
||||||
|
// We zero out the length of entries if they're no longer valid
|
||||||
uint64_t remaining = key_len + 2 * sizeof(uint64_t);
|
if (db_dim[1] == 0) {
|
||||||
|
continue;
|
||||||
if (fseek(store->idx_file, remaining, SEEK_CUR) != 0) {
|
}
|
||||||
return lsm_error_failed_io;
|
|
||||||
}
|
LSM_RES(lsm_store_insert_from_db(store, db_dim[0], idx_file_offset));
|
||||||
|
|
||||||
store->idx_file_size += remaining;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,7 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
||||||
|
|
||||||
LSM_RES(lsm_fseek(db_file, pos));
|
LSM_RES(lsm_fseek(db_file, pos));
|
||||||
|
|
||||||
|
LSM_RES(lsm_write_str(size, db_file, entry->key));
|
||||||
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len));
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len));
|
||||||
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count));
|
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count));
|
||||||
|
|
||||||
|
|
@ -59,16 +60,12 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry,
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset,
|
||||||
uint64_t offset, uint64_t len, uint64_t pos) {
|
uint64_t len, uint64_t pos) {
|
||||||
*size = 0;
|
*size = 0;
|
||||||
|
|
||||||
LSM_RES(lsm_fseek(idx_file, pos));
|
LSM_RES(lsm_fseek(idx_file, pos));
|
||||||
|
|
||||||
bool valid_entry_marker = true;
|
|
||||||
LSM_RES(lsm_fwrite(size, idx_file, sizeof(bool), 1, &valid_entry_marker));
|
|
||||||
|
|
||||||
LSM_RES(lsm_write_str(size, idx_file, entry->key));
|
|
||||||
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset));
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset));
|
||||||
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len));
|
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len));
|
||||||
|
|
||||||
|
|
@ -78,55 +75,52 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry,
|
||||||
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
|
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
|
||||||
lsm_store *store = handle->store;
|
lsm_store *store = handle->store;
|
||||||
|
|
||||||
pthread_mutex_lock(&store->db_lock);
|
pthread_mutex_lock(&store->db.lock);
|
||||||
|
|
||||||
uint64_t db_entry_index = store->db_file_size;
|
uint64_t db_entry_index = store->db.size;
|
||||||
|
|
||||||
uint64_t db_entry_size;
|
uint64_t db_entry_size;
|
||||||
lsm_error res =
|
lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f,
|
||||||
lsm_write_db_entry(&db_entry_size, store->db_file, handle->wrapper->entry,
|
handle->wrapper->entry, store->db.size);
|
||||||
store->db_file_size);
|
fflush(store->db.f);
|
||||||
fflush(store->db_file);
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&store->db_lock);
|
pthread_mutex_unlock(&store->db.lock);
|
||||||
|
|
||||||
if (res != lsm_error_ok) {
|
if (res != lsm_error_ok) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append entry to index file
|
// Append entry to index file
|
||||||
pthread_mutex_lock(&store->idx_lock);
|
pthread_mutex_lock(&store->idx.lock);
|
||||||
|
|
||||||
uint64_t idx_entry_index = store->idx_file_size;
|
uint64_t idx_entry_index = store->idx.size;
|
||||||
|
|
||||||
uint64_t idx_entry_size;
|
uint64_t idx_entry_size;
|
||||||
res = lsm_write_idx_entry(&idx_entry_size, store->idx_file,
|
res = lsm_write_idx_entry(&idx_entry_size, store->idx.f, db_entry_index,
|
||||||
handle->wrapper->entry, db_entry_index,
|
db_entry_size, store->idx.size);
|
||||||
db_entry_size, store->idx_file_size);
|
|
||||||
|
|
||||||
if (res == lsm_error_ok) {
|
if (res == lsm_error_ok) {
|
||||||
// Update the counter at the beginning of the file
|
// Update the counter at the beginning of the file
|
||||||
rewind(store->idx_file);
|
rewind(store->idx.f);
|
||||||
|
|
||||||
uint64_t new_block_count = store->idx_file_block_count + 1;
|
uint64_t new_block_count = store->idx.block_count + 1;
|
||||||
|
|
||||||
res = lsm_fwrite(NULL, store->idx_file, sizeof(uint64_t), 1,
|
res = lsm_fwrite(NULL, store->idx.f, sizeof(uint64_t), 1, &new_block_count);
|
||||||
&new_block_count);
|
|
||||||
|
|
||||||
if (res == lsm_error_ok) {
|
if (res == lsm_error_ok) {
|
||||||
// Only if we successfully updated the on-disk counter do we make the code
|
// Only if we successfully updated the on-disk counter do we make the code
|
||||||
// aware that the files' sizes have increased. This way, if a write to the
|
// aware that the files' sizes have increased. This way, if a write to the
|
||||||
// counter fails, the code will simply reuse the already written content.
|
// counter fails, the code will simply reuse the already written content.
|
||||||
store->idx_file_size += idx_entry_size;
|
store->idx.size += idx_entry_size;
|
||||||
store->idx_file_block_count = new_block_count;
|
store->idx.block_count = new_block_count;
|
||||||
store->db_file_size += db_entry_size;
|
store->db.size += db_entry_size;
|
||||||
|
|
||||||
handle->wrapper->entry->idx_file_offset = idx_entry_index;
|
handle->wrapper->entry->idx_file_offset = idx_entry_index;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fflush(store->idx_file);
|
fflush(store->idx.f);
|
||||||
pthread_mutex_unlock(&store->idx_lock);
|
pthread_mutex_unlock(&store->idx.lock);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,8 +53,14 @@ void lsm_entry_close(lsm_entry_handle *handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO handle errors here
|
// TODO handle errors here
|
||||||
if (handle->dirty) {
|
if ((handle->states & lsm_entry_handle_state_new) &&
|
||||||
|
!(handle->states & lsm_entry_handle_state_removed)) {
|
||||||
lsm_entry_disk_insert(handle);
|
lsm_entry_disk_insert(handle);
|
||||||
|
} else if ((handle->states & lsm_entry_handle_state_removed) &&
|
||||||
|
!(handle->states & lsm_entry_handle_state_new)) {
|
||||||
|
/* lsm_entry_disk_remove(handle); */
|
||||||
|
} else if (handle->states & lsm_entry_handle_state_updated) {
|
||||||
|
/* lsm_entry_disk_update(handle); */
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_rwlock_unlock(&handle->wrapper->lock);
|
pthread_rwlock_unlock(&handle->wrapper->lock);
|
||||||
|
|
@ -157,7 +163,7 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
|
||||||
entry->attrs.count--;
|
entry->attrs.count--;
|
||||||
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
|
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
|
||||||
|
|
||||||
handle->dirty = true;
|
handle->states |= lsm_entry_handle_state_updated;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
@ -184,7 +190,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
|
||||||
entry->attrs.count++;
|
entry->attrs.count++;
|
||||||
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
|
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
|
||||||
|
|
||||||
handle->dirty = true;
|
handle->states |= lsm_entry_handle_state_updated;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue