Compare commits

..

No commits in common. "51e4a203e988b5ca03690fc8a046b2346e8d5d5b" and "ddc38452bea958b4ceeb00ff51550973fab35d8e" have entirely different histories.

6 changed files with 97 additions and 108 deletions

View File

@ -7,6 +7,8 @@
#include "lsm.h" #include "lsm.h"
#include "lsm/str.h" #include "lsm/str.h"
#define LSM_STORE_DISK_THRESHOLD 1024
/** /**
* A handle referencing an entry inside a store. Read/write operations from/to * A handle referencing an entry inside a store. Read/write operations from/to
* the entry go through this handle. * the entry go through this handle.

View File

@ -19,8 +19,9 @@ typedef struct lsm_attr {
/** /**
* An entry inside an LSM store. * An entry inside an LSM store.
* *
* Each entry consists of the key it's stored behind, zero or more attributes * Each entry consists of the key it's stored behind, zero or more attributes
* (metadata) and a data file. * (metadata) and a data field. The data field can be stored on disk or
* in-memory, depending on the size.
*/ */
typedef struct lsm_entry { typedef struct lsm_entry {
lsm_str *key; lsm_str *key;
@ -55,12 +56,6 @@ typedef struct lsm_entry_wrapper {
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr); lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr);
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper); void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper);
typedef enum lsm_entry_handle_state : uint8_t {
lsm_entry_handle_state_new = 1 << 0,
lsm_entry_handle_state_updated = 1 << 1,
lsm_entry_handle_state_removed = 1 << 2,
} lsm_entry_handle_state;
struct lsm_entry_handle { struct lsm_entry_handle {
lsm_entry_wrapper *wrapper; lsm_entry_wrapper *wrapper;
lsm_store *store; lsm_store *store;
@ -68,8 +63,8 @@ struct lsm_entry_handle {
FILE *f; FILE *f;
// Current position in the file pointer // Current position in the file pointer
uint64_t pos; uint64_t pos;
// Required to determine in what way the database files need to be synced // Whether the entry's metadata has changed
uint64_t states; bool dirty;
}; };
lsm_error lsm_entry_handle_init(lsm_entry_handle **out); lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
@ -78,18 +73,14 @@ struct lsm_store {
lsm_trie *trie; lsm_trie *trie;
lsm_str *data_path; lsm_str *data_path;
struct { FILE *db_file;
FILE *f; uint64_t db_file_size;
uint64_t size; pthread_mutex_t db_lock;
pthread_mutex_t lock;
} db;
struct { FILE *idx_file;
FILE *f; uint64_t idx_file_block_count;
uint64_t size; uint64_t idx_file_size;
uint64_t block_count; pthread_mutex_t idx_lock;
pthread_mutex_t lock;
} idx;
}; };
/** /**
@ -109,17 +100,10 @@ lsm_error lsm_store_load_db(lsm_store *store);
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle); lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle);
/** /**
* Remove an entry from the database. * Remove an entry from the database
* *
* @param handle handle to the removed entry * @param handle handle to the removed entry
*/ */
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle); lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle);
/**
* Update an existing entry already in the store.
*
* @param handle to updated entry
*/
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle);
#endif #endif

View File

@ -22,8 +22,8 @@ lsm_error lsm_store_init(lsm_store **ptr) {
return res; return res;
} }
pthread_mutex_init(&store->db.lock, NULL); pthread_mutex_init(&store->db_lock, NULL);
pthread_mutex_init(&store->idx.lock, NULL); pthread_mutex_init(&store->idx_lock, NULL);
*ptr = store; *ptr = store;
@ -151,7 +151,7 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
handle->store = store; handle->store = store;
// Newly inserted entries are always dirty // Newly inserted entries are always dirty
handle->states |= lsm_entry_handle_state_new; handle->dirty = true;
*out = handle; *out = handle;
@ -192,7 +192,7 @@ lsm_error lsm_entry_data_append(lsm_entry_handle *handle, lsm_str *data) {
} }
entry->data_len = new_len; entry->data_len = new_len;
handle->states |= lsm_entry_handle_state_updated; handle->dirty = true;
return lsm_error_ok; return lsm_error_ok;
} }

View File

@ -70,8 +70,8 @@ lsm_error lsm_store_load(lsm_store **ptr, lsm_str *data_path) {
} }
store->data_path = data_path; store->data_path = data_path;
store->db.f = db_file; store->db_file = db_file;
store->idx.f = idx_file; store->idx_file = idx_file;
LSM_RES(lsm_store_load_db(store)); LSM_RES(lsm_store_load_db(store));
@ -137,60 +137,63 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
return lsm_error_ok; return lsm_error_ok;
} }
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
if (fseek(f, pos, SEEK_SET) != 0) {
return lsm_error_failed_io;
}
return lsm_error_ok;
}
/**
* Insert a new entry by reading it from the db file
*/
lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos,
uint64_t idx_file_offset) {
LSM_RES(lsm_fseek(store->db.f, pos));
lsm_str *key;
LSM_RES(lsm_entry_read_str(&key, &store->db.size, store->db.f));
lsm_entry_handle *handle;
LSM_RES(lsm_store_insert(&handle, store, key));
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, &store->db.size,
store->db.f, sizeof(uint64_t), 1));
LSM_RES(lsm_entry_read_attrs(&store->db.size, handle, store->db.f));
handle->wrapper->entry->idx_file_offset = idx_file_offset;
handle->states = 0;
lsm_entry_close(handle);
return lsm_error_ok;
}
lsm_error lsm_store_load_db(lsm_store *store) { lsm_error lsm_store_load_db(lsm_store *store) {
uint64_t db_dim[2]; uint64_t db_dim[2];
lsm_str *key;
lsm_entry_handle *handle;
bool valid_entry;
rewind(store->idx.f); rewind(store->idx_file);
// idx file starts with block count // idx file starts with block count
LSM_RES(lsm_fread(&store->idx.block_count, &store->idx.size, store->idx.f, LSM_RES(lsm_fread(&store->idx_file_block_count, &store->idx_file_size,
sizeof(uint64_t), 1)); store->idx_file, sizeof(uint64_t), 1));
for (uint64_t i = 0; i < store->idx.block_count; i++) { for (uint64_t i = 0; i < store->idx_file_block_count; i++) {
uint64_t idx_file_offset = store->idx.size; uint64_t idx_file_offset = store->idx_file_size;
LSM_RES(lsm_fread(&db_dim, &store->idx.size, store->idx.f, sizeof(uint64_t), LSM_RES(lsm_fread(&valid_entry, &store->idx_file_size, store->idx_file,
2)); sizeof(bool), 1));
// We zero out the length of entries if they're no longer valid if (valid_entry) {
if (db_dim[1] == 0) { LSM_RES(lsm_entry_read_str(&key, &store->idx_file_size, store->idx_file));
continue; LSM_RES(lsm_fread(&db_dim, &store->idx_file_size, store->idx_file,
sizeof(uint64_t), 2));
LSM_RES(lsm_store_insert(&handle, store, key));
// Read attributes from database file
if (fseek(store->db_file, db_dim[0], SEEK_SET) != 0) {
return lsm_error_failed_io;
}
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db_file,
sizeof(uint64_t), 1));
LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db_file));
handle->wrapper->entry->idx_file_offset = idx_file_offset;
// We explicitely set the dirty flag here to prevent writing to the datase
// when reading it in
handle->dirty = false;
lsm_entry_close(handle);
store->db_file_size += db_dim[1];
} }
// Simply skip the invalid entry
else {
uint64_t key_len;
LSM_RES(lsm_fread(&key_len, &store->idx_file_size, store->idx_file,
sizeof(uint64_t), 1));
LSM_RES(lsm_store_insert_from_db(store, db_dim[0], idx_file_offset)); uint64_t remaining = key_len + 2 * sizeof(uint64_t);
if (fseek(store->idx_file, remaining, SEEK_CUR) != 0) {
return lsm_error_failed_io;
}
store->idx_file_size += remaining;
}
} }
return lsm_error_ok; return lsm_error_ok;

View File

@ -47,7 +47,6 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry,
LSM_RES(lsm_fseek(db_file, pos)); LSM_RES(lsm_fseek(db_file, pos));
LSM_RES(lsm_write_str(size, db_file, entry->key));
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len)); LSM_RES(lsm_fwrite(size, db_file, sizeof(uint64_t), 1, &entry->data_len));
LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count)); LSM_RES(lsm_fwrite(size, db_file, sizeof(uint8_t), 1, &entry->attrs.count));
@ -60,12 +59,16 @@ lsm_error lsm_write_db_entry(uint64_t *size, FILE *db_file, lsm_entry *entry,
return lsm_error_ok; return lsm_error_ok;
} }
lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset, lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, lsm_entry *entry,
uint64_t len, uint64_t pos) { uint64_t offset, uint64_t len, uint64_t pos) {
*size = 0; *size = 0;
LSM_RES(lsm_fseek(idx_file, pos)); LSM_RES(lsm_fseek(idx_file, pos));
bool valid_entry_marker = true;
LSM_RES(lsm_fwrite(size, idx_file, sizeof(bool), 1, &valid_entry_marker));
LSM_RES(lsm_write_str(size, idx_file, entry->key));
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &offset));
LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len)); LSM_RES(lsm_fwrite(size, idx_file, sizeof(uint64_t), 1, &len));
@ -75,52 +78,55 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset,
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
lsm_store *store = handle->store; lsm_store *store = handle->store;
pthread_mutex_lock(&store->db.lock); pthread_mutex_lock(&store->db_lock);
uint64_t db_entry_index = store->db.size; uint64_t db_entry_index = store->db_file_size;
uint64_t db_entry_size; uint64_t db_entry_size;
lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f, lsm_error res =
handle->wrapper->entry, store->db.size); lsm_write_db_entry(&db_entry_size, store->db_file, handle->wrapper->entry,
fflush(store->db.f); store->db_file_size);
fflush(store->db_file);
pthread_mutex_unlock(&store->db.lock); pthread_mutex_unlock(&store->db_lock);
if (res != lsm_error_ok) { if (res != lsm_error_ok) {
return res; return res;
} }
// Append entry to index file // Append entry to index file
pthread_mutex_lock(&store->idx.lock); pthread_mutex_lock(&store->idx_lock);
uint64_t idx_entry_index = store->idx.size; uint64_t idx_entry_index = store->idx_file_size;
uint64_t idx_entry_size; uint64_t idx_entry_size;
res = lsm_write_idx_entry(&idx_entry_size, store->idx.f, db_entry_index, res = lsm_write_idx_entry(&idx_entry_size, store->idx_file,
db_entry_size, store->idx.size); handle->wrapper->entry, db_entry_index,
db_entry_size, store->idx_file_size);
if (res == lsm_error_ok) { if (res == lsm_error_ok) {
// Update the counter at the beginning of the file // Update the counter at the beginning of the file
rewind(store->idx.f); rewind(store->idx_file);
uint64_t new_block_count = store->idx.block_count + 1; uint64_t new_block_count = store->idx_file_block_count + 1;
res = lsm_fwrite(NULL, store->idx.f, sizeof(uint64_t), 1, &new_block_count); res = lsm_fwrite(NULL, store->idx_file, sizeof(uint64_t), 1,
&new_block_count);
if (res == lsm_error_ok) { if (res == lsm_error_ok) {
// Only if we successfully updated the on-disk counter do we make the code // Only if we successfully updated the on-disk counter do we make the code
// aware that the files' sizes have increased. This way, if a write to the // aware that the files' sizes have increased. This way, if a write to the
// counter fails, the code will simply reuse the already written content. // counter fails, the code will simply reuse the already written content.
store->idx.size += idx_entry_size; store->idx_file_size += idx_entry_size;
store->idx.block_count = new_block_count; store->idx_file_block_count = new_block_count;
store->db.size += db_entry_size; store->db_file_size += db_entry_size;
handle->wrapper->entry->idx_file_offset = idx_entry_index; handle->wrapper->entry->idx_file_offset = idx_entry_index;
} }
} }
fflush(store->idx.f); fflush(store->idx_file);
pthread_mutex_unlock(&store->idx.lock); pthread_mutex_unlock(&store->idx_lock);
return res; return res;
} }

View File

@ -53,14 +53,8 @@ void lsm_entry_close(lsm_entry_handle *handle) {
} }
// TODO handle errors here // TODO handle errors here
if ((handle->states & lsm_entry_handle_state_new) && if (handle->dirty) {
!(handle->states & lsm_entry_handle_state_removed)) {
lsm_entry_disk_insert(handle); lsm_entry_disk_insert(handle);
} else if ((handle->states & lsm_entry_handle_state_removed) &&
!(handle->states & lsm_entry_handle_state_new)) {
/* lsm_entry_disk_remove(handle); */
} else if (handle->states & lsm_entry_handle_state_updated) {
/* lsm_entry_disk_update(handle); */
} }
pthread_rwlock_unlock(&handle->wrapper->lock); pthread_rwlock_unlock(&handle->wrapper->lock);
@ -163,7 +157,7 @@ lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
entry->attrs.count--; entry->attrs.count--;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64)); entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
handle->states |= lsm_entry_handle_state_updated; handle->dirty = true;
return lsm_error_ok; return lsm_error_ok;
} }
@ -190,7 +184,7 @@ lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
entry->attrs.count++; entry->attrs.count++;
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64); entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
handle->states |= lsm_entry_handle_state_updated; handle->dirty = true;
return lsm_error_ok; return lsm_error_ok;
} }