From 83b3198a49750de041e401db230ec14cdf90b3fc Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 7 Mar 2023 11:55:12 +0100 Subject: [PATCH] feat(job-queue): initial implementation of functionality --- include/vieter_job_queue.h | 44 ++++++- src/job-queue/vieter_job_queue.c | 153 ++++++++++++++++++++++ src/job-queue/vieter_job_queue_internal.h | 3 + 3 files changed, 194 insertions(+), 6 deletions(-) create mode 100644 src/job-queue/vieter_job_queue.c diff --git a/include/vieter_job_queue.h b/include/vieter_job_queue.h index aeb7e1e..6cd748f 100644 --- a/include/vieter_job_queue.h +++ b/include/vieter_job_queue.h @@ -20,6 +20,8 @@ typedef enum vieter_job_state { // This macro should be kept in sync with the above enum #define VIETER_JOB_STATES 4 +#define VIETER_JOB_INITIAL_STATE vieter_job_state_queued +#define VIETER_JOB_FAILURE_STATE vieter_job_state_failed /* * Struct storing a report for why a certain job failed to be processed in the @@ -64,7 +66,10 @@ typedef struct vieter_job_queue vieter_job_queue; typedef enum vieter_job_queue_error { vieter_job_queue_ok = 0, - vieter_job_queue_not_found = 1 + vieter_job_queue_not_present = 1, + vieter_job_queue_already_present = 2, + vieter_job_queue_state_empty = 3, + vieter_job_queue_not_dispatched = 4 } vieter_job_queue_error; /* @@ -72,24 +77,51 @@ typedef enum vieter_job_queue_error { */ vieter_job_queue *vieter_job_queue_init(); +/* + * Free a job queue. + */ void vieter_job_queue_free(vieter_job_queue **ptp); /* * Insert the given job into the system. */ -vieter_job_queue_error vieter_job_queue_insert(vieter_job *job); +vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, + vieter_job *job); /* - * Dispatch the job with the given id, returning the pointer to the job. - * Dispatching a job removes it from its respective state's queue. + * Pop a job from the given state's queue. The job will then be marked as + * dispatched. */ -vieter_job_queue_error vieter_job_queue_dispatch(vieter_job **out, uint64_t id); +vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, + vieter_job_queue *queue, + vieter_job_state state); /* * Transition the job with the given id to the new state. This sets the * job's dispatch flag to false, and adds it to the new state's queue. + * + * NOTE: this can only be done with dispatched jobs. */ -vieter_job_queue_error vieter_job_queue_transition(uint64_t id, +vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue, + uint64_t id, vieter_job_state new_state); +/* + * Remove the given job from the job queue, returning its pointer to the caller. + * + * NOTE: this can only be done with dispatched jobs. + */ +vieter_job_queue_error +vieter_job_queue_remove(vieter_job **out, vieter_job_queue *queue, uint64_t id); + +/* + * Transition a job into the failure state, and attach a failure report with the + * provided message. The message is copied, so the caller is responsible for + * freeing the provided string. + * + * NOTE: this can only be done with dispatched jobs. + */ +vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue, + uint64_t id, char *report_message); + #endif diff --git a/src/job-queue/vieter_job_queue.c b/src/job-queue/vieter_job_queue.c new file mode 100644 index 0000000..14ae1dc --- /dev/null +++ b/src/job-queue/vieter_job_queue.c @@ -0,0 +1,153 @@ +#include "vieter_job_queue_internal.h" + +vieter_job_queue *vieter_job_queue_init() { + vieter_job_queue *queue = malloc(sizeof(vieter_job_queue)); + + queue->tree = vieter_tree_init(); + + for (int i = 0; i < VIETER_JOB_STATES; i++) { + queue->heaps[i] = vieter_heap_init(); + } + + return queue; +} + +void vieter_job_queue_free(vieter_job_queue **ptp) { + vieter_job_queue *queue = *ptp; + + vieter_tree_free(queue->tree); + + for (int i = 0; i < VIETER_JOB_STATES; i++) { + vieter_heap_free(queue->heaps[i]); + } + + free(queue); + *ptp = NULL; +} + +vieter_job *vieter_job_init() { return calloc(1, sizeof(vieter_job)); } + +void vieter_job_free(vieter_job **ptp) { + vieter_job *job = *ptp; + + if (job->schedule != NULL) { + vieter_cron_expr_free(job->schedule); + } + + if (job->failure_report != NULL) { + vieter_job_failure_report_free(&job->failure_report); + } + + free(job); + + *ptp = NULL; +} + +vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, + vieter_job *job) { + vieter_tree_error tree_res = vieter_tree_insert(queue->tree, job->id, job); + + if (tree_res != vieter_tree_ok) { + return vieter_job_queue_already_present; + } + + vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE], job->id, job); + + job->current_state = VIETER_JOB_INITIAL_STATE; + job->dispatched = false; + job->state_transition_times[VIETER_JOB_INITIAL_STATE] = time(NULL); + + return vieter_job_queue_ok; +} + +vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, + vieter_job_queue *queue, + vieter_job_state state) { + vieter_heap_error res = vieter_heap_pop((void **)out, queue->heaps[state]); + + if (res != vieter_heap_ok) { + return vieter_job_queue_state_empty; + } + + (*out)->dispatched = true; + + return vieter_job_queue_ok; +} + +vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue, + uint64_t id, + vieter_job_state new_state) { + vieter_job *job; + + vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id); + + if (res != vieter_tree_ok) { + return vieter_job_queue_not_present; + } + + if (!job->dispatched) { + return vieter_job_queue_not_dispatched; + } + + vieter_heap_insert(queue->heaps[new_state], job->id, job); + + job->current_state = new_state; + job->dispatched = false; + job->state_transition_times[new_state] = time(NULL); + + return vieter_job_queue_ok; +} + +vieter_job_queue_error vieter_job_queue_remove(vieter_job **out, + vieter_job_queue *queue, + uint64_t id) { + vieter_tree_error res = vieter_tree_search((void **)out, queue->tree, id); + + if (res != vieter_tree_ok) { + return vieter_job_queue_not_present; + } + + vieter_job *job = *out; + + if (!job->dispatched) { + return vieter_job_queue_not_dispatched; + } + + // This can't fail if the search succeeded + vieter_tree_remove((void **)out, queue->tree, job->id); + + return vieter_job_queue_ok; +} + +vieter_job_failure_report *vieter_job_failure_report_init() { + return calloc(1, sizeof(vieter_job_failure_report)); +} + +vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue, + uint64_t id, + char *report_message) { + vieter_job *job; + + vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id); + + if (res != vieter_tree_ok) { + return vieter_job_queue_not_present; + } + + if (!job->dispatched) { + return vieter_job_queue_not_dispatched; + } + + vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE], job->id, job); + + job->dispatched = false; + job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL); + + job->failure_report = vieter_job_failure_report_init(); + job->failure_report->failed_state = job->current_state; + job->failure_report->msg = strdup(report_message); + + job->current_state = VIETER_JOB_FAILURE_STATE; + + return vieter_job_queue_ok; +} diff --git a/src/job-queue/vieter_job_queue_internal.h b/src/job-queue/vieter_job_queue_internal.h index 8d8ffc4..dac8b97 100644 --- a/src/job-queue/vieter_job_queue_internal.h +++ b/src/job-queue/vieter_job_queue_internal.h @@ -1,10 +1,13 @@ #ifndef VIETER_JOB_QUEUE_INTERNAL #define VIETER_JOB_QUEUE_INTERNAL +#include "vieter_heap.h" +#include "vieter_job_queue.h" #include "vieter_tree.h" struct vieter_job_queue { vieter_tree *tree; + vieter_heap *heaps[VIETER_JOB_STATES]; }; #endif