feat(server): properly reschedule jobs after polling

Jef Roosens 2022-12-12 20:59:43 +01:00 committed by Chewing_Bever
parent c57de4d8ee
commit 0a5c4295e0
Signed by untrusted user: Jef Roosens
GPG Key ID: B75D4F293C7052DB
2 changed files with 45 additions and 11 deletions

View File

@ -8,12 +8,16 @@ import util
struct BuildJob { struct BuildJob {
pub: pub:
// Earliest point this // Next timestamp from which point this job is allowed to be executed
timestamp time.Time timestamp time.Time
// Required for calculating next timestamp after having pop'ed a job
ce CronExpression
// Actual build config sent to the agent
config BuildConfig config BuildConfig
} }
// Overloaded operator for comparing ScheduledBuild objects // Allows BuildJob structs to be sorted according to their timestamp in
// MinHeaps
fn (r1 BuildJob) < (r2 BuildJob) bool { fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp return r1.timestamp < r2.timestamp
} }
@ -39,7 +43,9 @@ pub fn new_job_queue(default_schedule CronExpression, default_base_image string)
} }
} }
// insert a new job into the queue for a given target on an architecture. // insert a new target's job into the queue for the given architecture. This
// job will then be endlessly rescheduled after being pop'ed, unless removed
// explicitely.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
lock q.mutex { lock q.mutex {
if arch !in q.queues { if arch !in q.queues {
@ -58,6 +64,7 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
job := BuildJob{ job := BuildJob{
timestamp: timestamp timestamp: timestamp
ce: ce
config: BuildConfig{ config: BuildConfig{
target_id: target.id target_id: target.id
kind: target.kind kind: target.kind
@ -69,10 +76,25 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
} }
} }
dump(job)
q.queues[arch].insert(job) q.queues[arch].insert(job)
} }
} }
// reschedule the given job by calculating the next timestamp and re-adding it
// to its respective queue. This function is called by the pop functions
// *after* having pop'ed the job.
fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
new_timestamp := job.ce.next_from_now()!
new_job := BuildJob{
...job
timestamp: new_timestamp
}
q.queues[arch].insert(new_job)
}
// peek shows the first job for the given architecture that's ready to be // peek shows the first job for the given architecture that's ready to be
// executed, if present. // executed, if present.
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob {
@ -99,10 +121,17 @@ pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
return none return none
} }
job := q.queues[arch].peek() or { return none } mut job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() { if job.timestamp < time.now() {
return q.queues[arch].pop() job = q.queues[arch].pop()?
// TODO how do we handle this properly? Is it even possible for a
// cron expression to not return a next time if it's already been
// used before?
q.reschedule(job, arch) or {}
return job
} }
} }
@ -119,10 +148,15 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
mut out := []BuildJob{} mut out := []BuildJob{}
for out.len < n { for out.len < n {
job := q.queues[arch].peek() or { break } mut job := q.queues[arch].peek() or { break }
if job.timestamp < time.now() { if job.timestamp < time.now() {
out << q.queues[arch].pop() or { break } job = q.queues[arch].pop() or { break }
// TODO idem
q.reschedule(job, arch) or {}
out << job
} else { } else {
break break
} }

View File

@ -6,8 +6,8 @@ import web.response { new_data_response, new_response }
// import util // import util
// import models { BuildLog, BuildLogFilter } // import models { BuildLog, BuildLogFilter }
['/api/v1/builds/poll'; auth; get] ['/api/v1/jobs/poll'; auth; get]
fn (mut app App) v1_poll_build_queue() web.Result { fn (mut app App) v1_poll_job_queue() web.Result {
arch := app.query['arch'] or { arch := app.query['arch'] or {
return app.json(.bad_request, new_response('Missing arch query arg.')) return app.json(.bad_request, new_response('Missing arch query arg.'))
} }
@ -17,7 +17,7 @@ fn (mut app App) v1_poll_build_queue() web.Result {
} }
max := max_str.int() max := max_str.int()
mut out := app.job_queue.pop_n(arch, max) mut out := app.job_queue.pop_n(arch, max).map(it.config)
return app.json(.ok, new_data_response(out)) return app.json(.ok, new_data_response(out))
} }