wpi
ci/woodpecker/push/build Pipeline failed
Details
ci/woodpecker/push/build Pipeline failed
Details
parent
80281c702b
commit
68f217d8e9
|
@ -19,12 +19,6 @@ typedef struct lsm_read_handle lsm_read_handle;
|
||||||
*/
|
*/
|
||||||
typedef struct lsm_write_handle lsm_write_handle;
|
typedef struct lsm_write_handle lsm_write_handle;
|
||||||
|
|
||||||
/**
|
|
||||||
* A handle referencing an entry inside a store. Read/write operations from/to
|
|
||||||
* the entry go through this handle.
|
|
||||||
*/
|
|
||||||
typedef struct lsm_entry_handle lsm_entry_handle;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A store consisting of LSM entries.
|
* A store consisting of LSM entries.
|
||||||
*
|
*
|
||||||
|
@ -86,23 +80,6 @@ lsm_error lsm_store_open_read(lsm_read_handle **out, lsm_store *store,
|
||||||
lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store,
|
lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store,
|
||||||
const lsm_str *key);
|
const lsm_str *key);
|
||||||
|
|
||||||
/**
|
|
||||||
* Commit any changes to the persistent storage. Any changes, insertions or
|
|
||||||
* deletions that occured without a commit are reverted when the handle is
|
|
||||||
* closed.
|
|
||||||
*
|
|
||||||
* @param handle handle to the entry
|
|
||||||
*/
|
|
||||||
lsm_error lsm_entry_commit(lsm_entry_handle *handle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close an open entry handle.
|
|
||||||
*
|
|
||||||
* @param store store the handle's entry is stored in
|
|
||||||
* @param handle handle to close
|
|
||||||
*/
|
|
||||||
void lsm_entry_close(lsm_entry_handle *handle);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Insert a new entry into the store, returning a write handle to the newly
|
* Insert a new entry into the store, returning a write handle to the newly
|
||||||
* created entry.
|
* created entry.
|
||||||
|
@ -111,57 +88,41 @@ void lsm_entry_close(lsm_entry_handle *handle);
|
||||||
* @param store store to modify
|
* @param store store to modify
|
||||||
* @param key key to add; ownership of key pointer is taken over
|
* @param key key to add; ownership of key pointer is taken over
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
lsm_error lsm_store_open_new(lsm_write_handle **out, lsm_store *store,
|
||||||
lsm_str *key);
|
lsm_str *key);
|
||||||
|
|
||||||
/**
|
bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type);
|
||||||
* Mark the entry as removed.
|
lsm_error lsm_read_attr_get(const lsm_str **out, const lsm_read_handle *handle,
|
||||||
*
|
uint8_t type);
|
||||||
* @param handle handle to entry to remove
|
lsm_error lsm_read_attr_get_uint64_t(uint64_t *out,
|
||||||
*/
|
const lsm_read_handle *handle,
|
||||||
void lsm_entry_remove(lsm_entry_handle *handle);
|
uint8_t type);
|
||||||
|
lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, const lsm_read_handle *handle,
|
||||||
|
uint8_t type);
|
||||||
|
lsm_error lsm_read_data_read(uint64_t *out, char *buf, lsm_read_handle *handle,
|
||||||
|
uint64_t len);
|
||||||
|
void lsm_read_close(lsm_read_handle *handle);
|
||||||
|
|
||||||
/**
|
bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type);
|
||||||
* Append new data to the given entry, which is expected to be in the store.
|
lsm_error lsm_write_attr_get(const lsm_str **out,
|
||||||
*
|
const lsm_write_handle *handle, uint8_t type);
|
||||||
* This function will append either to disk or to memory, depending on the
|
lsm_error lsm_write_attr_get_uint64_t(uint64_t *out,
|
||||||
* length of the entry's data.
|
const lsm_write_handle *handle,
|
||||||
*
|
uint8_t type);
|
||||||
* @param store store the entry is stored in
|
lsm_error lsm_write_attr_get_uint8_t(uint8_t *out,
|
||||||
* @param entry entry to append data to
|
const lsm_write_handle *handle,
|
||||||
* @param data data to append
|
uint8_t type);
|
||||||
*/
|
lsm_error lsm_write_attr_remove(lsm_str **out, lsm_write_handle *handle,
|
||||||
lsm_error lsm_entry_data_append(lsm_entry_handle *handle, const lsm_str *data);
|
uint8_t type);
|
||||||
|
lsm_error lsm_write_attr_insert(lsm_write_handle *handle, uint8_t type,
|
||||||
/**
|
lsm_str *data);
|
||||||
* Same as `lsm_entry_data_append`, except that it takes a direct char array.
|
lsm_error lsm_write_attr_insert_uint64_t(lsm_write_handle *handle, uint8_t type,
|
||||||
*
|
uint64_t data);
|
||||||
* @param store store the entry is stored in
|
lsm_error lsm_write_attr_insert_uint8_t(lsm_write_handle *handle, uint8_t type,
|
||||||
* @param entry entry to append data to
|
uint8_t data);
|
||||||
* @param data data to append
|
lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data);
|
||||||
* @param len length of data array
|
void lsm_write_remove(lsm_write_handle *handle);
|
||||||
*/
|
void lsm_write_close(lsm_write_handle *handle);
|
||||||
lsm_error lsm_entry_data_append_raw(lsm_entry_handle *handle, char *data,
|
lsm_error lsm_write_commit(lsm_write_handle *handle);
|
||||||
uint64_t len);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read a number of bytes from the entry's data field. The position from which
|
|
||||||
* data is read is dependent on previous read calls.
|
|
||||||
*
|
|
||||||
* @param out where to write how many bytes were read
|
|
||||||
* @param buf buffer to store read data in
|
|
||||||
* @param handle entry handle to read from
|
|
||||||
* @param len how many bytes to read at most
|
|
||||||
*/
|
|
||||||
lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_read_handle *handle,
|
|
||||||
uint64_t len);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the length of the entry's data.
|
|
||||||
*
|
|
||||||
* @param handle entry handle to return length for
|
|
||||||
* @return length of the data
|
|
||||||
*/
|
|
||||||
uint64_t lsm_entry_data_len(lsm_entry_handle *handle);
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -74,20 +74,6 @@ typedef enum lsm_entry_handle_state : uint8_t {
|
||||||
lsm_entry_handle_state_removed = 1 << 2,
|
lsm_entry_handle_state_removed = 1 << 2,
|
||||||
} lsm_entry_handle_state;
|
} lsm_entry_handle_state;
|
||||||
|
|
||||||
struct lsm_entry_handle {
|
|
||||||
lsm_entry_wrapper *wrapper;
|
|
||||||
lsm_store *store;
|
|
||||||
|
|
||||||
// Either read or append, depending on how it was opened
|
|
||||||
FILE *f;
|
|
||||||
// Current position in the file pointer
|
|
||||||
uint64_t pos;
|
|
||||||
// Required to determine in what way the database files need to be synced
|
|
||||||
uint64_t states;
|
|
||||||
};
|
|
||||||
|
|
||||||
lsm_error lsm_entry_handle_init(lsm_entry_handle **out);
|
|
||||||
|
|
||||||
struct lsm_store {
|
struct lsm_store {
|
||||||
lsm_trie *trie;
|
lsm_trie *trie;
|
||||||
lsm_str *data_path;
|
lsm_str *data_path;
|
||||||
|
@ -120,21 +106,21 @@ lsm_error lsm_store_load_db(lsm_store *store);
|
||||||
*
|
*
|
||||||
* @param handle handle to added entry
|
* @param handle handle to added entry
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle);
|
lsm_error lsm_entry_disk_insert(lsm_store *store, lsm_entry *entry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove an entry from the database.
|
* Remove an entry from the database.
|
||||||
*
|
*
|
||||||
* @param handle handle to the removed entry
|
* @param handle handle to the removed entry
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle);
|
lsm_error lsm_entry_disk_remove(lsm_store *store, lsm_entry *entry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update an existing entry already in the store.
|
* Update an existing entry already in the store.
|
||||||
*
|
*
|
||||||
* @param handle to updated entry
|
* @param handle to updated entry
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle);
|
lsm_error lsm_entry_disk_update(lsm_store *store, lsm_entry *entry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the length of the path to this entry's data file
|
* Return the length of the path to this entry's data file
|
||||||
|
@ -156,14 +142,6 @@ void lsm_entry_data_path(char *path, const lsm_store *store,
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_data_open_read(lsm_read_handle *handle);
|
lsm_error lsm_entry_data_open_read(lsm_read_handle *handle);
|
||||||
|
|
||||||
/**
|
|
||||||
* Open the entry's data file for writing. The file and all subdirectories in
|
|
||||||
* the data dir are created as needed.
|
|
||||||
*
|
|
||||||
* @param handle handle to the entry
|
|
||||||
*/
|
|
||||||
lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle);
|
|
||||||
|
|
||||||
lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store,
|
lsm_error lsm_entry_data_open(FILE **out, const lsm_store *store,
|
||||||
const lsm_entry *entry, const char *mode);
|
const lsm_entry *entry, const char *mode);
|
||||||
lsm_error lsm_entry_data_mkdirs(const lsm_store *store, const lsm_entry *entry);
|
lsm_error lsm_entry_data_mkdirs(const lsm_store *store, const lsm_entry *entry);
|
||||||
|
@ -191,7 +169,7 @@ bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type);
|
||||||
* @param entry entry to search for
|
* @param entry entry to search for
|
||||||
* @param type type of attribute to return
|
* @param type type of attribute to return
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_entry_attr_get(lsm_str **out, const lsm_entry *entry,
|
lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry,
|
||||||
uint8_t type);
|
uint8_t type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -277,6 +255,7 @@ struct lsm_write_handle {
|
||||||
lsm_store *store;
|
lsm_store *store;
|
||||||
|
|
||||||
lsm_entry *dirty;
|
lsm_entry *dirty;
|
||||||
|
bool removed;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
FILE *f;
|
FILE *f;
|
||||||
|
@ -294,4 +273,9 @@ lsm_error lsm_read_handle_init(lsm_read_handle **out);
|
||||||
*/
|
*/
|
||||||
lsm_error lsm_write_handle_init(lsm_write_handle **out);
|
lsm_error lsm_write_handle_init(lsm_write_handle **out);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit changes solely to the memory part of the store.
|
||||||
|
*/
|
||||||
|
void lsm_write_commit_mem(lsm_write_handle *handle);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,21 +15,21 @@ lsm_error lsm_read_handle_init(lsm_read_handle **out) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type) {
|
bool lsm_read_attr_present(lsm_read_handle *handle, uint8_t type) {
|
||||||
return (handle->wrapper->entry->attrs.bitmap[type / 64] &
|
return lsm_entry_attr_present(handle->wrapper->entry, type);
|
||||||
(((uint64_t)1) << (type % 64))) != 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_read_attr_get(lsm_str **out, lsm_read_handle *handle,
|
lsm_error lsm_read_attr_get(const lsm_str **out, const lsm_read_handle *handle,
|
||||||
uint8_t type) {
|
uint8_t type) {
|
||||||
return lsm_entry_attr_get(out, handle->wrapper->entry, type);
|
return lsm_entry_attr_get(out, handle->wrapper->entry, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_read_attr_get_uint64_t(uint64_t *out, lsm_read_handle *handle,
|
lsm_error lsm_read_attr_get_uint64_t(uint64_t *out,
|
||||||
|
const lsm_read_handle *handle,
|
||||||
uint8_t type) {
|
uint8_t type) {
|
||||||
return lsm_entry_attr_get_uint64_t(out, handle->wrapper->entry, type);
|
return lsm_entry_attr_get_uint64_t(out, handle->wrapper->entry, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, lsm_read_handle *handle,
|
lsm_error lsm_read_attr_get_uint8_t(uint8_t *out, const lsm_read_handle *handle,
|
||||||
uint8_t type) {
|
uint8_t type) {
|
||||||
return lsm_entry_attr_get_uint8_t(out, handle->wrapper->entry, type);
|
return lsm_entry_attr_get_uint8_t(out, handle->wrapper->entry, type);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,7 @@ lsm_error lsm_write_handle_init(lsm_write_handle **out) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type) {
|
bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type) {
|
||||||
return (handle->wrapper->entry->attrs.bitmap[type / 64] &
|
return lsm_entry_attr_present(handle->wrapper->entry, type);
|
||||||
(((uint64_t)1) << (type % 64))) != 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_write_attr_get(const lsm_str **out,
|
lsm_error lsm_write_attr_get(const lsm_str **out,
|
||||||
|
@ -112,3 +111,66 @@ lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data) {
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void lsm_write_remove(lsm_write_handle *handle) { handle->removed = true; }
|
||||||
|
|
||||||
|
void lsm_write_close(lsm_write_handle *handle) {
|
||||||
|
if (handle->data.f != NULL) {
|
||||||
|
fclose(handle->data.f);
|
||||||
|
handle->data.f = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle->dirty != NULL) {
|
||||||
|
// Entry was never committed to store, so any created data file should be
|
||||||
|
// removed
|
||||||
|
if (handle->wrapper->entry == NULL) {
|
||||||
|
lsm_entry_data_remove(handle->store, handle->dirty);
|
||||||
|
}
|
||||||
|
|
||||||
|
lsm_entry_free(handle->dirty);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_rwlock_unlock(&handle->wrapper->lock);
|
||||||
|
free(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
lsm_error lsm_write_commit(lsm_write_handle *handle) {
|
||||||
|
if (handle->removed && (handle->wrapper->entry != NULL)) {
|
||||||
|
LSM_RES(lsm_entry_disk_remove(handle->store, handle->wrapper->entry));
|
||||||
|
|
||||||
|
lsm_entry_free(handle->wrapper->entry);
|
||||||
|
handle->wrapper->entry = NULL;
|
||||||
|
handle->removed = false;
|
||||||
|
|
||||||
|
return lsm_error_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle->dirty == NULL) {
|
||||||
|
return lsm_error_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle->wrapper->entry == NULL) {
|
||||||
|
LSM_RES(lsm_entry_disk_insert(handle->store, handle->dirty));
|
||||||
|
} else {
|
||||||
|
LSM_RES(lsm_entry_disk_update(handle->store, handle->dirty));
|
||||||
|
lsm_entry_free(handle->wrapper->entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
handle->wrapper->entry = handle->dirty;
|
||||||
|
handle->dirty = NULL;
|
||||||
|
|
||||||
|
return lsm_error_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void lsm_write_commit_mem(lsm_write_handle *handle) {
|
||||||
|
if (handle->dirty == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle->wrapper->entry != NULL) {
|
||||||
|
lsm_entry_free(handle->wrapper->entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
handle->wrapper->entry = handle->dirty;
|
||||||
|
handle->dirty = NULL;
|
||||||
|
}
|
||||||
|
|
|
@ -88,8 +88,8 @@ lsm_error lsm_store_open_write(lsm_write_handle **out, lsm_store *store,
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
lsm_error lsm_store_open_new(lsm_write_handle **out, lsm_store *store,
|
||||||
lsm_str *key) {
|
lsm_str *key) {
|
||||||
// TODO what happens when two inserts to the same key happen at the same time?
|
// TODO what happens when two inserts to the same key happen at the same time?
|
||||||
lsm_entry_wrapper *wrapper;
|
lsm_entry_wrapper *wrapper;
|
||||||
|
|
||||||
|
@ -117,24 +117,16 @@ lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store,
|
||||||
LSM_RES2(lsm_entry_init(&entry), pthread_rwlock_unlock(&wrapper->lock));
|
LSM_RES2(lsm_entry_init(&entry), pthread_rwlock_unlock(&wrapper->lock));
|
||||||
|
|
||||||
entry->key = key;
|
entry->key = key;
|
||||||
wrapper->entry = entry;
|
|
||||||
|
|
||||||
lsm_entry_handle *handle;
|
lsm_write_handle *handle;
|
||||||
LSM_RES2(lsm_entry_handle_init(&handle),
|
LSM_RES2(lsm_write_handle_init(&handle),
|
||||||
pthread_rwlock_unlock(&wrapper->lock));
|
pthread_rwlock_unlock(&wrapper->lock));
|
||||||
|
|
||||||
// No need to set the handle's file, as the entry doesn't have any data yet
|
|
||||||
handle->wrapper = wrapper;
|
handle->wrapper = wrapper;
|
||||||
handle->store = store;
|
handle->store = store;
|
||||||
|
handle->dirty = entry;
|
||||||
// Newly inserted entries are always dirty
|
|
||||||
handle->states |= lsm_entry_handle_state_new;
|
|
||||||
|
|
||||||
*out = handle;
|
*out = handle;
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
void lsm_entry_remove(lsm_entry_handle *handle) {
|
|
||||||
handle->states |= lsm_entry_handle_state_removed;
|
|
||||||
}
|
|
||||||
|
|
|
@ -130,7 +130,7 @@ static lsm_error lsm_entry_read_str(lsm_str **out, uint64_t *sum, FILE *f) {
|
||||||
return lsm_str_init(out, buf);
|
return lsm_str_init(out, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
|
static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry *entry,
|
||||||
FILE *db_file) {
|
FILE *db_file) {
|
||||||
uint8_t attr_count;
|
uint8_t attr_count;
|
||||||
LSM_RES(lsm_fread(&attr_count, sum, db_file, sizeof(uint8_t), 1));
|
LSM_RES(lsm_fread(&attr_count, sum, db_file, sizeof(uint8_t), 1));
|
||||||
|
@ -142,11 +142,12 @@ static lsm_error lsm_entry_read_attrs(uint64_t *sum, lsm_entry_handle *handle,
|
||||||
for (uint64_t i = 0; i < attr_count; i++) {
|
for (uint64_t i = 0; i < attr_count; i++) {
|
||||||
LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1));
|
LSM_RES(lsm_fread(&attr_type, sum, db_file, sizeof(uint8_t), 1));
|
||||||
LSM_RES(lsm_entry_read_str(&val, sum, db_file));
|
LSM_RES(lsm_entry_read_str(&val, sum, db_file));
|
||||||
LSM_RES(lsm_entry_attr_insert(handle, attr_type, val));
|
LSM_RES(lsm_entry_attr_insert(entry, attr_type, val));
|
||||||
}
|
}
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
|
static lsm_error lsm_fseek(FILE *f, uint64_t pos) {
|
||||||
if (fseek(f, pos, SEEK_SET) != 0) {
|
if (fseek(f, pos, SEEK_SET) != 0) {
|
||||||
return lsm_error_failed_io;
|
return lsm_error_failed_io;
|
||||||
|
@ -165,17 +166,17 @@ lsm_error lsm_store_insert_from_db(lsm_store *store, uint64_t pos,
|
||||||
lsm_str *key;
|
lsm_str *key;
|
||||||
LSM_RES(lsm_entry_read_str(&key, NULL, store->db.f));
|
LSM_RES(lsm_entry_read_str(&key, NULL, store->db.f));
|
||||||
|
|
||||||
lsm_entry_handle *handle;
|
lsm_write_handle *handle;
|
||||||
LSM_RES(lsm_store_insert(&handle, store, key));
|
LSM_RES(lsm_store_open_new(&handle, store, key));
|
||||||
|
|
||||||
LSM_RES(lsm_fread(&handle->wrapper->entry->data_len, NULL, store->db.f,
|
LSM_RES(lsm_fread(&handle->dirty->data_len, NULL, store->db.f,
|
||||||
sizeof(uint64_t), 1));
|
sizeof(uint64_t), 1));
|
||||||
LSM_RES(lsm_entry_read_attrs(NULL, handle, store->db.f));
|
LSM_RES(lsm_entry_read_attrs(NULL, handle->dirty, store->db.f));
|
||||||
|
|
||||||
handle->wrapper->entry->idx_file_offset = idx_file_offset;
|
handle->dirty->idx_file_offset = idx_file_offset;
|
||||||
|
|
||||||
handle->states = 0;
|
lsm_write_commit_mem(handle);
|
||||||
lsm_entry_close(handle);
|
lsm_write_close(handle);
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,16 +74,14 @@ lsm_error lsm_write_idx_entry(uint64_t *size, FILE *idx_file, uint64_t offset,
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
|
lsm_error lsm_entry_disk_insert(lsm_store *store, lsm_entry *entry) {
|
||||||
lsm_store *store = handle->store;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&store->db.lock);
|
pthread_mutex_lock(&store->db.lock);
|
||||||
|
|
||||||
uint64_t db_entry_index = store->db.size;
|
uint64_t db_entry_index = store->db.size;
|
||||||
|
|
||||||
uint64_t db_entry_size;
|
uint64_t db_entry_size;
|
||||||
lsm_error res = lsm_write_db_entry(&db_entry_size, store->db.f,
|
lsm_error res =
|
||||||
handle->wrapper->entry, store->db.size);
|
lsm_write_db_entry(&db_entry_size, store->db.f, entry, store->db.size);
|
||||||
fflush(store->db.f);
|
fflush(store->db.f);
|
||||||
|
|
||||||
pthread_mutex_unlock(&store->db.lock);
|
pthread_mutex_unlock(&store->db.lock);
|
||||||
|
@ -117,7 +115,7 @@ lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) {
|
||||||
store->idx.block_count = new_block_count;
|
store->idx.block_count = new_block_count;
|
||||||
store->db.size += db_entry_size;
|
store->db.size += db_entry_size;
|
||||||
|
|
||||||
handle->wrapper->entry->idx_file_offset = idx_entry_index;
|
entry->idx_file_offset = idx_entry_index;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,24 +152,21 @@ static lsm_error lsm_idx_zero_block(lsm_store *store, uint64_t pos) {
|
||||||
|
|
||||||
// Marking an entry as removed in the idx file is simply setting the length of
|
// Marking an entry as removed in the idx file is simply setting the length of
|
||||||
// its entry to zero
|
// its entry to zero
|
||||||
lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) {
|
lsm_error lsm_entry_disk_remove(lsm_store *store, lsm_entry *entry) {
|
||||||
const lsm_entry *entry = handle->wrapper->entry;
|
LSM_RES(lsm_idx_zero_block(store, entry->idx_file_offset + sizeof(uint64_t)));
|
||||||
|
LSM_RES(lsm_entry_data_remove(store, entry));
|
||||||
LSM_RES(lsm_idx_zero_block(handle->store,
|
|
||||||
entry->idx_file_offset + sizeof(uint64_t)));
|
|
||||||
LSM_RES(lsm_entry_data_remove(handle));
|
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lsm_error lsm_entry_disk_update(lsm_entry_handle *handle) {
|
lsm_error lsm_entry_disk_update(lsm_store *store, lsm_entry *entry) {
|
||||||
// An update is implemented by reinserting the entry at the end of the db file
|
// An update is implemented by reinserting the entry at the end of the db file
|
||||||
uint64_t old_idx_index = handle->wrapper->entry->idx_file_offset;
|
uint64_t old_idx_index = entry->idx_file_offset;
|
||||||
|
|
||||||
// TODO is there any way we can make this atomic? If the zero write to the
|
// TODO is there any way we can make this atomic? If the zero write to the
|
||||||
// index file fails, there are two entries in the db file for the same key.
|
// index file fails, there are two entries in the db file for the same key.
|
||||||
LSM_RES(lsm_entry_disk_insert(handle));
|
LSM_RES(lsm_entry_disk_insert(store, entry));
|
||||||
LSM_RES(lsm_idx_zero_block(handle->store, old_idx_index + sizeof(uint64_t)));
|
LSM_RES(lsm_idx_zero_block(store, old_idx_index + sizeof(uint64_t)));
|
||||||
|
|
||||||
return lsm_error_ok;
|
return lsm_error_ok;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,10 @@ lsm_error lsm_entry_init(lsm_entry **ptr) {
|
||||||
|
|
||||||
void lsm_entry_free(lsm_entry *entry) {
|
void lsm_entry_free(lsm_entry *entry) {
|
||||||
if (entry->attrs.count > 0) {
|
if (entry->attrs.count > 0) {
|
||||||
|
for (size_t i = 0; i < entry->attrs.count; i++) {
|
||||||
|
lsm_str_free(entry->attrs.items[i].str);
|
||||||
|
}
|
||||||
|
|
||||||
free(entry->attrs.items);
|
free(entry->attrs.items);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,68 +79,6 @@ lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
|
||||||
|
|
||||||
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
|
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
|
||||||
|
|
||||||
lsm_error lsm_entry_handle_init(lsm_entry_handle **out) {
|
|
||||||
lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle));
|
|
||||||
|
|
||||||
if (handle == NULL) {
|
|
||||||
return lsm_error_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
*out = handle;
|
|
||||||
|
|
||||||
return lsm_error_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
lsm_error lsm_entry_commit(lsm_entry_handle *handle) {
|
|
||||||
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
|
|
||||||
uint8_t state_removed = handle->states & lsm_entry_handle_state_removed;
|
|
||||||
uint8_t state_updated = handle->states & lsm_entry_handle_state_updated;
|
|
||||||
|
|
||||||
// Clean new entry
|
|
||||||
if (state_new && !state_removed) {
|
|
||||||
LSM_RES(lsm_entry_disk_insert(handle));
|
|
||||||
}
|
|
||||||
// Previously stored entry that needs to be removed; should be removed from db
|
|
||||||
// file as well
|
|
||||||
else if (state_removed && !state_new) {
|
|
||||||
LSM_RES(lsm_entry_disk_remove(handle));
|
|
||||||
|
|
||||||
lsm_entry_free(handle->wrapper->entry);
|
|
||||||
handle->wrapper->entry = NULL;
|
|
||||||
} else if (state_updated && !(state_new || state_removed)) {
|
|
||||||
LSM_RES(lsm_entry_disk_update(handle));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset states after committing current changes
|
|
||||||
handle->states = 0;
|
|
||||||
|
|
||||||
return lsm_error_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void lsm_entry_close(lsm_entry_handle *handle) {
|
|
||||||
if (handle->f != NULL) {
|
|
||||||
fclose(handle->f);
|
|
||||||
handle->f = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
|
|
||||||
/* bool state_updated = handle->states & lsm_entry_handle_state_updated; */
|
|
||||||
|
|
||||||
// New entries create a wrapper in the trie that should be removed if not
|
|
||||||
// committed
|
|
||||||
if (state_new) {
|
|
||||||
lsm_entry_data_remove(handle);
|
|
||||||
|
|
||||||
lsm_entry_free(handle->wrapper->entry);
|
|
||||||
handle->wrapper->entry = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO rollback uncomitted updates
|
|
||||||
|
|
||||||
pthread_rwlock_unlock(&handle->wrapper->lock);
|
|
||||||
free(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type) {
|
bool lsm_entry_attr_present(const lsm_entry *entry, uint8_t type) {
|
||||||
return (entry->attrs.bitmap[type / 64] & (((uint64_t)1) << (type % 64))) != 0;
|
return (entry->attrs.bitmap[type / 64] & (((uint64_t)1) << (type % 64))) != 0;
|
||||||
}
|
}
|
||||||
|
@ -160,7 +102,7 @@ lsm_error lsm_entry_attr_get(const lsm_str **out, const lsm_entry *entry,
|
||||||
|
|
||||||
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry,
|
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry,
|
||||||
uint8_t type) {
|
uint8_t type) {
|
||||||
lsm_str *s;
|
const lsm_str *s;
|
||||||
|
|
||||||
LSM_RES(lsm_entry_attr_get(&s, entry, type));
|
LSM_RES(lsm_entry_attr_get(&s, entry, type));
|
||||||
|
|
||||||
|
@ -177,7 +119,7 @@ lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, const lsm_entry *entry,
|
||||||
|
|
||||||
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry,
|
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, const lsm_entry *entry,
|
||||||
uint8_t type) {
|
uint8_t type) {
|
||||||
lsm_str *s;
|
const lsm_str *s;
|
||||||
|
|
||||||
LSM_RES(lsm_entry_attr_get(&s, entry, type));
|
LSM_RES(lsm_entry_attr_get(&s, entry, type));
|
||||||
|
|
||||||
|
@ -273,10 +215,6 @@ lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry *entry, uint8_t type,
|
||||||
return lsm_entry_attr_insert(entry, type, s);
|
return lsm_entry_attr_insert(entry, type, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t lsm_entry_data_len(lsm_entry_handle *handle) {
|
|
||||||
return handle->wrapper->entry->data_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t lsm_entry_data_path_len(const lsm_store *store,
|
uint64_t lsm_entry_data_path_len(const lsm_store *store,
|
||||||
const lsm_entry *entry) {
|
const lsm_entry *entry) {
|
||||||
const lsm_str *data_path = store->data_path;
|
const lsm_str *data_path = store->data_path;
|
||||||
|
|
|
@ -50,7 +50,7 @@ bool lander_insert_entry(lnm_http_loop_ctx *ctx, bool secure) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO free key on error
|
// TODO free key on error
|
||||||
switch (lsm_store_insert(&c_ctx->entry, c_gctx->store, key)) {
|
switch (lsm_store_open_new(&c_ctx->entry, c_gctx->store, key)) {
|
||||||
case lsm_error_already_present:
|
case lsm_error_already_present:
|
||||||
ctx->res.status = lnm_http_status_conflict;
|
ctx->res.status = lnm_http_status_conflict;
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue