feat(agent): clean up code a bit; add frequent polling when active

web-stuff
Jef Roosens 2022-12-13 17:42:49 +01:00
parent 6342789921
commit 5cbfc0ebcb
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 83 additions and 50 deletions

View File

@ -2,12 +2,12 @@ module agent
import log import log
import os import os
import util
const log_file_name = 'vieter.agent.log' const log_file_name = 'vieter.agent.log'
// agent start an agent service // agent starts an agent service
pub fn agent(conf Config) ! { pub fn agent(conf Config) ! {
// Configure logger
log_level := log.level_from_tag(conf.log_level) or { log_level := log.level_from_tag(conf.log_level) or {
return error('Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') return error('Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
} }
@ -16,6 +16,8 @@ pub fn agent(conf Config) ! {
level: log_level level: log_level
} }
os.mkdir_all(conf.data_dir) or { util.exit_with_message(1, 'Failed to create data directory.') }
log_file := os.join_path_single(conf.data_dir, agent.log_file_name) log_file := os.join_path_single(conf.data_dir, agent.log_file_name)
logger.set_full_logpath(log_file) logger.set_full_logpath(log_file)
logger.log_to_console_too() logger.log_to_console_too()

View File

@ -13,8 +13,6 @@ pub:
data_dir string data_dir string
max_concurrent_builds int = 1 max_concurrent_builds int = 1
polling_frequency int = 30 polling_frequency int = 30
// Architecture of agent
// arch string
image_rebuild_frequency int = 1440 image_rebuild_frequency int = 1440
} }
@ -22,7 +20,7 @@ pub:
pub fn cmd() cli.Command { pub fn cmd() cli.Command {
return cli.Command{ return cli.Command{
name: 'agent' name: 'agent'
description: 'Start an agent service & start polling for new builds.' description: 'Start an agent daemon.'
execute: fn (cmd cli.Command) ! { execute: fn (cmd cli.Command) ! {
config_file := cmd.flags.get_string('config-file')! config_file := cmd.flags.get_string('config-file')!
conf := vconf.load<Config>(prefix: 'VIETER_', default_path: config_file)! conf := vconf.load<Config>(prefix: 'VIETER_', default_path: config_file)!

View File

@ -16,17 +16,17 @@ const (
struct AgentDaemon { struct AgentDaemon {
logger shared log.Log logger shared log.Log
conf Config conf Config
client client.Client
mut: mut:
images ImageManager images ImageManager
// Which builds are currently running; length is same as // Which builds are currently running; length is conf.max_concurrent_builds
// conf.max_concurrent_builds
builds []BuildConfig builds []BuildConfig
// Atomic variables used to detect when a build has finished; length is the // Atomic variables used to detect when a build has finished; length is
// same as conf.max_concurrent_builds // conf.max_concurrent_builds
client client.Client
atomics []u64 atomics []u64
} }
// agent_init initializes a new agent
fn agent_init(logger log.Log, conf Config) AgentDaemon { fn agent_init(logger log.Log, conf Config) AgentDaemon {
mut d := AgentDaemon{ mut d := AgentDaemon{
logger: logger logger: logger
@ -40,37 +40,49 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
return d return d
} }
// run starts the actual agent daemon. This function will run forever.
pub fn (mut d AgentDaemon) run() { pub fn (mut d AgentDaemon) run() {
// This is just so that the very first time the loop is ran, the jobs are // This is just so that the very first time the loop is ran, the jobs are
// always polled // always polled
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency) mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
mut sleep_time := 1 * time.second
mut finished, mut empty := 0, 0
for { for {
free_builds := d.update_atomics() finished, empty = d.update_atomics()
// All build slots are taken, so there's nothing to be done // No new finished builds and no free slots, so there's nothing to be
if free_builds == 0 { // done
if finished + empty == 0 {
time.sleep(1 * time.second) time.sleep(1 * time.second)
continue continue
} }
// Builds have finished, so old builder images might have freed up. // Builds have finished, so old builder images might have freed up.
// TODO this might query the docker daemon too frequently.
if finished > 0 {
d.images.clean_old_images() d.images.clean_old_images()
}
// Poll for new jobs // The agent will always poll for new jobs after at most
if time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) { // `polling_frequency` seconds. However, when jobs have finished, the
new_configs := d.client.poll_jobs(d.conf.arch, free_builds) or { // agent will also poll for new jobs. This is because jobs are often
// clustered together (especially when mostly using the global cron
// schedule), so there's a much higher chance jobs are available.
if finished > 0 || time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) {
new_configs := d.client.poll_jobs(d.conf.arch, finished + empty) or {
d.lerror('Failed to poll jobs: $err.msg()') d.lerror('Failed to poll jobs: $err.msg()')
// TODO pick a better delay here
time.sleep(5 * time.second) time.sleep(5 * time.second)
continue continue
} }
last_poll_time = time.now() last_poll_time = time.now()
// Schedule new jobs
for config in new_configs { for config in new_configs {
// TODO handle this better than to just skip the config // TODO handle this better than to just skip the config
// Make sure a recent build base image is available for building the config // Make sure a recent build base image is available for
// building the config
d.images.refresh_image(config.base_image) or { d.images.refresh_image(config.base_image) or {
d.lerror(err.msg()) d.lerror(err.msg())
continue continue
@ -78,40 +90,45 @@ pub fn (mut d AgentDaemon) run() {
d.start_build(config) d.start_build(config)
} }
time.sleep(1 * time.second) // No new jobs were scheduled and the agent isn't doing anything,
// so we just wait until the next polling period.
if new_configs.len == 0 && finished + empty == d.conf.max_concurrent_builds {
sleep_time = time.now() - last_poll_time
} }
// Builds are running, so check again after one second
else if free_builds < d.conf.max_concurrent_builds {
time.sleep(1 * time.second)
} }
// The agent is not doing anything, so we just wait until the next poll // The agent is not doing anything, so we just wait until the next poll
// time // time
else { else if finished + empty == d.conf.max_concurrent_builds {
time_until_next_poll := time.now() - last_poll_time sleep_time = time.now() - last_poll_time
time.sleep(time_until_next_poll)
} }
time.sleep(sleep_time)
} }
} }
// update_atomics checks for each build whether it's completed, and sets it to // update_atomics checks for each build whether it's completed, and sets it to
// free again if so. The return value is how many build slots are currently // empty again if so. The return value is a tuple `(finished, empty)` where
// free. // `finished` is how many builds were just finished and thus set to empty, and
fn (mut d AgentDaemon) update_atomics() int { // `empty` is how many build slots were already empty. The amount of running
mut count := 0 // builds can then be calculated by substracting these two values from the
// total allowed concurrent builds.
fn (mut d AgentDaemon) update_atomics() (int, int) {
mut finished := 0
mut empty := 0
for i in 0 .. d.atomics.len { for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_done { if stdatomic.load_u64(&d.atomics[i]) == agent.build_done {
stdatomic.store_u64(&d.atomics[i], agent.build_empty) stdatomic.store_u64(&d.atomics[i], agent.build_empty)
count++ finished++
} else if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { } else if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
count++ empty++
} }
} }
return count return finished, empty
} }
// start_build starts a build for the given BuildConfig object. // start_build starts a build for the given BuildConfig.
fn (mut d AgentDaemon) start_build(config BuildConfig) bool { fn (mut d AgentDaemon) start_build(config BuildConfig) bool {
for i in 0 .. d.atomics.len { for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
@ -149,6 +166,7 @@ fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
if status == 0 { if status == 0 {
d.linfo('finished build: $config.url -> $config.repo; uploading logs...') d.linfo('finished build: $config.url -> $config.repo; uploading logs...')
// TODO use the arch value here
build_arch := os.uname().machine build_arch := os.uname().machine
d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch, d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch,
res.exit_code, res.logs) or { res.exit_code, res.logs) or {

View File

@ -4,29 +4,42 @@ import time
import docker import docker
import build import build
// An ImageManager is a utility that creates builder images from given base
// images, updating these builder images if they've become too old. This
// structure can manage images from any number of base images, paving the way
// for configurable base images per target/repository.
struct ImageManager { struct ImageManager {
mut: mut:
refresh_frequency int max_image_age int [required]
// For each base images, one or more builder images can exist at the same
// time
images map[string][]string [required] images map[string][]string [required]
// For each base image, we track when its newest image was built
timestamps map[string]time.Time [required] timestamps map[string]time.Time [required]
} }
fn new_image_manager(refresh_frequency int) ImageManager { // new_image_manager initializes a new image manager.
fn new_image_manager(max_image_age int) ImageManager {
return ImageManager{ return ImageManager{
refresh_frequency: refresh_frequency max_image_age: max_image_age
images: map[string][]string{} images: map[string][]string{}
timestamps: map[string]time.Time{} timestamps: map[string]time.Time{}
} }
} }
// get returns the name of the newest image for the given base image. Note that
// this function should only be called *after* a first call to `refresh_image`.
pub fn (m &ImageManager) get(base_image string) string { pub fn (m &ImageManager) get(base_image string) string {
return m.images[base_image].last() return m.images[base_image].last()
} }
// refresh_image builds a new builder image from the given base image if the
// previous builder image is too old or non-existent. This function will do
// nothing if these conditions aren't met, so it's safe to call it every time
// you want to ensure an image is up to date.
fn (mut m ImageManager) refresh_image(base_image string) ! { fn (mut m ImageManager) refresh_image(base_image string) ! {
// No need to refresh the image if the previous one is still new enough
if base_image in m.timestamps if base_image in m.timestamps
&& m.timestamps[base_image].add_seconds(m.refresh_frequency) > time.now() { && m.timestamps[base_image].add_seconds(m.max_image_age) > time.now() {
return return
} }
@ -39,7 +52,9 @@ fn (mut m ImageManager) refresh_image(base_image string) ! {
m.timestamps[base_image] = time.now() m.timestamps[base_image] = time.now()
} }
// clean_old_images tries to remove any old but still present builder images. // clean_old_images removes all older builder images that are no longer in use.
// The function will always leave at least one builder image, namely the newest
// one.
fn (mut m ImageManager) clean_old_images() { fn (mut m ImageManager) clean_old_images() {
mut dd := docker.new_conn() or { return } mut dd := docker.new_conn() or { return }

View File

@ -2,7 +2,7 @@ module agent
import log import log
// log reate a log message with the given level // log a message with the given level
pub fn (mut d AgentDaemon) log(msg string, level log.Level) { pub fn (mut d AgentDaemon) log(msg string, level log.Level) {
lock d.logger { lock d.logger {
d.logger.send_output(msg, level) d.logger.send_output(msg, level)