From 6f9e1b5f3cf02a5f60c419da6c71523a790d19bf Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 30 Apr 2022 11:31:14 +0200 Subject: [PATCH] feat(cron): start of working loop --- src/cron/daemon/build.v | 36 +++++++++++++++++++------- src/cron/daemon/daemon.v | 43 +++++++++++++++++++++----------- src/cron/expression/expression.v | 2 +- 3 files changed, 57 insertions(+), 24 deletions(-) diff --git a/src/cron/daemon/build.v b/src/cron/daemon/build.v index e7e5ac3..73ba183 100644 --- a/src/cron/daemon/build.v +++ b/src/cron/daemon/build.v @@ -1,9 +1,25 @@ module daemon -import git import time import sync.stdatomic +const build_empty = 0 +const build_running = 1 +const build_done = 2 + +// reschedule_builds looks for any builds with status code 2 & re-adds them to +// the queue. +fn (mut d Daemon) reschedule_builds() ? { + for i in 0..d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == build_done { + stdatomic.store_u64(&d.atomics[i], build_empty) + sb := d.builds[i] + + d.schedule_build(sb.repo_id, sb.repo) ? + } + } +} + // update_builds starts as many builds as possible. fn (mut d Daemon) update_builds() ? { now := time.now() @@ -13,7 +29,7 @@ fn (mut d Daemon) update_builds() ? { sb := d.queue.pop() ? // If this build couldn't be scheduled, no more will be possible. - if !d.start_build(sb.repo_id)? { + if !d.start_build(sb)? { break } } else { @@ -22,13 +38,14 @@ fn (mut d Daemon) update_builds() ? { } } -// start_build starts a build for the given repo_id. -fn (mut d Daemon) start_build(repo_id string) ?bool { +// start_build starts a build for the given ScheduledBuild object. +fn (mut d Daemon) start_build(sb ScheduledBuild) ?bool { for i in 0..d.atomics.len { - if stdatomic.load_u64(&d.atomics[i]) == 0 { - stdatomic.store_u64(&d.atomics[i], 1) + if stdatomic.load_u64(&d.atomics[i]) == build_empty { + stdatomic.store_u64(&d.atomics[i], build_running) + d.builds[i] = sb - go d.run_build(i, d.repos_map[repo_id]) + go d.run_build(i, sb) return true } @@ -37,9 +54,10 @@ fn (mut d Daemon) start_build(repo_id string) ?bool { return false } -fn (mut d Daemon) run_build(build_index int, repo git.GitRepo) ? { +// run_build actually starts the build process for a given repo. +fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { time.sleep(10 * time.second) - stdatomic.store_u64(&d.atomics[build_index], 2) + stdatomic.store_u64(&d.atomics[build_index], build_done) } diff --git a/src/cron/daemon/daemon.v b/src/cron/daemon/daemon.v index fc917e4..816bc15 100644 --- a/src/cron/daemon/daemon.v +++ b/src/cron/daemon/daemon.v @@ -30,7 +30,7 @@ mut: api_update_timestamp time.Time queue MinHeap // Which builds are currently running - builds []git.GitRepo + builds []ScheduledBuild // Atomic variables used to detect when a build has finished; length is the // same as builds atomics []u64 @@ -47,7 +47,7 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st global_schedule: global_schedule api_update_frequency: api_update_frequency atomics: []u64{len: max_concurrent_builds} - builds: []git.GitRepo{len: max_concurrent_builds} + builds: []ScheduledBuild{len: max_concurrent_builds} logger: logger } @@ -62,14 +62,37 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st // periodically refreshes the list of repositories to ensure we stay in sync. pub fn (mut d Daemon) run() ? { for { + println('1') + // Cleans up finished builds, opening up spots for new builds + d.reschedule_builds() ? + println('2') + // Schedules new builds when possible d.update_builds() ? + println(d.queue) println(d.atomics) - time.sleep(60 * time.second) + time.sleep(10 * time.second) } } +// schedule_build adds the next occurence of the given repo build to the queue. +fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { + ce := parse_expression(repo.schedule) or { + d.lerror("Error while parsing cron expression '$repo.schedule' ($repo_id): $err.msg()") + + d.global_schedule + } + // A repo that can't be scheduled will just be skipped for now + timestamp := ce.next_from_now() ? + + d.queue.insert(ScheduledBuild{ + repo_id: repo_id + repo: repo + timestamp: timestamp + }) +} + fn (mut d Daemon) renew_repos() ? { mut new_repos := git.get_repos(d.address, d.api_key) ? @@ -101,19 +124,11 @@ fn (mut d Daemon) renew_queue() ? { } } + d.queue = new_queue + // For each repository in repos_map, parse their cron expression (or use // the default one if not present) & add them to the queue for id, repo in d.repos_map { - ce := parse_expression(repo.schedule) or { d.global_schedule } - // A repo that can't be scheduled will just be skipped for now - timestamp := ce.next(now) or { continue } - - new_queue.insert(ScheduledBuild{ - repo_id: id - repo: repo - timestamp: timestamp - }) + d.schedule_build(id, repo) ? } - - d.queue = new_queue } diff --git a/src/cron/expression/expression.v b/src/cron/expression/expression.v index c122585..6e11da2 100644 --- a/src/cron/expression/expression.v +++ b/src/cron/expression/expression.v @@ -114,7 +114,7 @@ pub fn (ce &CronExpression) next(ref time.Time) ?time.Time { }) } -fn (ce &CronExpression) next_from_now() ?time.Time { +pub fn (ce &CronExpression) next_from_now() ?time.Time { return ce.next(time.now()) }