lander/lsm/src/store/lsm_store_entry.c

252 lines
6.1 KiB
C

#include <fcntl.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "lsm.h"
#include "lsm/store_internal.h"
#include "lsm/str.h"
lsm_error lsm_entry_init(lsm_entry **ptr) {
lsm_entry *entry = calloc(1, sizeof(lsm_entry));
if (entry == NULL) {
return lsm_error_failed_alloc;
}
*ptr = entry;
return lsm_error_ok;
}
void lsm_entry_free(lsm_entry *entry) {
if (entry->attrs.count > 0) {
free(entry->attrs.items);
}
free(entry);
}
lsm_error lsm_entry_wrapper_init(lsm_entry_wrapper **ptr) {
lsm_entry_wrapper *wrap = calloc(1, sizeof(lsm_entry_wrapper));
if (wrap == NULL) {
return lsm_error_failed_alloc;
}
pthread_rwlock_init(&wrap->lock, NULL);
*ptr = wrap;
return lsm_error_ok;
}
void lsm_entry_wrapper_free(lsm_entry_wrapper *wrapper) { free(wrapper); }
lsm_error lsm_entry_handle_init(lsm_entry_handle **out) {
lsm_entry_handle *handle = calloc(1, sizeof(lsm_entry_handle));
if (handle == NULL) {
return lsm_error_failed_alloc;
}
*out = handle;
return lsm_error_ok;
}
void lsm_entry_close(lsm_entry_handle *handle) {
if (handle->f != NULL) {
fclose(handle->f);
}
// TODO handle errors here
if ((handle->states & lsm_entry_handle_state_new) &&
!(handle->states & lsm_entry_handle_state_removed)) {
lsm_entry_disk_insert(handle);
} else if ((handle->states & lsm_entry_handle_state_removed) &&
!(handle->states & lsm_entry_handle_state_new)) {
lsm_entry_disk_remove(handle);
lsm_entry_free(handle->wrapper->entry);
handle->wrapper->entry = NULL;
} else if (handle->states & lsm_entry_handle_state_updated) {
/* lsm_entry_disk_update(handle); */
}
pthread_rwlock_unlock(&handle->wrapper->lock);
free(handle);
}
bool lsm_entry_attr_present(lsm_entry_handle *handle, uint8_t type) {
return (handle->wrapper->entry->attrs.bitmap[type / 64] &
(((uint64_t)1) << (type % 64))) != 0;
}
lsm_error lsm_entry_attr_get(lsm_str **out, lsm_entry_handle *handle,
uint8_t type) {
if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found;
}
lsm_entry *entry = handle->wrapper->entry;
uint64_t i = 0;
while (entry->attrs.items[i].type != type) {
i++;
}
*out = entry->attrs.items[i].str;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_get_uint64_t(uint64_t *out, lsm_entry_handle *handle,
uint8_t type) {
lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type));
uint64_t num = 0;
for (uint8_t i = 0; i < sizeof(uint64_t) / sizeof(char); i++) {
((char *)&num)[i] = lsm_str_char(s, i);
}
*out = num;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_get_uint8_t(uint8_t *out, lsm_entry_handle *handle,
uint8_t type) {
lsm_str *s;
LSM_RES(lsm_entry_attr_get(&s, handle, type));
*out = lsm_str_char(s, 0);
return lsm_error_ok;
}
lsm_error lsm_entry_attr_remove(lsm_str **out, lsm_entry_handle *handle,
uint8_t type) {
if (!lsm_entry_attr_present(handle, type)) {
return lsm_error_not_found;
}
lsm_entry *entry = handle->wrapper->entry;
if (entry->attrs.count == 1) {
*out = entry->attrs.items[0].str;
free(entry->attrs.items);
entry->attrs.items = NULL;
entry->attrs.count = 0;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
return lsm_error_ok;
}
uint64_t i = 0;
while (entry->attrs.items[i].type != type) {
i++;
}
lsm_attr *new_attrs = malloc((entry->attrs.count - 1) * sizeof(lsm_attr));
if (new_attrs == NULL) {
return lsm_error_failed_alloc;
}
if (out != NULL) {
*out = entry->attrs.items[i].str;
}
memcpy(new_attrs, entry->attrs.items, i * sizeof(lsm_attr));
memcpy(&new_attrs[i], &entry->attrs.items[i + 1],
(entry->attrs.count - i - 1) * sizeof(lsm_attr));
free(entry->attrs.items);
entry->attrs.items = new_attrs;
entry->attrs.count--;
entry->attrs.bitmap[type / 64] &= ~(((uint64_t)1) << (type % 64));
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_insert(lsm_entry_handle *handle, uint8_t type,
lsm_str *data) {
if (lsm_entry_attr_present(handle, type)) {
return lsm_error_already_present;
}
lsm_entry *entry = handle->wrapper->entry;
lsm_attr *new_attrs =
realloc(entry->attrs.items, (entry->attrs.count + 1) * sizeof(lsm_attr));
if (new_attrs == NULL) {
return lsm_error_failed_alloc;
}
new_attrs[entry->attrs.count].type = type;
new_attrs[entry->attrs.count].str = data;
entry->attrs.items = new_attrs;
entry->attrs.count++;
entry->attrs.bitmap[type / 64] |= ((uint64_t)1) << (type % 64);
handle->states |= lsm_entry_handle_state_updated;
return lsm_error_ok;
}
lsm_error lsm_entry_attr_insert_uint64_t(lsm_entry_handle *handle, uint8_t type,
uint64_t data) {
lsm_str *s;
LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint64_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s);
}
lsm_error lsm_entry_attr_insert_uint8_t(lsm_entry_handle *handle, uint8_t type,
uint8_t data) {
lsm_str *s;
LSM_RES(
lsm_str_init_copy_n(&s, (char *)&data, sizeof(uint8_t) / sizeof(char)));
return lsm_entry_attr_insert(handle, type, s);
}
uint64_t lsm_entry_data_len(lsm_entry_handle *handle) {
return handle->wrapper->entry->data_len;
}
uint64_t lsm_entry_data_path_len(lsm_entry_handle *handle) {
// [data path]/[entry key][data file suffix]
return lsm_str_len(handle->store->data_path) +
lsm_str_len(handle->wrapper->entry->key) +
strlen(LSM_DATA_FILE_SUFFIX) + 1;
}
void lsm_entry_data_path(char *buf, lsm_entry_handle *handle) {
lsm_str *data_path = handle->store->data_path;
lsm_str *key = handle->wrapper->entry->key;
memcpy(buf, lsm_str_ptr(data_path), lsm_str_len(data_path));
uint64_t index = lsm_str_len(data_path);
buf[index] = '/';
index += 1;
memcpy(&buf[index], lsm_str_ptr(key), lsm_str_len(key));
index += lsm_str_len(key);
strcpy(&buf[index], LSM_DATA_FILE_SUFFIX);
}