diff --git a/src/build/queue.v b/src/build/queue.v index 29036e4b..b5595524 100644 --- a/src/build/queue.v +++ b/src/build/queue.v @@ -8,6 +8,8 @@ import util struct BuildJob { pub: + // Time at which this build job was created/queued + created time.Time // Next timestamp from which point this job is allowed to be executed timestamp time.Time // Required for calculating next timestamp after having pop'ed a job @@ -22,6 +24,8 @@ fn (r1 BuildJob) < (r2 BuildJob) bool { return r1.timestamp < r2.timestamp } +// The build job queue is responsible for managing the list of scheduled builds +// for each architecture. Agents receive jobs from this queue. pub struct BuildJobQueue { // Schedule to use for targets without explicitely defined cron expression default_schedule CronExpression @@ -31,15 +35,17 @@ mut: mutex shared util.Dummy // For each architecture, a priority queue is tracked queues map[string]MinHeap - // Each queued build job is also stored in a map, with the keys being the - // target IDs. This is used when removing or editing targets. - // jobs map[int]BuildJob + // When a target is removed from the server or edited, its previous build + // configs will be invalid. This map allows for those to be simply skipped + // by ignoring any build configs created before this timestamp. + invalidated map[int]time.Time } pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { return BuildJobQueue{ default_schedule: default_schedule default_base_image: default_base_image + invalidated: map[int]time.Time{} } } @@ -63,6 +69,7 @@ pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { timestamp := ce.next_from_now()! job := BuildJob{ + created: time.now() timestamp: timestamp ce: ce config: BuildConfig{ @@ -88,6 +95,7 @@ fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! { new_job := BuildJob{ ...job + created: time.now() timestamp: new_timestamp } @@ -96,16 +104,26 @@ fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! { // peek shows the first job for the given architecture that's ready to be // executed, if present. -pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { +pub fn (mut q BuildJobQueue) peek(arch string) ?BuildJob { rlock q.mutex { if arch !in q.queues { return none } - job := q.queues[arch].peek() or { return none } + for { + job := q.queues[arch].peek() or { return none } - if job.timestamp < time.now() { - return job + // Skip any invalidated jobs + if job.config.target_id in q.invalidated + && job.created < q.invalidated[job.config.target_id] { + // This pop *should* never fail according to the source code + q.queues[arch].pop() or { return none } + continue + } + + if job.timestamp < time.now() { + return job + } } } @@ -120,17 +138,27 @@ pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob { return none } - mut job := q.queues[arch].peek() or { return none } + for { + mut job := q.queues[arch].peek() or { return none } - if job.timestamp < time.now() { - job = q.queues[arch].pop()? + // Skip any invalidated jobs + if job.config.target_id in q.invalidated + && job.created < q.invalidated[job.config.target_id] { + // This pop *should* never fail according to the source code + q.queues[arch].pop() or { return none } + continue + } - // TODO how do we handle this properly? Is it even possible for a - // cron expression to not return a next time if it's already been - // used before? - q.reschedule(job, arch) or {} + if job.timestamp < time.now() { + job = q.queues[arch].pop()? - return job + // TODO how do we handle this properly? Is it even possible for a + // cron expression to not return a next time if it's already been + // used before? + q.reschedule(job, arch) or {} + + return job + } } } @@ -146,18 +174,28 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { mut out := []BuildJob{} - for out.len < n { - mut job := q.queues[arch].peek() or { break } + outer: for out.len < n { + for { + mut job := q.queues[arch].peek() or { break outer } - if job.timestamp < time.now() { - job = q.queues[arch].pop() or { break } + // Skip any invalidated jobs + if job.config.target_id in q.invalidated + && job.created < q.invalidated[job.config.target_id] { + // This pop *should* never fail according to the source code + q.queues[arch].pop() or { break outer } + continue + } - // TODO idem - q.reschedule(job, arch) or {} + if job.timestamp < time.now() { + job = q.queues[arch].pop() or { break outer } - out << job - } else { - break + // TODO idem + q.reschedule(job, arch) or {} + + out << job + } else { + break outer + } } } @@ -166,3 +204,8 @@ pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { return [] } + +// invalidate a target's old build jobs. +pub fn (mut q BuildJobQueue) invalidate(target_id int) { + q.invalidated[target_id] = time.now() +}