#include "vieter_job_queue_internal.h" vieter_job_queue *vieter_job_queue_init() { vieter_job_queue *queue = malloc(sizeof(vieter_job_queue)); queue->tree = vieter_tree_init(); for (int i = 0; i < VIETER_JOB_STATES; i++) { if (VIETER_JOB_STATE_IS_ARCH(i)) { queue->heaps[i].cat_heap = vieter_cat_heap_init(); } else { queue->heaps[i].heap = vieter_heap_init(); } } return queue; } void vieter_job_queue_free(vieter_job_queue *queue) { vieter_tree_free(queue->tree); for (int i = 0; i < VIETER_JOB_STATES; i++) { if (VIETER_JOB_STATE_IS_ARCH(i)) { vieter_cat_heap_free(queue->heaps[i].cat_heap); } else { vieter_heap_free(queue->heaps[i].heap); } } free(queue); } vieter_job_queue_error vieter_job_queue_insert(vieter_job_queue *queue, vieter_job *job) { vieter_tree_error tree_res = vieter_tree_insert(queue->tree, job->id, job); if (tree_res != vieter_tree_ok) { return vieter_job_queue_already_present; } // We assume that the initial state is not a category heap vieter_heap_insert(queue->heaps[VIETER_JOB_INITIAL_STATE].heap, job->next_scheduled_time, job); job->current_state = VIETER_JOB_INITIAL_STATE; job->dispatched = false; job->state_transition_times[VIETER_JOB_INITIAL_STATE] = time(NULL); return vieter_job_queue_ok; } vieter_job_queue_error vieter_job_queue_pop(vieter_job **out, vieter_job_queue *queue, vieter_job_state state) { if (VIETER_JOB_STATE_IS_ARCH(state)) { return vieter_job_queue_state_is_arch; } vieter_heap_error res = vieter_heap_pop((void **)out, queue->heaps[state].heap); if (res != vieter_heap_ok) { return vieter_job_queue_state_empty; } (*out)->dispatched = true; return vieter_job_queue_ok; } vieter_job_queue_error vieter_job_queue_pop_arch(vieter_job **out, vieter_job_queue *queue, vieter_job_state state, char *arch) { if (!VIETER_JOB_STATE_IS_ARCH(state)) { return vieter_job_queue_state_is_not_arch; } vieter_cat_heap_error res = vieter_cat_heap_pop((void **)out, queue->heaps[state].cat_heap, arch); if (res != vieter_cat_heap_ok) { return vieter_job_queue_state_empty; } (*out)->dispatched = true; return vieter_job_queue_ok; } vieter_job_queue_error vieter_job_queue_transition(vieter_job_queue *queue, uint64_t id, vieter_job_state new_state) { vieter_job *job; vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id); if (res != vieter_tree_ok) { return vieter_job_queue_not_present; } if (!job->dispatched) { return vieter_job_queue_not_dispatched; } if (VIETER_JOB_STATE_IS_ARCH(new_state)) { vieter_cat_heap_insert(queue->heaps[new_state].cat_heap, job->arch, job->next_scheduled_time, job); } else { vieter_heap_insert(queue->heaps[new_state].heap, job->next_scheduled_time, job); } job->current_state = new_state; job->dispatched = false; job->state_transition_times[new_state] = time(NULL); return vieter_job_queue_ok; } vieter_job_queue_error vieter_job_queue_remove(vieter_job **out, vieter_job_queue *queue, uint64_t id) { vieter_tree_error res = vieter_tree_search((void **)out, queue->tree, id); if (res != vieter_tree_ok) { return vieter_job_queue_not_present; } vieter_job *job = *out; if (!job->dispatched) { return vieter_job_queue_not_dispatched; } // This can't fail if the search succeeded vieter_tree_remove((void **)out, queue->tree, job->id); return vieter_job_queue_ok; } vieter_job_queue_error vieter_job_queue_fail(vieter_job_queue *queue, uint64_t id, char *report_message) { vieter_job *job; vieter_tree_error res = vieter_tree_search((void **)&job, queue->tree, id); if (res != vieter_tree_ok) { return vieter_job_queue_not_present; } if (!job->dispatched) { return vieter_job_queue_not_dispatched; } // We assume the failure state is not categorized vieter_heap_insert(queue->heaps[VIETER_JOB_FAILURE_STATE].heap, job->next_scheduled_time, job); job->dispatched = false; job->state_transition_times[VIETER_JOB_FAILURE_STATE] = time(NULL); job->failure_report = vieter_job_failure_report_init(); job->failure_report->failed_state = job->current_state; job->failure_report->msg = strdup(report_message); job->current_state = VIETER_JOB_FAILURE_STATE; return vieter_job_queue_ok; } int vieter_job_queue_rlock(vieter_job_queue *job_queue) { return pthread_rwlock_rdlock(&job_queue->lock); } int vieter_job_queue_wlock(vieter_job_queue *job_queue) { return pthread_rwlock_wrlock(&job_queue->lock); } int vieter_job_queue_unlock(vieter_job_queue *job_queue) { return pthread_rwlock_unlock(&job_queue->lock); }