diff --git a/src/agent/agent.v b/src/agent/agent.v index 3affd21..1758c85 100644 --- a/src/agent/agent.v +++ b/src/agent/agent.v @@ -20,6 +20,6 @@ pub fn agent(conf Config) ! { logger.set_full_logpath(log_file) logger.log_to_console_too() - mut d := agent.agent_init(logger, conf) + mut d := agent_init(logger, conf) d.run() } diff --git a/src/agent/cli.v b/src/agent/cli.v index 46942ec..a375f08 100644 --- a/src/agent/cli.v +++ b/src/agent/cli.v @@ -5,15 +5,17 @@ import conf as vconf struct Config { pub: - log_level string = 'WARN' - api_key string - address string - data_dir string - max_concurrent_builds int = 1 - polling_frequency int = 30 + log_level string = 'WARN' + // Architecture that the agent represents + arch string + api_key string + address string + data_dir string + max_concurrent_builds int = 1 + polling_frequency int = 30 // Architecture of agent - /* arch string */ - /* image_rebuild_frequency int = 1440 */ + // arch string + image_rebuild_frequency int = 1440 } // cmd returns the cli module that handles the cron daemon. diff --git a/src/agent/daemon.v b/src/agent/daemon.v index 389a148..f060863 100644 --- a/src/agent/daemon.v +++ b/src/agent/daemon.v @@ -4,6 +4,8 @@ import log import sync.stdatomic import build { BuildConfig } import client +import time +import os const ( build_empty = 0 @@ -13,13 +15,15 @@ const ( struct AgentDaemon { logger shared log.Log - conf Config + conf Config +mut: + images ImageManager // Which builds are currently running; length is same as // conf.max_concurrent_builds builds []BuildConfig // Atomic variables used to detect when a build has finished; length is the // same as conf.max_concurrent_builds - client client.Client + client client.Client atomics []u64 } @@ -28,6 +32,7 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon { logger: logger client: client.new(conf.address, conf.api_key) conf: conf + images: new_image_manager(conf.image_rebuild_frequency) builds: []BuildConfig{len: conf.max_concurrent_builds} atomics: []u64{len: conf.max_concurrent_builds} } @@ -36,18 +41,60 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon { } pub fn (mut d AgentDaemon) run() { + // This is just so that the very first time the loop is ran, the jobs are + // always polled + mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency) + for { free_builds := d.update_atomics() - if free_builds > 0 { - - } - + // All build slots are taken, so there's nothing to be done + if free_builds == 0 { + time.sleep(1 * time.second) + continue + } + + // Builds have finished, so old builder images might have freed up. + d.images.clean_old_images() + + // Poll for new jobs + if time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) { + new_configs := d.client.poll_jobs(d.conf.arch, free_builds) or { + d.lerror('Failed to poll jobs: $err.msg()') + + time.sleep(5 * time.second) + continue + } + last_poll_time = time.now() + + // Schedule new jobs + for config in new_configs { + // TODO handle this better than to just skip the config + // Make sure a recent build base image is available for building the config + d.images.refresh_image(config.base_image) or { + d.lerror(err.msg()) + continue + } + d.start_build(config) + } + + time.sleep(1 * time.second) + } + // Builds are running, so check again after one second + else if free_builds < d.conf.max_concurrent_builds { + time.sleep(1 * time.second) + } + // The agent is not doing anything, so we just wait until the next poll + // time + else { + time_until_next_poll := time.now() - last_poll_time + time.sleep(time_until_next_poll) + } } } -// clean_finished_builds checks for each build whether it's completed, and sets -// it to free again if so. The return value is how many fields are now set to +// update_atomics checks for each build whether it's completed, and sets it to +// free again if so. The return value is how many build slots are currently // free. fn (mut d AgentDaemon) update_atomics() int { mut count := 0 @@ -63,3 +110,53 @@ fn (mut d AgentDaemon) update_atomics() int { return count } + +// start_build starts a build for the given BuildConfig object. +fn (mut d AgentDaemon) start_build(config BuildConfig) bool { + for i in 0 .. d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { + stdatomic.store_u64(&d.atomics[i], agent.build_running) + d.builds[i] = config + + go d.run_build(i, config) + + return true + } + } + + return false +} + +// run_build actually starts the build process for a given target. +fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) { + d.linfo('started build: $config.url -> $config.repo') + + // 0 means success, 1 means failure + mut status := 0 + + new_config := BuildConfig{ + ...config + base_image: d.images.get(config.base_image) + } + + res := build.build_config(d.client.address, d.client.api_key, new_config) or { + d.ldebug('build_config error: $err.msg()') + status = 1 + + build.BuildResult{} + } + + if status == 0 { + d.linfo('finished build: $config.url -> $config.repo; uploading logs...') + + build_arch := os.uname().machine + d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch, + res.exit_code, res.logs) or { + d.lerror('Failed to upload logs for build: $config.url -> $config.repo') + } + } else { + d.linfo('an error occured during build: $config.url -> $config.repo') + } + + stdatomic.store_u64(&d.atomics[build_index], agent.build_done) +} diff --git a/src/agent/images.v b/src/agent/images.v new file mode 100644 index 0000000..78bf2d0 --- /dev/null +++ b/src/agent/images.v @@ -0,0 +1,64 @@ +module agent + +import time +import docker +import build + +struct ImageManager { +mut: + refresh_frequency int + images map[string][]string [required] + timestamps map[string]time.Time [required] +} + +fn new_image_manager(refresh_frequency int) ImageManager { + return ImageManager{ + refresh_frequency: refresh_frequency + images: map[string][]string{} + timestamps: map[string]time.Time{} + } +} + +pub fn (m &ImageManager) get(base_image string) string { + return m.images[base_image].last() +} + +fn (mut m ImageManager) refresh_image(base_image string) ! { + // No need to refresh the image if the previous one is still new enough + if base_image in m.timestamps + && m.timestamps[base_image].add_seconds(m.refresh_frequency) > time.now() { + return + } + + // TODO use better image tags for built images + new_image := build.create_build_image(base_image) or { + return error('Failed to build builder image from base image $base_image') + } + + m.images[base_image] << new_image + m.timestamps[base_image] = time.now() +} + +// clean_old_images tries to remove any old but still present builder images. +fn (mut m ImageManager) clean_old_images() { + mut dd := docker.new_conn() or { return } + + defer { + dd.close() or {} + } + + mut i := 0 + + for image in m.images.keys() { + i = 0 + + for i < m.images[image].len - 1 { + // For each builder image, we try to remove it by calling the Docker + // API. If the function returns an error or false, that means the image + // wasn't deleted. Therefore, we move the index over. If the function + // returns true, the array's length has decreased by one so we don't + // move the index. + dd.remove_image(m.images[image][i]) or { i += 1 } + } + } +} diff --git a/src/agent/log.v b/src/agent/log.v new file mode 100644 index 0000000..d47df0f --- /dev/null +++ b/src/agent/log.v @@ -0,0 +1,35 @@ +module agent + +import log + +// log reate a log message with the given level +pub fn (mut d AgentDaemon) log(msg string, level log.Level) { + lock d.logger { + d.logger.send_output(msg, level) + } +} + +// lfatal create a log message with the fatal level +pub fn (mut d AgentDaemon) lfatal(msg string) { + d.log(msg, log.Level.fatal) +} + +// lerror create a log message with the error level +pub fn (mut d AgentDaemon) lerror(msg string) { + d.log(msg, log.Level.error) +} + +// lwarn create a log message with the warn level +pub fn (mut d AgentDaemon) lwarn(msg string) { + d.log(msg, log.Level.warn) +} + +// linfo create a log message with the info level +pub fn (mut d AgentDaemon) linfo(msg string) { + d.log(msg, log.Level.info) +} + +// ldebug create a log message with the debug level +pub fn (mut d AgentDaemon) ldebug(msg string) { + d.log(msg, log.Level.debug) +} diff --git a/src/build/build.v b/src/build/build.v index b7c5cb6..2d51156 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -18,11 +18,11 @@ const ( pub struct BuildConfig { pub: - id int - kind string - url string - branch string - repo string + target_id int + kind string + url string + branch string + repo string base_image string } @@ -103,10 +103,23 @@ pub: logs string } +pub fn build_target(address string, api_key string, base_image_id string, target &Target) !BuildResult { + config := BuildConfig{ + target_id: target.id + kind: target.kind + url: target.url + branch: target.branch + repo: target.repo + base_image: base_image_id + } + + return build_config(address, api_key, config) +} + // build_target builds, packages & publishes a given Arch package based on the // provided target. The base image ID should be of an image previously created // by create_build_image. It returns the logs of the container. -pub fn build_target(address string, api_key string, base_image_id string, target &Target) !BuildResult { +pub fn build_config(address string, api_key string, config BuildConfig) !BuildResult { mut dd := docker.new_conn()! defer { @@ -114,14 +127,14 @@ pub fn build_target(address string, api_key string, base_image_id string, target } build_arch := os.uname().machine - build_script := create_build_script(address, target, build_arch) + build_script := create_build_script(address, config, build_arch) // We convert the build script into a base64 string, which then gets passed // to the container as an env var base64_script := base64.encode_str(build_script) c := docker.NewContainer{ - image: '$base_image_id' + image: '$config.base_image' env: [ 'BUILD_SCRIPT=$base64_script', 'API_KEY=$api_key', diff --git a/src/build/queue.v b/src/build/queue.v new file mode 100644 index 0000000..29036e4 --- /dev/null +++ b/src/build/queue.v @@ -0,0 +1,168 @@ +module build + +import models { Target } +import cron.expression { CronExpression, parse_expression } +import time +import datatypes { MinHeap } +import util + +struct BuildJob { +pub: + // Next timestamp from which point this job is allowed to be executed + timestamp time.Time + // Required for calculating next timestamp after having pop'ed a job + ce CronExpression + // Actual build config sent to the agent + config BuildConfig +} + +// Allows BuildJob structs to be sorted according to their timestamp in +// MinHeaps +fn (r1 BuildJob) < (r2 BuildJob) bool { + return r1.timestamp < r2.timestamp +} + +pub struct BuildJobQueue { + // Schedule to use for targets without explicitely defined cron expression + default_schedule CronExpression + // Base image to use for targets without defined base image + default_base_image string +mut: + mutex shared util.Dummy + // For each architecture, a priority queue is tracked + queues map[string]MinHeap + // Each queued build job is also stored in a map, with the keys being the + // target IDs. This is used when removing or editing targets. + // jobs map[int]BuildJob +} + +pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { + return BuildJobQueue{ + default_schedule: default_schedule + default_base_image: default_base_image + } +} + +// insert a new target's job into the queue for the given architecture. This +// job will then be endlessly rescheduled after being pop'ed, unless removed +// explicitely. +pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { + lock q.mutex { + if arch !in q.queues { + q.queues[arch] = MinHeap{} + } + + ce := if target.schedule != '' { + parse_expression(target.schedule) or { + return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") + } + } else { + q.default_schedule + } + + timestamp := ce.next_from_now()! + + job := BuildJob{ + timestamp: timestamp + ce: ce + config: BuildConfig{ + target_id: target.id + kind: target.kind + url: target.url + branch: target.branch + repo: target.repo + // TODO make this configurable + base_image: q.default_base_image + } + } + + q.queues[arch].insert(job) + } +} + +// reschedule the given job by calculating the next timestamp and re-adding it +// to its respective queue. This function is called by the pop functions +// *after* having pop'ed the job. +fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! { + new_timestamp := job.ce.next_from_now()! + + new_job := BuildJob{ + ...job + timestamp: new_timestamp + } + + q.queues[arch].insert(new_job) +} + +// peek shows the first job for the given architecture that's ready to be +// executed, if present. +pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { + rlock q.mutex { + if arch !in q.queues { + return none + } + + job := q.queues[arch].peek() or { return none } + + if job.timestamp < time.now() { + return job + } + } + + return none +} + +// pop removes the first job for the given architecture that's ready to be +// executed from the queue and returns it, if present. +pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob { + lock q.mutex { + if arch !in q.queues { + return none + } + + mut job := q.queues[arch].peek() or { return none } + + if job.timestamp < time.now() { + job = q.queues[arch].pop()? + + // TODO how do we handle this properly? Is it even possible for a + // cron expression to not return a next time if it's already been + // used before? + q.reschedule(job, arch) or {} + + return job + } + } + + return none +} + +// pop_n tries to pop at most n available jobs for the given architecture. +pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { + lock q.mutex { + if arch !in q.queues { + return [] + } + + mut out := []BuildJob{} + + for out.len < n { + mut job := q.queues[arch].peek() or { break } + + if job.timestamp < time.now() { + job = q.queues[arch].pop() or { break } + + // TODO idem + q.reschedule(job, arch) or {} + + out << job + } else { + break + } + } + + return out + } + + return [] +} diff --git a/src/build/shell.v b/src/build/shell.v index e573d53..c2d0c9b 100644 --- a/src/build/shell.v +++ b/src/build/shell.v @@ -1,7 +1,5 @@ module build -import models { Target } - // escape_shell_string escapes any characters that could be interpreted // incorrectly by a shell. The resulting value should be safe to use inside an // echo statement. @@ -23,13 +21,13 @@ pub fn echo_commands(cmds []string) []string { } // create_build_script generates a shell script that builds a given Target. -fn create_build_script(address string, target &Target, build_arch string) string { - repo_url := '$address/$target.repo' +fn create_build_script(address string, config BuildConfig, build_arch string) string { + repo_url := '$address/$config.repo' mut commands := [ // This will later be replaced by a proper setting for changing the // mirrorlist - "echo -e '[$target.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf" + "echo -e '[$config.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf" // We need to update the package list of the repo we just added above. // This should however not pull in a lot of packages as long as the // builder image is rebuilt frequently. @@ -38,22 +36,22 @@ fn create_build_script(address string, target &Target, build_arch string) string 'su builder', ] - commands << match target.kind { + commands << match config.kind { 'git' { - if target.branch == '' { + if config.branch == '' { [ - "git clone --single-branch --depth 1 '$target.url' repo", + "git clone --single-branch --depth 1 '$config.url' repo", ] } else { [ - "git clone --single-branch --depth 1 --branch $target.branch '$target.url' repo", + "git clone --single-branch --depth 1 --branch $config.branch '$config.url' repo", ] } } 'url' { [ 'mkdir repo', - "curl -o repo/PKGBUILD -L '$target.url'", + "curl -o repo/PKGBUILD -L '$config.url'", ] } else { diff --git a/src/client/jobs.v b/src/client/jobs.v new file mode 100644 index 0000000..30f2531 --- /dev/null +++ b/src/client/jobs.v @@ -0,0 +1,12 @@ +module client + +import build { BuildConfig } + +pub fn (c &Client) poll_jobs(arch string, max int) ![]BuildConfig { + data := c.send_request<[]BuildConfig>(.get, '/api/v1/jobs/poll', { + 'arch': arch + 'max': max.str() + })! + + return data.data +} diff --git a/src/main.v b/src/main.v index 424e328..34387bf 100644 --- a/src/main.v +++ b/src/main.v @@ -41,7 +41,7 @@ fn main() { schedule.cmd(), man.cmd(), aur.cmd(), - agent.cmd() + agent.cmd(), ] } app.setup() diff --git a/src/server/api_builds.v b/src/server/api_builds.v new file mode 100644 index 0000000..ec3c8ec --- /dev/null +++ b/src/server/api_builds.v @@ -0,0 +1,23 @@ +module server + +import web +import web.response { new_data_response, new_response } +// import os +// import util +// import models { BuildLog, BuildLogFilter } + +['/api/v1/jobs/poll'; auth; get] +fn (mut app App) v1_poll_job_queue() web.Result { + arch := app.query['arch'] or { + return app.json(.bad_request, new_response('Missing arch query arg.')) + } + + max_str := app.query['max'] or { + return app.json(.bad_request, new_response('Missing max query arg.')) + } + max := max_str.int() + + mut out := app.job_queue.pop_n(arch, max).map(it.config) + + return app.json(.ok, new_data_response(out)) +} diff --git a/src/server/cli.v b/src/server/cli.v index a9644f3..2fede6c 100644 --- a/src/server/cli.v +++ b/src/server/cli.v @@ -5,12 +5,14 @@ import conf as vconf struct Config { pub: - log_level string = 'WARN' - pkg_dir string - data_dir string - api_key string - default_arch string - port int = 8000 + log_level string = 'WARN' + pkg_dir string + data_dir string + api_key string + default_arch string + global_schedule string = '0 3' + port int = 8000 + base_image string = 'archlinux:base-devel' } // cmd returns the cli submodule that handles starting the server diff --git a/src/server/server.v b/src/server/server.v index d5f6135..e2c19c2 100644 --- a/src/server/server.v +++ b/src/server/server.v @@ -6,6 +6,8 @@ import log import repo import util import db +import build { BuildJobQueue } +import cron.expression const ( log_file_name = 'vieter.log' @@ -20,7 +22,26 @@ pub: conf Config [required; web_global] pub mut: repo repo.RepoGroupManager [required; web_global] - db db.VieterDb + // Keys are the various architectures for packages + job_queue BuildJobQueue [required; web_global] + db db.VieterDb +} + +fn (mut app App) init_job_queue() ! { + // Initialize build queues + mut targets := app.db.get_targets(limit: 25) + mut i := u64(0) + + for targets.len > 0 { + for target in targets { + for arch in target.arch { + app.job_queue.insert(target, arch.value)! + } + } + + i += 25 + targets = app.db.get_targets(limit: 25, offset: i) + } } // server starts the web server & starts listening for requests @@ -30,6 +51,10 @@ pub fn server(conf Config) ! { util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") } + global_ce := expression.parse_expression(conf.global_schedule) or { + util.exit_with_message(1, 'Invalid global cron expression: $err.msg()') + } + // Configure logger log_level := log.level_from_tag(conf.log_level) or { util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') @@ -71,11 +96,17 @@ pub fn server(conf Config) ! { util.exit_with_message(1, 'Failed to initialize database: $err.msg()') } - web.run(&App{ + mut app := &App{ logger: logger api_key: conf.api_key conf: conf repo: repo db: db - }, conf.port) + job_queue: build.new_job_queue(global_ce, conf.base_image) + } + app.init_job_queue() or { + util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()') + } + + web.run(app, conf.port) } diff --git a/vieter.toml b/vieter.toml index d3922a4..9a68ae3 100644 --- a/vieter.toml +++ b/vieter.toml @@ -4,6 +4,7 @@ data_dir = "data" pkg_dir = "data/pkgs" log_level = "DEBUG" default_arch = "x86_64" +arch = "x86_64" address = "http://localhost:8000" @@ -11,4 +12,3 @@ global_schedule = '* *' api_update_frequency = 2 image_rebuild_frequency = 1 max_concurrent_builds = 3 -