Job queue implementation #7
|
@ -0,0 +1,5 @@
|
||||||
|
root = true
|
||||||
|
|
||||||
|
[*.{c,h}]
|
||||||
|
indent_style = space
|
||||||
|
indent_size = 2
|
13
Makefile
13
Makefile
|
@ -11,7 +11,8 @@ INC_DIRS ?= include
|
||||||
LIB := $(BUILD_DIR)/$(LIB_FILENAME)
|
LIB := $(BUILD_DIR)/$(LIB_FILENAME)
|
||||||
|
|
||||||
SRCS != find '$(SRC_DIR)' -iname '*.c'
|
SRCS != find '$(SRC_DIR)' -iname '*.c'
|
||||||
SRCS_H != find $(INC_DIRS) '$(SRC_DIR)' -iname '*.h'
|
SRCS_H != find $(INC_DIRS) -iname '*.h'
|
||||||
|
SRCS_H_INTERNAL != find $(SRC_DIR) -iname '*.h'
|
||||||
SRCS_TEST != find '$(TEST_DIR)' -iname '*.c'
|
SRCS_TEST != find '$(TEST_DIR)' -iname '*.c'
|
||||||
|
|
||||||
OBJS := $(SRCS:%=$(BUILD_DIR)/%.o)
|
OBJS := $(SRCS:%=$(BUILD_DIR)/%.o)
|
||||||
|
@ -86,16 +87,22 @@ $(BUILD_DIR)/$(TEST_DIR)/%.c.o: $(TEST_DIR)/%.c
|
||||||
# =====MAINTENANCE=====
|
# =====MAINTENANCE=====
|
||||||
.PHONY: lint
|
.PHONY: lint
|
||||||
lint:
|
lint:
|
||||||
clang-format -n --Werror $(SRCS) $(SRCS_H)
|
clang-format -n --Werror $(SRCS) $(SRCS_H) $(SRCS_H_INTERNAL)
|
||||||
|
|
||||||
.PHONY: fmt
|
.PHONY: fmt
|
||||||
fmt:
|
fmt:
|
||||||
clang-format -i $(SRCS) $(SRCS_H)
|
clang-format -i $(SRCS) $(SRCS_H) $(SRCS_H_INTERNAL)
|
||||||
|
|
||||||
.PHONY: clean
|
.PHONY: clean
|
||||||
clean:
|
clean:
|
||||||
rm -rf $(BUILD_DIR)
|
rm -rf $(BUILD_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
.PHONY: bear
|
||||||
|
bear: clean
|
||||||
|
bear -- make
|
||||||
|
bear --append -- make build-test
|
||||||
|
|
||||||
|
|
||||||
# Make make aware of the .d files
|
# Make make aware of the .d files
|
||||||
-include $(DEPS)
|
-include $(DEPS)
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
#ifndef VIETER_CAT_HEAP
|
||||||
|
#define VIETER_CAT_HEAP
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
typedef struct vieter_cat_heap vieter_cat_heap;
|
||||||
|
|
||||||
|
typedef enum vieter_cat_heap_error {
|
||||||
|
vieter_cat_heap_ok = 0,
|
||||||
|
vieter_cat_heap_arch_empty = 1,
|
||||||
|
vieter_cat_heap_arch_not_found = 2
|
||||||
|
} vieter_cat_heap_error;
|
||||||
|
|
||||||
|
vieter_cat_heap *vieter_cat_heap_init();
|
||||||
|
|
||||||
|
void vieter_cat_heap_free(vieter_cat_heap *cheap);
|
||||||
|
|
||||||
|
uint64_t vieter_cat_heap_size(vieter_cat_heap *cheap);
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_insert(vieter_cat_heap *cheap,
|
||||||
|
char *category, uint64_t key,
|
||||||
|
void *data);
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_pop(void **out, vieter_cat_heap *cheap,
|
||||||
|
char *category);
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_peek(void **out, vieter_cat_heap *cheap,
|
||||||
|
char *category);
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,160 @@
|
||||||
|
#ifndef VIETER_JOB_QUEUE
|
||||||
|
#define VIETER_JOB_QUEUE
|
||||||
|
|
||||||
|
#include "vieter_cron.h"
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The order of these do not imply that they happen in this order. New states
|
||||||
|
* will just get added as consecutive numbers. Their values should be
|
||||||
|
* monotonically increasing values, as these will be used to index arrays, among
|
||||||
|
* other things.
|
||||||
|
*/
|
||||||
|
typedef enum vieter_job_state {
|
||||||
|
vieter_job_state_queued = 0,
|
||||||
|
vieter_job_state_ready = 1,
|
||||||
|
vieter_job_state_build_finished = 2,
|
||||||
|
vieter_job_state_failed = 3
|
||||||
|
} vieter_job_state;
|
||||||
|
|
||||||
|
// This macro should be kept in sync with the above enum
|
||||||
|
#define VIETER_JOB_STATES 4
|
||||||
|
// Neither of these states are allowed to be categorized
|
||||||
|
#define VIETER_JOB_INITIAL_STATE vieter_job_state_queued
|
||||||
|
#define VIETER_JOB_FAILURE_STATE vieter_job_state_failed
|
||||||
|
// Bitmap describing what states should be divided into multiple architectures
|
||||||
|
#define VIETER_JOB_STATES_ARCH 0b0010
|
||||||
|
#define VIETER_JOB_STATE_IS_ARCH(i) (VIETER_JOB_STATES_ARCH & (1 << i))
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Struct storing a report for why a certain job failed to be processed in the
|
||||||
|
* given state.
|
||||||
|
*/
|
||||||
|
typedef struct vieter_job_failure_report {
|
||||||
|
vieter_job_state failed_state;
|
||||||
|
char *msg;
|
||||||
|
} vieter_job_failure_report;
|
||||||
|
|
||||||
|
vieter_job_failure_report *vieter_job_failure_report_init();
|
||||||
|
|
||||||
|
void vieter_job_failure_report_free(vieter_job_failure_report *report);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Represents a job currently being processed in the system. A job migrates
|
||||||
|
* between different states before finally being removed from the queue.
|
||||||
|
*/
|
||||||
|
typedef struct vieter_job {
|
||||||
|
uint64_t id;
|
||||||
|
uint64_t next_scheduled_time;
|
||||||
|
vieter_cron_expression *schedule;
|
||||||
|
void *build_config;
|
||||||
|
char *arch;
|
||||||
|
vieter_job_failure_report *failure_report;
|
||||||
|
uint64_t state_transition_times[VIETER_JOB_STATES];
|
||||||
|
vieter_job_state current_state;
|
||||||
|
bool single;
|
||||||
|
bool dispatched;
|
||||||
|
} vieter_job;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocate a new vieter_job object.
|
||||||
|
*/
|
||||||
|
vieter_job *vieter_job_init();
|
||||||
|
|
||||||
|
void vieter_job_free(vieter_job *job);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Represents the actual queue managing the list of jobs.
|
||||||
|
*/
|
||||||
|
typedef struct vieter_job_queue vieter_job_queue;
|
||||||
|
|
||||||
|
typedef enum vieter_job_queue_error {
|
||||||
|
vieter_job_queue_ok = 0,
|
||||||
|
vieter_job_queue_not_present = 1,
|
||||||
|
vieter_job_queue_already_present = 2,
|
||||||
|
vieter_job_queue_state_empty = 3,
|
||||||
|
vieter_job_queue_not_dispatched = 4,
|
||||||
|
vieter_job_queue_state_is_arch = 5,
|
||||||
|
vieter_job_queue_state_is_not_arch = 6,
|
||||||
|
} vieter_job_queue_error;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocate and initialize a new job queue.
|
||||||
|
*/
|
||||||
|
vieter_job_queue *vieter_job_queue_init();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Free a job queue.
|
||||||
|
*/
|
||||||
|
void vieter_job_queue_free(vieter_job_queue *queue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Insert the given job into the system.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
|
||||||
|
vieter_job *job);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Pop a job from the given non-categorized state's queue. The job will then be
|
||||||
|
* marked as dispatched.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
|
||||||
|
vieter_job_queue *queue,
|
||||||
|
vieter_job_state state);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Pop a job from the given categorized state's queue. The job will then be
|
||||||
|
* marked as dispatched.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out,
|
||||||
|
vieter_job_queue *queue,
|
||||||
|
vieter_job_state state,
|
||||||
|
char *arch);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Transition the job with the given id to the new state. This sets the
|
||||||
|
* job's dispatch flag to false, and adds it to the new state's queue.
|
||||||
|
*
|
||||||
|
* NOTE: this can only be done with dispatched jobs.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
|
||||||
|
uint64_t id,
|
||||||
|
vieter_job_state new_state);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remove the given job from the job queue, returning its pointer to the caller.
|
||||||
|
*
|
||||||
|
* NOTE: this can only be done with dispatched jobs.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error
|
||||||
|
vieter_job_queue_remove(vieter_job **out, vieter_job_queue *queue, uint64_t id);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Transition a job into the failure state, and attach a failure report with the
|
||||||
|
* provided message. The message is copied, so the caller is responsible for
|
||||||
|
* freeing the provided string.
|
||||||
|
*
|
||||||
|
* NOTE: this can only be done with dispatched jobs.
|
||||||
|
*/
|
||||||
|
vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue,
|
||||||
|
uint64_t id, char *report_message);
|
||||||
|
/*
|
||||||
|
* Acquire a read lock on the job queue. Return value is the result of
|
||||||
|
* pthread_rwlock_rdlock.
|
||||||
|
*/
|
||||||
|
int vieter_job_queue_rlock(vieter_job_queue *job_queue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Acquire a write lock on the job queue. Return value is the result of
|
||||||
|
* pthread_rwlock_wrlock.
|
||||||
|
*/
|
||||||
|
int vieter_job_queue_wlock(vieter_job_queue *job_queue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Unlock the lock after having acquired it. Return value is the result of
|
||||||
|
* pthread_rwlock_unlock.
|
||||||
|
*/
|
||||||
|
int vieter_job_queue_unlock(vieter_job_queue *job_queue);
|
||||||
|
|
||||||
|
#endif
|
|
@ -67,4 +67,22 @@ void vieter_tree_iterator_free(vieter_tree_iterator **ptp);
|
||||||
vieter_tree_error vieter_tree_iterator_next(void **out,
|
vieter_tree_error vieter_tree_iterator_next(void **out,
|
||||||
vieter_tree_iterator *iter);
|
vieter_tree_iterator *iter);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Acquire a read lock on the tree. Return value is the result of
|
||||||
|
* pthread_rwlock_rdlock.
|
||||||
|
*/
|
||||||
|
int vieter_tree_rlock(vieter_tree *tree);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Acquire a write lock on the tree. Return value is the result of
|
||||||
|
* pthread_rwlock_wrlock.
|
||||||
|
*/
|
||||||
|
int vieter_tree_wlock(vieter_tree *tree);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Unlock the lock after having acquired it. Return value is the result of
|
||||||
|
* pthread_rwlock_unlock.
|
||||||
|
*/
|
||||||
|
int vieter_tree_unlock(vieter_tree *tree);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
A category heap consists of a collection of heaps, with each heap representing
|
||||||
|
a certain category. In practice, this category would be an architecture (e.g.
|
||||||
|
`x86_64`). This is used by the job queue to have a queue for each architecture
|
||||||
|
for a given state.
|
|
@ -0,0 +1,104 @@
|
||||||
|
#include "vieter_cat_heap_internal.h"
|
||||||
|
|
||||||
|
vieter_cat_heap *vieter_cat_heap_init() {
|
||||||
|
return calloc(1, sizeof(vieter_cat_heap));
|
||||||
|
}
|
||||||
|
|
||||||
|
void vieter_cat_heap_free(vieter_cat_heap *cheap) {
|
||||||
|
if (cheap->cat_count > 0) {
|
||||||
|
for (uint64_t i = 0; i < cheap->cat_count; i++) {
|
||||||
|
free(cheap->categories[i]);
|
||||||
|
vieter_heap_free(cheap->heaps[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(cheap->categories);
|
||||||
|
free(cheap->heaps);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(cheap);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t vieter_cat_heap_size(vieter_cat_heap *cheap) {
|
||||||
|
uint64_t total = 0;
|
||||||
|
|
||||||
|
for (uint64_t i = 0; i < cheap->cat_count; i++) {
|
||||||
|
total += vieter_heap_size(cheap->heaps[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_insert(vieter_cat_heap *cheap,
|
||||||
|
char *category, uint64_t key,
|
||||||
|
void *data) {
|
||||||
|
uint64_t i = 0;
|
||||||
|
|
||||||
|
// For now, we do a linear search through all categories. This is more than
|
||||||
|
// fast enough for most usecases.
|
||||||
|
while (i < cheap->cat_count && strcmp(category, cheap->categories[i]) != 0) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i == cheap->cat_count) {
|
||||||
|
if (cheap->cat_count == 0) {
|
||||||
|
cheap->categories = malloc(sizeof(char *));
|
||||||
|
cheap->heaps = malloc(sizeof(vieter_heap *));
|
||||||
|
} else {
|
||||||
|
cheap->categories =
|
||||||
|
realloc(cheap->categories, sizeof(char *) * (cheap->cat_count + 1));
|
||||||
|
cheap->heaps =
|
||||||
|
realloc(cheap->heaps, sizeof(vieter_heap *) * (cheap->cat_count + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
cheap->cat_count++;
|
||||||
|
|
||||||
|
cheap->categories[i] = strdup(category);
|
||||||
|
cheap->heaps[i] = vieter_heap_init();
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_heap_insert(cheap->heaps[i], key, data);
|
||||||
|
|
||||||
|
return vieter_cat_heap_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_pop(void **out, vieter_cat_heap *cheap,
|
||||||
|
char *category) {
|
||||||
|
uint64_t i = 0;
|
||||||
|
|
||||||
|
while (i < cheap->cat_count && strcmp(category, cheap->categories[i]) != 0) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i == cheap->cat_count) {
|
||||||
|
return vieter_cat_heap_arch_not_found;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_heap_error res = vieter_heap_pop(out, cheap->heaps[i]);
|
||||||
|
|
||||||
|
if (res != vieter_heap_ok) {
|
||||||
|
return vieter_cat_heap_arch_empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
return vieter_cat_heap_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_cat_heap_error vieter_cat_heap_peek(void **out, vieter_cat_heap *cheap,
|
||||||
|
char *category) {
|
||||||
|
uint64_t i = 0;
|
||||||
|
|
||||||
|
while (i < cheap->cat_count && strcmp(category, cheap->categories[i]) != 0) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i == cheap->cat_count) {
|
||||||
|
return vieter_cat_heap_arch_not_found;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_heap_error res = vieter_heap_peek(out, cheap->heaps[i]);
|
||||||
|
|
||||||
|
if (res != vieter_heap_ok) {
|
||||||
|
return vieter_cat_heap_arch_empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
return vieter_cat_heap_ok;
|
||||||
|
}
|
|
@ -0,0 +1,10 @@
|
||||||
|
#include "vieter_cat_heap.h"
|
||||||
|
#include "vieter_heap.h"
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
struct vieter_cat_heap {
|
||||||
|
uint64_t cat_count;
|
||||||
|
char **categories;
|
||||||
|
vieter_heap **heaps;
|
||||||
|
};
|
|
@ -0,0 +1,27 @@
|
||||||
|
The goal of this job queue design is to process jobs in order, with each job
|
||||||
|
moving through a pipeline of tasks that need to be completed.
|
||||||
|
|
||||||
|
At any given time, a job is in one of a few given states, e.g. "queued". These
|
||||||
|
states are explained below. Along with this, each job also has a "dispatched"
|
||||||
|
flag. If this flag is set to true, it means this job is currently being
|
||||||
|
processed. "Being processed" could mean anything; it depends entirely on the
|
||||||
|
state a job's in. While a job is dispatched, it is no longer present in the
|
||||||
|
priority queue of its respective state.
|
||||||
|
|
||||||
|
## Job
|
||||||
|
|
||||||
|
A job describes a scheduled build as it moves through the pipeline of states.
|
||||||
|
The job queue datastructure keeps track of all jobs in a central red-black
|
||||||
|
binary tree. For each state, a priority queue tracks in what order jobs should
|
||||||
|
be processed.
|
||||||
|
|
||||||
|
## States
|
||||||
|
|
||||||
|
* `queued`: a job that's in the job queue but does not yet need to be executed
|
||||||
|
(as defined by its timestamp)
|
||||||
|
* `ready`: a job that's scheduled for building, with all preprocessing tasks
|
||||||
|
fulfilled.
|
||||||
|
* `build_finished`: a job whose build has finished, and is waiting for any
|
||||||
|
post-build tasks.
|
||||||
|
* `failed`: a job whose processing failed at some point. Jobs in this state
|
||||||
|
include a failure report that describes in what state they failed, and why.
|
|
@ -0,0 +1,24 @@
|
||||||
|
#include "vieter_job_queue_internal.h"
|
||||||
|
|
||||||
|
vieter_job *vieter_job_init() { return calloc(1, sizeof(vieter_job)); }
|
||||||
|
|
||||||
|
void vieter_job_free(vieter_job *job) {
|
||||||
|
if (job->schedule != NULL) {
|
||||||
|
vieter_cron_expr_free(job->schedule);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (job->failure_report != NULL) {
|
||||||
|
vieter_job_failure_report_free(job->failure_report);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_failure_report *vieter_job_failure_report_init() {
|
||||||
|
return calloc(1, sizeof(vieter_job_failure_report));
|
||||||
|
}
|
||||||
|
|
||||||
|
void vieter_job_failure_report_free(vieter_job_failure_report *report) {
|
||||||
|
free(report->msg);
|
||||||
|
free(report);
|
||||||
|
}
|
|
@ -0,0 +1,183 @@
|
||||||
|
#include "vieter_job_queue_internal.h"
|
||||||
|
|
||||||
|
vieter_job_queue *vieter_job_queue_init() {
|
||||||
|
vieter_job_queue *queue = malloc(sizeof(vieter_job_queue));
|
||||||
|
|
||||||
|
queue->tree = vieter_tree_init();
|
||||||
|
|
||||||
|
for (int i = 0; i < VIETER_JOB_STATES; i++) {
|
||||||
|
if (VIETER_JOB_STATE_IS_ARCH(i)) {
|
||||||
|
queue->heaps[i].cat_heap = vieter_cat_heap_init();
|
||||||
|
} else {
|
||||||
|
queue->heaps[i].heap = vieter_heap_init();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vieter_job_queue_free(vieter_job_queue *queue) {
|
||||||
|
vieter_tree_free(queue->tree);
|
||||||
|
|
||||||
|
for (int i = 0; i < VIETER_JOB_STATES; i++) {
|
||||||
|
if (VIETER_JOB_STATE_IS_ARCH(i)) {
|
||||||
|
vieter_cat_heap_free(queue->heaps[i].cat_heap);
|
||||||
|
} else {
|
||||||
|
vieter_heap_free(queue->heaps[i].heap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
free(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue,
|
||||||
|
vieter_job *job) {
|
||||||
|
vieter_tree_error tree_res = vieter_tree_insert(queue->tree, job->id, job);
|
||||||
|
|
||||||
|
if (tree_res != vieter_tree_ok) {
|
||||||
|
return vieter_job_queue_already_present;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We assume that the initial state is not a category heap
|
||||||
|
vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE].heap,
|
||||||
|
job->next_scheduled_time, job);
|
||||||
|
|
||||||
|
job->current_state = VIETER_JOB_INITIAL_STATE;
|
||||||
|
job->dispatched = false;
|
||||||
|
job->state_transition_times[VIETER_JOB_INITIAL_STATE] = time(NULL);
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_pop(vieter_job **out,
|
||||||
|
vieter_job_queue *queue,
|
||||||
|
vieter_job_state state) {
|
||||||
|
if (VIETER_JOB_STATE_IS_ARCH(state)) {
|
||||||
|
return vieter_job_queue_state_is_arch;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_heap_error res =
|
||||||
|
vieter_heap_pop((void **)out, queue->heaps[state].heap);
|
||||||
|
|
||||||
|
if (res != vieter_heap_ok) {
|
||||||
|
return vieter_job_queue_state_empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*out)->dispatched = true;
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out,
|
||||||
|
vieter_job_queue *queue,
|
||||||
|
vieter_job_state state,
|
||||||
|
char *arch) {
|
||||||
|
if (!VIETER_JOB_STATE_IS_ARCH(state)) {
|
||||||
|
return vieter_job_queue_state_is_not_arch;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_cat_heap_error res =
|
||||||
|
vieter_cat_heap_pop((void **)out, queue->heaps[state].cat_heap, arch);
|
||||||
|
|
||||||
|
if (res != vieter_cat_heap_ok) {
|
||||||
|
return vieter_job_queue_state_empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*out)->dispatched = true;
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue,
|
||||||
|
uint64_t id,
|
||||||
|
vieter_job_state new_state) {
|
||||||
|
vieter_job *job;
|
||||||
|
|
||||||
|
vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id);
|
||||||
|
|
||||||
|
if (res != vieter_tree_ok) {
|
||||||
|
return vieter_job_queue_not_present;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!job->dispatched) {
|
||||||
|
return vieter_job_queue_not_dispatched;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (VIETER_JOB_STATE_IS_ARCH(new_state)) {
|
||||||
|
vieter_cat_heap_insert(queue->heaps[new_state].cat_heap, job->arch,
|
||||||
|
job->next_scheduled_time, job);
|
||||||
|
} else {
|
||||||
|
vieter_heap_insert(queue->heaps[new_state].heap, job->next_scheduled_time,
|
||||||
|
job);
|
||||||
|
}
|
||||||
|
|
||||||
|
job->current_state = new_state;
|
||||||
|
job->dispatched = false;
|
||||||
|
job->state_transition_times[new_state] = time(NULL);
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_remove(vieter_job **out,
|
||||||
|
vieter_job_queue *queue,
|
||||||
|
uint64_t id) {
|
||||||
|
vieter_tree_error res = vieter_tree_search((void **)out, queue->tree, id);
|
||||||
|
|
||||||
|
if (res != vieter_tree_ok) {
|
||||||
|
return vieter_job_queue_not_present;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job *job = *out;
|
||||||
|
|
||||||
|
if (!job->dispatched) {
|
||||||
|
return vieter_job_queue_not_dispatched;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This can't fail if the search succeeded
|
||||||
|
vieter_tree_remove((void **)out, queue->tree, job->id);
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue,
|
||||||
|
uint64_t id,
|
||||||
|
char *report_message) {
|
||||||
|
vieter_job *job;
|
||||||
|
|
||||||
|
vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id);
|
||||||
|
|
||||||
|
if (res != vieter_tree_ok) {
|
||||||
|
return vieter_job_queue_not_present;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!job->dispatched) {
|
||||||
|
return vieter_job_queue_not_dispatched;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We assume the failure state is not categorized
|
||||||
|
vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE].heap,
|
||||||
|
job->next_scheduled_time, job);
|
||||||
|
|
||||||
|
job->dispatched = false;
|
||||||
|
job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL);
|
||||||
|
|
||||||
|
job->failure_report = vieter_job_failure_report_init();
|
||||||
|
job->failure_report->failed_state = job->current_state;
|
||||||
|
job->failure_report->msg = strdup(report_message);
|
||||||
|
|
||||||
|
job->current_state = VIETER_JOB_FAILURE_STATE;
|
||||||
|
|
||||||
|
return vieter_job_queue_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vieter_job_queue_rlock(vieter_job_queue *job_queue) {
|
||||||
|
return pthread_rwlock_rdlock(&job_queue->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int vieter_job_queue_wlock(vieter_job_queue *job_queue) {
|
||||||
|
return pthread_rwlock_wrlock(&job_queue->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int vieter_job_queue_unlock(vieter_job_queue *job_queue) {
|
||||||
|
return pthread_rwlock_unlock(&job_queue->lock);
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
#ifndef VIETER_JOB_QUEUE_INTERNAL
|
||||||
|
#define VIETER_JOB_QUEUE_INTERNAL
|
||||||
|
|
||||||
|
#include "vieter_cat_heap.h"
|
||||||
|
#include "vieter_heap.h"
|
||||||
|
#include "vieter_job_queue.h"
|
||||||
|
#include "vieter_tree.h"
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
struct vieter_job_queue {
|
||||||
|
vieter_tree *tree;
|
||||||
|
union {
|
||||||
|
vieter_heap *heap;
|
||||||
|
vieter_cat_heap *cat_heap;
|
||||||
|
} heaps[VIETER_JOB_STATES];
|
||||||
|
pthread_rwlock_t lock;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
|
@ -114,3 +114,15 @@ bool vieter_tree_validate(vieter_tree *tree) {
|
||||||
return vieter_tree_node_get(tree->root, vieter_tree_node_black) &&
|
return vieter_tree_node_get(tree->root, vieter_tree_node_black) &&
|
||||||
vieter_tree_node_validate(tree->root, 0, expected_black_nodes);
|
vieter_tree_node_validate(tree->root, 0, expected_black_nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int vieter_tree_rlock(vieter_tree *tree) {
|
||||||
|
return pthread_rwlock_rdlock(&tree->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int vieter_tree_wlock(vieter_tree *tree) {
|
||||||
|
return pthread_rwlock_wrlock(&tree->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int vieter_tree_unlock(vieter_tree *tree) {
|
||||||
|
return pthread_rwlock_unlock(&tree->lock);
|
||||||
|
}
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
#include "vieter_tree.h"
|
#include "vieter_tree.h"
|
||||||
#include "vieter_tree_node.h"
|
#include "vieter_tree_node.h"
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
|
|
||||||
struct vieter_tree {
|
struct vieter_tree {
|
||||||
uint64_t size;
|
uint64_t size;
|
||||||
vieter_tree_node *root;
|
vieter_tree_node *root;
|
||||||
|
pthread_rwlock_t lock;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
#include "acutest.h"
|
||||||
|
#include "vieter_cat_heap_internal.h"
|
||||||
|
|
||||||
|
#define TEST_SIZE(cheap, size) \
|
||||||
|
TEST_CHECK(vieter_cat_heap_size(cheap) == size); \
|
||||||
|
TEST_MSG("Size: %zu, expected: %lu", vieter_cat_heap_size(cheap), (uint64_t)size)
|
||||||
|
|
||||||
|
void test_init() {
|
||||||
|
vieter_cat_heap *cheap = vieter_cat_heap_init();
|
||||||
|
TEST_CHECK(cheap != NULL);
|
||||||
|
TEST_SIZE(cheap, 0);
|
||||||
|
vieter_cat_heap_free(cheap);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_insert() {
|
||||||
|
vieter_cat_heap *cheap = vieter_cat_heap_init();
|
||||||
|
TEST_SIZE(cheap, 0);
|
||||||
|
|
||||||
|
void *data;
|
||||||
|
|
||||||
|
for (uint64_t i = 50; i > 0; i--) {
|
||||||
|
vieter_cat_heap_insert(cheap, "cat1", i, (void *)i);
|
||||||
|
TEST_SIZE(cheap, (uint64_t)51 - i);
|
||||||
|
|
||||||
|
data = 0;
|
||||||
|
|
||||||
|
TEST_CHECK(vieter_cat_heap_peek(&data, cheap, "cat1") == vieter_cat_heap_ok);
|
||||||
|
TEST_CHECK_(data == (void *)i, "%lX == %lX", (uint64_t)data, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint64_t i = 50; i > 0; i--) {
|
||||||
|
vieter_cat_heap_insert(cheap, "cat2", i, (void *)i);
|
||||||
|
TEST_SIZE(cheap, (uint64_t)101 - i);
|
||||||
|
|
||||||
|
data = 0;
|
||||||
|
|
||||||
|
TEST_CHECK(vieter_cat_heap_peek(&data, cheap, "cat2") == vieter_cat_heap_ok);
|
||||||
|
TEST_CHECK_(data == (void *)i, "%lX == %lX", (uint64_t)data, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
vieter_cat_heap_free(cheap);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST_LIST = {
|
||||||
|
{"cat heap init", test_init},
|
||||||
|
{"cat heap insert", test_insert},
|
||||||
|
/* {"heap insert random", test_insert_random}, */
|
||||||
|
/* {"heap pop", test_pop}, */
|
||||||
|
/* {"heap pop random", test_pop_random}, */
|
||||||
|
{NULL, NULL}
|
||||||
|
};
|
|
@ -0,0 +1,23 @@
|
||||||
|
#include "acutest.h"
|
||||||
|
#include "vieter_job_queue_internal.h"
|
||||||
|
|
||||||
|
void test_init() {
|
||||||
|
vieter_job_queue *queue = vieter_job_queue_init();
|
||||||
|
TEST_CHECK(queue != NULL);
|
||||||
|
vieter_job_queue_free(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_job_path() {
|
||||||
|
vieter_job_queue *queue = vieter_job_queue_init();
|
||||||
|
TEST_CHECK(queue != NULL);
|
||||||
|
|
||||||
|
vieter_job *job = vieter_job_init();
|
||||||
|
job->next_scheduled_time = 5;
|
||||||
|
|
||||||
|
vieter_job_queue_free(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_LIST = {
|
||||||
|
{"job queue init", test_init},
|
||||||
|
{NULL, NULL}
|
||||||
|
};
|
Loading…
Reference in New Issue