diff --git a/src/build/queue.v b/src/build/queue.v index e87024b..e74529c 100644 --- a/src/build/queue.v +++ b/src/build/queue.v @@ -13,7 +13,7 @@ pub mut: // Next timestamp from which point this job is allowed to be executed timestamp time.Time // Required for calculating next timestamp after having pop'ed a job - ce &CronExpression = unsafe { nil } + ce CronExpression // Actual build config sent to the agent config BuildConfig // Whether this is a one-time job @@ -30,7 +30,7 @@ fn (r1 BuildJob) < (r2 BuildJob) bool { // for each architecture. Agents receive jobs from this queue. pub struct BuildJobQueue { // Schedule to use for targets without explicitely defined cron expression - default_schedule &CronExpression + default_schedule CronExpression // Base image to use for targets without defined base image default_base_image string mut: @@ -44,9 +44,9 @@ mut: } // new_job_queue initializes a new job queue -pub fn new_job_queue(default_schedule &CronExpression, default_base_image string) BuildJobQueue { +pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { return BuildJobQueue{ - default_schedule: unsafe { default_schedule } + default_schedule: default_schedule default_base_image: default_base_image invalidated: map[int]time.Time{} } diff --git a/src/cron/cli.v b/src/cron/cli.v new file mode 100644 index 0000000..16a3537 --- /dev/null +++ b/src/cron/cli.v @@ -0,0 +1,32 @@ +module cron + +import cli +import conf as vconf + +struct Config { +pub: + log_level string = 'WARN' + api_key string + address string + data_dir string + base_image string = 'archlinux:base-devel' + max_concurrent_builds int = 1 + api_update_frequency int = 15 + image_rebuild_frequency int = 1440 + // Replicates the behavior of the original cron system + global_schedule string = '0 3' +} + +// cmd returns the cli module that handles the cron daemon. +pub fn cmd() cli.Command { + return cli.Command{ + name: 'cron' + description: 'Start the cron service that periodically runs builds.' + execute: fn (cmd cli.Command) ! { + config_file := cmd.flags.get_string('config-file')! + conf := vconf.load(prefix: 'VIETER_', default_path: config_file)! + + cron(conf)! + } + } +} diff --git a/src/cron/cron.v b/src/cron/cron.v new file mode 100644 index 0000000..f1d6b7b --- /dev/null +++ b/src/cron/cron.v @@ -0,0 +1,33 @@ +module cron + +import log +import cron.daemon +import cron.expression +import os + +const log_file_name = 'vieter.cron.log' + +// cron starts a cron daemon & starts periodically scheduling builds. +pub fn cron(conf Config) ! { + // Configure logger + log_level := log.level_from_tag(conf.log_level) or { + return error('Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') + } + + mut logger := log.Log{ + level: log_level + } + + log_file := os.join_path_single(conf.data_dir, cron.log_file_name) + logger.set_full_logpath(log_file) + logger.log_to_console_too() + + ce := expression.parse_expression(conf.global_schedule) or { + return error('Error while parsing global cron expression: $err.msg()') + } + + mut d := daemon.init_daemon(logger, conf.address, conf.api_key, conf.base_image, ce, + conf.max_concurrent_builds, conf.api_update_frequency, conf.image_rebuild_frequency)! + + d.run() +} diff --git a/src/cron/daemon/build.v b/src/cron/daemon/build.v new file mode 100644 index 0000000..42edc92 --- /dev/null +++ b/src/cron/daemon/build.v @@ -0,0 +1,115 @@ +module daemon + +import time +import sync.stdatomic +import build +import os + +const ( + build_empty = 0 + build_running = 1 + build_done = 2 +) + +// clean_finished_builds removes finished builds from the build slots & returns +// them. +fn (mut d Daemon) clean_finished_builds() []ScheduledBuild { + mut out := []ScheduledBuild{} + + for i in 0 .. d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == daemon.build_done { + stdatomic.store_u64(&d.atomics[i], daemon.build_empty) + out << d.builds[i] + } + } + + return out +} + +// update_builds starts as many builds as possible. +fn (mut d Daemon) start_new_builds() { + now := time.now() + + for d.queue.len() > 0 { + elem := d.queue.peek() or { + d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") + + break + } + + if elem.timestamp < now { + sb := d.queue.pop() or { + d.lerror("queue.pop() unexpectedly returned an error. This shouldn't happen.") + + break + } + + // If this build couldn't be scheduled, no more will be possible. + if !d.start_build(sb) { + d.queue.insert(sb) + break + } + } else { + break + } + } +} + +// start_build starts a build for the given ScheduledBuild object. +fn (mut d Daemon) start_build(sb ScheduledBuild) bool { + for i in 0 .. d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == daemon.build_empty { + stdatomic.store_u64(&d.atomics[i], daemon.build_running) + d.builds[i] = sb + + go d.run_build(i, sb) + + return true + } + } + + return false +} + +// run_build actually starts the build process for a given target. +fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) { + d.linfo('started build: $sb.target.url -> $sb.target.repo') + + // 0 means success, 1 means failure + mut status := 0 + + res := build.build_target(d.client.address, d.client.api_key, d.builder_images.last(), + &sb.target, false) or { + d.ldebug('build_target error: $err.msg()') + status = 1 + + build.BuildResult{} + } + + if status == 0 { + d.linfo('finished build: $sb.target.url -> $sb.target.repo; uploading logs...') + + build_arch := os.uname().machine + d.client.add_build_log(sb.target.id, res.start_time, res.end_time, build_arch, + res.exit_code, res.logs) or { + d.lerror('Failed to upload logs for build: $sb.target.url -> $sb.target.repo') + } + } else { + d.linfo('an error occured during build: $sb.target.url -> $sb.target.repo') + } + + stdatomic.store_u64(&d.atomics[build_index], daemon.build_done) +} + +// current_build_count returns how many builds are currently running. +fn (mut d Daemon) current_build_count() int { + mut res := 0 + + for i in 0 .. d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == daemon.build_running { + res += 1 + } + } + + return res +} diff --git a/src/cron/daemon/daemon.v b/src/cron/daemon/daemon.v new file mode 100644 index 0000000..b94dab8 --- /dev/null +++ b/src/cron/daemon/daemon.v @@ -0,0 +1,274 @@ +module daemon + +import time +import log +import datatypes { MinHeap } +import cron.expression { CronExpression, parse_expression } +import math +import build +import docker +import os +import client +import models { Target } + +const ( + // How many seconds to wait before retrying to update API if failed + api_update_retry_timeout = 5 + // How many seconds to wait before retrying to rebuild image if failed + rebuild_base_image_retry_timout = 30 +) + +struct ScheduledBuild { +pub: + target Target + timestamp time.Time +} + +// Overloaded operator for comparing ScheduledBuild objects +fn (r1 ScheduledBuild) < (r2 ScheduledBuild) bool { + return r1.timestamp < r2.timestamp +} + +pub struct Daemon { +mut: + client client.Client + base_image string + builder_images []string + global_schedule CronExpression + api_update_frequency int + image_rebuild_frequency int + // Targets currently loaded from API. + targets []Target + // At what point to update the list of targets. + api_update_timestamp time.Time + image_build_timestamp time.Time + queue MinHeap + // Which builds are currently running + builds []ScheduledBuild + // Atomic variables used to detect when a build has finished; length is the + // same as builds + atomics []u64 + logger shared log.Log +} + +// init_daemon initializes a new Daemon object. It renews the targets & +// populates the build queue for the first time. +pub fn init_daemon(logger log.Log, address string, api_key string, base_image string, global_schedule CronExpression, max_concurrent_builds int, api_update_frequency int, image_rebuild_frequency int) !Daemon { + mut d := Daemon{ + client: client.new(address, api_key) + base_image: base_image + global_schedule: global_schedule + api_update_frequency: api_update_frequency + image_rebuild_frequency: image_rebuild_frequency + atomics: []u64{len: max_concurrent_builds} + builds: []ScheduledBuild{len: max_concurrent_builds} + logger: logger + } + + // Initialize the targets & queue + d.renew_targets() + d.renew_queue() + if !d.rebuild_base_image() { + return error('The base image failed to build. The Vieter cron daemon cannot run without an initial builder image.') + } + + return d +} + +// run starts the actual daemon process. It runs builds when possible & +// periodically refreshes the list of targets to ensure we stay in sync. +pub fn (mut d Daemon) run() { + for { + finished_builds := d.clean_finished_builds() + + // Update the API's contents if needed & renew the queue + if time.now() >= d.api_update_timestamp { + d.renew_targets() + d.renew_queue() + } + // The finished builds should only be rescheduled if the API contents + // haven't been renewed. + else { + for sb in finished_builds { + d.schedule_build(sb.target) + } + } + + // TODO remove old builder images. + // This issue is less trivial than it sounds, because a build could + // still be running when the image has to be rebuilt. That would + // prevent the image from being removed. Therefore, we will need to + // keep track of a list or something & remove an image once we have + // made sure it isn't being used anymore. + if time.now() >= d.image_build_timestamp { + d.rebuild_base_image() + // In theory, executing this function here allows an old builder + // image to exist for at most image_rebuild_frequency minutes. + d.clean_old_base_images() + } + + // Schedules new builds when possible + d.start_new_builds() + + // If there are builds currently running, the daemon should refresh + // every second to clean up any finished builds & start new ones. + mut delay := time.Duration(1 * time.second) + + // Sleep either until we have to refresh the targets or when the next + // build has to start, with a minimum of 1 second. + if d.current_build_count() == 0 { + now := time.now() + delay = d.api_update_timestamp - now + + if d.queue.len() > 0 { + elem := d.queue.peek() or { + d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") + + // This is just a fallback option. In theory, queue.peek() + // should *never* return an error or none, because we check + // its len beforehand. + time.sleep(1) + continue + } + + time_until_next_job := elem.timestamp - now + + delay = math.min(delay, time_until_next_job) + } + } + + // We sleep for at least one second. This is to prevent the program + // from looping agressively when a cronjob can be scheduled, but + // there's no spots free for it to be started. + delay = math.max(delay, 1 * time.second) + + d.ldebug('Sleeping for ${delay}...') + + time.sleep(delay) + } +} + +// schedule_build adds the next occurence of the given targets build to the +// queue. +fn (mut d Daemon) schedule_build(target Target) { + ce := if target.schedule != '' { + parse_expression(target.schedule) or { + // TODO This shouldn't return an error if the expression is empty. + d.lerror("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") + + d.global_schedule + } + } else { + d.global_schedule + } + + // A target that can't be scheduled will just be skipped for now + timestamp := ce.next_from_now() or { + d.lerror("Couldn't calculate next timestamp from '$target.schedule'; skipping") + return + } + + d.queue.insert(ScheduledBuild{ + target: target + timestamp: timestamp + }) +} + +// renew_targets requests the newest list of targets from the server & replaces +// the old one. +fn (mut d Daemon) renew_targets() { + d.linfo('Renewing targets...') + + mut new_targets := d.client.get_all_targets() or { + d.lerror('Failed to renew targets. Retrying in ${daemon.api_update_retry_timeout}s...') + d.api_update_timestamp = time.now().add_seconds(daemon.api_update_retry_timeout) + + return + } + + // Filter out any targets that shouldn't run on this architecture + cur_arch := os.uname().machine + new_targets = new_targets.filter(it.arch.any(it.value == cur_arch)) + + d.targets = new_targets + + d.api_update_timestamp = time.now().add_seconds(60 * d.api_update_frequency) +} + +// renew_queue replaces the old queue with a new one that reflects the newest +// values in targets. +fn (mut d Daemon) renew_queue() { + d.linfo('Renewing queue...') + mut new_queue := MinHeap{} + + // Move any jobs that should have already started from the old queue onto + // the new one + now := time.now() + + // For some reason, using + // ```v + // for d.queue.len() > 0 && d.queue.peek() !.timestamp < now { + //``` + // here causes the function to prematurely just exit, without any errors or anything, very weird + // https://github.com/vlang/v/issues/14042 + for d.queue.len() > 0 { + elem := d.queue.pop() or { + d.lerror("queue.pop() returned an error. This shouldn't happen.") + continue + } + + if elem.timestamp < now { + new_queue.insert(elem) + } else { + break + } + } + + d.queue = new_queue + + // For each target in targets, parse their cron expression (or use the + // default one if not present) & add them to the queue + for target in d.targets { + d.schedule_build(target) + } +} + +// rebuild_base_image recreates the builder image. +fn (mut d Daemon) rebuild_base_image() bool { + d.linfo('Rebuilding builder image....') + + d.builder_images << build.create_build_image(d.base_image) or { + d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') + d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) + + return false + } + + d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) + + return true +} + +// clean_old_base_images tries to remove any old but still present builder +// images. +fn (mut d Daemon) clean_old_base_images() { + mut i := 0 + + mut dd := docker.new_conn() or { + d.lerror('Failed to connect to Docker socket.') + return + } + + defer { + dd.close() or {} + } + + for i < d.builder_images.len - 1 { + // For each builder image, we try to remove it by calling the Docker + // API. If the function returns an error or false, that means the image + // wasn't deleted. Therefore, we move the index over. If the function + // returns true, the array's length has decreased by one so we don't + // move the index. + dd.image_remove(d.builder_images[i]) or { i += 1 } + } +} diff --git a/src/cron/daemon/log.v b/src/cron/daemon/log.v new file mode 100644 index 0000000..95a50e7 --- /dev/null +++ b/src/cron/daemon/log.v @@ -0,0 +1,35 @@ +module daemon + +import log + +// log reate a log message with the given level +pub fn (mut d Daemon) log(msg string, level log.Level) { + lock d.logger { + d.logger.send_output(msg, level) + } +} + +// lfatal create a log message with the fatal level +pub fn (mut d Daemon) lfatal(msg string) { + d.log(msg, log.Level.fatal) +} + +// lerror create a log message with the error level +pub fn (mut d Daemon) lerror(msg string) { + d.log(msg, log.Level.error) +} + +// lwarn create a log message with the warn level +pub fn (mut d Daemon) lwarn(msg string) { + d.log(msg, log.Level.warn) +} + +// linfo create a log message with the info level +pub fn (mut d Daemon) linfo(msg string) { + d.log(msg, log.Level.info) +} + +// ldebug create a log message with the debug level +pub fn (mut d Daemon) ldebug(msg string) { + d.log(msg, log.Level.debug) +} diff --git a/src/cron/expression/c/expression.c b/src/cron/expression/c/expression.c index 3b03f7a..3d12604 100644 --- a/src/cron/expression/c/expression.c +++ b/src/cron/expression/c/expression.c @@ -7,14 +7,6 @@ CronExpression *ce_init() { return malloc(sizeof(CronExpression)); } -void ce_free(CronExpression *ce) { - free(ce->months); - free(ce->days); - free(ce->hours); - free(ce->minutes); - free(ce); -} - int ce_next(SimpleTime *out, CronExpression *ce, SimpleTime *ref) { // For all of these values, the rule is the following: if their value is // the length of their respective array in the CronExpression object, that diff --git a/src/cron/expression/c/expression.h b/src/cron/expression/c/expression.h index b849b9a..12dbdec 100644 --- a/src/cron/expression/c/expression.h +++ b/src/cron/expression/c/expression.h @@ -31,8 +31,6 @@ typedef struct simple_time { CronExpression *ce_init(); -void ce_free(CronExpression *ce); - /** * Given a */ diff --git a/src/cron/expression/expression.c.v b/src/cron/expression/expression.c.v index 452e512..27e6193 100644 --- a/src/cron/expression/expression.c.v +++ b/src/cron/expression/expression.c.v @@ -2,34 +2,30 @@ module expression #flag -I @VMODROOT/c #flag @VMODROOT/c/parse.o -#flag @VMODROOT/c/expression.o #include "expression.h" -[heap] pub struct C.CronExpression { - minutes &u8 - hours &u8 - days &u8 - months &u8 + minutes &u8 + hours &u8 + days &u8 + months &u8 minute_count u8 - hour_count u8 - day_count u8 - month_count u8 + hour_count u8 + day_count u8 + month_count u8 +} + +struct C.SimpleTime { + year int + month int + day int + hour int + minute int } pub type CronExpression = C.CronExpression -struct C.SimpleTime { - year int - month int - day int - hour int - minute int -} - -fn C.ce_init() &C.CronExpression - -fn C.ce_free(ce &C.CronExpression) +fn C.ce_init() &CronExpression fn C.ce_next(out &C.SimpleTime, ce &C.CronExpression, ref &C.SimpleTime) int diff --git a/src/cron/expression/expression.v b/src/cron/expression/expression.v index 208b9cb..095acf0 100644 --- a/src/cron/expression/expression.v +++ b/src/cron/expression/expression.v @@ -4,19 +4,16 @@ import time pub fn parse_expression(exp string) !&CronExpression { out := C.ce_init() + res := C.ce_parse_expression(out, exp.str) - if res != 0 { - return error('yuhh') - } - + if res != 0 { + return error('yuhh') + } + return out } -pub fn (ce &CronExpression) free() { - C.ce_free(ce) -} - pub fn (ce &CronExpression) next(ref time.Time) !time.Time { st := C.SimpleTime{ year: ref.year @@ -29,10 +26,10 @@ pub fn (ce &CronExpression) next(ref time.Time) !time.Time { out := C.SimpleTime{} res := C.ce_next(&out, ce, &st) - if res != 0 { - return error('yuhh') - } - + if res != 0 { + return error('yuhh') + } + return time.new_time(time.Time{ year: out.year month: out.month @@ -46,10 +43,10 @@ pub fn (ce &CronExpression) next_from_now() !time.Time { out := C.SimpleTime{} res := C.ce_next_from_now(&out, ce) - if res != 0 { - return error('yuhh') - } - + if res != 0 { + return error('yuhh') + } + return time.new_time(time.Time{ year: out.year month: out.month @@ -58,17 +55,3 @@ pub fn (ce &CronExpression) next_from_now() !time.Time { minute: out.minute }) } - -// next_n returns the n next occurences of the expression, given a starting -// time. -pub fn (ce &CronExpression) next_n(ref time.Time, n int) ![]time.Time { - mut times := []time.Time{cap: n} - - times << ce.next(ref)! - - for i in 1 .. n { - times << ce.next(times[i - 1])! - } - - return times -} diff --git a/src/main.v b/src/main.v index ce9ec81..1c8b816 100644 --- a/src/main.v +++ b/src/main.v @@ -9,6 +9,7 @@ import console.schedule import console.man import console.aur import console.repos +import cron import agent fn main() { @@ -42,6 +43,7 @@ fn main() { commands: [ server.cmd(), targets.cmd(), + cron.cmd(), logs.cmd(), schedule.cmd(), man.cmd(),