From 0a5c4295e008b3687d160957328a934c85489f9b Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Mon, 12 Dec 2022 20:59:43 +0100 Subject: [PATCH] feat(server): properly reschedule jobs after polling --- src/build/queue.v | 50 ++++++++++++++++++++++++++++++++++------- src/server/api_builds.v | 6 ++--- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/build/queue.v b/src/build/queue.v index 65b279e2..b704926f 100644 --- a/src/build/queue.v +++ b/src/build/queue.v @@ -8,12 +8,16 @@ import util struct BuildJob { pub: - // Earliest point this + // Next timestamp from which point this job is allowed to be executed timestamp time.Time - config BuildConfig + // Required for calculating next timestamp after having pop'ed a job + ce CronExpression + // Actual build config sent to the agent + config BuildConfig } -// Overloaded operator for comparing ScheduledBuild objects +// Allows BuildJob structs to be sorted according to their timestamp in +// MinHeaps fn (r1 BuildJob) < (r2 BuildJob) bool { return r1.timestamp < r2.timestamp } @@ -39,7 +43,9 @@ pub fn new_job_queue(default_schedule CronExpression, default_base_image string) } } -// insert a new job into the queue for a given target on an architecture. +// insert a new target's job into the queue for the given architecture. This +// job will then be endlessly rescheduled after being pop'ed, unless removed +// explicitely. pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { lock q.mutex { if arch !in q.queues { @@ -58,6 +64,7 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { job := BuildJob{ timestamp: timestamp + ce: ce config: BuildConfig{ target_id: target.id kind: target.kind @@ -69,10 +76,25 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { } } + dump(job) q.queues[arch].insert(job) } } +// reschedule the given job by calculating the next timestamp and re-adding it +// to its respective queue. This function is called by the pop functions +// *after* having pop'ed the job. +fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! { + new_timestamp := job.ce.next_from_now()! + + new_job := BuildJob{ + ...job + timestamp: new_timestamp + } + + q.queues[arch].insert(new_job) +} + // peek shows the first job for the given architecture that's ready to be // executed, if present. pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { @@ -99,10 +121,17 @@ pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob { return none } - job := q.queues[arch].peek() or { return none } + mut job := q.queues[arch].peek() or { return none } if job.timestamp < time.now() { - return q.queues[arch].pop() + job = q.queues[arch].pop()? + + // TODO how do we handle this properly? Is it even possible for a + // cron expression to not return a next time if it's already been + // used before? + q.reschedule(job, arch) or {} + + return job } } @@ -119,10 +148,15 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { mut out := []BuildJob{} for out.len < n { - job := q.queues[arch].peek() or { break } + mut job := q.queues[arch].peek() or { break } if job.timestamp < time.now() { - out << q.queues[arch].pop() or { break } + job = q.queues[arch].pop() or { break } + + // TODO idem + q.reschedule(job, arch) or {} + + out << job } else { break } diff --git a/src/server/api_builds.v b/src/server/api_builds.v index 62948cd3..ec3c8ec2 100644 --- a/src/server/api_builds.v +++ b/src/server/api_builds.v @@ -6,8 +6,8 @@ import web.response { new_data_response, new_response } // import util // import models { BuildLog, BuildLogFilter } -['/api/v1/builds/poll'; auth; get] -fn (mut app App) v1_poll_build_queue() web.Result { +['/api/v1/jobs/poll'; auth; get] +fn (mut app App) v1_poll_job_queue() web.Result { arch := app.query['arch'] or { return app.json(.bad_request, new_response('Missing arch query arg.')) } @@ -17,7 +17,7 @@ fn (mut app App) v1_poll_build_queue() web.Result { } max := max_str.int() - mut out := app.job_queue.pop_n(arch, max) + mut out := app.job_queue.pop_n(arch, max).map(it.config) return app.json(.ok, new_data_response(out)) }