feat(job-queue): add cat-heap support

This commit is contained in:
Jef Roosens 2023-04-01 17:04:40 +02:00
parent 8e076f8543
commit 1f4887118f
5 changed files with 71 additions and 15 deletions

View file

@ -4,9 +4,7 @@ vieter_cat_heap *vieter_cat_heap_init() {
return calloc(1, sizeof(vieter_cat_heap));
}
void vieter_cat_heap_free(vieter_cat_heap **ptp) {
vieter_cat_heap *cheap = *ptp;
void vieter_cat_heap_free(vieter_cat_heap *cheap) {
if (cheap->cat_count > 0) {
for (uint64_t i = 0; i < cheap->cat_count; i++) {
free(cheap->categories[i]);
@ -18,7 +16,6 @@ void vieter_cat_heap_free(vieter_cat_heap **ptp) {
}
free(cheap);
*ptp = NULL;
}
uint64_t vieter_cat_heap_size(vieter_cat_heap *cheap) {

View file

@ -6,7 +6,11 @@ vieter_job_queue *vieter_job_queue_init() {
queue->tree = vieter_tree_init();
for (int i = 0; i < VIETER_JOB_STATES; i++) {
queue->heaps[i] = vieter_heap_init();
if (VIETER_JOB_STATE_IS_ARCH(i)) {
queue->heaps[i].cat_heap = vieter_cat_heap_init();
} else {
queue->heaps[i].heap = vieter_heap_init();
}
}
return queue;
@ -18,7 +22,11 @@ void vieter_job_queue_free(vieter_job_queue **ptp) {
vieter_tree_free(queue->tree);
for (int i = 0; i < VIETER_JOB_STATES; i++) {
vieter_heap_free(queue->heaps[i]);
if (VIETER_JOB_STATE_IS_ARCH(i)) {
vieter_cat_heap_free(queue->heaps[i].cat_heap);
} else {
vieter_heap_free(queue->heaps[i].heap);
}
}
free(queue);
@ -51,7 +59,8 @@ vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
return vieter_job_queue_already_present;
}
vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE], job->id, job);
// We assume that the initial state is not a category heap
vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE].heap, job->id, job);
job->current_state = VIETER_JOB_INITIAL_STATE;
job->dispatched = false;
@ -63,8 +72,12 @@ vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
vieter_job_queue *queue,
vieter_job_state state) {
if (VIETER_JOB_STATE_IS_ARCH(state)) {
return vieter_job_queue_state_is_arch;
}
vieter_heap_error res = vieter_heap_pop((void **)out, queue->heaps[state]);
vieter_heap_error res =
vieter_heap_pop((void **)out, queue->heaps[state].heap);
if (res != vieter_heap_ok) {
return vieter_job_queue_state_empty;
@ -75,6 +88,26 @@ vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
return vieter_job_queue_ok;
}
vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out,
vieter_job_queue *queue,
vieter_job_state state,
char *arch) {
if (!VIETER_JOB_STATE_IS_ARCH(state)) {
return vieter_job_queue_state_is_not_arch;
}
vieter_cat_heap_error res =
vieter_cat_heap_pop((void **)out, queue->heaps[state].cat_heap, arch);
if (res != vieter_cat_heap_ok) {
return vieter_job_queue_state_empty;
}
(*out)->dispatched = true;
return vieter_job_queue_ok;
}
vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
uint64_t id,
vieter_job_state new_state) {
@ -90,7 +123,12 @@ vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
return vieter_job_queue_not_dispatched;
}
vieter_heap_insert(queue->heaps[new_state], job->id, job);
if (VIETER_JOB_STATE_IS_ARCH(new_state)) {
vieter_cat_heap_insert(queue->heaps[new_state].cat_heap, job->arch, job->id,
job);
} else {
vieter_heap_insert(queue->heaps[new_state].heap, job->id, job);
}
job->current_state = new_state;
job->dispatched = false;
@ -139,7 +177,8 @@ vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue,
return vieter_job_queue_not_dispatched;
}
vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE], job->id, job);
// We assume the failure state is not categorized
vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE].heap, job->id, job);
job->dispatched = false;
job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL);

View file

@ -1,6 +1,7 @@
#ifndef VIETER_JOB_QUEUE_INTERNAL
#define VIETER_JOB_QUEUE_INTERNAL
#include "vieter_cat_heap.h"
#include "vieter_heap.h"
#include "vieter_job_queue.h"
#include "vieter_tree.h"
@ -8,7 +9,10 @@
struct vieter_job_queue {
vieter_tree *tree;
vieter_heap *heaps[VIETER_JOB_STATES];
union {
vieter_heap *heap;
vieter_cat_heap *cat_heap;
} heaps[VIETER_JOB_STATES];
pthread_rwlock_t lock;
};