From 5cbfc0ebcb45e08b5d445d6e3997f7a92628a797 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 13 Dec 2022 17:42:49 +0100 Subject: [PATCH] feat(agent): clean up code a bit; add frequent polling when active --- src/agent/agent.v | 6 ++-- src/agent/cli.v | 16 +++++----- src/agent/daemon.v | 78 ++++++++++++++++++++++++++++------------------ src/agent/images.v | 31 +++++++++++++----- src/agent/log.v | 2 +- 5 files changed, 83 insertions(+), 50 deletions(-) diff --git a/src/agent/agent.v b/src/agent/agent.v index 1758c85..69b9947 100644 --- a/src/agent/agent.v +++ b/src/agent/agent.v @@ -2,12 +2,12 @@ module agent import log import os +import util const log_file_name = 'vieter.agent.log' -// agent start an agent service +// agent starts an agent service pub fn agent(conf Config) ! { - // Configure logger log_level := log.level_from_tag(conf.log_level) or { return error('Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') } @@ -16,6 +16,8 @@ pub fn agent(conf Config) ! { level: log_level } + os.mkdir_all(conf.data_dir) or { util.exit_with_message(1, 'Failed to create data directory.') } + log_file := os.join_path_single(conf.data_dir, agent.log_file_name) logger.set_full_logpath(log_file) logger.log_to_console_too() diff --git a/src/agent/cli.v b/src/agent/cli.v index a375f08..1535e17 100644 --- a/src/agent/cli.v +++ b/src/agent/cli.v @@ -7,14 +7,12 @@ struct Config { pub: log_level string = 'WARN' // Architecture that the agent represents - arch string - api_key string - address string - data_dir string - max_concurrent_builds int = 1 - polling_frequency int = 30 - // Architecture of agent - // arch string + arch string + api_key string + address string + data_dir string + max_concurrent_builds int = 1 + polling_frequency int = 30 image_rebuild_frequency int = 1440 } @@ -22,7 +20,7 @@ pub: pub fn cmd() cli.Command { return cli.Command{ name: 'agent' - description: 'Start an agent service & start polling for new builds.' + description: 'Start an agent daemon.' execute: fn (cmd cli.Command) ! { config_file := cmd.flags.get_string('config-file')! conf := vconf.load(prefix: 'VIETER_', default_path: config_file)! diff --git a/src/agent/daemon.v b/src/agent/daemon.v index f060863..f753e25 100644 --- a/src/agent/daemon.v +++ b/src/agent/daemon.v @@ -16,17 +16,17 @@ const ( struct AgentDaemon { logger shared log.Log conf Config + client client.Client mut: images ImageManager - // Which builds are currently running; length is same as - // conf.max_concurrent_builds + // Which builds are currently running; length is conf.max_concurrent_builds builds []BuildConfig - // Atomic variables used to detect when a build has finished; length is the - // same as conf.max_concurrent_builds - client client.Client + // Atomic variables used to detect when a build has finished; length is + // conf.max_concurrent_builds atomics []u64 } +// agent_init initializes a new agent fn agent_init(logger log.Log, conf Config) AgentDaemon { mut d := AgentDaemon{ logger: logger @@ -40,37 +40,49 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon { return d } +// run starts the actual agent daemon. This function will run forever. pub fn (mut d AgentDaemon) run() { // This is just so that the very first time the loop is ran, the jobs are // always polled mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency) + mut sleep_time := 1 * time.second + mut finished, mut empty := 0, 0 for { - free_builds := d.update_atomics() + finished, empty = d.update_atomics() - // All build slots are taken, so there's nothing to be done - if free_builds == 0 { + // No new finished builds and no free slots, so there's nothing to be + // done + if finished + empty == 0 { time.sleep(1 * time.second) continue } // Builds have finished, so old builder images might have freed up. - d.images.clean_old_images() + // TODO this might query the docker daemon too frequently. + if finished > 0 { + d.images.clean_old_images() + } - // Poll for new jobs - if time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) { - new_configs := d.client.poll_jobs(d.conf.arch, free_builds) or { + // The agent will always poll for new jobs after at most + // `polling_frequency` seconds. However, when jobs have finished, the + // agent will also poll for new jobs. This is because jobs are often + // clustered together (especially when mostly using the global cron + // schedule), so there's a much higher chance jobs are available. + if finished > 0 || time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) { + new_configs := d.client.poll_jobs(d.conf.arch, finished + empty) or { d.lerror('Failed to poll jobs: $err.msg()') + // TODO pick a better delay here time.sleep(5 * time.second) continue } last_poll_time = time.now() - // Schedule new jobs for config in new_configs { // TODO handle this better than to just skip the config - // Make sure a recent build base image is available for building the config + // Make sure a recent build base image is available for + // building the config d.images.refresh_image(config.base_image) or { d.lerror(err.msg()) continue @@ -78,40 +90,45 @@ pub fn (mut d AgentDaemon) run() { d.start_build(config) } - time.sleep(1 * time.second) - } - // Builds are running, so check again after one second - else if free_builds < d.conf.max_concurrent_builds { - time.sleep(1 * time.second) + // No new jobs were scheduled and the agent isn't doing anything, + // so we just wait until the next polling period. + if new_configs.len == 0 && finished + empty == d.conf.max_concurrent_builds { + sleep_time = time.now() - last_poll_time + } } // The agent is not doing anything, so we just wait until the next poll // time - else { - time_until_next_poll := time.now() - last_poll_time - time.sleep(time_until_next_poll) + else if finished + empty == d.conf.max_concurrent_builds { + sleep_time = time.now() - last_poll_time } + + time.sleep(sleep_time) } } // update_atomics checks for each build whether it's completed, and sets it to -// free again if so. The return value is how many build slots are currently -// free. -fn (mut d AgentDaemon) update_atomics() int { - mut count := 0 +// empty again if so. The return value is a tuple `(finished, empty)` where +// `finished` is how many builds were just finished and thus set to empty, and +// `empty` is how many build slots were already empty. The amount of running +// builds can then be calculated by substracting these two values from the +// total allowed concurrent builds. +fn (mut d AgentDaemon) update_atomics() (int, int) { + mut finished := 0 + mut empty := 0 for i in 0 .. d.atomics.len { if stdatomic.load_u64(&d.atomics[i]) == agent.build_done { stdatomic.store_u64(&d.atomics[i], agent.build_empty) - count++ + finished++ } else if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { - count++ + empty++ } } - return count + return finished, empty } -// start_build starts a build for the given BuildConfig object. +// start_build starts a build for the given BuildConfig. fn (mut d AgentDaemon) start_build(config BuildConfig) bool { for i in 0 .. d.atomics.len { if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { @@ -149,6 +166,7 @@ fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) { if status == 0 { d.linfo('finished build: $config.url -> $config.repo; uploading logs...') + // TODO use the arch value here build_arch := os.uname().machine d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch, res.exit_code, res.logs) or { diff --git a/src/agent/images.v b/src/agent/images.v index 78bf2d0..64a8f74 100644 --- a/src/agent/images.v +++ b/src/agent/images.v @@ -4,29 +4,42 @@ import time import docker import build +// An ImageManager is a utility that creates builder images from given base +// images, updating these builder images if they've become too old. This +// structure can manage images from any number of base images, paving the way +// for configurable base images per target/repository. struct ImageManager { mut: - refresh_frequency int - images map[string][]string [required] - timestamps map[string]time.Time [required] + max_image_age int [required] + // For each base images, one or more builder images can exist at the same + // time + images map[string][]string [required] + // For each base image, we track when its newest image was built + timestamps map[string]time.Time [required] } -fn new_image_manager(refresh_frequency int) ImageManager { +// new_image_manager initializes a new image manager. +fn new_image_manager(max_image_age int) ImageManager { return ImageManager{ - refresh_frequency: refresh_frequency + max_image_age: max_image_age images: map[string][]string{} timestamps: map[string]time.Time{} } } +// get returns the name of the newest image for the given base image. Note that +// this function should only be called *after* a first call to `refresh_image`. pub fn (m &ImageManager) get(base_image string) string { return m.images[base_image].last() } +// refresh_image builds a new builder image from the given base image if the +// previous builder image is too old or non-existent. This function will do +// nothing if these conditions aren't met, so it's safe to call it every time +// you want to ensure an image is up to date. fn (mut m ImageManager) refresh_image(base_image string) ! { - // No need to refresh the image if the previous one is still new enough if base_image in m.timestamps - && m.timestamps[base_image].add_seconds(m.refresh_frequency) > time.now() { + && m.timestamps[base_image].add_seconds(m.max_image_age) > time.now() { return } @@ -39,7 +52,9 @@ fn (mut m ImageManager) refresh_image(base_image string) ! { m.timestamps[base_image] = time.now() } -// clean_old_images tries to remove any old but still present builder images. +// clean_old_images removes all older builder images that are no longer in use. +// The function will always leave at least one builder image, namely the newest +// one. fn (mut m ImageManager) clean_old_images() { mut dd := docker.new_conn() or { return } diff --git a/src/agent/log.v b/src/agent/log.v index d47df0f..cd59207 100644 --- a/src/agent/log.v +++ b/src/agent/log.v @@ -2,7 +2,7 @@ module agent import log -// log reate a log message with the given level +// log a message with the given level pub fn (mut d AgentDaemon) log(msg string, level log.Level) { lock d.logger { d.logger.send_output(msg, level)