feat(lsm): implement updating entries
							parent
							
								
									3dce25239b
								
							
						
					
					
						commit
						4d9dfff27e
					
				|  | @ -1,3 +1,5 @@ | ||||||
|  | #include <stdint.h> | ||||||
|  | 
 | ||||||
| #include "lsm/store_internal.h" | #include "lsm/store_internal.h" | ||||||
| 
 | 
 | ||||||
| static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size, | static lsm_error lsm_fwrite(uint64_t *sum, FILE *f, uint64_t size, | ||||||
|  | @ -125,16 +127,10 @@ lsm_error lsm_entry_disk_insert(lsm_entry_handle *handle) { | ||||||
|   return res; |   return res; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Marking an entry as removed in the idx file is simply setting the length of
 | static lsm_error lsm_idx_zero_block(lsm_store *store, uint64_t pos) { | ||||||
| // its entry to zero
 |  | ||||||
| lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { |  | ||||||
|   lsm_store *store = handle->store; |  | ||||||
|   const lsm_entry *entry = handle->wrapper->entry; |  | ||||||
| 
 |  | ||||||
|   pthread_mutex_lock(&store->idx.lock); |   pthread_mutex_lock(&store->idx.lock); | ||||||
| 
 | 
 | ||||||
|   lsm_error res = |   lsm_error res = lsm_fseek(store->idx.f, pos); | ||||||
|       lsm_fseek(store->idx.f, entry->idx_file_offset + sizeof(uint64_t)); |  | ||||||
| 
 | 
 | ||||||
|   if (res != lsm_error_ok) { |   if (res != lsm_error_ok) { | ||||||
|     pthread_mutex_unlock(&store->idx.lock); |     pthread_mutex_unlock(&store->idx.lock); | ||||||
|  | @ -153,7 +149,29 @@ lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { | ||||||
| 
 | 
 | ||||||
|   fflush(store->idx.f); |   fflush(store->idx.f); | ||||||
| 
 | 
 | ||||||
|  |   return lsm_error_ok; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Marking an entry as removed in the idx file is simply setting the length of
 | ||||||
|  | // its entry to zero
 | ||||||
|  | lsm_error lsm_entry_disk_remove(lsm_entry_handle *handle) { | ||||||
|  |   const lsm_entry *entry = handle->wrapper->entry; | ||||||
|  | 
 | ||||||
|  |   LSM_RES(lsm_idx_zero_block(handle->store, | ||||||
|  |                              entry->idx_file_offset + sizeof(uint64_t))); | ||||||
|   LSM_RES(lsm_entry_data_remove(handle)); |   LSM_RES(lsm_entry_data_remove(handle)); | ||||||
| 
 | 
 | ||||||
|   return lsm_error_ok; |   return lsm_error_ok; | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | lsm_error lsm_entry_disk_update(lsm_entry_handle *handle) { | ||||||
|  |   // An update is implemented by reinserting the entry at the end of the db file
 | ||||||
|  |   uint64_t old_idx_index = handle->wrapper->entry->idx_file_offset; | ||||||
|  | 
 | ||||||
|  |   // TODO is there any way we can make this atomic? If the zero write to the
 | ||||||
|  |   // index file fails, there are two entries in the db file for the same key.
 | ||||||
|  |   LSM_RES(lsm_entry_disk_insert(handle)); | ||||||
|  |   LSM_RES(lsm_idx_zero_block(handle->store, old_idx_index + sizeof(uint64_t))); | ||||||
|  | 
 | ||||||
|  |   return lsm_error_ok; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -61,6 +61,7 @@ lsm_error lsm_entry_handle_init(lsm_entry_handle **out) { | ||||||
| lsm_error lsm_entry_commit(lsm_entry_handle *handle) { | lsm_error lsm_entry_commit(lsm_entry_handle *handle) { | ||||||
|   uint8_t state_new = handle->states & lsm_entry_handle_state_new; |   uint8_t state_new = handle->states & lsm_entry_handle_state_new; | ||||||
|   uint8_t state_removed = handle->states & lsm_entry_handle_state_removed; |   uint8_t state_removed = handle->states & lsm_entry_handle_state_removed; | ||||||
|  |   uint8_t state_updated = handle->states & lsm_entry_handle_state_updated; | ||||||
| 
 | 
 | ||||||
|   // Clean new entry
 |   // Clean new entry
 | ||||||
|   if (state_new && !state_removed) { |   if (state_new && !state_removed) { | ||||||
|  | @ -73,6 +74,8 @@ lsm_error lsm_entry_commit(lsm_entry_handle *handle) { | ||||||
| 
 | 
 | ||||||
|     lsm_entry_free(handle->wrapper->entry); |     lsm_entry_free(handle->wrapper->entry); | ||||||
|     handle->wrapper->entry = NULL; |     handle->wrapper->entry = NULL; | ||||||
|  |   } else if (state_updated && !(state_new || state_removed)) { | ||||||
|  |     LSM_RES(lsm_entry_disk_update(handle)); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   // Reset states after committing current changes
 |   // Reset states after committing current changes
 | ||||||
|  | @ -99,6 +102,8 @@ void lsm_entry_close(lsm_entry_handle *handle) { | ||||||
|     handle->wrapper->entry = NULL; |     handle->wrapper->entry = NULL; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |   // TODO rollback uncomitted updates
 | ||||||
|  | 
 | ||||||
|   pthread_rwlock_unlock(&handle->wrapper->lock); |   pthread_rwlock_unlock(&handle->wrapper->lock); | ||||||
|   free(handle); |   free(handle); | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue