diff --git a/src/build/queue.v b/src/build/queue.v index 81d3fa9..65b279e 100644 --- a/src/build/queue.v +++ b/src/build/queue.v @@ -4,6 +4,7 @@ import models { Target } import cron.expression { CronExpression, parse_expression } import time import datatypes { MinHeap } +import util struct BuildJob { pub: @@ -23,6 +24,7 @@ pub struct BuildJobQueue { // Base image to use for targets without defined base image default_base_image string mut: + mutex shared util.Dummy // For each architecture, a priority queue is tracked queues map[string]MinHeap // Each queued build job is also stored in a map, with the keys being the @@ -39,32 +41,95 @@ pub fn new_job_queue(default_schedule CronExpression, default_base_image string) // insert a new job into the queue for a given target on an architecture. pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { - if arch !in q.queues { - q.queues[arch] = MinHeap{} - } - - ce := if target.schedule != '' { - parse_expression(target.schedule) or { - return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") + lock q.mutex { + if arch !in q.queues { + q.queues[arch] = MinHeap{} } - } else { - q.default_schedule - } - timestamp := ce.next_from_now()! - - job := BuildJob{ - timestamp: timestamp - config: BuildConfig{ - target_id: target.id - kind: target.kind - url: target.url - branch: target.branch - repo: target.repo - // TODO make this configurable - base_image: q.default_base_image + ce := if target.schedule != '' { + parse_expression(target.schedule) or { + return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()") + } + } else { + q.default_schedule } - } - q.queues[arch].insert(job) + timestamp := ce.next_from_now()! + + job := BuildJob{ + timestamp: timestamp + config: BuildConfig{ + target_id: target.id + kind: target.kind + url: target.url + branch: target.branch + repo: target.repo + // TODO make this configurable + base_image: q.default_base_image + } + } + + q.queues[arch].insert(job) + } +} + +// peek shows the first job for the given architecture that's ready to be +// executed, if present. +pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob { + rlock q.mutex { + if arch !in q.queues { + return none + } + + job := q.queues[arch].peek() or { return none } + + if job.timestamp < time.now() { + return job + } + } + + return none +} + +// pop removes the first job for the given architecture that's ready to be +// executed from the queue and returns it, if present. +pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob { + lock q.mutex { + if arch !in q.queues { + return none + } + + job := q.queues[arch].peek() or { return none } + + if job.timestamp < time.now() { + return q.queues[arch].pop() + } + } + + return none +} + +// pop_n tries to pop at most n available jobs for the given architecture. +pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob { + lock q.mutex { + if arch !in q.queues { + return [] + } + + mut out := []BuildJob{} + + for out.len < n { + job := q.queues[arch].peek() or { break } + + if job.timestamp < time.now() { + out << q.queues[arch].pop() or { break } + } else { + break + } + } + + return out + } + + return [] } diff --git a/src/server/api_builds.v b/src/server/api_builds.v index 888fe9d..62948cd 100644 --- a/src/server/api_builds.v +++ b/src/server/api_builds.v @@ -1,39 +1,23 @@ module server -/* import web */ -/* import web.response { new_data_response, new_response } */ -/* import time */ -/* import build { BuildConfig } */ -/* // import os */ -/* // import util */ -/* // import models { BuildLog, BuildLogFilter } */ +import web +import web.response { new_data_response, new_response } +// import os +// import util +// import models { BuildLog, BuildLogFilter } -/* ['/api/v1/builds/poll'; auth; get] */ -/* fn (mut app App) v1_poll_build_queue() web.Result { */ -/* arch := app.query['arch'] or { */ -/* return app.json(.bad_request, new_response('Missing arch query arg.')) */ -/* } */ +['/api/v1/builds/poll'; auth; get] +fn (mut app App) v1_poll_build_queue() web.Result { + arch := app.query['arch'] or { + return app.json(.bad_request, new_response('Missing arch query arg.')) + } -/* max_str := app.query['max'] or { */ -/* return app.json(.bad_request, new_response('Missing max query arg.')) */ -/* } */ -/* max := max_str.int() */ + max_str := app.query['max'] or { + return app.json(.bad_request, new_response('Missing max query arg.')) + } + max := max_str.int() -/* mut out := []BuildConfig{} */ + mut out := app.job_queue.pop_n(arch, max) -/* now := time.now() */ - -/* lock app.build_queues { */ -/* mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } */ - -/* for queue.len() > 0 && out.len < max { */ -/* next := queue.peek() or { return app.status(.internal_server_error) } */ - -/* if next.timestamp < now { */ -/* out << queue.pop() or { return app.status(.internal_server_error) }.config */ -/* } */ -/* } */ -/* } */ - -/* return app.json(.ok, new_data_response(out)) */ -/* } */ + return app.json(.ok, new_data_response(out)) +} diff --git a/src/server/server.v b/src/server/server.v index fb45e6d..e2c19c2 100644 --- a/src/server/server.v +++ b/src/server/server.v @@ -24,34 +24,25 @@ pub mut: repo repo.RepoGroupManager [required; web_global] // Keys are the various architectures for packages job_queue BuildJobQueue [required; web_global] - db db.VieterDb + db db.VieterDb } -// fn (mut app App) init_build_queues() { -// // Initialize build queues -// mut i := 0 -// mut targets := app.db.get_targets(limit: 25) +fn (mut app App) init_job_queue() ! { + // Initialize build queues + mut targets := app.db.get_targets(limit: 25) + mut i := u64(0) -// default_ce := expression.parse_expression(conf.global_schedule) or { return } + for targets.len > 0 { + for target in targets { + for arch in target.arch { + app.job_queue.insert(target, arch.value)! + } + } -// for targets.len > 0 { -// for t in targets { -// ce := parse_expression(t.schedule) or { default_ce } - -// for arch in t.arch { -// if arch !in app.build_queues { -// app.build_queues[arch] = Minheap{} -// } - -// build_config := BuildConfig{} -// app.build_queues[arch].push(ScheduledBuild{ -// timestamp: ce.next() -// config: build_config -// }) -// } -// } -// } -//} + i += 25 + targets = app.db.get_targets(limit: 25, offset: i) + } +} // server starts the web server & starts listening for requests pub fn server(conf Config) ! { @@ -105,14 +96,17 @@ pub fn server(conf Config) ! { util.exit_with_message(1, 'Failed to initialize database: $err.msg()') } - mut queue := build.new_job_queue(global_ce, conf.base_image) - - web.run(&App{ + mut app := &App{ logger: logger api_key: conf.api_key conf: conf repo: repo db: db - job_queue: queue - }, conf.port) + job_queue: build.new_job_queue(global_ce, conf.base_image) + } + app.init_job_queue() or { + util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()') + } + + web.run(app, conf.port) }