#include #include #include #include "lsm.h" #include "lsm/store.h" #include "lsm/store_internal.h" #include "lsm/trie.h" lsm_error lsm_store_init(lsm_store **ptr) { lsm_store *store = calloc(1, sizeof(lsm_store)); if (store == NULL) { return lsm_error_failed_alloc; } lsm_error res = lsm_trie_init(&store->trie); if (res != lsm_error_ok) { free(store); return res; } *ptr = store; return lsm_error_ok; } lsm_error lsm_store_load(lsm_store **ptr, lsm_str *db_path, lsm_str *data_path) { lsm_store *store; LSM_RES(lsm_store_init(&store)); // TODO implement all of reading the db file store->db_path = db_path; store->data_path = data_path; *ptr = store; return lsm_error_ok; } lsm_error lsm_store_open_read(lsm_entry_handle **out, lsm_store *store, lsm_str *key) { lsm_entry_wrapper *wrapper; LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); // Try to get a read lock on the entry's lock if (pthread_rwlock_tryrdlock(&wrapper->lock) != 0) { return lsm_error_lock_busy; } lsm_entry *entry = wrapper->entry; // While the trie's data field will never be NULL, the actual entry pointer // might be if (entry == NULL) { pthread_rwlock_unlock(&wrapper->lock); return lsm_error_not_found; } lsm_entry_handle *handle; lsm_error res = lsm_entry_handle_init(&handle); if (res != lsm_error_ok) { pthread_rwlock_unlock(&wrapper->lock); return res; } // Open a new file descriptor if needed if (entry->data.on_disk) { char path[store->data_path->len + entry->key->len + 2]; sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), lsm_str_ptr(entry->key)); FILE *f = fopen(path, "rb"); if (f == NULL) { free(handle); return lsm_error_failed_io; } handle->f = f; } handle->wrapper = wrapper; *out = handle; return lsm_error_ok; } lsm_error lsm_store_open_write(lsm_entry_handle **out, lsm_store *store, lsm_str *key) { lsm_entry_wrapper *wrapper; LSM_RES(lsm_trie_search((void **)&wrapper, store->trie, key)); // Try to get a write lock on the entry's lock // TODO make this timeout to not block if (pthread_rwlock_wrlock(&wrapper->lock) != 0) { return lsm_error_lock_busy; } lsm_entry *entry = wrapper->entry; // While the trie's data field will never be NULL, the actual entry pointer // might be if (entry == NULL) { pthread_rwlock_unlock(&wrapper->lock); return lsm_error_not_found; } lsm_entry_handle *handle; lsm_error res = lsm_entry_handle_init(&handle); if (res != lsm_error_ok) { pthread_rwlock_unlock(&wrapper->lock); return res; } // Open a new file descriptor if needed if (entry->data.on_disk) { char path[store->data_path->len + entry->key->len + 2]; sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), lsm_str_ptr(entry->key)); FILE *f = fopen(path, "ab"); if (f == NULL) { free(handle); return lsm_error_failed_io; } handle->f = f; } handle->wrapper = wrapper; *out = handle; return lsm_error_ok; } lsm_error lsm_store_insert(lsm_entry_handle **out, lsm_store *store, lsm_str *key) { // TODO what happens when two inserts to the same key happen at the same time? lsm_entry_wrapper *wrapper; // If a key was previously removed from the trie, the wrapper will already be // present in the trie if (lsm_trie_search((void **)&wrapper, store->trie, key) == lsm_error_not_found) { LSM_RES(lsm_entry_wrapper_init(&wrapper)); pthread_rwlock_wrlock(&wrapper->lock); lsm_error res = lsm_trie_insert(store->trie, key, wrapper); // Check if entry isn't already present in advance if (res != lsm_error_ok) { lsm_entry_wrapper_free(wrapper); return res; } } else { pthread_rwlock_wrlock(&wrapper->lock); if (wrapper->entry != NULL) { pthread_rwlock_unlock(&wrapper->lock); return lsm_error_already_present; } } lsm_entry *entry; LSM_RES(lsm_entry_init(&entry)); entry->key = key; wrapper->entry = entry; lsm_entry_handle *handle; LSM_RES(lsm_entry_handle_init(&handle)); // No need to set the handle's file, as the entry doesn't have any data yet handle->wrapper = wrapper; *out = handle; return lsm_error_ok; } lsm_error lsm_entry_data_append(lsm_store *store, lsm_entry_handle *handle, lsm_str *data) { if (lsm_str_len(data) == 0) { return lsm_error_ok; } lsm_entry *entry = handle->wrapper->entry; uint64_t new_len = entry->data.len + lsm_str_len(data); const char *data_s = lsm_str_ptr(data); // Data is in memory and still fits -> keep it in memory if ((new_len <= LSM_STORE_DISK_THRESHOLD) && (!entry->data.on_disk)) { char *buf; // Entries with no data do not have an allocated buffer yet if (entry->data.len == 0) { buf = malloc(new_len * sizeof(char)); } else { buf = realloc(entry->data.value.ptr, new_len * sizeof(char)); } if (buf == NULL) { return lsm_error_failed_alloc; } memcpy(&buf[entry->data.len], data_s, lsm_str_len(data)); entry->data.value.ptr = buf; } // Data will end up on disk else { // Data is not yet on disk, so we create the file if (!entry->data.on_disk) { char path[store->data_path->len + entry->key->len + 2]; sprintf(path, "%s/%s", lsm_str_ptr(store->data_path), lsm_str_ptr(entry->key)); FILE *f = fopen(path, "ab"); if (f == NULL) { return lsm_error_failed_io; } // If there was data present in memory already, we sync this to disk. // This check is required because it's possible that more than the // treshold is written to an empty entry immediately, meaning there's no // allocated memory buffer present. if (entry->data.len > 0) { size_t written = 0; // Write original in-memory data to file while (written < entry->data.len) { written += fwrite(&entry->data.value.ptr[written], sizeof(char), entry->data.len - written, f); } free(entry->data.value.ptr); entry->data.value.ptr = NULL; } handle->f = f; entry->data.on_disk = true; } size_t written = 0; // TODO what happens when I/O fails? while (written < data->len) { written += fwrite(&data_s[written], sizeof(char), data->len - written, handle->f); } } entry->data.len = new_len; return lsm_error_ok; } lsm_error lsm_entry_data_read(uint64_t *out, char *buf, lsm_entry_handle *handle, uint64_t len) { lsm_entry *entry = handle->wrapper->entry; if (entry->data.len == 0) { *out = 0; return lsm_error_ok; } uint64_t read; if (entry->data.on_disk) { read = fread(buf, sizeof(char), len, handle->f); if ((read == 0) && (ferror(handle->f) != 0)) { return lsm_error_failed_io; } } else { read = (entry->data.len - handle->pos) < len ? (entry->data.len - handle->pos) : len; memcpy(buf, &entry->data.value.ptr[handle->pos], read * sizeof(char)); } handle->pos += read; *out = read; return lsm_error_ok; }