Compare commits

...

9 Commits

14 changed files with 491 additions and 46 deletions

View File

@ -20,6 +20,6 @@ pub fn agent(conf Config) ! {
logger.set_full_logpath(log_file) logger.set_full_logpath(log_file)
logger.log_to_console_too() logger.log_to_console_too()
mut d := agent.agent_init(logger, conf) mut d := agent_init(logger, conf)
d.run() d.run()
} }

View File

@ -6,14 +6,16 @@ import conf as vconf
struct Config { struct Config {
pub: pub:
log_level string = 'WARN' log_level string = 'WARN'
// Architecture that the agent represents
arch string
api_key string api_key string
address string address string
data_dir string data_dir string
max_concurrent_builds int = 1 max_concurrent_builds int = 1
polling_frequency int = 30 polling_frequency int = 30
// Architecture of agent // Architecture of agent
/* arch string */ // arch string
/* image_rebuild_frequency int = 1440 */ image_rebuild_frequency int = 1440
} }
// cmd returns the cli module that handles the cron daemon. // cmd returns the cli module that handles the cron daemon.

View File

@ -4,6 +4,8 @@ import log
import sync.stdatomic import sync.stdatomic
import build { BuildConfig } import build { BuildConfig }
import client import client
import time
import os
const ( const (
build_empty = 0 build_empty = 0
@ -14,6 +16,8 @@ const (
struct AgentDaemon { struct AgentDaemon {
logger shared log.Log logger shared log.Log
conf Config conf Config
mut:
images ImageManager
// Which builds are currently running; length is same as // Which builds are currently running; length is same as
// conf.max_concurrent_builds // conf.max_concurrent_builds
builds []BuildConfig builds []BuildConfig
@ -28,6 +32,7 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
logger: logger logger: logger
client: client.new(conf.address, conf.api_key) client: client.new(conf.address, conf.api_key)
conf: conf conf: conf
images: new_image_manager(conf.image_rebuild_frequency)
builds: []BuildConfig{len: conf.max_concurrent_builds} builds: []BuildConfig{len: conf.max_concurrent_builds}
atomics: []u64{len: conf.max_concurrent_builds} atomics: []u64{len: conf.max_concurrent_builds}
} }
@ -36,18 +41,60 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
} }
pub fn (mut d AgentDaemon) run() { pub fn (mut d AgentDaemon) run() {
// This is just so that the very first time the loop is ran, the jobs are
// always polled
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
for { for {
free_builds := d.update_atomics() free_builds := d.update_atomics()
if free_builds > 0 { // All build slots are taken, so there's nothing to be done
if free_builds == 0 {
time.sleep(1 * time.second)
continue
} }
// Builds have finished, so old builder images might have freed up.
d.images.clean_old_images()
// Poll for new jobs
if time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) {
new_configs := d.client.poll_jobs(d.conf.arch, free_builds) or {
d.lerror('Failed to poll jobs: $err.msg()')
time.sleep(5 * time.second)
continue
}
last_poll_time = time.now()
// Schedule new jobs
for config in new_configs {
// TODO handle this better than to just skip the config
// Make sure a recent build base image is available for building the config
d.images.refresh_image(config.base_image) or {
d.lerror(err.msg())
continue
}
d.start_build(config)
}
time.sleep(1 * time.second)
}
// Builds are running, so check again after one second
else if free_builds < d.conf.max_concurrent_builds {
time.sleep(1 * time.second)
}
// The agent is not doing anything, so we just wait until the next poll
// time
else {
time_until_next_poll := time.now() - last_poll_time
time.sleep(time_until_next_poll)
}
} }
} }
// clean_finished_builds checks for each build whether it's completed, and sets // update_atomics checks for each build whether it's completed, and sets it to
// it to free again if so. The return value is how many fields are now set to // free again if so. The return value is how many build slots are currently
// free. // free.
fn (mut d AgentDaemon) update_atomics() int { fn (mut d AgentDaemon) update_atomics() int {
mut count := 0 mut count := 0
@ -63,3 +110,53 @@ fn (mut d AgentDaemon) update_atomics() int {
return count return count
} }
// start_build starts a build for the given BuildConfig object.
fn (mut d AgentDaemon) start_build(config BuildConfig) bool {
for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
stdatomic.store_u64(&d.atomics[i], agent.build_running)
d.builds[i] = config
go d.run_build(i, config)
return true
}
}
return false
}
// run_build actually starts the build process for a given target.
fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
d.linfo('started build: $config.url -> $config.repo')
// 0 means success, 1 means failure
mut status := 0
new_config := BuildConfig{
...config
base_image: d.images.get(config.base_image)
}
res := build.build_config(d.client.address, d.client.api_key, new_config) or {
d.ldebug('build_config error: $err.msg()')
status = 1
build.BuildResult{}
}
if status == 0 {
d.linfo('finished build: $config.url -> $config.repo; uploading logs...')
build_arch := os.uname().machine
d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch,
res.exit_code, res.logs) or {
d.lerror('Failed to upload logs for build: $config.url -> $config.repo')
}
} else {
d.linfo('an error occured during build: $config.url -> $config.repo')
}
stdatomic.store_u64(&d.atomics[build_index], agent.build_done)
}

64
src/agent/images.v 100644
View File

@ -0,0 +1,64 @@
module agent
import time
import docker
import build
struct ImageManager {
mut:
refresh_frequency int
images map[string][]string [required]
timestamps map[string]time.Time [required]
}
fn new_image_manager(refresh_frequency int) ImageManager {
return ImageManager{
refresh_frequency: refresh_frequency
images: map[string][]string{}
timestamps: map[string]time.Time{}
}
}
pub fn (m &ImageManager) get(base_image string) string {
return m.images[base_image].last()
}
fn (mut m ImageManager) refresh_image(base_image string) ! {
// No need to refresh the image if the previous one is still new enough
if base_image in m.timestamps
&& m.timestamps[base_image].add_seconds(m.refresh_frequency) > time.now() {
return
}
// TODO use better image tags for built images
new_image := build.create_build_image(base_image) or {
return error('Failed to build builder image from base image $base_image')
}
m.images[base_image] << new_image
m.timestamps[base_image] = time.now()
}
// clean_old_images tries to remove any old but still present builder images.
fn (mut m ImageManager) clean_old_images() {
mut dd := docker.new_conn() or { return }
defer {
dd.close() or {}
}
mut i := 0
for image in m.images.keys() {
i = 0
for i < m.images[image].len - 1 {
// For each builder image, we try to remove it by calling the Docker
// API. If the function returns an error or false, that means the image
// wasn't deleted. Therefore, we move the index over. If the function
// returns true, the array's length has decreased by one so we don't
// move the index.
dd.remove_image(m.images[image][i]) or { i += 1 }
}
}
}

35
src/agent/log.v 100644
View File

@ -0,0 +1,35 @@
module agent
import log
// log reate a log message with the given level
pub fn (mut d AgentDaemon) log(msg string, level log.Level) {
lock d.logger {
d.logger.send_output(msg, level)
}
}
// lfatal create a log message with the fatal level
pub fn (mut d AgentDaemon) lfatal(msg string) {
d.log(msg, log.Level.fatal)
}
// lerror create a log message with the error level
pub fn (mut d AgentDaemon) lerror(msg string) {
d.log(msg, log.Level.error)
}
// lwarn create a log message with the warn level
pub fn (mut d AgentDaemon) lwarn(msg string) {
d.log(msg, log.Level.warn)
}
// linfo create a log message with the info level
pub fn (mut d AgentDaemon) linfo(msg string) {
d.log(msg, log.Level.info)
}
// ldebug create a log message with the debug level
pub fn (mut d AgentDaemon) ldebug(msg string) {
d.log(msg, log.Level.debug)
}

View File

@ -18,7 +18,7 @@ const (
pub struct BuildConfig { pub struct BuildConfig {
pub: pub:
id int target_id int
kind string kind string
url string url string
branch string branch string
@ -103,10 +103,23 @@ pub:
logs string logs string
} }
pub fn build_target(address string, api_key string, base_image_id string, target &Target) !BuildResult {
config := BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
base_image: base_image_id
}
return build_config(address, api_key, config)
}
// build_target builds, packages & publishes a given Arch package based on the // build_target builds, packages & publishes a given Arch package based on the
// provided target. The base image ID should be of an image previously created // provided target. The base image ID should be of an image previously created
// by create_build_image. It returns the logs of the container. // by create_build_image. It returns the logs of the container.
pub fn build_target(address string, api_key string, base_image_id string, target &Target) !BuildResult { pub fn build_config(address string, api_key string, config BuildConfig) !BuildResult {
mut dd := docker.new_conn()! mut dd := docker.new_conn()!
defer { defer {
@ -114,14 +127,14 @@ pub fn build_target(address string, api_key string, base_image_id string, target
} }
build_arch := os.uname().machine build_arch := os.uname().machine
build_script := create_build_script(address, target, build_arch) build_script := create_build_script(address, config, build_arch)
// We convert the build script into a base64 string, which then gets passed // We convert the build script into a base64 string, which then gets passed
// to the container as an env var // to the container as an env var
base64_script := base64.encode_str(build_script) base64_script := base64.encode_str(build_script)
c := docker.NewContainer{ c := docker.NewContainer{
image: '$base_image_id' image: '$config.base_image'
env: [ env: [
'BUILD_SCRIPT=$base64_script', 'BUILD_SCRIPT=$base64_script',
'API_KEY=$api_key', 'API_KEY=$api_key',

168
src/build/queue.v 100644
View File

@ -0,0 +1,168 @@
module build
import models { Target }
import cron.expression { CronExpression, parse_expression }
import time
import datatypes { MinHeap }
import util
struct BuildJob {
pub:
// Next timestamp from which point this job is allowed to be executed
timestamp time.Time
// Required for calculating next timestamp after having pop'ed a job
ce CronExpression
// Actual build config sent to the agent
config BuildConfig
}
// Allows BuildJob structs to be sorted according to their timestamp in
// MinHeaps
fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp
}
pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression
// Base image to use for targets without defined base image
default_base_image string
mut:
mutex shared util.Dummy
// For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob>
// Each queued build job is also stored in a map, with the keys being the
// target IDs. This is used when removing or editing targets.
// jobs map[int]BuildJob
}
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{
default_schedule: default_schedule
default_base_image: default_base_image
}
}
// insert a new target's job into the queue for the given architecture. This
// job will then be endlessly rescheduled after being pop'ed, unless removed
// explicitely.
pub fn (mut q BuildJobQueue) insert(target Target, arch string) ! {
lock q.mutex {
if arch !in q.queues {
q.queues[arch] = MinHeap<BuildJob>{}
}
ce := if target.schedule != '' {
parse_expression(target.schedule) or {
return error("Error while parsing cron expression '$target.schedule' (id $target.id): $err.msg()")
}
} else {
q.default_schedule
}
timestamp := ce.next_from_now()!
job := BuildJob{
timestamp: timestamp
ce: ce
config: BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
// TODO make this configurable
base_image: q.default_base_image
}
}
q.queues[arch].insert(job)
}
}
// reschedule the given job by calculating the next timestamp and re-adding it
// to its respective queue. This function is called by the pop functions
// *after* having pop'ed the job.
fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
new_timestamp := job.ce.next_from_now()!
new_job := BuildJob{
...job
timestamp: new_timestamp
}
q.queues[arch].insert(new_job)
}
// peek shows the first job for the given architecture that's ready to be
// executed, if present.
pub fn (q &BuildJobQueue) peek(arch string) ?BuildJob {
rlock q.mutex {
if arch !in q.queues {
return none
}
job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
return job
}
}
return none
}
// pop removes the first job for the given architecture that's ready to be
// executed from the queue and returns it, if present.
pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
lock q.mutex {
if arch !in q.queues {
return none
}
mut job := q.queues[arch].peek() or { return none }
if job.timestamp < time.now() {
job = q.queues[arch].pop()?
// TODO how do we handle this properly? Is it even possible for a
// cron expression to not return a next time if it's already been
// used before?
q.reschedule(job, arch) or {}
return job
}
}
return none
}
// pop_n tries to pop at most n available jobs for the given architecture.
pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
lock q.mutex {
if arch !in q.queues {
return []
}
mut out := []BuildJob{}
for out.len < n {
mut job := q.queues[arch].peek() or { break }
if job.timestamp < time.now() {
job = q.queues[arch].pop() or { break }
// TODO idem
q.reschedule(job, arch) or {}
out << job
} else {
break
}
}
return out
}
return []
}

View File

@ -1,7 +1,5 @@
module build module build
import models { Target }
// escape_shell_string escapes any characters that could be interpreted // escape_shell_string escapes any characters that could be interpreted
// incorrectly by a shell. The resulting value should be safe to use inside an // incorrectly by a shell. The resulting value should be safe to use inside an
// echo statement. // echo statement.
@ -23,13 +21,13 @@ pub fn echo_commands(cmds []string) []string {
} }
// create_build_script generates a shell script that builds a given Target. // create_build_script generates a shell script that builds a given Target.
fn create_build_script(address string, target &Target, build_arch string) string { fn create_build_script(address string, config BuildConfig, build_arch string) string {
repo_url := '$address/$target.repo' repo_url := '$address/$config.repo'
mut commands := [ mut commands := [
// This will later be replaced by a proper setting for changing the // This will later be replaced by a proper setting for changing the
// mirrorlist // mirrorlist
"echo -e '[$target.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf" "echo -e '[$config.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf"
// We need to update the package list of the repo we just added above. // We need to update the package list of the repo we just added above.
// This should however not pull in a lot of packages as long as the // This should however not pull in a lot of packages as long as the
// builder image is rebuilt frequently. // builder image is rebuilt frequently.
@ -38,22 +36,22 @@ fn create_build_script(address string, target &Target, build_arch string) string
'su builder', 'su builder',
] ]
commands << match target.kind { commands << match config.kind {
'git' { 'git' {
if target.branch == '' { if config.branch == '' {
[ [
"git clone --single-branch --depth 1 '$target.url' repo", "git clone --single-branch --depth 1 '$config.url' repo",
] ]
} else { } else {
[ [
"git clone --single-branch --depth 1 --branch $target.branch '$target.url' repo", "git clone --single-branch --depth 1 --branch $config.branch '$config.url' repo",
] ]
} }
} }
'url' { 'url' {
[ [
'mkdir repo', 'mkdir repo',
"curl -o repo/PKGBUILD -L '$target.url'", "curl -o repo/PKGBUILD -L '$config.url'",
] ]
} }
else { else {

12
src/client/jobs.v 100644
View File

@ -0,0 +1,12 @@
module client
import build { BuildConfig }
pub fn (c &Client) poll_jobs(arch string, max int) ![]BuildConfig {
data := c.send_request<[]BuildConfig>(.get, '/api/v1/jobs/poll', {
'arch': arch
'max': max.str()
})!
return data.data
}

View File

@ -41,7 +41,7 @@ fn main() {
schedule.cmd(), schedule.cmd(),
man.cmd(), man.cmd(),
aur.cmd(), aur.cmd(),
agent.cmd() agent.cmd(),
] ]
} }
app.setup() app.setup()

View File

@ -0,0 +1,23 @@
module server
import web
import web.response { new_data_response, new_response }
// import os
// import util
// import models { BuildLog, BuildLogFilter }
['/api/v1/jobs/poll'; auth; get]
fn (mut app App) v1_poll_job_queue() web.Result {
arch := app.query['arch'] or {
return app.json(.bad_request, new_response('Missing arch query arg.'))
}
max_str := app.query['max'] or {
return app.json(.bad_request, new_response('Missing max query arg.'))
}
max := max_str.int()
mut out := app.job_queue.pop_n(arch, max).map(it.config)
return app.json(.ok, new_data_response(out))
}

View File

@ -10,7 +10,9 @@ pub:
data_dir string data_dir string
api_key string api_key string
default_arch string default_arch string
global_schedule string = '0 3'
port int = 8000 port int = 8000
base_image string = 'archlinux:base-devel'
} }
// cmd returns the cli submodule that handles starting the server // cmd returns the cli submodule that handles starting the server

View File

@ -6,6 +6,8 @@ import log
import repo import repo
import util import util
import db import db
import build { BuildJobQueue }
import cron.expression
const ( const (
log_file_name = 'vieter.log' log_file_name = 'vieter.log'
@ -20,9 +22,28 @@ pub:
conf Config [required; web_global] conf Config [required; web_global]
pub mut: pub mut:
repo repo.RepoGroupManager [required; web_global] repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages
job_queue BuildJobQueue [required; web_global]
db db.VieterDb db db.VieterDb
} }
fn (mut app App) init_job_queue() ! {
// Initialize build queues
mut targets := app.db.get_targets(limit: 25)
mut i := u64(0)
for targets.len > 0 {
for target in targets {
for arch in target.arch {
app.job_queue.insert(target, arch.value)!
}
}
i += 25
targets = app.db.get_targets(limit: 25, offset: i)
}
}
// server starts the web server & starts listening for requests // server starts the web server & starts listening for requests
pub fn server(conf Config) ! { pub fn server(conf Config) ! {
// Prevent using 'any' as the default arch // Prevent using 'any' as the default arch
@ -30,6 +51,10 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
} }
global_ce := expression.parse_expression(conf.global_schedule) or {
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
}
// Configure logger // Configure logger
log_level := log.level_from_tag(conf.log_level) or { log_level := log.level_from_tag(conf.log_level) or {
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
@ -71,11 +96,17 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()') util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
} }
web.run(&App{ mut app := &App{
logger: logger logger: logger
api_key: conf.api_key api_key: conf.api_key
conf: conf conf: conf
repo: repo repo: repo
db: db db: db
}, conf.port) job_queue: build.new_job_queue(global_ce, conf.base_image)
}
app.init_job_queue() or {
util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()')
}
web.run(app, conf.port)
} }

View File

@ -4,6 +4,7 @@ data_dir = "data"
pkg_dir = "data/pkgs" pkg_dir = "data/pkgs"
log_level = "DEBUG" log_level = "DEBUG"
default_arch = "x86_64" default_arch = "x86_64"
arch = "x86_64"
address = "http://localhost:8000" address = "http://localhost:8000"
@ -11,4 +12,3 @@ global_schedule = '* *'
api_update_frequency = 2 api_update_frequency = 2
image_rebuild_frequency = 1 image_rebuild_frequency = 1
max_concurrent_builds = 3 max_concurrent_builds = 3