From 1f4887118f7eec4dcfbf9f0275d4244a2a143b3f Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 1 Apr 2023 17:04:40 +0200 Subject: [PATCH] feat(job-queue): add cat-heap support --- include/vieter_cat_heap.h | 2 +- include/vieter_job_queue.h | 22 ++++++++-- src/cat-heap/vieter_cat_heap.c | 5 +-- src/job-queue/vieter_job_queue.c | 51 ++++++++++++++++++++--- src/job-queue/vieter_job_queue_internal.h | 6 ++- 5 files changed, 71 insertions(+), 15 deletions(-) diff --git a/include/vieter_cat_heap.h b/include/vieter_cat_heap.h index d6b2a83..f25df00 100644 --- a/include/vieter_cat_heap.h +++ b/include/vieter_cat_heap.h @@ -13,7 +13,7 @@ typedef enum vieter_cat_heap_error { vieter_cat_heap *vieter_cat_heap_init(); -void vieter_cat_heap_free(vieter_cat_heap **ptp); +void vieter_cat_heap_free(vieter_cat_heap *cheap); uint64_t vieter_cat_heap_size(vieter_cat_heap *cheap); diff --git a/include/vieter_job_queue.h b/include/vieter_job_queue.h index 92ef35f..9fb2c96 100644 --- a/include/vieter_job_queue.h +++ b/include/vieter_job_queue.h @@ -20,8 +20,12 @@ typedef enum vieter_job_state { // This macro should be kept in sync with the above enum #define VIETER_JOB_STATES 4 +// Neither of these states are allowed to be categorized #define VIETER_JOB_INITIAL_STATE vieter_job_state_queued #define VIETER_JOB_FAILURE_STATE vieter_job_state_failed +// Bitmap describing what states should be divided into multiple architectures +#define VIETER_JOB_STATES_ARCH 0b0010 +#define VIETER_JOB_STATE_IS_ARCH(i) (VIETER_JOB_STATES_ARCH & (1 << i)) /* * Struct storing a report for why a certain job failed to be processed in the @@ -45,6 +49,7 @@ typedef struct vieter_job { uint64_t next_scheduled_time; vieter_cron_expression *schedule; void *build_config; + char *arch; vieter_job_failure_report *failure_report; uint64_t state_transition_times[VIETER_JOB_STATES]; vieter_job_state current_state; @@ -69,7 +74,9 @@ typedef enum vieter_job_queue_error { vieter_job_queue_not_present = 1, vieter_job_queue_already_present = 2, vieter_job_queue_state_empty = 3, - vieter_job_queue_not_dispatched = 4 + vieter_job_queue_not_dispatched = 4, + vieter_job_queue_state_is_arch = 5, + vieter_job_queue_state_is_not_arch = 6, } vieter_job_queue_error; /* @@ -89,13 +96,22 @@ vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, vieter_job *job); /* - * Pop a job from the given state's queue. The job will then be marked as - * dispatched. + * Pop a job from the given non-categorized state's queue. The job will then be + * marked as dispatched. */ vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, vieter_job_queue *queue, vieter_job_state state); +/* + * Pop a job from the given categorized state's queue. The job will then be + * marked as dispatched. + */ +vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out, + vieter_job_queue *queue, + vieter_job_state state, + char *arch); + /* * Transition the job with the given id to the new state. This sets the * job's dispatch flag to false, and adds it to the new state's queue. diff --git a/src/cat-heap/vieter_cat_heap.c b/src/cat-heap/vieter_cat_heap.c index b514b5b..d0a00a0 100644 --- a/src/cat-heap/vieter_cat_heap.c +++ b/src/cat-heap/vieter_cat_heap.c @@ -4,9 +4,7 @@ vieter_cat_heap *vieter_cat_heap_init() { return calloc(1, sizeof(vieter_cat_heap)); } -void vieter_cat_heap_free(vieter_cat_heap **ptp) { - vieter_cat_heap *cheap = *ptp; - +void vieter_cat_heap_free(vieter_cat_heap *cheap) { if (cheap->cat_count > 0) { for (uint64_t i = 0; i < cheap->cat_count; i++) { free(cheap->categories[i]); @@ -18,7 +16,6 @@ void vieter_cat_heap_free(vieter_cat_heap **ptp) { } free(cheap); - *ptp = NULL; } uint64_t vieter_cat_heap_size(vieter_cat_heap *cheap) { diff --git a/src/job-queue/vieter_job_queue.c b/src/job-queue/vieter_job_queue.c index a5fb9e3..0fd3c09 100644 --- a/src/job-queue/vieter_job_queue.c +++ b/src/job-queue/vieter_job_queue.c @@ -6,7 +6,11 @@ vieter_job_queue *vieter_job_queue_init() { queue->tree = vieter_tree_init(); for (int i = 0; i < VIETER_JOB_STATES; i++) { - queue->heaps[i] = vieter_heap_init(); + if (VIETER_JOB_STATE_IS_ARCH(i)) { + queue->heaps[i].cat_heap = vieter_cat_heap_init(); + } else { + queue->heaps[i].heap = vieter_heap_init(); + } } return queue; @@ -18,7 +22,11 @@ void vieter_job_queue_free(vieter_job_queue **ptp) { vieter_tree_free(queue->tree); for (int i = 0; i < VIETER_JOB_STATES; i++) { - vieter_heap_free(queue->heaps[i]); + if (VIETER_JOB_STATE_IS_ARCH(i)) { + vieter_cat_heap_free(queue->heaps[i].cat_heap); + } else { + vieter_heap_free(queue->heaps[i].heap); + } } free(queue); @@ -51,7 +59,8 @@ vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, return vieter_job_queue_already_present; } - vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE], job->id, job); + // We assume that the initial state is not a category heap + vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE].heap, job->id, job); job->current_state = VIETER_JOB_INITIAL_STATE; job->dispatched = false; @@ -63,8 +72,12 @@ vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, vieter_job_queue *queue, vieter_job_state state) { + if (VIETER_JOB_STATE_IS_ARCH(state)) { + return vieter_job_queue_state_is_arch; + } - vieter_heap_error res = vieter_heap_pop((void **)out, queue->heaps[state]); + vieter_heap_error res = + vieter_heap_pop((void **)out, queue->heaps[state].heap); if (res != vieter_heap_ok) { return vieter_job_queue_state_empty; @@ -75,6 +88,26 @@ vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, return vieter_job_queue_ok; } +vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out, + vieter_job_queue *queue, + vieter_job_state state, + char *arch) { + if (!VIETER_JOB_STATE_IS_ARCH(state)) { + return vieter_job_queue_state_is_not_arch; + } + + vieter_cat_heap_error res = + vieter_cat_heap_pop((void **)out, queue->heaps[state].cat_heap, arch); + + if (res != vieter_cat_heap_ok) { + return vieter_job_queue_state_empty; + } + + (*out)->dispatched = true; + + return vieter_job_queue_ok; +} + vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue, uint64_t id, vieter_job_state new_state) { @@ -90,7 +123,12 @@ vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue, return vieter_job_queue_not_dispatched; } - vieter_heap_insert(queue->heaps[new_state], job->id, job); + if (VIETER_JOB_STATE_IS_ARCH(new_state)) { + vieter_cat_heap_insert(queue->heaps[new_state].cat_heap, job->arch, job->id, + job); + } else { + vieter_heap_insert(queue->heaps[new_state].heap, job->id, job); + } job->current_state = new_state; job->dispatched = false; @@ -139,7 +177,8 @@ vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue, return vieter_job_queue_not_dispatched; } - vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE], job->id, job); + // We assume the failure state is not categorized + vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE].heap, job->id, job); job->dispatched = false; job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL); diff --git a/src/job-queue/vieter_job_queue_internal.h b/src/job-queue/vieter_job_queue_internal.h index 4641ba6..99ecab8 100644 --- a/src/job-queue/vieter_job_queue_internal.h +++ b/src/job-queue/vieter_job_queue_internal.h @@ -1,6 +1,7 @@ #ifndef VIETER_JOB_QUEUE_INTERNAL #define VIETER_JOB_QUEUE_INTERNAL +#include "vieter_cat_heap.h" #include "vieter_heap.h" #include "vieter_job_queue.h" #include "vieter_tree.h" @@ -8,7 +9,10 @@ struct vieter_job_queue { vieter_tree *tree; - vieter_heap *heaps[VIETER_JOB_STATES]; + union { + vieter_heap *heap; + vieter_cat_heap *cat_heap; + } heaps[VIETER_JOB_STATES]; pthread_rwlock_t lock; };