#include #include "lsm/store_internal.h" lsm_error lsm_write_handle_init(lsm_write_handle **out) { lsm_write_handle *handle = calloc(1, sizeof(lsm_write_handle)); if (handle == NULL) { return lsm_error_failed_alloc; } *out = handle; return lsm_error_ok; } bool lsm_write_attr_present(const lsm_write_handle *handle, uint8_t type) { return lsm_entry_attr_present(handle->wrapper->entry, type); } lsm_error lsm_write_attr_get(const lsm_str **out, const lsm_write_handle *handle, uint8_t type) { lsm_entry *entry = handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; return lsm_entry_attr_get(out, entry, type); } lsm_error lsm_write_attr_get_uint64_t(uint64_t *out, const lsm_write_handle *handle, uint8_t type) { lsm_entry *entry = handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; return lsm_entry_attr_get_uint64_t(out, entry, type); } lsm_error lsm_write_attr_get_uint8_t(uint8_t *out, const lsm_write_handle *handle, uint8_t type) { lsm_entry *entry = handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; return lsm_entry_attr_get_uint8_t(out, entry, type); } lsm_error lsm_write_attr_remove(lsm_str **out, lsm_write_handle *handle, uint8_t type) { if (handle->dirty == NULL) { LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); } return lsm_entry_attr_remove(out, handle->dirty, type); } lsm_error lsm_write_attr_insert(lsm_write_handle *handle, uint8_t type, lsm_str *data) { if (handle->dirty == NULL) { LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); } return lsm_entry_attr_insert(handle->dirty, type, data); } lsm_error lsm_write_attr_insert_uint64_t(lsm_write_handle *handle, uint8_t type, uint64_t data) { if (handle->dirty == NULL) { LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); } return lsm_entry_attr_insert_uint64_t(handle->dirty, type, data); } lsm_error lsm_write_attr_insert_uint8_t(lsm_write_handle *handle, uint8_t type, uint8_t data) { if (handle->dirty == NULL) { LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); } return lsm_entry_attr_insert_uint8_t(handle->dirty, type, data); } uint64_t lsm_write_data_len(const lsm_write_handle *handle) { lsm_entry *entry = handle->dirty == NULL ? handle->wrapper->entry : handle->dirty; return entry->data_len; } lsm_error lsm_write_data_append(lsm_write_handle *handle, const lsm_str *data) { if (lsm_str_len(data) == 0) { return lsm_error_ok; } if (handle->dirty == NULL) { LSM_RES(lsm_entry_clone(&handle->dirty, handle->wrapper->entry)); } lsm_entry *entry = handle->dirty; uint64_t new_len = entry->data_len + lsm_str_len(data); const char *data_s = lsm_str_ptr(data); // Entries don't open their file unless needed if (handle->data.f == NULL) { // An entry with no existing data will not have a data file yet, so we set // create to true then LSM_RES(lsm_entry_data_mkdirs(handle->store, entry)); LSM_RES(lsm_entry_data_open(&handle->data.f, handle->store, entry, "ab")); } size_t written = 0; // TODO what happens when I/O fails? while (written < data->len) { written += fwrite(&data_s[written], sizeof(char), data->len - written, handle->data.f); } entry->data_len = new_len; return lsm_error_ok; } void lsm_write_remove(lsm_write_handle *handle) { handle->removed = true; } void lsm_write_close(lsm_write_handle *handle) { if (handle->data.f != NULL) { fclose(handle->data.f); handle->data.f = NULL; } if (handle->dirty != NULL) { // Entry was never committed to store, so any created data file should be // removed if (handle->wrapper->entry == NULL) { lsm_entry_data_remove(handle->store, handle->dirty); } lsm_entry_free(handle->dirty); } pthread_rwlock_unlock(&handle->wrapper->lock); free(handle); } lsm_error lsm_write_commit(lsm_write_handle *handle) { if (handle->removed && (handle->wrapper->entry != NULL)) { LSM_RES(lsm_entry_disk_remove(handle->store, handle->wrapper->entry)); lsm_entry_free(handle->wrapper->entry); handle->wrapper->entry = NULL; handle->removed = false; return lsm_error_ok; } if (handle->dirty == NULL) { return lsm_error_ok; } if (handle->wrapper->entry == NULL) { LSM_RES(lsm_entry_disk_insert(handle->store, handle->dirty)); } else { LSM_RES(lsm_entry_disk_update(handle->store, handle->dirty)); lsm_entry_free(handle->wrapper->entry); } handle->wrapper->entry = handle->dirty; handle->dirty = NULL; return lsm_error_ok; } void lsm_write_commit_mem(lsm_write_handle *handle) { if (handle->dirty == NULL) { return; } if (handle->wrapper->entry != NULL) { lsm_entry_free(handle->wrapper->entry); } handle->wrapper->entry = handle->dirty; handle->dirty = NULL; }