feat(job-queue): initial implementation of functionality

pull/7/head
Jef Roosens 2023-03-07 11:55:12 +01:00
parent 02bd2c24b7
commit 83b3198a49
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 194 additions and 6 deletions

View File

@ -20,6 +20,8 @@ typedef enum vieter_job_state {
// This macro should be kept in sync with the above enum // This macro should be kept in sync with the above enum
#define VIETER_JOB_STATES 4 #define VIETER_JOB_STATES 4
#define VIETER_JOB_INITIAL_STATE vieter_job_state_queued
#define VIETER_JOB_FAILURE_STATE vieter_job_state_failed
/* /*
* Struct storing a report for why a certain job failed to be processed in the * Struct storing a report for why a certain job failed to be processed in the
@ -64,7 +66,10 @@ typedef struct vieter_job_queue vieter_job_queue;
typedef enum vieter_job_queue_error { typedef enum vieter_job_queue_error {
vieter_job_queue_ok = 0, vieter_job_queue_ok = 0,
vieter_job_queue_not_found = 1 vieter_job_queue_not_present = 1,
vieter_job_queue_already_present = 2,
vieter_job_queue_state_empty = 3,
vieter_job_queue_not_dispatched = 4
} vieter_job_queue_error; } vieter_job_queue_error;
/* /*
@ -72,24 +77,51 @@ typedef enum vieter_job_queue_error {
*/ */
vieter_job_queue *vieter_job_queue_init(); vieter_job_queue *vieter_job_queue_init();
/*
* Free a job queue.
*/
void vieter_job_queue_free(vieter_job_queue **ptp); void vieter_job_queue_free(vieter_job_queue **ptp);
/* /*
* Insert the given job into the system. * Insert the given job into the system.
*/ */
vieter_job_queue_error vieter_job_queue_insert(vieter_job *job); vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
vieter_job *job);
/* /*
* Dispatch the job with the given id, returning the pointer to the job. * Pop a job from the given state's queue. The job will then be marked as
* Dispatching a job removes it from its respective state's queue. * dispatched.
*/ */
vieter_job_queue_error vieter_job_queue_dispatch(vieter_job **out, uint64_t id); vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
vieter_job_queue *queue,
vieter_job_state state);
/* /*
* Transition the job with the given id to the new state. This sets the * Transition the job with the given id to the new state. This sets the
* job's dispatch flag to false, and adds it to the new state's queue. * job's dispatch flag to false, and adds it to the new state's queue.
*
* NOTE: this can only be done with dispatched jobs.
*/ */
vieter_job_queue_error vieter_job_queue_transition(uint64_t id, vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
uint64_t id,
vieter_job_state new_state); vieter_job_state new_state);
/*
* Remove the given job from the job queue, returning its pointer to the caller.
*
* NOTE: this can only be done with dispatched jobs.
*/
vieter_job_queue_error
vieter_job_queue_remove(vieter_job **out, vieter_job_queue *queue, uint64_t id);
/*
* Transition a job into the failure state, and attach a failure report with the
* provided message. The message is copied, so the caller is responsible for
* freeing the provided string.
*
* NOTE: this can only be done with dispatched jobs.
*/
vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue,
uint64_t id, char *report_message);
#endif #endif

View File

@ -0,0 +1,153 @@
#include "vieter_job_queue_internal.h"
vieter_job_queue *vieter_job_queue_init() {
vieter_job_queue *queue = malloc(sizeof(vieter_job_queue));
queue->tree = vieter_tree_init();
for (int i = 0; i < VIETER_JOB_STATES; i++) {
queue->heaps[i] = vieter_heap_init();
}
return queue;
}
void vieter_job_queue_free(vieter_job_queue **ptp) {
vieter_job_queue *queue = *ptp;
vieter_tree_free(queue->tree);
for (int i = 0; i < VIETER_JOB_STATES; i++) {
vieter_heap_free(queue->heaps[i]);
}
free(queue);
*ptp = NULL;
}
vieter_job *vieter_job_init() { return calloc(1, sizeof(vieter_job)); }
void vieter_job_free(vieter_job **ptp) {
vieter_job *job = *ptp;
if (job->schedule != NULL) {
vieter_cron_expr_free(job->schedule);
}
if (job->failure_report != NULL) {
vieter_job_failure_report_free(&job->failure_report);
}
free(job);
*ptp = NULL;
}
vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
vieter_job *job) {
vieter_tree_error tree_res = vieter_tree_insert(queue->tree, job->id, job);
if (tree_res != vieter_tree_ok) {
return vieter_job_queue_already_present;
}
vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE], job->id, job);
job->current_state = VIETER_JOB_INITIAL_STATE;
job->dispatched = false;
job->state_transition_times[VIETER_JOB_INITIAL_STATE] = time(NULL);
return vieter_job_queue_ok;
}
vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
vieter_job_queue *queue,
vieter_job_state state) {
vieter_heap_error res = vieter_heap_pop((void **)out, queue->heaps[state]);
if (res != vieter_heap_ok) {
return vieter_job_queue_state_empty;
}
(*out)->dispatched = true;
return vieter_job_queue_ok;
}
vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
uint64_t id,
vieter_job_state new_state) {
vieter_job *job;
vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id);
if (res != vieter_tree_ok) {
return vieter_job_queue_not_present;
}
if (!job->dispatched) {
return vieter_job_queue_not_dispatched;
}
vieter_heap_insert(queue->heaps[new_state], job->id, job);
job->current_state = new_state;
job->dispatched = false;
job->state_transition_times[new_state] = time(NULL);
return vieter_job_queue_ok;
}
vieter_job_queue_error vieter_job_queue_remove(vieter_job **out,
vieter_job_queue *queue,
uint64_t id) {
vieter_tree_error res = vieter_tree_search((void **)out, queue->tree, id);
if (res != vieter_tree_ok) {
return vieter_job_queue_not_present;
}
vieter_job *job = *out;
if (!job->dispatched) {
return vieter_job_queue_not_dispatched;
}
// This can't fail if the search succeeded
vieter_tree_remove((void **)out, queue->tree, job->id);
return vieter_job_queue_ok;
}
vieter_job_failure_report *vieter_job_failure_report_init() {
return calloc(1, sizeof(vieter_job_failure_report));
}
vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue,
uint64_t id,
char *report_message) {
vieter_job *job;
vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id);
if (res != vieter_tree_ok) {
return vieter_job_queue_not_present;
}
if (!job->dispatched) {
return vieter_job_queue_not_dispatched;
}
vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE], job->id, job);
job->dispatched = false;
job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL);
job->failure_report = vieter_job_failure_report_init();
job->failure_report->failed_state = job->current_state;
job->failure_report->msg = strdup(report_message);
job->current_state = VIETER_JOB_FAILURE_STATE;
return vieter_job_queue_ok;
}

View File

@ -1,10 +1,13 @@
#ifndef VIETER_JOB_QUEUE_INTERNAL #ifndef VIETER_JOB_QUEUE_INTERNAL
#define VIETER_JOB_QUEUE_INTERNAL #define VIETER_JOB_QUEUE_INTERNAL
#include "vieter_heap.h"
#include "vieter_job_queue.h"
#include "vieter_tree.h" #include "vieter_tree.h"
struct vieter_job_queue { struct vieter_job_queue {
vieter_tree *tree; vieter_tree *tree;
vieter_heap *heaps[VIETER_JOB_STATES];
}; };
#endif #endif