Compare commits

...

4 Commits

8 changed files with 290 additions and 69 deletions

View File

@ -14,6 +14,8 @@ const (
struct AgentDaemon {
logger shared log.Log
conf Config
// List of last built builder images
builder_images []string
// Which builds are currently running; length is same as
// conf.max_concurrent_builds
builds []BuildConfig
@ -44,9 +46,8 @@ pub fn (mut d AgentDaemon) run() {
}
}
// clean_finished_builds checks for each build whether it's completed, and sets
// it to free again if so. The return value is how many fields are now set to
// free.
// update_atomics checks for each build whether it's completed, and sets it to
// free again if so. The return value is how many fields are now set to free.
fn (mut d AgentDaemon) update_atomics() int {
mut count := 0

49
src/agent/images.v 100644
View File

@ -0,0 +1,49 @@
module agent
import time
import docker
struct ImageManager {
images map[string]string
timestamps map[string]time.Time
}
// clean_old_base_images tries to remove any old but still present builder
// images.
fn (mut d AgentDaemon) clean_old_base_images() {
mut i := 0
mut dd := docker.new_conn() or {
d.lerror('Failed to connect to Docker socket.')
return
}
defer {
dd.close() or {}
}
for i < d.builder_images.len - 1 {
// For each builder image, we try to remove it by calling the Docker
// API. If the function returns an error or false, that means the image
// wasn't deleted. Therefore, we move the index over. If the function
// returns true, the array's length has decreased by one so we don't
// move the index.
dd.remove_image(d.builder_images[i]) or { i += 1 }
}
}
// rebuild_base_image builds a builder image from the given base image.
/* fn (mut d AgentDaemon) build_base_image(base_image string) bool { */
/* d.linfo('Rebuilding builder image....') */
/* d.builder_images << build.create_build_image(d.base_image) or { */
/* d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') */
/* d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) */
/* return false */
/* } */
/* d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) */
/* return true */
/* } */

35
src/agent/log.v 100644
View File

@ -0,0 +1,35 @@
module agent
import log
// log reate a log message with the given level
pub fn (mut d AgentDaemon) log(msg string, level log.Level) {
lock d.logger {
d.logger.send_output(msg, level)
}
}
// lfatal create a log message with the fatal level
pub fn (mut d AgentDaemon) lfatal(msg string) {
d.log(msg, log.Level.fatal)
}
// lerror create a log message with the error level
pub fn (mut d AgentDaemon) lerror(msg string) {
d.log(msg, log.Level.error)
}
// lwarn create a log message with the warn level
pub fn (mut d AgentDaemon) lwarn(msg string) {
d.log(msg, log.Level.warn)
}
// linfo create a log message with the info level
pub fn (mut d AgentDaemon) linfo(msg string) {
d.log(msg, log.Level.info)
}
// ldebug create a log message with the debug level
pub fn (mut d AgentDaemon) ldebug(msg string) {
d.log(msg, log.Level.debug)
}

View File

@ -18,7 +18,6 @@ const (
pub struct BuildConfig {
pub:
id int
target_id int
kind string
url string

169
src/build/queue.v 100644
View File

@ -0,0 +1,169 @@
module build
import models { Target }
import cron.expression { CronExpression, parse_expression }
import time
import datatypes { MinHeap }
import util
struct BuildJob {
pub:
// Next timestamp from which point this job is allowed to be executed
timestamp time.Time
// Required for calculating next timestamp after having pop'ed a job
ce CronExpression
// Actual build config sent to the agent
config BuildConfig
}
// Allows BuildJob structs to be sorted according to their timestamp in
// MinHeaps
fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp
}
pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression
// Base image to use for targets without defined base image
default_base_image string
mut:
mutex shared util.Dummy
// For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the
// target IDs. This is used when removing or editing targets.
// jobs map[int]BuildJob
}
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{
default_schedule: default_schedule
default_base_image: default_base_image
}
}
// insert a new target's job into the queue for the given architecture. This
// job will then be endlessly rescheduled after being pop'ed, unless removed
// explicitely.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
lock q.mutex {
if arch !in q.queues {
q.queues[arch] = MinHeap<BuildJob>{}
}
ce := if target.schedule != '' {
parse_expression(target.schedule) or {
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
}
} else {
q.default_schedule
}
timestamp := ce.next_from_now()!
job := BuildJob{
timestamp: timestamp
ce: ce
config: BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
}
}
dump(job)
q.queues[arch].insert(job)
}
}
// reschedule the given job by calculating the next timestamp and re-adding it
// to its respective queue. This function is called by the pop functions
// *after* having pop'ed the job.
fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
new_timestamp := job.ce.next_from_now()!
new_job := BuildJob{
...job
timestamp: new_timestamp
}
q.queues[arch].insert(new_job)
}
// peek shows the first job for the given architecture that's ready to be
// executed, if present.
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob {
rlock q.mutex {
if arch !in q.queues {
return none
}
job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
return job
}
}
return none
}
// pop removes the first job for the given architecture that's ready to be
// executed from the queue and returns it, if present.
pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
lock q.mutex {
if arch !in q.queues {
return none
}
mut job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
job = q.queues[arch].pop()?
// TODO how do we handle this properly? Is it even possible for a
// cron expression to not return a next time if it's already been
// used before?
q.reschedule(job, arch) or {}
return job
}
}
return none
}
// pop_n tries to pop at most n available jobs for the given architecture.
pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
lock q.mutex {
if arch !in q.queues {
return []
}
mut out := []BuildJob{}
for out.len < n {
mut job := q.queues[arch].peek() or { break }
if job.timestamp < time.now() {
job = q.queues[arch].pop() or { break }
// TODO idem
q.reschedule(job, arch) or {}
out << job
} else {
break
}
}
return out
}
return []
}

View File

@ -2,14 +2,12 @@ module server
import web
import web.response { new_data_response, new_response }
import time
import build { BuildConfig }
// import os
// import util
// import models { BuildLog, BuildLogFilter }
['/api/v1/builds/poll'; auth; get]
fn (mut app App) v1_poll_build_queue() web.Result {
['/api/v1/jobs/poll'; auth; get]
fn (mut app App) v1_poll_job_queue() web.Result {
arch := app.query['arch'] or {
return app.json(.bad_request, new_response('Missing arch query arg.'))
}
@ -19,21 +17,7 @@ fn (mut app App) v1_poll_build_queue() web.Result {
}
max := max_str.int()
mut out := []BuildConfig{}
now := time.now()
lock app.build_queues {
mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) }
for queue.len() > 0 && out.len < max {
next := queue.peek() or { return app.status(.internal_server_error) }
if next.timestamp < now {
out << queue.pop() or { return app.status(.internal_server_error) }.config
}
}
}
mut out := app.job_queue.pop_n(arch, max).map(it.config)
return app.json(.ok, new_data_response(out))
}

View File

@ -5,13 +5,14 @@ import conf as vconf
struct Config {
pub:
log_level string = 'WARN'
pkg_dir string
data_dir string
api_key string
default_arch string
log_level string = 'WARN'
pkg_dir string
data_dir string
api_key string
default_arch string
global_schedule string = '0 3'
port int = 8000
port int = 8000
base_image string = 'archlinux:base-devel'
}
// cmd returns the cli submodule that handles starting the server

View File

@ -6,9 +6,7 @@ import log
import repo
import util
import db
import datatypes { MinHeap }
import build { BuildConfig }
import time
import build { BuildJobQueue }
import cron.expression
const (
@ -18,17 +16,6 @@ const (
logs_dir_name = 'logs'
)
struct ScheduledBuild {
pub:
timestamp time.Time
config BuildConfig
}
// Overloaded operator for comparing ScheduledBuild objects
fn (r1 ScheduledBuild) < (r2 ScheduledBuild) bool {
return r1.timestamp < r2.timestamp
}
struct App {
web.Context
pub:
@ -36,37 +23,24 @@ pub:
pub mut:
repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages
build_queues shared map[string]MinHeap<ScheduledBuild>
db db.VieterDb
job_queue BuildJobQueue [required; web_global]
db db.VieterDb
}
fn (mut app App) init_build_queues() {
fn (mut app App) init_job_queue() ! {
// Initialize build queues
mut i := 0
mut targets := app.db.get_targets(limit: 25)
default_ce := expression.parse_expression(conf.global_schedule) or {
return
}
mut i := u64(0)
for targets.len > 0 {
for t in targets {
ce := parse_expression(t.schedule) or { default_ce }
for arch in t.arch {
if arch !in app.build_queues {
app.build_queues[arch] = Minheap<ScheduledBuild>{}
}
build_config := BuildConfig{
}
app.build_queues[arch].push(ScheduledBuild{
timestamp: ce.next()
config: build_config
})
for target in targets {
for arch in target.arch {
app.job_queue.insert(target, arch.value)!
}
}
i += 25
targets = app.db.get_targets(limit: 25, offset: i)
}
}
@ -77,6 +51,10 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
}
global_ce := expression.parse_expression(conf.global_schedule) or {
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
}
// Configure logger
log_level := log.level_from_tag(conf.log_level) or {
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
@ -118,12 +96,17 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
}
web.run(&App{
mut app := &App{
logger: logger
api_key: conf.api_key
conf: conf
repo: repo
db: db
build_queues: map[string]MinHeap<ScheduledBuild>{}
}, conf.port)
job_queue: build.new_job_queue(global_ce, conf.base_image)
}
app.init_job_queue() or {
util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()')
}
web.run(app, conf.port)
}