From 9a49d96e202208453169524585eb28882628f10f Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 6 Dec 2022 14:11:17 +0100 Subject: [PATCH] feat(build): start of server-side job queue --- src/agent/agent.v | 2 +- src/agent/cli.v | 16 +++++----- src/agent/daemon.v | 10 +++--- src/build/build.v | 10 +++--- src/build/queue.v | 70 +++++++++++++++++++++++++++++++++++++++++ src/main.v | 2 +- src/server/api_builds.v | 39 +++++++++++++++++++++++ src/server/cli.v | 14 +++++---- src/server/server.v | 39 ++++++++++++++++++++++- 9 files changed, 174 insertions(+), 28 deletions(-) create mode 100644 src/build/queue.v create mode 100644 src/server/api_builds.v diff --git a/src/agent/agent.v b/src/agent/agent.v index 3affd218..1758c85d 100644 --- a/src/agent/agent.v +++ b/src/agent/agent.v @@ -20,6 +20,6 @@ pub fn agent(conf Config) ! { logger.set_full_logpath(log_file) logger.log_to_console_too() - mut d := agent.agent_init(logger, conf) + mut d := agent_init(logger, conf) d.run() } diff --git a/src/agent/cli.v b/src/agent/cli.v index 46942ec8..a0a249cc 100644 --- a/src/agent/cli.v +++ b/src/agent/cli.v @@ -5,15 +5,15 @@ import conf as vconf struct Config { pub: - log_level string = 'WARN' - api_key string - address string - data_dir string - max_concurrent_builds int = 1 - polling_frequency int = 30 + log_level string = 'WARN' + api_key string + address string + data_dir string + max_concurrent_builds int = 1 + polling_frequency int = 30 // Architecture of agent - /* arch string */ - /* image_rebuild_frequency int = 1440 */ + // arch string + // image_rebuild_frequency int = 1440 } // cmd returns the cli module that handles the cron daemon. diff --git a/src/agent/daemon.v b/src/agent/daemon.v index 389a1485..fd5fe04c 100644 --- a/src/agent/daemon.v +++ b/src/agent/daemon.v @@ -13,13 +13,13 @@ const ( struct AgentDaemon { logger shared log.Log - conf Config + conf Config // Which builds are currently running; length is same as // conf.max_concurrent_builds builds []BuildConfig // Atomic variables used to detect when a build has finished; length is the // same as conf.max_concurrent_builds - client client.Client + client client.Client atomics []u64 } @@ -39,10 +39,8 @@ pub fn (mut d AgentDaemon) run() { for { free_builds := d.update_atomics() - if free_builds > 0 { - - } - + if free_builds > 0 { + } } } diff --git a/src/build/build.v b/src/build/build.v index b7c5cb6d..13d3e451 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -18,11 +18,11 @@ const ( pub struct BuildConfig { pub: - id int - kind string - url string - branch string - repo string + target_id int + kind string + url string + branch string + repo string base_image string } diff --git a/src/build/queue.v b/src/build/queue.v new file mode 100644 index 00000000..81d3fa91 --- /dev/null +++ b/src/build/queue.v @@ -0,0 +1,70 @@ +module build + +import models { Target } +import cron.expression { CronExpression, parse_expression } +import time +import datatypes { MinHeap } + +struct BuildJob { +pub: + // Earliest point this + timestamp time.Time + config BuildConfig +} + +// Overloaded operator for comparing ScheduledBuild objects +fn (r1 BuildJob) < (r2 BuildJob) bool { + return r1.timestamp < r2.timestamp +} + +pub struct BuildJobQueue { + // Schedule to use for targets without explicitely defined cron expression + default_schedule CronExpression + // Base image to use for targets without defined base image + default_base_image string +mut: + // For each architecture, a priority queue is tracked + queues map[string]MinHeap + // Each queued build job is also stored in a map, with the keys being the + // target IDs. This is used when removing or editing targets. + // jobs map[int]BuildJob +} + +pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { + return BuildJobQueue{ + default_schedule: default_schedule + default_base_image: default_base_image + } +} + +// insert a new job into the queue for a given target on an architecture. +pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { + if arch !in q.queues { + q.queues[arch] = MinHeap{} + } + + ce := if target.schedule != '' { + parse_expression(target.schedule) or { + return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") + } + } else { + q.default_schedule + } + + timestamp := ce.next_from_now()! + + job := BuildJob{ + timestamp: timestamp + config: BuildConfig{ + target_id: target.id + kind: target.kind + url: target.url + branch: target.branch + repo: target.repo + // TODO make this configurable + base_image: q.default_base_image + } + } + + q.queues[arch].insert(job) +} diff --git a/src/main.v b/src/main.v index 424e3288..34387bf4 100644 --- a/src/main.v +++ b/src/main.v @@ -41,7 +41,7 @@ fn main() { schedule.cmd(), man.cmd(), aur.cmd(), - agent.cmd() + agent.cmd(), ] } app.setup() diff --git a/src/server/api_builds.v b/src/server/api_builds.v new file mode 100644 index 00000000..888fe9df --- /dev/null +++ b/src/server/api_builds.v @@ -0,0 +1,39 @@ +module server + +/* import web */ +/* import web.response { new_data_response, new_response } */ +/* import time */ +/* import build { BuildConfig } */ +/* // import os */ +/* // import util */ +/* // import models { BuildLog, BuildLogFilter } */ + +/* ['/api/v1/builds/poll'; auth; get] */ +/* fn (mut app App) v1_poll_build_queue() web.Result { */ +/* arch := app.query['arch'] or { */ +/* return app.json(.bad_request, new_response('Missing arch query arg.')) */ +/* } */ + +/* max_str := app.query['max'] or { */ +/* return app.json(.bad_request, new_response('Missing max query arg.')) */ +/* } */ +/* max := max_str.int() */ + +/* mut out := []BuildConfig{} */ + +/* now := time.now() */ + +/* lock app.build_queues { */ +/* mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } */ + +/* for queue.len() > 0 && out.len < max { */ +/* next := queue.peek() or { return app.status(.internal_server_error) } */ + +/* if next.timestamp < now { */ +/* out << queue.pop() or { return app.status(.internal_server_error) }.config */ +/* } */ +/* } */ +/* } */ + +/* return app.json(.ok, new_data_response(out)) */ +/* } */ diff --git a/src/server/cli.v b/src/server/cli.v index a9644f35..2fede6ce 100644 --- a/src/server/cli.v +++ b/src/server/cli.v @@ -5,12 +5,14 @@ import conf as vconf struct Config { pub: - log_level string = 'WARN' - pkg_dir string - data_dir string - api_key string - default_arch string - port int = 8000 + log_level string = 'WARN' + pkg_dir string + data_dir string + api_key string + default_arch string + global_schedule string = '0 3' + port int = 8000 + base_image string = 'archlinux:base-devel' } // cmd returns the cli submodule that handles starting the server diff --git a/src/server/server.v b/src/server/server.v index d5f61359..fb45e6d7 100644 --- a/src/server/server.v +++ b/src/server/server.v @@ -6,6 +6,8 @@ import log import repo import util import db +import build { BuildJobQueue } +import cron.expression const ( log_file_name = 'vieter.log' @@ -20,9 +22,37 @@ pub: conf Config [required; web_global] pub mut: repo repo.RepoGroupManager [required; web_global] - db db.VieterDb + // Keys are the various architectures for packages + job_queue BuildJobQueue [required; web_global] + db db.VieterDb } +// fn (mut app App) init_build_queues() { +// // Initialize build queues +// mut i := 0 +// mut targets := app.db.get_targets(limit: 25) + +// default_ce := expression.parse_expression(conf.global_schedule) or { return } + +// for targets.len > 0 { +// for t in targets { +// ce := parse_expression(t.schedule) or { default_ce } + +// for arch in t.arch { +// if arch !in app.build_queues { +// app.build_queues[arch] = Minheap{} +// } + +// build_config := BuildConfig{} +// app.build_queues[arch].push(ScheduledBuild{ +// timestamp: ce.next() +// config: build_config +// }) +// } +// } +// } +//} + // server starts the web server & starts listening for requests pub fn server(conf Config) ! { // Prevent using 'any' as the default arch @@ -30,6 +60,10 @@ pub fn server(conf Config) ! { util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") } + global_ce := expression.parse_expression(conf.global_schedule) or { + util.exit_with_message(1, 'Invalid global cron expression: $err.msg()') + } + // Configure logger log_level := log.level_from_tag(conf.log_level) or { util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') @@ -71,11 +105,14 @@ pub fn server(conf Config) ! { util.exit_with_message(1, 'Failed to initialize database: $err.msg()') } + mut queue := build.new_job_queue(global_ce, conf.base_image) + web.run(&App{ logger: logger api_key: conf.api_key conf: conf repo: repo db: db + job_queue: queue }, conf.port) }