lander/lsm/src/store/lsm_store_entry.c

361 lines
8.7 KiB
C

#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include "lsm.h"
#include "lsm/store.h"
#include "lsm/store_internal.h"
#include "lsm/str.h"
lsm_error lsm_entry_init(lsm_entry **ptr) {
lsm_entry *entry = calloc(1, sizeof(lsm_entry));
if (entry == NULL) {
return lsm_error_failed_alloc;
}
*ptr = entry;
return lsm_error_ok;
}
void lsm_entry_free(lsm_entry *entry) {
if (entry->attrs.count > 0) {
free(entry->attrs.items);
}
free(entry);
}
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper));
if (wrap == NULL) {
return lsm_error_failed_alloc;
}
pthread_rwlock_init(&wrap->lock, NULL);
*ptr = wrap;
return lsm_error_ok;
}
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
lsm_error lsm_entry_handle_init(lsm_entry_handle **out) {
lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle));
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
}
lsm_error lsm_entry_commit(lsm_entry_handle *handle) {
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
uint8_t state_removed = handle->states & lsm_entry_handle_state_removed;
// Clean new entry
if (state_new && !state_removed) {
LSM_RES(lsm_entry_disk_insert(handle));
}
// Previously stored entry that needs to be removed; should be removed from db
// file as well
else if (state_removed && !state_new) {
LSM_RES(lsm_entry_disk_remove(handle));
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
}
// Reset states after committing current changes
handle->states = 0;
return lsm_error_ok;
}
void lsm_entry_close(lsm_entry_handle *handle) {
if (handle->f != NULL) {
fclose(handle->f);
handle->f = NULL;
}
uint8_t state_new = handle->states & lsm_entry_handle_state_new;
/* bool state_updated = handle->states & lsm_entry_handle_state_updated; */
// New entries create a wrapper in the trie that should be removed if not
// committed
if (state_new) {
lsm_entry_data_remove(handle);
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
}
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}
bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type) {
return (handle->wrapper->entry->attrs.bitmap[type / 64] &
(((uint64_t)1) << (type % 64))) != 0;
}
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
uint8_t type) {
if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found;
}
lsm_entry *entry = handle->wrapper->entry;
uint64_t i = 0;
while (entry->attrs.items[i].type != type) {
i++;
}
*out = entry->attrs.items[i].str;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle,
uint8_t type) {
lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type));
uint64_t num = 0;
for (uint8_t i = 0; i < sizeof(uint64_t) / sizeof(char); i++) {
((char *)&num)[i] = lsm_str_char(s, i);
}
*out = num;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle,
uint8_t type) {
lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type));
*out = lsm_str_char(s, 0);
return lsm_error_ok;
}
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
uint8_t type) {
if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found;
}
lsm_entry *entry = handle->wrapper->entry;
if (entry->attrs.count == 1) {
*out = entry->attrs.items[0].str;
free(entry->attrs.items);
entry->attrs.items = NULL;
entry->attrs.count = 0;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
return lsm_error_ok;
}
uint64_t i = 0;
while (entry->attrs.items[i].type != type) {
i++;
}
lsm_attr *new_attrs = malloc((entry->attrs.count - 1) * sizeof(lsm_attr));
if (new_attrs == NULL) {
return lsm_error_failed_alloc;
}
if (out != NULL) {
*out = entry->attrs.items[i].str;
}
memcpy(new_attrs, entry->attrs.items, i * sizeof(lsm_attr));
memcpy(&new_attrs[i], &entry->attrs.items[i + 1],
(entry->attrs.count - i - 1) * sizeof(lsm_attr));
free(entry->attrs.items);
entry->attrs.items = new_attrs;
entry->attrs.count--;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
lsm_str *data) {
if (lsm_entry_attr_present(handle, type)) {
return lsm_error_already_present;
}
lsm_entry *entry = handle->wrapper->entry;
lsm_attr *new_attrs =
realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr));
if (new_attrs == NULL) {
return lsm_error_failed_alloc;
}
new_attrs[entry->attrs.count].type = type;
new_attrs[entry->attrs.count].str = data;
entry->attrs.items = new_attrs;
entry->attrs.count++;
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type,
uint64_t data) {
lsm_str *s;
LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s);
}
lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type,
uint8_t data) {
lsm_str *s;
LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint8_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s);
}
uint64_t lsm_entry_data_len(lsm_entry_handle *handle) {
return handle->wrapper->entry->data_len;
}
uint64_t lsm_entry_data_path_len(const lsm_entry_handle *handle) {
const lsm_str *data_path = handle->store->data_path;
const lsm_str *key = handle->wrapper->entry->key;
uint8_t levels =
key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS;
return data_path->len + 1 + 2 * levels + key->len +
strlen(LSM_DATA_FILE_SUFFIX);
}
void lsm_entry_data_path(char *path, const lsm_entry_handle *handle) {
const lsm_str *data_path = handle->store->data_path;
const lsm_str *key = handle->wrapper->entry->key;
uint8_t levels =
key->len > LSM_STORE_DATA_LEVELS ? LSM_STORE_DATA_LEVELS : key->len;
memcpy(path, lsm_str_ptr(data_path), data_path->len);
path[data_path->len] = '/';
uint64_t index = data_path->len + 1;
// Create each directory in the file hierarchy
// cppcheck-suppress knownConditionTrueFalse
for (uint8_t i = 0; i < levels; i++) {
path[index] = lsm_str_char(key, i);
path[index + 1] = '/';
index += 2;
}
memcpy(&path[index], lsm_str_ptr(key), lsm_str_len(key));
index += lsm_str_len(key);
strcpy(&path[index], LSM_DATA_FILE_SUFFIX);
}
lsm_error lsm_entry_data_open_write(lsm_entry_handle *handle) {
char path[lsm_entry_data_path_len(handle) + 1];
lsm_entry_data_path(path, handle);
const lsm_str *data_path = handle->store->data_path;
const lsm_str *key = handle->wrapper->entry->key;
uint8_t levels =
key->len <= LSM_STORE_DATA_LEVELS ? key->len : LSM_STORE_DATA_LEVELS;
// Create all required directories in the path
// cppcheck-suppress knownConditionTrueFalse
for (uint8_t i = 0; i < levels; i++) {
path[data_path->len + 2 * (i + 1)] = '\0';
if ((mkdir(path, 0755) != 0) && (errno != EEXIST)) {
return lsm_error_failed_io;
}
path[data_path->len + 2 * (i + 1)] = '/';
}
FILE *f = fopen(path, "ab");
if (f == NULL) {
return lsm_error_failed_io;
}
handle->f = f;
return lsm_error_ok;
}
lsm_error lsm_entry_data_open_read(lsm_entry_handle *handle) {
char path[lsm_entry_data_path_len(handle) + 1];
lsm_entry_data_path(path, handle);
FILE *f = fopen(path, "rb");
if (f == NULL) {
return lsm_error_failed_io;
}
handle->f = f;
return lsm_error_ok;
}
lsm_error lsm_entry_data_remove(lsm_entry_handle *handle) {
const lsm_entry *entry = handle->wrapper->entry;
if (entry->data_len > 0) {
if (handle->f != NULL) {
fclose(handle->f);
handle->f = NULL;
}
char data_path[lsm_entry_data_path_len(handle) + 1];
lsm_entry_data_path(data_path, handle);
if (remove(data_path) != 0) {
return lsm_error_failed_io;
}
}
return lsm_error_ok;
}