feat(server): initialize job queue on start; api endpoint for polling

jobs
web-stuff
Jef Roosens 2022-12-12 20:33:51 +01:00 committed by Chewing_Bever
parent 9a49d96e20
commit c57de4d8ee
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 129 additions and 86 deletions

View File

@ -4,6 +4,7 @@ import models { Target }
import cron.expression { CronExpression, parse_expression } import cron.expression { CronExpression, parse_expression }
import time import time
import datatypes { MinHeap } import datatypes { MinHeap }
import util
struct BuildJob { struct BuildJob {
pub: pub:
@ -23,6 +24,7 @@ pub struct BuildJobQueue {
// Base image to use for targets without defined base image // Base image to use for targets without defined base image
default_base_image string default_base_image string
mut: mut:
mutex shared util.Dummy
// For each architecture, a priority queue is tracked // For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob> queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the // Each queued build job is also stored in a map, with the keys being the
@ -39,32 +41,95 @@ pub fn new_job_queue(default_schedule CronExpression, default_base_image string)
// insert a new job into the queue for a given target on an architecture. // insert a new job into the queue for a given target on an architecture.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! { pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
if arch !in q.queues { lock q.mutex {
q.queues[arch] = MinHeap<BuildJob>{} if arch !in q.queues {
} q.queues[arch] = MinHeap<BuildJob>{}
ce := if target.schedule != '' {
parse_expression(target.schedule) or {
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
} }
} else {
q.default_schedule
}
timestamp := ce.next_from_now()! ce := if target.schedule != '' {
parse_expression(target.schedule) or {
job := BuildJob{ return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
timestamp: timestamp }
config: BuildConfig{ } else {
target_id: target.id q.default_schedule
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
} }
}
q.queues[arch].insert(job) timestamp := ce.next_from_now()!
job := BuildJob{
timestamp: timestamp
config: BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
}
}
q.queues[arch].insert(job)
}
}
// peek shows the first job for the given architecture that's ready to be
// executed, if present.
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob {
rlock q.mutex {
if arch !in q.queues {
return none
}
job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
return job
}
}
return none
}
// pop removes the first job for the given architecture that's ready to be
// executed from the queue and returns it, if present.
pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
lock q.mutex {
if arch !in q.queues {
return none
}
job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
return q.queues[arch].pop()
}
}
return none
}
// pop_n tries to pop at most n available jobs for the given architecture.
pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
lock q.mutex {
if arch !in q.queues {
return []
}
mut out := []BuildJob{}
for out.len < n {
job := q.queues[arch].peek() or { break }
if job.timestamp < time.now() {
out << q.queues[arch].pop() or { break }
} else {
break
}
}
return out
}
return []
} }

View File

@ -1,39 +1,23 @@
module server module server
/* import web */ import web
/* import web.response { new_data_response, new_response } */ import web.response { new_data_response, new_response }
/* import time */ // import os
/* import build { BuildConfig } */ // import util
/* // import os */ // import models { BuildLog, BuildLogFilter }
/* // import util */
/* // import models { BuildLog, BuildLogFilter } */
/* ['/api/v1/builds/poll'; auth; get] */ ['/api/v1/builds/poll'; auth; get]
/* fn (mut app App) v1_poll_build_queue() web.Result { */ fn (mut app App) v1_poll_build_queue() web.Result {
/* arch := app.query['arch'] or { */ arch := app.query['arch'] or {
/* return app.json(.bad_request, new_response('Missing arch query arg.')) */ return app.json(.bad_request, new_response('Missing arch query arg.'))
/* } */ }
/* max_str := app.query['max'] or { */ max_str := app.query['max'] or {
/* return app.json(.bad_request, new_response('Missing max query arg.')) */ return app.json(.bad_request, new_response('Missing max query arg.'))
/* } */ }
/* max := max_str.int() */ max := max_str.int()
/* mut out := []BuildConfig{} */ mut out := app.job_queue.pop_n(arch, max)
/* now := time.now() */ return app.json(.ok, new_data_response(out))
}
/* lock app.build_queues { */
/* mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } */
/* for queue.len() > 0 && out.len < max { */
/* next := queue.peek() or { return app.status(.internal_server_error) } */
/* if next.timestamp < now { */
/* out << queue.pop() or { return app.status(.internal_server_error) }.config */
/* } */
/* } */
/* } */
/* return app.json(.ok, new_data_response(out)) */
/* } */

View File

@ -24,34 +24,25 @@ pub mut:
repo repo.RepoGroupManager [required; web_global] repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages // Keys are the various architectures for packages
job_queue BuildJobQueue [required; web_global] job_queue BuildJobQueue [required; web_global]
db db.VieterDb db db.VieterDb
} }
// fn (mut app App) init_build_queues() { fn (mut app App) init_job_queue() ! {
// // Initialize build queues // Initialize build queues
// mut i := 0 mut targets := app.db.get_targets(limit: 25)
// mut targets := app.db.get_targets(limit: 25) mut i := u64(0)
// default_ce := expression.parse_expression(conf.global_schedule) or { return } for targets.len > 0 {
for target in targets {
for arch in target.arch {
app.job_queue.insert(target, arch.value)!
}
}
// for targets.len > 0 { i += 25
// for t in targets { targets = app.db.get_targets(limit: 25, offset: i)
// ce := parse_expression(t.schedule) or { default_ce } }
}
// for arch in t.arch {
// if arch !in app.build_queues {
// app.build_queues[arch] = Minheap<ScheduledBuild>{}
// }
// build_config := BuildConfig{}
// app.build_queues[arch].push(ScheduledBuild{
// timestamp: ce.next()
// config: build_config
// })
// }
// }
// }
//}
// server starts the web server & starts listening for requests // server starts the web server & starts listening for requests
pub fn server(conf Config) ! { pub fn server(conf Config) ! {
@ -105,14 +96,17 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()') util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
} }
mut queue := build.new_job_queue(global_ce, conf.base_image) mut app := &App{
web.run(&App{
logger: logger logger: logger
api_key: conf.api_key api_key: conf.api_key
conf: conf conf: conf
repo: repo repo: repo
db: db db: db
job_queue: queue job_queue: build.new_job_queue(global_ce, conf.base_image)
}, conf.port) }
app.init_job_queue() or {
util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()')
}
web.run(app, conf.port)
} }