feat(build): allowed invalidating entries in build queue

Jef Roosens 2022-12-13 08:58:27 +01:00
parent 3611123f45
commit 882a9a60a9
Signed by untrusted user: Jef Roosens
GPG Key ID: B75D4F293C7052DB
1 changed files with 67 additions and 24 deletions

View File

@ -8,6 +8,8 @@ import util
struct BuildJob { struct BuildJob {
pub: pub:
// Time at which this build job was created/queued
created time.Time
// Next timestamp from which point this job is allowed to be executed // Next timestamp from which point this job is allowed to be executed
timestamp time.Time timestamp time.Time
// Required for calculating next timestamp after having pop'ed a job // Required for calculating next timestamp after having pop'ed a job
@ -22,6 +24,8 @@ fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp return r1.timestamp < r2.timestamp
} }
// The build job queue is responsible for managing the list of scheduled builds
// for each architecture. Agents receive jobs from this queue.
pub struct BuildJobQueue { pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression // Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression default_schedule CronExpression
@ -31,15 +35,17 @@ mut:
mutex shared util.Dummy mutex shared util.Dummy
// For each architecture, a priority queue is tracked // For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob> queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the // When a target is removed from the server or edited, its previous build
// target IDs. This is used when removing or editing targets. // configs will be invalid. This map allows for those to be simply skipped
// jobs map[int]BuildJob // by ignoring any build configs created before this timestamp.
invalidated map[int]time.Time
} }
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{ return BuildJobQueue{
default_schedule: default_schedule default_schedule: default_schedule
default_base_image: default_base_image default_base_image: default_base_image
invalidated: map[int]time.Time{}
} }
} }
@ -63,6 +69,7 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
timestamp := ce.next_from_now()! timestamp := ce.next_from_now()!
job := BuildJob{ job := BuildJob{
created: time.now()
timestamp: timestamp timestamp: timestamp
ce: ce ce: ce
config: BuildConfig{ config: BuildConfig{
@ -88,6 +95,7 @@ fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
new_job := BuildJob{ new_job := BuildJob{
...job ...job
created: time.now()
timestamp: new_timestamp timestamp: new_timestamp
} }
@ -96,16 +104,26 @@ fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
// peek shows the first job for the given architecture that's ready to be // peek shows the first job for the given architecture that's ready to be
// executed, if present. // executed, if present.
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { pub fn (mut q BuildJobQueue) peek(arch string) ?BuildJob {
rlock q.mutex { rlock q.mutex {
if arch !in q.queues { if arch !in q.queues {
return none return none
} }
job := q.queues[arch].peek() or { return none } for {
job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() { // Skip any invalidated jobs
return job if job.config.target_id in q.invalidated
&& job.created < q.invalidated[job.config.target_id] {
// This pop *should* never fail according to the source code
q.queues[arch].pop() or { return none }
continue
}
if job.timestamp < time.now() {
return job
}
} }
} }
@ -120,17 +138,27 @@ pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
return none return none
} }
mut job := q.queues[arch].peek() or { return none } for {
mut job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() { // Skip any invalidated jobs
job = q.queues[arch].pop()? if job.config.target_id in q.invalidated
&& job.created < q.invalidated[job.config.target_id] {
// This pop *should* never fail according to the source code
q.queues[arch].pop() or { return none }
continue
}
// TODO how do we handle this properly? Is it even possible for a if job.timestamp < time.now() {
// cron expression to not return a next time if it's already been job = q.queues[arch].pop()?
// used before?
q.reschedule(job, arch) or {}
return job // TODO how do we handle this properly? Is it even possible for a
// cron expression to not return a next time if it's already been
// used before?
q.reschedule(job, arch) or {}
return job
}
} }
} }
@ -146,18 +174,28 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
mut out := []BuildJob{} mut out := []BuildJob{}
for out.len < n { outer: for out.len < n {
mut job := q.queues[arch].peek() or { break } for {
mut job := q.queues[arch].peek() or { break outer }
if job.timestamp < time.now() { // Skip any invalidated jobs
job = q.queues[arch].pop() or { break } if job.config.target_id in q.invalidated
&& job.created < q.invalidated[job.config.target_id] {
// This pop *should* never fail according to the source code
q.queues[arch].pop() or { break outer }
continue
}
// TODO idem if job.timestamp < time.now() {
q.reschedule(job, arch) or {} job = q.queues[arch].pop() or { break outer }
out << job // TODO idem
} else { q.reschedule(job, arch) or {}
break
out << job
} else {
break outer
}
} }
} }
@ -166,3 +204,8 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
return [] return []
} }
// invalidate a target's old build jobs.
pub fn (mut q BuildJobQueue) invalidate(target_id int) {
q.invalidated[target_id] = time.now()
}