feat(build): start of server-side job queue

Jef Roosens 2022-12-06 14:11:17 +01:00
parent 6281ef7607
commit 9a49d96e20
Signed by untrusted user: Jef Roosens
GPG Key ID: B75D4F293C7052DB
9 changed files with 174 additions and 28 deletions

View File

@ -20,6 +20,6 @@ pub fn agent(conf Config) ! {
logger.set_full_logpath(log_file) logger.set_full_logpath(log_file)
logger.log_to_console_too() logger.log_to_console_too()
mut d := agent.agent_init(logger, conf) mut d := agent_init(logger, conf)
d.run() d.run()
} }

View File

@ -12,8 +12,8 @@ pub:
max_concurrent_builds int = 1 max_concurrent_builds int = 1
polling_frequency int = 30 polling_frequency int = 30
// Architecture of agent // Architecture of agent
/* arch string */ // arch string
/* image_rebuild_frequency int = 1440 */ // image_rebuild_frequency int = 1440
} }
// cmd returns the cli module that handles the cron daemon. // cmd returns the cli module that handles the cron daemon.

View File

@ -40,9 +40,7 @@ pub fn (mut d AgentDaemon) run() {
free_builds := d.update_atomics() free_builds := d.update_atomics()
if free_builds > 0 { if free_builds > 0 {
} }
} }
} }

View File

@ -18,7 +18,7 @@ const (
pub struct BuildConfig { pub struct BuildConfig {
pub: pub:
id int target_id int
kind string kind string
url string url string
branch string branch string

70
src/build/queue.v 100644
View File

@ -0,0 +1,70 @@
module build
import models { Target }
import cron.expression { CronExpression, parse_expression }
import time
import datatypes { MinHeap }
struct BuildJob {
pub:
// Earliest point this
timestamp time.Time
config BuildConfig
}
// Overloaded operator for comparing ScheduledBuild objects
fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp
}
pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression
// Base image to use for targets without defined base image
default_base_image string
mut:
// For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the
// target IDs. This is used when removing or editing targets.
// jobs map[int]BuildJob
}
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{
default_schedule: default_schedule
default_base_image: default_base_image
}
}
// insert a new job into the queue for a given target on an architecture.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
if arch !in q.queues {
q.queues[arch] = MinHeap<BuildJob>{}
}
ce := if target.schedule != '' {
parse_expression(target.schedule) or {
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
}
} else {
q.default_schedule
}
timestamp := ce.next_from_now()!
job := BuildJob{
timestamp: timestamp
config: BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
}
}
q.queues[arch].insert(job)
}

View File

@ -41,7 +41,7 @@ fn main() {
schedule.cmd(), schedule.cmd(),
man.cmd(), man.cmd(),
aur.cmd(), aur.cmd(),
agent.cmd() agent.cmd(),
] ]
} }
app.setup() app.setup()

View File

@ -0,0 +1,39 @@
module server
/* import web */
/* import web.response { new_data_response, new_response } */
/* import time */
/* import build { BuildConfig } */
/* // import os */
/* // import util */
/* // import models { BuildLog, BuildLogFilter } */
/* ['/api/v1/builds/poll'; auth; get] */
/* fn (mut app App) v1_poll_build_queue() web.Result { */
/* arch := app.query['arch'] or { */
/* return app.json(.bad_request, new_response('Missing arch query arg.')) */
/* } */
/* max_str := app.query['max'] or { */
/* return app.json(.bad_request, new_response('Missing max query arg.')) */
/* } */
/* max := max_str.int() */
/* mut out := []BuildConfig{} */
/* now := time.now() */
/* lock app.build_queues { */
/* mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) } */
/* for queue.len() > 0 && out.len < max { */
/* next := queue.peek() or { return app.status(.internal_server_error) } */
/* if next.timestamp < now { */
/* out << queue.pop() or { return app.status(.internal_server_error) }.config */
/* } */
/* } */
/* } */
/* return app.json(.ok, new_data_response(out)) */
/* } */

View File

@ -10,7 +10,9 @@ pub:
data_dir string data_dir string
api_key string api_key string
default_arch string default_arch string
global_schedule string = '0 3'
port int = 8000 port int = 8000
base_image string = 'archlinux:base-devel'
} }
// cmd returns the cli submodule that handles starting the server // cmd returns the cli submodule that handles starting the server

View File

@ -6,6 +6,8 @@ import log
import repo import repo
import util import util
import db import db
import build { BuildJobQueue }
import cron.expression
const ( const (
log_file_name = 'vieter.log' log_file_name = 'vieter.log'
@ -20,9 +22,37 @@ pub:
conf Config [required; web_global] conf Config [required; web_global]
pub mut: pub mut:
repo repo.RepoGroupManager [required; web_global] repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages
job_queue BuildJobQueue [required; web_global]
db db.VieterDb db db.VieterDb
} }
// fn (mut app App) init_build_queues() {
// // Initialize build queues
// mut i := 0
// mut targets := app.db.get_targets(limit: 25)
// default_ce := expression.parse_expression(conf.global_schedule) or { return }
// for targets.len > 0 {
// for t in targets {
// ce := parse_expression(t.schedule) or { default_ce }
// for arch in t.arch {
// if arch !in app.build_queues {
// app.build_queues[arch] = Minheap<ScheduledBuild>{}
// }
// build_config := BuildConfig{}
// app.build_queues[arch].push(ScheduledBuild{
// timestamp: ce.next()
// config: build_config
// })
// }
// }
// }
//}
// server starts the web server & starts listening for requests // server starts the web server & starts listening for requests
pub fn server(conf Config) ! { pub fn server(conf Config) ! {
// Prevent using 'any' as the default arch // Prevent using 'any' as the default arch
@ -30,6 +60,10 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
} }
global_ce := expression.parse_expression(conf.global_schedule) or {
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
}
// Configure logger // Configure logger
log_level := log.level_from_tag(conf.log_level) or { log_level := log.level_from_tag(conf.log_level) or {
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
@ -71,11 +105,14 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()') util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
} }
mut queue := build.new_job_queue(global_ce, conf.base_image)
web.run(&App{ web.run(&App{
logger: logger logger: logger
api_key: conf.api_key api_key: conf.api_key
conf: conf conf: conf
repo: repo repo: repo
db: db db: db
job_queue: queue
}, conf.port) }, conf.port)
} }