diff --git a/src/agent/daemon.v b/src/agent/daemon.v index 71f4780..fd5fe04 100644 --- a/src/agent/daemon.v +++ b/src/agent/daemon.v @@ -14,8 +14,6 @@ const ( struct AgentDaemon { logger shared log.Log conf Config - // List of last built builder images - builder_images []string // Which builds are currently running; length is same as // conf.max_concurrent_builds builds []BuildConfig @@ -46,8 +44,9 @@ pub fn (mut d AgentDaemon) run() { } } -// update_atomics checks for each build whether it's completed, and sets it to -// free again if so. The return value is how many fields are now set to free. +// clean_finished_builds checks for each build whether it's completed, and sets +// it to free again if so. The return value is how many fields are now set to +// free. fn (mut d AgentDaemon) update_atomics() int { mut count := 0 diff --git a/src/agent/images.v b/src/agent/images.v deleted file mode 100644 index 454f85f..0000000 --- a/src/agent/images.v +++ /dev/null @@ -1,49 +0,0 @@ -module agent - -import time -import docker - -struct ImageManager { - images map[string]string - timestamps map[string]time.Time -} - -// clean_old_base_images tries to remove any old but still present builder -// images. -fn (mut d AgentDaemon) clean_old_base_images() { - mut i := 0 - - mut dd := docker.new_conn() or { - d.lerror('Failed to connect to Docker socket.') - return - } - - defer { - dd.close() or {} - } - - for i < d.builder_images.len - 1 { - // For each builder image, we try to remove it by calling the Docker - // API. If the function returns an error or false, that means the image - // wasn't deleted. Therefore, we move the index over. If the function - // returns true, the array's length has decreased by one so we don't - // move the index. - dd.remove_image(d.builder_images[i]) or { i += 1 } - } -} - -// rebuild_base_image builds a builder image from the given base image. -/* fn (mut d AgentDaemon) build_base_image(base_image string) bool { */ -/* d.linfo('Rebuilding builder image....') */ - -/* d.builder_images << build.create_build_image(d.base_image) or { */ -/* d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') */ -/* d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) */ - -/* return false */ -/* } */ - -/* d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) */ - -/* return true */ -/* } */ diff --git a/src/agent/log.v b/src/agent/log.v deleted file mode 100644 index d47df0f..0000000 --- a/src/agent/log.v +++ /dev/null @@ -1,35 +0,0 @@ -module agent - -import log - -// log reate a log message with the given level -pub fn (mut d AgentDaemon) log(msg string, level log.Level) { - lock d.logger { - d.logger.send_output(msg, level) - } -} - -// lfatal create a log message with the fatal level -pub fn (mut d AgentDaemon) lfatal(msg string) { - d.log(msg, log.Level.fatal) -} - -// lerror create a log message with the error level -pub fn (mut d AgentDaemon) lerror(msg string) { - d.log(msg, log.Level.error) -} - -// lwarn create a log message with the warn level -pub fn (mut d AgentDaemon) lwarn(msg string) { - d.log(msg, log.Level.warn) -} - -// linfo create a log message with the info level -pub fn (mut d AgentDaemon) linfo(msg string) { - d.log(msg, log.Level.info) -} - -// ldebug create a log message with the debug level -pub fn (mut d AgentDaemon) ldebug(msg string) { - d.log(msg, log.Level.debug) -} diff --git a/src/build/queue.v b/src/build/queue.v index b704926..d6a832d 100644 --- a/src/build/queue.v +++ b/src/build/queue.v @@ -4,20 +4,15 @@ import models { Target } import cron.expression { CronExpression, parse_expression } import time import datatypes { MinHeap } -import util struct BuildJob { pub: - // Next timestamp from which point this job is allowed to be executed + // Earliest point this timestamp time.Time - // Required for calculating next timestamp after having pop'ed a job - ce CronExpression - // Actual build config sent to the agent - config BuildConfig + config BuildConfig } -// Allows BuildJob structs to be sorted according to their timestamp in -// MinHeaps +// Overloaded operator for comparing ScheduledBuild objects fn (r1 BuildJob) < (r2 BuildJob) bool { return r1.timestamp < r2.timestamp } @@ -28,12 +23,11 @@ pub struct BuildJobQueue { // Base image to use for targets without defined base image default_base_image string mut: - mutex shared util.Dummy // For each architecture, a priority queue is tracked queues map[string]MinHeap // Each queued build job is also stored in a map, with the keys being the // target IDs. This is used when removing or editing targets. - // jobs map[int]BuildJob + /* jobs map[int]BuildJob */ } pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue { @@ -43,127 +37,33 @@ pub fn new_job_queue(default_schedule CronExpression, default_base_image string) } } -// insert a new target's job into the queue for the given architecture. This -// job will then be endlessly rescheduled after being pop'ed, unless removed -// explicitely. -pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { - lock q.mutex { - if arch !in q.queues { - q.queues[arch] = MinHeap{} - } - - ce := if target.schedule != '' { - parse_expression(target.schedule) or { - return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") - } - } else { - q.default_schedule - } - - timestamp := ce.next_from_now()! - - job := BuildJob{ - timestamp: timestamp - ce: ce - config: BuildConfig{ - target_id: target.id - kind: target.kind - url: target.url - branch: target.branch - repo: target.repo - // TODO make this configurable - base_image: q.default_base_image - } - } - - dump(job) - q.queues[arch].insert(job) - } -} - -// reschedule the given job by calculating the next timestamp and re-adding it -// to its respective queue. This function is called by the pop functions -// *after* having pop'ed the job. -fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! { - new_timestamp := job.ce.next_from_now()! - - new_job := BuildJob{ - ...job - timestamp: new_timestamp +// insert a new job into the queue for a given target on an architecture. +pub fn (mut q BuildJobQueue) insert(target &Target, arch string) ! { + if arch !in q.queues { + q.queues[arch] = MinHeap{} } - q.queues[arch].insert(new_job) -} - -// peek shows the first job for the given architecture that's ready to be -// executed, if present. -pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { - rlock q.mutex { - if arch !in q.queues { - return none - } - - job := q.queues[arch].peek() or { return none } - - if job.timestamp < time.now() { - return job + ce := if target.schedule != '' { + parse_expression(target.schedule) or { + return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") } + } else { + q.default_schedule } - return none -} - -// pop removes the first job for the given architecture that's ready to be -// executed from the queue and returns it, if present. -pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob { - lock q.mutex { - if arch !in q.queues { - return none - } - - mut job := q.queues[arch].peek() or { return none } - - if job.timestamp < time.now() { - job = q.queues[arch].pop()? - - // TODO how do we handle this properly? Is it even possible for a - // cron expression to not return a next time if it's already been - // used before? - q.reschedule(job, arch) or {} - - return job - } - } - - return none -} - -// pop_n tries to pop at most n available jobs for the given architecture. -pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { - lock q.mutex { - if arch !in q.queues { - return [] - } - - mut out := []BuildJob{} - - for out.len < n { - mut job := q.queues[arch].peek() or { break } - - if job.timestamp < time.now() { - job = q.queues[arch].pop() or { break } - - // TODO idem - q.reschedule(job, arch) or {} - - out << job - } else { - break - } - } - - return out - } - - return [] + timestamp := ce.next_from_now()! + +job := BuildJob{ + timestamp: timestamp + config: BuildJob{ + target_id: target.id + kind: target.kind + url: target.url + branch: target.branch + repo: target.repo + // TODO make this configurable + base_image: q.default_base_image + }} + + q.queues[arch].insert(job) } diff --git a/src/server/api_builds.v b/src/server/api_builds.v index ec3c8ec..34b34dc 100644 --- a/src/server/api_builds.v +++ b/src/server/api_builds.v @@ -2,12 +2,14 @@ module server import web import web.response { new_data_response, new_response } +import time +import build { BuildConfig } // import os // import util // import models { BuildLog, BuildLogFilter } -['/api/v1/jobs/poll'; auth; get] -fn (mut app App) v1_poll_job_queue() web.Result { +['/api/v1/builds/poll'; auth; get] +fn (mut app App) v1_poll_build_queue() web.Result { arch := app.query['arch'] or { return app.json(.bad_request, new_response('Missing arch query arg.')) } @@ -17,7 +19,21 @@ fn (mut app App) v1_poll_job_queue() web.Result { } max := max_str.int() - mut out := app.job_queue.pop_n(arch, max).map(it.config) + mut out := []BuildConfig{} + + now := time.now() + + lock app.build_queues { + mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } + + for queue.len() > 0 && out.len < max { + next := queue.peek() or { return app.status(.internal_server_error) } + + if next.timestamp < now { + out << queue.pop() or { return app.status(.internal_server_error) }.config + } + } + } return app.json(.ok, new_data_response(out)) } diff --git a/src/server/cli.v b/src/server/cli.v index 2fede6c..067725c 100644 --- a/src/server/cli.v +++ b/src/server/cli.v @@ -5,14 +5,13 @@ import conf as vconf struct Config { pub: - log_level string = 'WARN' - pkg_dir string - data_dir string - api_key string - default_arch string + log_level string = 'WARN' + pkg_dir string + data_dir string + api_key string + default_arch string global_schedule string = '0 3' - port int = 8000 - base_image string = 'archlinux:base-devel' + port int = 8000 } // cmd returns the cli submodule that handles starting the server diff --git a/src/server/server.v b/src/server/server.v index e2c19c2..e985052 100644 --- a/src/server/server.v +++ b/src/server/server.v @@ -6,7 +6,8 @@ import log import repo import util import db -import build { BuildJobQueue } +import datatypes { MinHeap } +import build { BuildConfig } import cron.expression const ( @@ -23,24 +24,37 @@ pub: pub mut: repo repo.RepoGroupManager [required; web_global] // Keys are the various architectures for packages - job_queue BuildJobQueue [required; web_global] - db db.VieterDb + build_queues shared map[string]MinHeap + db db.VieterDb } -fn (mut app App) init_job_queue() ! { +fn (mut app App) init_build_queues() { // Initialize build queues + mut i := 0 mut targets := app.db.get_targets(limit: 25) - mut i := u64(0) + + default_ce := expression.parse_expression(conf.global_schedule) or { + return + } for targets.len > 0 { - for target in targets { - for arch in target.arch { - app.job_queue.insert(target, arch.value)! + for t in targets { + ce := parse_expression(t.schedule) or { default_ce } + + for arch in t.arch { + if arch !in app.build_queues { + app.build_queues[arch] = Minheap{} + } + +build_config := BuildConfig{ + + } + app.build_queues[arch].push(ScheduledBuild{ + timestamp: ce.next() + config: build_config + }) } } - - i += 25 - targets = app.db.get_targets(limit: 25, offset: i) } } @@ -51,10 +65,6 @@ pub fn server(conf Config) ! { util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") } - global_ce := expression.parse_expression(conf.global_schedule) or { - util.exit_with_message(1, 'Invalid global cron expression: $err.msg()') - } - // Configure logger log_level := log.level_from_tag(conf.log_level) or { util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') @@ -96,17 +106,12 @@ pub fn server(conf Config) ! { util.exit_with_message(1, 'Failed to initialize database: $err.msg()') } - mut app := &App{ + web.run(&App{ logger: logger api_key: conf.api_key conf: conf repo: repo db: db - job_queue: build.new_job_queue(global_ce, conf.base_image) - } - app.init_job_queue() or { - util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()') - } - - web.run(app, conf.port) + build_queues: map[string]MinHeap{} + }, conf.port) }