feat(build): start of server-side job queue

Jef Roosens 2022-12-12 19:03:31 +01:00 committed by Jef Roosens
parent 722acb8ab3
commit 05980cc09c
5 changed files with 137 additions and 78 deletions

View File

@ -18,7 +18,6 @@ const (
pub struct BuildConfig { pub struct BuildConfig {
pub: pub:
id int
target_id int target_id int
kind string kind string
url string url string

70
src/build/queue.v 100644
View File

@ -0,0 +1,70 @@
module build
import models { Target }
import cron.expression { CronExpression, parse_expression }
import time
import datatypes { MinHeap }
struct BuildJob {
pub:
// Earliest point this
timestamp time.Time
config BuildConfig
}
// Overloaded operator for comparing ScheduledBuild objects
fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp
}
pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression
// Base image to use for targets without defined base image
default_base_image string
mut:
// For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the
// target IDs. This is used when removing or editing targets.
// jobs map[int]BuildJob
}
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{
default_schedule: default_schedule
default_base_image: default_base_image
}
}
// insert a new job into the queue for a given target on an architecture.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
if arch !in q.queues {
q.queues[arch] = MinHeap<BuildJob>{}
}
ce := if target.schedule != '' {
parse_expression(target.schedule) or {
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
}
} else {
q.default_schedule
}
timestamp := ce.next_from_now()!
job := BuildJob{
timestamp: timestamp
config: BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
}
}
q.queues[arch].insert(job)
}

View File

@ -1,39 +1,39 @@
module server module server
import web /* import web */
import web.response { new_data_response, new_response } /* import web.response { new_data_response, new_response } */
import time /* import time */
import build { BuildConfig } /* import build { BuildConfig } */
// import os /* // import os */
// import util /* // import util */
// import models { BuildLog, BuildLogFilter } /* // import models { BuildLog, BuildLogFilter } */
['/api/v1/builds/poll'; auth; get] /* ['/api/v1/builds/poll'; auth; get] */
fn (mut app App) v1_poll_build_queue() web.Result { /* fn (mut app App) v1_poll_build_queue() web.Result { */
arch := app.query['arch'] or { /* arch := app.query['arch'] or { */
return app.json(.bad_request, new_response('Missing arch query arg.')) /* return app.json(.bad_request, new_response('Missing arch query arg.')) */
} /* } */
max_str := app.query['max'] or { /* max_str := app.query['max'] or { */
return app.json(.bad_request, new_response('Missing max query arg.')) /* return app.json(.bad_request, new_response('Missing max query arg.')) */
} /* } */
max := max_str.int() /* max := max_str.int() */
mut out := []BuildConfig{} /* mut out := []BuildConfig{} */
now := time.now() /* now := time.now() */
lock app.build_queues { /* lock app.build_queues { */
mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } /* mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } */
for queue.len() > 0 && out.len < max { /* for queue.len() > 0 && out.len < max { */
next := queue.peek() or { return app.status(.internal_server_error) } /* next := queue.peek() or { return app.status(.internal_server_error) } */
if next.timestamp < now { /* if next.timestamp < now { */
out << queue.pop() or { return app.status(.internal_server_error) }.config /* out << queue.pop() or { return app.status(.internal_server_error) }.config */
} /* } */
} /* } */
} /* } */
return app.json(.ok, new_data_response(out)) /* return app.json(.ok, new_data_response(out)) */
} /* } */

View File

@ -5,13 +5,14 @@ import conf as vconf
struct Config { struct Config {
pub: pub:
log_level string = 'WARN' log_level string = 'WARN'
pkg_dir string pkg_dir string
data_dir string data_dir string
api_key string api_key string
default_arch string default_arch string
global_schedule string = '0 3' global_schedule string = '0 3'
port int = 8000 port int = 8000
base_image string = 'archlinux:base-devel'
} }
// cmd returns the cli submodule that handles starting the server // cmd returns the cli submodule that handles starting the server

View File

@ -6,9 +6,7 @@ import log
import repo import repo
import util import util
import db import db
import datatypes { MinHeap } import build { BuildJobQueue }
import build { BuildConfig }
import time
import cron.expression import cron.expression
const ( const (
@ -18,17 +16,6 @@ const (
logs_dir_name = 'logs' logs_dir_name = 'logs'
) )
struct ScheduledBuild {
pub:
timestamp time.Time
config BuildConfig
}
// Overloaded operator for comparing ScheduledBuild objects
fn (r1 ScheduledBuild) < (r2 ScheduledBuild) bool {
return r1.timestamp < r2.timestamp
}
struct App { struct App {
web.Context web.Context
pub: pub:
@ -36,39 +23,35 @@ pub:
pub mut: pub mut:
repo repo.RepoGroupManager [required; web_global] repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages // Keys are the various architectures for packages
build_queues shared map[string]MinHeap<ScheduledBuild> job_queue BuildJobQueue [required; web_global]
db db.VieterDb db db.VieterDb
} }
fn (mut app App) init_build_queues() { // fn (mut app App) init_build_queues() {
// Initialize build queues // // Initialize build queues
mut i := 0 // mut i := 0
mut targets := app.db.get_targets(limit: 25) // mut targets := app.db.get_targets(limit: 25)
default_ce := expression.parse_expression(conf.global_schedule) or { // default_ce := expression.parse_expression(conf.global_schedule) or { return }
return
}
for targets.len > 0 { // for targets.len > 0 {
for t in targets { // for t in targets {
ce := parse_expression(t.schedule) or { default_ce } // ce := parse_expression(t.schedule) or { default_ce }
for arch in t.arch { // for arch in t.arch {
if arch !in app.build_queues { // if arch !in app.build_queues {
app.build_queues[arch] = Minheap<ScheduledBuild>{} // app.build_queues[arch] = Minheap<ScheduledBuild>{}
} // }
build_config := BuildConfig{ // build_config := BuildConfig{}
// app.build_queues[arch].push(ScheduledBuild{
} // timestamp: ce.next()
app.build_queues[arch].push(ScheduledBuild{ // config: build_config
timestamp: ce.next() // })
config: build_config // }
}) // }
} // }
} //}
}
}
// server starts the web server & starts listening for requests // server starts the web server & starts listening for requests
pub fn server(conf Config) ! { pub fn server(conf Config) ! {
@ -77,6 +60,10 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
} }
global_ce := expression.parse_expression(conf.global_schedule) or {
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
}
// Configure logger // Configure logger
log_level := log.level_from_tag(conf.log_level) or { log_level := log.level_from_tag(conf.log_level) or {
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
@ -118,12 +105,14 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()') util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
} }
mut queue := build.new_job_queue(global_ce, conf.base_image)
web.run(&App{ web.run(&App{
logger: logger logger: logger
api_key: conf.api_key api_key: conf.api_key
conf: conf conf: conf
repo: repo repo: repo
db: db db: db
build_queues: map[string]MinHeap<ScheduledBuild>{} job_queue: queue
}, conf.port) }, conf.port)
} }