From 325dcc27de1a69fb8fd82d0104f432899e7461b1 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 1 May 2022 09:14:33 +0200 Subject: [PATCH] fix(cron): made Daemon.run infallible --- src/cron/cron.v | 2 +- src/cron/daemon/build.v | 31 ++++++++++++--- src/cron/daemon/daemon.v | 82 ++++++++++++++++++++++++++++++---------- 3 files changed, 87 insertions(+), 28 deletions(-) diff --git a/src/cron/cron.v b/src/cron/cron.v index 49a379e9..e10e4dda 100644 --- a/src/cron/cron.v +++ b/src/cron/cron.v @@ -25,5 +25,5 @@ pub fn cron(conf Config) ? { mut d := daemon.init_daemon(logger, conf.address, conf.api_key, conf.base_image, ce, conf.max_concurrent_builds, conf.api_update_frequency, conf.image_rebuild_frequency) ? - d.run() ? + d.run() } diff --git a/src/cron/daemon/build.v b/src/cron/daemon/build.v index ec8be4d5..5b2e9ccb 100644 --- a/src/cron/daemon/build.v +++ b/src/cron/daemon/build.v @@ -12,7 +12,7 @@ const build_done = 2 // clean_finished_builds removes finished builds from the build slots & returns // them. -fn (mut d Daemon) clean_finished_builds() ?[]ScheduledBuild { +fn (mut d Daemon) clean_finished_builds() []ScheduledBuild { mut out := []ScheduledBuild{} for i in 0 .. d.atomics.len { @@ -26,12 +26,22 @@ fn (mut d Daemon) clean_finished_builds() ?[]ScheduledBuild { } // update_builds starts as many builds as possible. -fn (mut d Daemon) start_new_builds() ? { +fn (mut d Daemon) start_new_builds() { now := time.now() for d.queue.len() > 0 { - if d.queue.peek() ?.timestamp < now { - sb := d.queue.pop() ? + elem := d.queue.peek() or { + d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") + + break + } + + if elem.timestamp < now { + sb := d.queue.pop() or { + d.lerror("queue.pop() unexpectedly returned an error. This shouldn't happen.") + + break + } // If this build couldn't be scheduled, no more will be possible. if !d.start_build(sb) { @@ -61,10 +71,19 @@ fn (mut d Daemon) start_build(sb ScheduledBuild) bool { } // run_build actually starts the build process for a given repo. -fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { +fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) { d.linfo('started build: $sb.repo.url $sb.repo.branch') - build.build_repo(d.address, d.api_key, d.builder_images.last(), &sb.repo) ? + // 0 means success, 1 means failure + mut status := 0 + + build.build_repo(d.address, d.api_key, d.builder_images.last(), &sb.repo) or { status = 1 } + + if status == 0 { + d.linfo('finished build: $sb.repo.url $sb.repo.branch') + } else { + d.linfo('failed build: $sb.repo.url $sb.repo.branch') + } stdatomic.store_u64(&d.atomics[build_index], daemon.build_done) } diff --git a/src/cron/daemon/daemon.v b/src/cron/daemon/daemon.v index 729e94b0..088a24f6 100644 --- a/src/cron/daemon/daemon.v +++ b/src/cron/daemon/daemon.v @@ -9,6 +9,12 @@ import math import build import docker +// How many seconds to wait before retrying to update API if failed +const api_update_retry_timeout = 5 + +// How many seconds to wait before retrying to rebuild image if failed +const rebuild_base_image_retry_timout = 30 + struct ScheduledBuild { pub: repo_id string @@ -60,29 +66,31 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st } // Initialize the repos & queue - d.renew_repos() ? - d.renew_queue() ? - d.rebuild_base_image() ? + d.renew_repos() + d.renew_queue() + if !d.rebuild_base_image() { + return error('The base image failed to build. The Vieter cron daemon cannot run without an initial builder image.') + } return d } // run starts the actual daemon process. It runs builds when possible & // periodically refreshes the list of repositories to ensure we stay in sync. -pub fn (mut d Daemon) run() ? { +pub fn (mut d Daemon) run() { for { - finished_builds := d.clean_finished_builds() ? + finished_builds := d.clean_finished_builds() // Update the API's contents if needed & renew the queue if time.now() >= d.api_update_timestamp { - d.renew_repos() ? - d.renew_queue() ? + d.renew_repos() + d.renew_queue() } // The finished builds should only be rescheduled if the API contents // haven't been renewed. else { for sb in finished_builds { - d.schedule_build(sb.repo_id, sb.repo) ? + d.schedule_build(sb.repo_id, sb.repo) } } @@ -93,14 +101,14 @@ pub fn (mut d Daemon) run() ? { // keep track of a list or something & remove an image once we have // made sure it isn't being used anymore. if time.now() >= d.image_build_timestamp { - d.rebuild_base_image() ? + d.rebuild_base_image() // In theory, executing this function here allows an old builder // image to exist for at most image_rebuild_frequency minutes. d.clean_old_base_images() } // Schedules new builds when possible - d.start_new_builds() ? + d.start_new_builds() // If there are builds currently running, the daemon should refresh // every second to clean up any finished builds & start new ones. @@ -113,7 +121,17 @@ pub fn (mut d Daemon) run() ? { delay = d.api_update_timestamp - now if d.queue.len() > 0 { - time_until_next_job := d.queue.peek() ?.timestamp - now + elem := d.queue.peek() or { + d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") + + // This is just a fallback option. In theory, queue.peek() + // should *never* return an error or none, because we check + // its len beforehand. + time.sleep(1) + continue + } + + time_until_next_job := elem.timestamp - now delay = math.min(delay, time_until_next_job) } @@ -131,7 +149,7 @@ pub fn (mut d Daemon) run() ? { } // schedule_build adds the next occurence of the given repo build to the queue. -fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { +fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) { ce := if repo.schedule != '' { parse_expression(repo.schedule) or { // TODO This shouldn't return an error if the expression is empty. @@ -144,7 +162,10 @@ fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { } // A repo that can't be scheduled will just be skipped for now - timestamp := ce.next_from_now() ? + timestamp := ce.next_from_now() or { + d.lerror("Couldn't calculate next timestamp from '$repo.schedule'; skipping") + return + } d.queue.insert(ScheduledBuild{ repo_id: repo_id @@ -155,9 +176,15 @@ fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { // renew_repos requests the newest list of Git repos from the server & replaces // the old one. -fn (mut d Daemon) renew_repos() ? { +fn (mut d Daemon) renew_repos() { d.linfo('Renewing repos...') - mut new_repos := git.get_repos(d.address, d.api_key) ? + + mut new_repos := git.get_repos(d.address, d.api_key) or { + d.lerror('Failed to renew repos. Retrying in ${daemon.api_update_retry_timeout}s...') + d.api_update_timestamp = time.now().add_seconds(daemon.api_update_retry_timeout) + + return + } d.repos_map = new_repos.move() @@ -166,7 +193,7 @@ fn (mut d Daemon) renew_repos() ? { // renew_queue replaces the old queue with a new one that reflects the newest // values in repos_map. -fn (mut d Daemon) renew_queue() ? { +fn (mut d Daemon) renew_queue() { d.linfo('Renewing queue...') mut new_queue := MinHeap{} @@ -181,8 +208,13 @@ fn (mut d Daemon) renew_queue() ? { // here causes the function to prematurely just exit, without any errors or anything, very weird // https://github.com/vlang/v/issues/14042 for d.queue.len() > 0 { - if d.queue.peek() ?.timestamp < now { - new_queue.insert(d.queue.pop() ?) + elem := d.queue.pop() or { + d.lerror("queue.pop() returned an error. This shouldn't happen.") + continue + } + + if elem.timestamp < now { + new_queue.insert(elem) } else { break } @@ -193,16 +225,24 @@ fn (mut d Daemon) renew_queue() ? { // For each repository in repos_map, parse their cron expression (or use // the default one if not present) & add them to the queue for id, repo in d.repos_map { - d.schedule_build(id, repo) ? + d.schedule_build(id, repo) } } // rebuild_base_image recreates the builder image. -fn (mut d Daemon) rebuild_base_image() ? { +fn (mut d Daemon) rebuild_base_image() bool { d.linfo('Rebuilding builder image....') - d.builder_images << build.create_build_image(d.base_image) ? + d.builder_images << build.create_build_image(d.base_image) or { + d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') + d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) + + return false + } + d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) + + return true } // clean_old_base_images tries to remove any old but still present builder