forked from vieter-v/vieter
Compare commits
4 Commits
d35b8826e6
...
ca92ae2a0e
| Author | SHA1 | Date |
|---|---|---|
|
|
ca92ae2a0e | |
|
|
4f1be62b1b | |
|
|
2777507434 | |
|
|
05980cc09c |
|
|
@ -14,6 +14,8 @@ const (
|
|||
struct AgentDaemon {
|
||||
logger shared log.Log
|
||||
conf Config
|
||||
// List of last built builder images
|
||||
builder_images []string
|
||||
// Which builds are currently running; length is same as
|
||||
// conf.max_concurrent_builds
|
||||
builds []BuildConfig
|
||||
|
|
@ -44,9 +46,8 @@ pub fn (mut d AgentDaemon) run() {
|
|||
}
|
||||
}
|
||||
|
||||
// clean_finished_builds checks for each build whether it's completed, and sets
|
||||
// it to free again if so. The return value is how many fields are now set to
|
||||
// free.
|
||||
// update_atomics checks for each build whether it's completed, and sets it to
|
||||
// free again if so. The return value is how many fields are now set to free.
|
||||
fn (mut d AgentDaemon) update_atomics() int {
|
||||
mut count := 0
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,49 @@
|
|||
module agent
|
||||
|
||||
import time
|
||||
import docker
|
||||
|
||||
struct ImageManager {
|
||||
images map[string]string
|
||||
timestamps map[string]time.Time
|
||||
}
|
||||
|
||||
// clean_old_base_images tries to remove any old but still present builder
|
||||
// images.
|
||||
fn (mut d AgentDaemon) clean_old_base_images() {
|
||||
mut i := 0
|
||||
|
||||
mut dd := docker.new_conn() or {
|
||||
d.lerror('Failed to connect to Docker socket.')
|
||||
return
|
||||
}
|
||||
|
||||
defer {
|
||||
dd.close() or {}
|
||||
}
|
||||
|
||||
for i < d.builder_images.len - 1 {
|
||||
// For each builder image, we try to remove it by calling the Docker
|
||||
// API. If the function returns an error or false, that means the image
|
||||
// wasn't deleted. Therefore, we move the index over. If the function
|
||||
// returns true, the array's length has decreased by one so we don't
|
||||
// move the index.
|
||||
dd.remove_image(d.builder_images[i]) or { i += 1 }
|
||||
}
|
||||
}
|
||||
|
||||
// rebuild_base_image builds a builder image from the given base image.
|
||||
/* fn (mut d AgentDaemon) build_base_image(base_image string) bool { */
|
||||
/* d.linfo('Rebuilding builder image....') */
|
||||
|
||||
/* d.builder_images << build.create_build_image(d.base_image) or { */
|
||||
/* d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') */
|
||||
/* d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) */
|
||||
|
||||
/* return false */
|
||||
/* } */
|
||||
|
||||
/* d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) */
|
||||
|
||||
/* return true */
|
||||
/* } */
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
module agent
|
||||
|
||||
import log
|
||||
|
||||
// log reate a log message with the given level
|
||||
pub fn (mut d AgentDaemon) log(msg string, level log.Level) {
|
||||
lock d.logger {
|
||||
d.logger.send_output(msg, level)
|
||||
}
|
||||
}
|
||||
|
||||
// lfatal create a log message with the fatal level
|
||||
pub fn (mut d AgentDaemon) lfatal(msg string) {
|
||||
d.log(msg, log.Level.fatal)
|
||||
}
|
||||
|
||||
// lerror create a log message with the error level
|
||||
pub fn (mut d AgentDaemon) lerror(msg string) {
|
||||
d.log(msg, log.Level.error)
|
||||
}
|
||||
|
||||
// lwarn create a log message with the warn level
|
||||
pub fn (mut d AgentDaemon) lwarn(msg string) {
|
||||
d.log(msg, log.Level.warn)
|
||||
}
|
||||
|
||||
// linfo create a log message with the info level
|
||||
pub fn (mut d AgentDaemon) linfo(msg string) {
|
||||
d.log(msg, log.Level.info)
|
||||
}
|
||||
|
||||
// ldebug create a log message with the debug level
|
||||
pub fn (mut d AgentDaemon) ldebug(msg string) {
|
||||
d.log(msg, log.Level.debug)
|
||||
}
|
||||
|
|
@ -18,7 +18,6 @@ const (
|
|||
|
||||
pub struct BuildConfig {
|
||||
pub:
|
||||
id int
|
||||
target_id int
|
||||
kind string
|
||||
url string
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
module build
|
||||
|
||||
import models { Target }
|
||||
import cron.expression { CronExpression, parse_expression }
|
||||
import time
|
||||
import datatypes { MinHeap }
|
||||
import util
|
||||
|
||||
struct BuildJob {
|
||||
pub:
|
||||
// Next timestamp from which point this job is allowed to be executed
|
||||
timestamp time.Time
|
||||
// Required for calculating next timestamp after having pop'ed a job
|
||||
ce CronExpression
|
||||
// Actual build config sent to the agent
|
||||
config BuildConfig
|
||||
}
|
||||
|
||||
// Allows BuildJob structs to be sorted according to their timestamp in
|
||||
// MinHeaps
|
||||
fn (r1 BuildJob) < (r2 BuildJob) bool {
|
||||
return r1.timestamp < r2.timestamp
|
||||
}
|
||||
|
||||
pub struct BuildJobQueue {
|
||||
// Schedule to use for targets without explicitely defined cron expression
|
||||
default_schedule CronExpression
|
||||
// Base image to use for targets without defined base image
|
||||
default_base_image string
|
||||
mut:
|
||||
mutex shared util.Dummy
|
||||
// For each architecture, a priority queue is tracked
|
||||
queues map[string]MinHeap<BuildJob>
|
||||
// Each queued build job is also stored in a map, with the keys being the
|
||||
// target IDs. This is used when removing or editing targets.
|
||||
// jobs map[int]BuildJob
|
||||
}
|
||||
|
||||
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
|
||||
return BuildJobQueue{
|
||||
default_schedule: default_schedule
|
||||
default_base_image: default_base_image
|
||||
}
|
||||
}
|
||||
|
||||
// insert a new target's job into the queue for the given architecture. This
|
||||
// job will then be endlessly rescheduled after being pop'ed, unless removed
|
||||
// explicitely.
|
||||
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
|
||||
lock q.mutex {
|
||||
if arch !in q.queues {
|
||||
q.queues[arch] = MinHeap<BuildJob>{}
|
||||
}
|
||||
|
||||
ce := if target.schedule != '' {
|
||||
parse_expression(target.schedule) or {
|
||||
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
|
||||
}
|
||||
} else {
|
||||
q.default_schedule
|
||||
}
|
||||
|
||||
timestamp := ce.next_from_now()!
|
||||
|
||||
job := BuildJob{
|
||||
timestamp: timestamp
|
||||
ce: ce
|
||||
config: BuildConfig{
|
||||
target_id: target.id
|
||||
kind: target.kind
|
||||
url: target.url
|
||||
branch: target.branch
|
||||
repo: target.repo
|
||||
// TODO make this configurable
|
||||
base_image: q.default_base_image
|
||||
}
|
||||
}
|
||||
|
||||
dump(job)
|
||||
q.queues[arch].insert(job)
|
||||
}
|
||||
}
|
||||
|
||||
// reschedule the given job by calculating the next timestamp and re-adding it
|
||||
// to its respective queue. This function is called by the pop functions
|
||||
// *after* having pop'ed the job.
|
||||
fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
|
||||
new_timestamp := job.ce.next_from_now()!
|
||||
|
||||
new_job := BuildJob{
|
||||
...job
|
||||
timestamp: new_timestamp
|
||||
}
|
||||
|
||||
q.queues[arch].insert(new_job)
|
||||
}
|
||||
|
||||
// peek shows the first job for the given architecture that's ready to be
|
||||
// executed, if present.
|
||||
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob {
|
||||
rlock q.mutex {
|
||||
if arch !in q.queues {
|
||||
return none
|
||||
}
|
||||
|
||||
job := q.queues[arch].peek() or { return none }
|
||||
|
||||
if job.timestamp < time.now() {
|
||||
return job
|
||||
}
|
||||
}
|
||||
|
||||
return none
|
||||
}
|
||||
|
||||
// pop removes the first job for the given architecture that's ready to be
|
||||
// executed from the queue and returns it, if present.
|
||||
pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
|
||||
lock q.mutex {
|
||||
if arch !in q.queues {
|
||||
return none
|
||||
}
|
||||
|
||||
mut job := q.queues[arch].peek() or { return none }
|
||||
|
||||
if job.timestamp < time.now() {
|
||||
job = q.queues[arch].pop()?
|
||||
|
||||
// TODO how do we handle this properly? Is it even possible for a
|
||||
// cron expression to not return a next time if it's already been
|
||||
// used before?
|
||||
q.reschedule(job, arch) or {}
|
||||
|
||||
return job
|
||||
}
|
||||
}
|
||||
|
||||
return none
|
||||
}
|
||||
|
||||
// pop_n tries to pop at most n available jobs for the given architecture.
|
||||
pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
|
||||
lock q.mutex {
|
||||
if arch !in q.queues {
|
||||
return []
|
||||
}
|
||||
|
||||
mut out := []BuildJob{}
|
||||
|
||||
for out.len < n {
|
||||
mut job := q.queues[arch].peek() or { break }
|
||||
|
||||
if job.timestamp < time.now() {
|
||||
job = q.queues[arch].pop() or { break }
|
||||
|
||||
// TODO idem
|
||||
q.reschedule(job, arch) or {}
|
||||
|
||||
out << job
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
return []
|
||||
}
|
||||
|
|
@ -2,14 +2,12 @@ module server
|
|||
|
||||
import web
|
||||
import web.response { new_data_response, new_response }
|
||||
import time
|
||||
import build { BuildConfig }
|
||||
// import os
|
||||
// import util
|
||||
// import models { BuildLog, BuildLogFilter }
|
||||
|
||||
['/api/v1/builds/poll'; auth; get]
|
||||
fn (mut app App) v1_poll_build_queue() web.Result {
|
||||
['/api/v1/jobs/poll'; auth; get]
|
||||
fn (mut app App) v1_poll_job_queue() web.Result {
|
||||
arch := app.query['arch'] or {
|
||||
return app.json(.bad_request, new_response('Missing arch query arg.'))
|
||||
}
|
||||
|
|
@ -19,21 +17,7 @@ fn (mut app App) v1_poll_build_queue() web.Result {
|
|||
}
|
||||
max := max_str.int()
|
||||
|
||||
mut out := []BuildConfig{}
|
||||
|
||||
now := time.now()
|
||||
|
||||
lock app.build_queues {
|
||||
mut queue := app.build_queues[arch] or { return app.json(.ok, new_data_response(out)) }
|
||||
|
||||
for queue.len() > 0 && out.len < max {
|
||||
next := queue.peek() or { return app.status(.internal_server_error) }
|
||||
|
||||
if next.timestamp < now {
|
||||
out << queue.pop() or { return app.status(.internal_server_error) }.config
|
||||
}
|
||||
}
|
||||
}
|
||||
mut out := app.job_queue.pop_n(arch, max).map(it.config)
|
||||
|
||||
return app.json(.ok, new_data_response(out))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,13 +5,14 @@ import conf as vconf
|
|||
|
||||
struct Config {
|
||||
pub:
|
||||
log_level string = 'WARN'
|
||||
pkg_dir string
|
||||
data_dir string
|
||||
api_key string
|
||||
default_arch string
|
||||
log_level string = 'WARN'
|
||||
pkg_dir string
|
||||
data_dir string
|
||||
api_key string
|
||||
default_arch string
|
||||
global_schedule string = '0 3'
|
||||
port int = 8000
|
||||
port int = 8000
|
||||
base_image string = 'archlinux:base-devel'
|
||||
}
|
||||
|
||||
// cmd returns the cli submodule that handles starting the server
|
||||
|
|
|
|||
|
|
@ -6,9 +6,7 @@ import log
|
|||
import repo
|
||||
import util
|
||||
import db
|
||||
import datatypes { MinHeap }
|
||||
import build { BuildConfig }
|
||||
import time
|
||||
import build { BuildJobQueue }
|
||||
import cron.expression
|
||||
|
||||
const (
|
||||
|
|
@ -18,17 +16,6 @@ const (
|
|||
logs_dir_name = 'logs'
|
||||
)
|
||||
|
||||
struct ScheduledBuild {
|
||||
pub:
|
||||
timestamp time.Time
|
||||
config BuildConfig
|
||||
}
|
||||
|
||||
// Overloaded operator for comparing ScheduledBuild objects
|
||||
fn (r1 ScheduledBuild) < (r2 ScheduledBuild) bool {
|
||||
return r1.timestamp < r2.timestamp
|
||||
}
|
||||
|
||||
struct App {
|
||||
web.Context
|
||||
pub:
|
||||
|
|
@ -36,37 +23,24 @@ pub:
|
|||
pub mut:
|
||||
repo repo.RepoGroupManager [required; web_global]
|
||||
// Keys are the various architectures for packages
|
||||
build_queues shared map[string]MinHeap<ScheduledBuild>
|
||||
db db.VieterDb
|
||||
job_queue BuildJobQueue [required; web_global]
|
||||
db db.VieterDb
|
||||
}
|
||||
|
||||
fn (mut app App) init_build_queues() {
|
||||
fn (mut app App) init_job_queue() ! {
|
||||
// Initialize build queues
|
||||
mut i := 0
|
||||
mut targets := app.db.get_targets(limit: 25)
|
||||
|
||||
default_ce := expression.parse_expression(conf.global_schedule) or {
|
||||
return
|
||||
}
|
||||
mut i := u64(0)
|
||||
|
||||
for targets.len > 0 {
|
||||
for t in targets {
|
||||
ce := parse_expression(t.schedule) or { default_ce }
|
||||
|
||||
for arch in t.arch {
|
||||
if arch !in app.build_queues {
|
||||
app.build_queues[arch] = Minheap<ScheduledBuild>{}
|
||||
}
|
||||
|
||||
build_config := BuildConfig{
|
||||
|
||||
}
|
||||
app.build_queues[arch].push(ScheduledBuild{
|
||||
timestamp: ce.next()
|
||||
config: build_config
|
||||
})
|
||||
for target in targets {
|
||||
for arch in target.arch {
|
||||
app.job_queue.insert(target, arch.value)!
|
||||
}
|
||||
}
|
||||
|
||||
i += 25
|
||||
targets = app.db.get_targets(limit: 25, offset: i)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -77,6 +51,10 @@ pub fn server(conf Config) ! {
|
|||
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
|
||||
}
|
||||
|
||||
global_ce := expression.parse_expression(conf.global_schedule) or {
|
||||
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
|
||||
}
|
||||
|
||||
// Configure logger
|
||||
log_level := log.level_from_tag(conf.log_level) or {
|
||||
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
|
||||
|
|
@ -118,12 +96,17 @@ pub fn server(conf Config) ! {
|
|||
util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
|
||||
}
|
||||
|
||||
web.run(&App{
|
||||
mut app := &App{
|
||||
logger: logger
|
||||
api_key: conf.api_key
|
||||
conf: conf
|
||||
repo: repo
|
||||
db: db
|
||||
build_queues: map[string]MinHeap<ScheduledBuild>{}
|
||||
}, conf.port)
|
||||
job_queue: build.new_job_queue(global_ce, conf.base_image)
|
||||
}
|
||||
app.init_job_queue() or {
|
||||
util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()')
|
||||
}
|
||||
|
||||
web.run(app, conf.port)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue