Polling-based agent-server architecture #301

Merged
Jef Roosens merged 22 commits from Chewing_Bever/vieter:agent-server-polling into dev 2022-12-14 17:34:48 +01:00
26 changed files with 874 additions and 68 deletions

View File

@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Migrated codebase to V 0.3.2 * Migrated codebase to V 0.3.2
* Cron expression parser now uses bitfields instead of bool arrays * Cron expression parser now uses bitfields instead of bool arrays
* Added option to deploy using agent-server architecture instead of cron daemon
* Allow force-building packages, meaning the build won't check if the
repository is already up to date
* Allow scheduling builds on the server from the CLI tool instead of building
them locally
### Fixed ### Fixed
@ -19,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* CLI no longer exits with non-zero status code when removing/patching * CLI no longer exits with non-zero status code when removing/patching
target target
* Allow NULL values for branch in database * Allow NULL values for branch in database
* Endpoint for adding targets now returns the correct id
## [0.4.0](https://git.rustybever.be/vieter-v/vieter/src/tag/0.4.0) ## [0.4.0](https://git.rustybever.be/vieter-v/vieter/src/tag/0.4.0)

View File

@ -21,7 +21,8 @@ quicker.
I chose [V](https://vlang.io/) as I've been very intrigued by this language for I chose [V](https://vlang.io/) as I've been very intrigued by this language for
a while now. I wanted a fast language that I could code while relaxing, without a while now. I wanted a fast language that I could code while relaxing, without
having to exert too much mental effort & V seemed like the right choice for having to exert too much mental effort & V seemed like the right choice for
that. that. Sadly, this didn't quite turn out the way I expected, but I'm sticking
with it anyways ;p
## Features ## Features
@ -49,7 +50,7 @@ update`.
I used to maintain a mirror that tracked the latest master, but nowadays, I I used to maintain a mirror that tracked the latest master, but nowadays, I
maintain a Docker image containing the specific compiler version that Vieter maintain a Docker image containing the specific compiler version that Vieter
builds with. Currently, this is V 0.3. builds with. Currently, this is V 0.3.2.
## Contributing ## Contributing

View File

@ -97,3 +97,25 @@ configuration variable required for each command.
build`. build`.
* Default: `archlinux:base-devel` * Default: `archlinux:base-devel`
### `vieter agent`
* `log_level`: log verbosity level. Value should be one of `FATAL`, `ERROR`,
`WARN`, `INFO` or `DEBUG`.
* Default: `WARN`
* `address`: *public* URL of the Vieter repository server to build for. From
this server jobs are retrieved. All built packages are published to this
server.
* `api_key`: API key of the above server.
* `data_dir`: directory to store log file in.
* `max_concurrent_builds`: how many builds to run at the same time.
* Default: `1`
* `polling_frequency`: how often (in seconds) to poll the server for new
builds. Note that the agent might poll more frequently when it's actively
processing builds.
* `image_rebuild_frequency`: Vieter periodically builds images that are then
used as a basis for running build containers. This is to prevent each build
from downloading an entire repository worth of dependencies. This setting
defines how frequently (in minutes) to rebuild these images.
* Default: `1440` (every 24 hours)
* `arch`: architecture for which this agent should pull down builds (e.g.
`x86_64`)

View File

@ -21,7 +21,7 @@ branch. This branch will be the most up to date, but does not give any
guarantees about stability, so beware! guarantees about stability, so beware!
Thanks to the single-binary design of Vieter, this image can be used both for Thanks to the single-binary design of Vieter, this image can be used both for
the repository server & the cron daemon. the repository server, the cron daemon and the agent.
Below is an example compose file to set up both the repository server & the Below is an example compose file to set up both the repository server & the
cron daemon: cron daemon:
@ -76,7 +76,7 @@ architectures will build on both.
## Binary ## Binary
On the On the
[releases](https://git.rustybever.be/vieter/vieter/releases) [releases](https://git.rustybever.be/vieter-v/vieter/releases)
page, you can find statically compiled binaries for all page, you can find statically compiled binaries for all
released versions. This is the same binary as used inside released versions. This is the same binary as used inside
the Docker images. the Docker images.
@ -106,5 +106,5 @@ guarantee that a compiler update won't temporarily break them.
## Building from source ## Building from source
The project [README](https://git.rustybever.be/vieter/vieter#building) contains The project [README](https://git.rustybever.be/vieter-v/vieter#building)
instructions for building Vieter from source. contains instructions for building Vieter from source.

View File

@ -37,6 +37,6 @@ Each section can consist of as many of these parts as necessary.
## CLI tool ## CLI tool
The Vieter binary contains a command that shows you the next matching times for The Vieter binary contains a command that shows you the next matching times for
a given expression. This can be useful to understand the syntax. For more a given expression. This can be useful for understanding the syntax. For more
information, see information, see
[vieter-schedule(1)](https://rustybever.be/man/vieter/vieter-schedule.1.html). [vieter-schedule(1)](https://rustybever.be/man/vieter/vieter-schedule.1.html).

27
src/agent/agent.v 100644
View File

@ -0,0 +1,27 @@
module agent
import log
import os
import util
const log_file_name = 'vieter.agent.log'
// agent starts an agent service
pub fn agent(conf Config) ! {
log_level := log.level_from_tag(conf.log_level) or {
return error('Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
}
mut logger := log.Log{
level: log_level
}
os.mkdir_all(conf.data_dir) or { util.exit_with_message(1, 'Failed to create data directory.') }
log_file := os.join_path_single(conf.data_dir, agent.log_file_name)
logger.set_full_logpath(log_file)
logger.log_to_console_too()
mut d := agent_init(logger, conf)
d.run()
}

31
src/agent/cli.v 100644
View File

@ -0,0 +1,31 @@
module agent
import cli
import conf as vconf
struct Config {
pub:
log_level string = 'WARN'
// Architecture that the agent represents
arch string
api_key string
address string
data_dir string
max_concurrent_builds int = 1
polling_frequency int = 30
image_rebuild_frequency int = 1440
}
// cmd returns the cli module that handles the cron daemon.
pub fn cmd() cli.Command {
return cli.Command{
name: 'agent'
description: 'Start an agent daemon.'
execute: fn (cmd cli.Command) ! {
config_file := cmd.flags.get_string('config-file')!
conf := vconf.load<Config>(prefix: 'VIETER_', default_path: config_file)!
agent(conf)!
}
}
}

178
src/agent/daemon.v 100644
View File

@ -0,0 +1,178 @@
module agent
import log
import sync.stdatomic
import build { BuildConfig }
import client
import time
import os
const (
build_empty = 0
build_running = 1
build_done = 2
)
struct AgentDaemon {
logger shared log.Log
conf Config
client client.Client
mut:
images ImageManager
// Which builds are currently running; length is conf.max_concurrent_builds
builds []BuildConfig
// Atomic variables used to detect when a build has finished; length is
// conf.max_concurrent_builds
atomics []u64
}
// agent_init initializes a new agent
fn agent_init(logger log.Log, conf Config) AgentDaemon {
mut d := AgentDaemon{
logger: logger
client: client.new(conf.address, conf.api_key)
conf: conf
images: new_image_manager(conf.image_rebuild_frequency * 60)
builds: []BuildConfig{len: conf.max_concurrent_builds}
atomics: []u64{len: conf.max_concurrent_builds}
}
return d
}
// run starts the actual agent daemon. This function will run forever.
pub fn (mut d AgentDaemon) run() {
// This is just so that the very first time the loop is ran, the jobs are
// always polled
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
mut sleep_time := 1 * time.second
mut finished, mut empty := 0, 0
for {
finished, empty = d.update_atomics()
// No new finished builds and no free slots, so there's nothing to be
// done
if finished + empty == 0 {
time.sleep(1 * time.second)
continue
}
// Builds have finished, so old builder images might have freed up.
// TODO this might query the docker daemon too frequently.
if finished > 0 {
d.images.clean_old_images()
}
// The agent will always poll for new jobs after at most
// `polling_frequency` seconds. However, when jobs have finished, the
// agent will also poll for new jobs. This is because jobs are often
// clustered together (especially when mostly using the global cron
// schedule), so there's a much higher chance jobs are available.
if finished > 0 || time.now() >= last_poll_time.add_seconds(d.conf.polling_frequency) {
new_configs := d.client.poll_jobs(d.conf.arch, finished + empty) or {
d.lerror('Failed to poll jobs: $err.msg()')
// TODO pick a better delay here
time.sleep(5 * time.second)
continue
}
last_poll_time = time.now()
for config in new_configs {
// TODO handle this better than to just skip the config
// Make sure a recent build base image is available for
// building the config
d.images.refresh_image(config.base_image) or {
d.lerror(err.msg())
continue
}
d.start_build(config)
}
// No new jobs were scheduled and the agent isn't doing anything,
// so we just wait until the next polling period.
if new_configs.len == 0 && finished + empty == d.conf.max_concurrent_builds {
sleep_time = time.now() - last_poll_time
}
}
// The agent is not doing anything, so we just wait until the next poll
// time
else if finished + empty == d.conf.max_concurrent_builds {
sleep_time = time.now() - last_poll_time
}
time.sleep(sleep_time)
}
}
// update_atomics checks for each build whether it's completed, and sets it to
// empty again if so. The return value is a tuple `(finished, empty)` where
// `finished` is how many builds were just finished and thus set to empty, and
// `empty` is how many build slots were already empty. The amount of running
// builds can then be calculated by substracting these two values from the
// total allowed concurrent builds.
fn (mut d AgentDaemon) update_atomics() (int, int) {
mut finished := 0
mut empty := 0
for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_done {
stdatomic.store_u64(&d.atomics[i], agent.build_empty)
finished++
} else if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
empty++
}
}
return finished, empty
}
// start_build starts a build for the given BuildConfig.
fn (mut d AgentDaemon) start_build(config BuildConfig) bool {
for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
stdatomic.store_u64(&d.atomics[i], agent.build_running)
d.builds[i] = config
go d.run_build(i, config)
return true
}
}
return false
}
// run_build actually starts the build process for a given target.
fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
d.linfo('started build: $config')
// 0 means success, 1 means failure
mut status := 0
new_config := BuildConfig{
...config
base_image: d.images.get(config.base_image)
}
res := build.build_config(d.client.address, d.client.api_key, new_config) or {
d.ldebug('build_config error: $err.msg()')
status = 1
build.BuildResult{}
}
if status == 0 {
d.linfo('Uploading build logs for $config')
// TODO use the arch value here
build_arch := os.uname().machine
d.client.add_build_log(config.target_id, res.start_time, res.end_time, build_arch,
res.exit_code, res.logs) or { d.lerror('Failed to upload logs for $config') }
} else {
d.lwarn('an error occurred during build: $config')
}
stdatomic.store_u64(&d.atomics[build_index], agent.build_done)
}

79
src/agent/images.v 100644
View File

@ -0,0 +1,79 @@
module agent
import time
import docker
import build
// An ImageManager is a utility that creates builder images from given base
// images, updating these builder images if they've become too old. This
// structure can manage images from any number of base images, paving the way
// for configurable base images per target/repository.
struct ImageManager {
max_image_age int [required]
mut:
// For each base image, one or more builder images can exist at the same
// time
images map[string][]string [required]
// For each base image, we track when its newest image was built
timestamps map[string]time.Time [required]
}
// new_image_manager initializes a new image manager.
fn new_image_manager(max_image_age int) ImageManager {
return ImageManager{
max_image_age: max_image_age
images: map[string][]string{}
timestamps: map[string]time.Time{}
}
}
// get returns the name of the newest image for the given base image. Note that
// this function should only be called *after* a first call to `refresh_image`.
pub fn (m &ImageManager) get(base_image string) string {
return m.images[base_image].last()
}
// refresh_image builds a new builder image from the given base image if the
// previous builder image is too old or non-existent. This function will do
// nothing if these conditions aren't met, so it's safe to call it every time
// you want to ensure an image is up to date.
fn (mut m ImageManager) refresh_image(base_image string) ! {
if base_image in m.timestamps
&& m.timestamps[base_image].add_seconds(m.max_image_age) > time.now() {
return
}
// TODO use better image tags for built images
new_image := build.create_build_image(base_image) or {
return error('Failed to build builder image from base image $base_image')
}
m.images[base_image] << new_image
m.timestamps[base_image] = time.now()
}
// clean_old_images removes all older builder images that are no longer in use.
// The function will always leave at least one builder image, namely the newest
// one.
fn (mut m ImageManager) clean_old_images() {
mut dd := docker.new_conn() or { return }
defer {
dd.close() or {}
}
mut i := 0
for image in m.images.keys() {
i = 0
for i < m.images[image].len - 1 {
// For each builder image, we try to remove it by calling the Docker
// API. If the function returns an error or false, that means the image
// wasn't deleted. Therefore, we move the index over. If the function
// returns true, the array's length has decreased by one so we don't
// move the index.
dd.remove_image(m.images[image][i]) or { i += 1 }
}
}
}

35
src/agent/log.v 100644
View File

@ -0,0 +1,35 @@
module agent
import log
// log a message with the given level
pub fn (mut d AgentDaemon) log(msg string, level log.Level) {
lock d.logger {
d.logger.send_output(msg, level)
}
}
// lfatal create a log message with the fatal level
pub fn (mut d AgentDaemon) lfatal(msg string) {
d.log(msg, log.Level.fatal)
}
// lerror create a log message with the error level
pub fn (mut d AgentDaemon) lerror(msg string) {
d.log(msg, log.Level.error)
}
// lwarn create a log message with the warn level
pub fn (mut d AgentDaemon) lwarn(msg string) {
d.log(msg, log.Level.warn)
}
// linfo create a log message with the info level
pub fn (mut d AgentDaemon) linfo(msg string) {
d.log(msg, log.Level.info)
}
// ldebug create a log message with the debug level
pub fn (mut d AgentDaemon) ldebug(msg string) {
d.log(msg, log.Level.debug)
}

View File

@ -16,6 +16,22 @@ const (
'/usr/local/bin', '/usr/bin/site_perl', '/usr/bin/vendor_perl', '/usr/bin/core_perl'] '/usr/local/bin', '/usr/bin/site_perl', '/usr/bin/vendor_perl', '/usr/bin/core_perl']
) )
pub struct BuildConfig {
pub:
target_id int
kind string
url string
branch string
repo string
base_image string
force bool
}
// str return a single-line string representation of a build log
pub fn (c BuildConfig) str() string {
return '{ target: $c.target_id, kind: $c.kind, url: $c.url, branch: $c.branch, repo: $c.repo, base_image: $c.base_image, force: $c.force }'
}
// create_build_image creates a builder image given some base image which can // create_build_image creates a builder image given some base image which can
// then be used to build & package Arch images. It mostly just updates the // then be used to build & package Arch images. It mostly just updates the
// system, install some necessary packages & creates a non-root user to run // system, install some necessary packages & creates a non-root user to run
@ -93,10 +109,25 @@ pub:
logs string logs string
} }
// build_target builds, packages & publishes a given Arch package based on the // build_target builds the given target. Internally it calls `build_config`.
pub fn build_target(address string, api_key string, base_image_id string, target &Target, force bool) !BuildResult {
config := BuildConfig{
target_id: target.id
kind: target.kind
url: target.url
branch: target.branch
repo: target.repo
base_image: base_image_id
force: force
}
return build_config(address, api_key, config)
}
// build_config builds, packages & publishes a given Arch package based on the
// provided target. The base image ID should be of an image previously created // provided target. The base image ID should be of an image previously created
// by create_build_image. It returns the logs of the container. // by create_build_image. It returns the logs of the container.
pub fn build_target(address string, api_key string, base_image_id string, target &Target) !BuildResult { pub fn build_config(address string, api_key string, config BuildConfig) !BuildResult {
mut dd := docker.new_conn()! mut dd := docker.new_conn()!
defer { defer {
@ -104,14 +135,14 @@ pub fn build_target(address string, api_key string, base_image_id string, target
} }
build_arch := os.uname().machine build_arch := os.uname().machine
build_script := create_build_script(address, target, build_arch) build_script := create_build_script(address, config, build_arch)
// We convert the build script into a base64 string, which then gets passed // We convert the build script into a base64 string, which then gets passed
// to the container as an env var // to the container as an env var
base64_script := base64.encode_str(build_script) base64_script := base64.encode_str(build_script)
c := docker.NewContainer{ c := docker.NewContainer{
image: '$base_image_id' image: '$config.base_image'
env: [ env: [
'BUILD_SCRIPT=$base64_script', 'BUILD_SCRIPT=$base64_script',
'API_KEY=$api_key', 'API_KEY=$api_key',

229
src/build/queue.v 100644
View File

@ -0,0 +1,229 @@
module build
import models { Target }
import cron.expression { CronExpression, parse_expression }
import time
import datatypes { MinHeap }
import util
struct BuildJob {
pub mut:
// Time at which this build job was created/queued
created time.Time
// Next timestamp from which point this job is allowed to be executed
timestamp time.Time
// Required for calculating next timestamp after having pop'ed a job
ce CronExpression
// Actual build config sent to the agent
config BuildConfig
// Whether this is a one-time job
single bool
}
// Allows BuildJob structs to be sorted according to their timestamp in
// MinHeaps
fn (r1 BuildJob) < (r2 BuildJob) bool {
return r1.timestamp < r2.timestamp
}
// The build job queue is responsible for managing the list of scheduled builds
// for each architecture. Agents receive jobs from this queue.
pub struct BuildJobQueue {
// Schedule to use for targets without explicitely defined cron expression
default_schedule CronExpression
// Base image to use for targets without defined base image
default_base_image string
mut:
mutex shared util.Dummy
// For each architecture, a priority queue is tracked
queues map[string]MinHeap<BuildJob>
// When a target is removed from the server or edited, its previous build
// configs will be invalid. This map allows for those to be simply skipped
// by ignoring any build configs created before this timestamp.
invalidated map[int]time.Time
}
// new_job_queue initializes a new job queue
pub fn new_job_queue(default_schedule CronExpression, default_base_image string) BuildJobQueue {
return BuildJobQueue{
default_schedule: default_schedule
default_base_image: default_base_image
invalidated: map[int]time.Time{}
}
}
// insert_all executes insert for each architecture of the given Target.
pub fn (mut q BuildJobQueue) insert_all(target Target) ! {
for arch in target.arch {
q.insert(target: target, arch: arch.value)!
}
}
[params]
pub struct InsertConfig {
target Target [required]
arch string [required]
single bool
force bool
now bool
}
// insert a new target's job into the queue for the given architecture. This
// job will then be endlessly rescheduled after being pop'ed, unless removed
// explicitely.
pub fn (mut q BuildJobQueue) insert(input InsertConfig) ! {
lock q.mutex {
if input.arch !in q.queues {
q.queues[input.arch] = MinHeap<BuildJob>{}
}
mut job := BuildJob{
created: time.now()
single: input.single
config: BuildConfig{
target_id: input.target.id
kind: input.target.kind
url: input.target.url
branch: input.target.branch
repo: input.target.repo
// TODO make this configurable
base_image: q.default_base_image
force: input.force
}
}
if !input.now {
ce := if input.target.schedule != '' {
parse_expression(input.target.schedule) or {
return error("Error while parsing cron expression '$input.target.schedule' (id $input.target.id): $err.msg()")
}
} else {
q.default_schedule
}
job.timestamp = ce.next_from_now()!
job.ce = ce
} else {
job.timestamp = time.now()
}
q.queues[input.arch].insert(job)
}
}
// reschedule the given job by calculating the next timestamp and re-adding it
// to its respective queue. This function is called by the pop functions
// *after* having pop'ed the job.
fn (mut q BuildJobQueue) reschedule(job BuildJob, arch string) ! {
new_timestamp := job.ce.next_from_now()!
new_job := BuildJob{
...job
created: time.now()
timestamp: new_timestamp
}
q.queues[arch].insert(new_job)
}
// pop_invalid pops all invalid jobs.
fn (mut q BuildJobQueue) pop_invalid(arch string) {
for {
job := q.queues[arch].peek() or { return }
if job.config.target_id in q.invalidated
&& job.created < q.invalidated[job.config.target_id] {
// This pop *should* never fail according to the source code
q.queues[arch].pop() or {}
} else {
break
}
}
}
// peek shows the first job for the given architecture that's ready to be
// executed, if present.
pub fn (mut q BuildJobQueue) peek(arch string) ?BuildJob {
// Even peek requires a write lock, because pop_invalid can modify the data
// structure
lock q.mutex {
if arch !in q.queues {
return none
}
q.pop_invalid(arch)
job := q.queues[arch].peek()?
if job.timestamp < time.now() {
return job
}
}
return none
}
// pop removes the first job for the given architecture that's ready to be
// executed from the queue and returns it, if present.
pub fn (mut q BuildJobQueue) pop(arch string) ?BuildJob {
lock q.mutex {
if arch !in q.queues {
return none
}
q.pop_invalid(arch)
mut job := q.queues[arch].peek()?
if job.timestamp < time.now() {
job = q.queues[arch].pop()?
if !job.single {
// TODO how do we handle this properly? Is it even possible for a
// cron expression to not return a next time if it's already been
// used before?
q.reschedule(job, arch) or {}
}
return job
}
}
return none
}
// pop_n tries to pop at most n available jobs for the given architecture.
pub fn (mut q BuildJobQueue) pop_n(arch string, n int) []BuildJob {
lock q.mutex {
if arch !in q.queues {
return []
}
mut out := []BuildJob{}
for out.len < n {
q.pop_invalid(arch)
mut job := q.queues[arch].peek() or { break }
if job.timestamp < time.now() {
job = q.queues[arch].pop() or { break }
if !job.single {
// TODO idem
q.reschedule(job, arch) or {}
}
out << job
} else {
break
}
}
return out
}
return []
}
// invalidate a target's old build jobs.
pub fn (mut q BuildJobQueue) invalidate(target_id int) {
q.invalidated[target_id] = time.now()
}

View File

@ -1,7 +1,5 @@
module build module build
import models { Target }
// escape_shell_string escapes any characters that could be interpreted // escape_shell_string escapes any characters that could be interpreted
// incorrectly by a shell. The resulting value should be safe to use inside an // incorrectly by a shell. The resulting value should be safe to use inside an
// echo statement. // echo statement.
@ -23,13 +21,13 @@ pub fn echo_commands(cmds []string) []string {
} }
// create_build_script generates a shell script that builds a given Target. // create_build_script generates a shell script that builds a given Target.
fn create_build_script(address string, target &Target, build_arch string) string { fn create_build_script(address string, config BuildConfig, build_arch string) string {
repo_url := '$address/$target.repo' repo_url := '$address/$config.repo'
mut commands := [ mut commands := [
// This will later be replaced by a proper setting for changing the // This will later be replaced by a proper setting for changing the
// mirrorlist // mirrorlist
"echo -e '[$target.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf" "echo -e '[$config.repo]\\nServer = $address/\$repo/\$arch\\nSigLevel = Optional' >> /etc/pacman.conf"
// We need to update the package list of the repo we just added above. // We need to update the package list of the repo we just added above.
// This should however not pull in a lot of packages as long as the // This should however not pull in a lot of packages as long as the
// builder image is rebuilt frequently. // builder image is rebuilt frequently.
@ -38,22 +36,22 @@ fn create_build_script(address string, target &Target, build_arch string) string
'su builder', 'su builder',
] ]
commands << match target.kind { commands << match config.kind {
'git' { 'git' {
if target.branch == '' { if config.branch == '' {
[ [
"git clone --single-branch --depth 1 '$target.url' repo", "git clone --single-branch --depth 1 '$config.url' repo",
] ]
} else { } else {
[ [
"git clone --single-branch --depth 1 --branch $target.branch '$target.url' repo", "git clone --single-branch --depth 1 --branch $config.branch '$config.url' repo",
] ]
} }
} }
'url' { 'url' {
[ [
'mkdir repo', 'mkdir repo',
"curl -o repo/PKGBUILD -L '$target.url'", "curl -o repo/PKGBUILD -L '$config.url'",
] ]
} }
else { else {
@ -65,14 +63,22 @@ fn create_build_script(address string, target &Target, build_arch string) string
'cd repo', 'cd repo',
'makepkg --nobuild --syncdeps --needed --noconfirm', 'makepkg --nobuild --syncdeps --needed --noconfirm',
'source PKGBUILD', 'source PKGBUILD',
]
if !config.force {
// The build container checks whether the package is already present on // The build container checks whether the package is already present on
// the server. // the server.
commands << [
'curl -s --head --fail $repo_url/$build_arch/\$pkgname-\$pkgver-\$pkgrel && exit 0', 'curl -s --head --fail $repo_url/$build_arch/\$pkgname-\$pkgver-\$pkgrel && exit 0',
// If the above curl command succeeds, we don't need to rebuild the // If the above curl command succeeds, we don't need to rebuild the
// package. However, because we're in a su shell, the exit command will // package. However, because we're in a su shell, the exit command will
// drop us back into the root shell. Therefore, we must check whether // drop us back into the root shell. Therefore, we must check whether
// we're in root so we don't proceed. // we're in root so we don't proceed.
'[ "\$(id -u)" == 0 ] && exit 0', '[ "\$(id -u)" == 0 ] && exit 0',
]
}
commands << [
'MAKEFLAGS="-j\$(nproc)" makepkg -s --noconfirm --needed && for pkg in \$(ls -1 *.pkg*); do curl -XPOST -T "\$pkg" -H "X-API-KEY: \$API_KEY" $repo_url/publish; done', 'MAKEFLAGS="-j\$(nproc)" makepkg -s --noconfirm --needed && for pkg in \$(ls -1 *.pkg*); do curl -XPOST -T "\$pkg" -H "X-API-KEY: \$API_KEY" $repo_url/publish; done',
] ]

View File

@ -1,42 +1,46 @@
module build module build
import models { Target }
fn test_create_build_script_git_branch() { fn test_create_build_script_git_branch() {
target := Target{ config := BuildConfig{
id: 1 target_id: 1
kind: 'git' kind: 'git'
url: 'https://examplerepo.com' url: 'https://examplerepo.com'
branch: 'main' branch: 'main'
repo: 'vieter' repo: 'vieter'
base_image: 'not-used:latest'
} }
build_script := create_build_script('https://example.com', target, 'x86_64')
build_script := create_build_script('https://example.com', config, 'x86_64')
expected := $embed_file('build_script_git_branch.sh') expected := $embed_file('build_script_git_branch.sh')
assert build_script == expected.to_string().trim_space() assert build_script == expected.to_string().trim_space()
} }
fn test_create_build_script_git() { fn test_create_build_script_git() {
target := Target{ config := BuildConfig{
id: 1 target_id: 1
kind: 'git' kind: 'git'
url: 'https://examplerepo.com' url: 'https://examplerepo.com'
repo: 'vieter' repo: 'vieter'
base_image: 'not-used:latest'
} }
build_script := create_build_script('https://example.com', target, 'x86_64')
build_script := create_build_script('https://example.com', config, 'x86_64')
expected := $embed_file('build_script_git.sh') expected := $embed_file('build_script_git.sh')
assert build_script == expected.to_string().trim_space() assert build_script == expected.to_string().trim_space()
} }
fn test_create_build_script_url() { fn test_create_build_script_url() {
target := Target{ config := BuildConfig{
id: 1 target_id: 1
kind: 'url' kind: 'url'
url: 'https://examplerepo.com' url: 'https://examplerepo.com'
repo: 'vieter' repo: 'vieter'
base_image: 'not-used:latest'
} }
build_script := create_build_script('https://example.com', target, 'x86_64')
build_script := create_build_script('https://example.com', config, 'x86_64')
expected := $embed_file('build_script_url.sh') expected := $embed_file('build_script_url.sh')
assert build_script == expected.to_string().trim_space() assert build_script == expected.to_string().trim_space()

26
src/client/jobs.v 100644
View File

@ -0,0 +1,26 @@
module client
import build { BuildConfig }
import web.response { Response }
// poll_jobs requests a list of new build jobs from the server.
pub fn (c &Client) poll_jobs(arch string, max int) ![]BuildConfig {
data := c.send_request<[]BuildConfig>(.get, '/api/v1/jobs/poll', {
'arch': arch
'max': max.str()
})!
return data.data
}
// queue_job adds a new one-time build job for the given target to the job
// queue.
pub fn (c &Client) queue_job(target_id int, arch string, force bool) !Response<string> {
data := c.send_request<string>(.post, '/api/v1/jobs/queue', {
'target': target_id.str()
'arch': arch
'force': force.str()
})!
return data
}

View File

@ -6,7 +6,7 @@ import os
import build import build
// build locally builds the target with the given id. // build locally builds the target with the given id.
fn build(conf Config, target_id int) ! { fn build(conf Config, target_id int, force bool) ! {
c := client.new(conf.address, conf.api_key) c := client.new(conf.address, conf.api_key)
target := c.get_target(target_id)! target := c.get_target(target_id)!
@ -16,7 +16,7 @@ fn build(conf Config, target_id int) ! {
image_id := build.create_build_image(conf.base_image)! image_id := build.create_build_image(conf.base_image)!
println('Running build...') println('Running build...')
res := build.build_target(conf.address, conf.api_key, image_id, target)! res := build.build_target(conf.address, conf.api_key, image_id, target, force)!
println('Removing build image...') println('Removing build image...')

View File

@ -182,11 +182,44 @@ pub fn cmd() cli.Command {
required_args: 1 required_args: 1
usage: 'id' usage: 'id'
description: 'Build the target with the given id & publish it.' description: 'Build the target with the given id & publish it.'
flags: [
cli.Flag{
name: 'force'
description: 'Build the target without checking whether it needs to be renewed.'
flag: cli.FlagType.bool
},
cli.Flag{
name: 'remote'
description: 'Schedule the build on the server instead of running it locally.'
flag: cli.FlagType.bool
},
cli.Flag{
name: 'arch'
description: 'Architecture to schedule build for. Required when using -remote.'
flag: cli.FlagType.string
},
]
execute: fn (cmd cli.Command) ! { execute: fn (cmd cli.Command) ! {
config_file := cmd.flags.get_string('config-file')! config_file := cmd.flags.get_string('config-file')!
conf := vconf.load<Config>(prefix: 'VIETER_', default_path: config_file)! conf := vconf.load<Config>(prefix: 'VIETER_', default_path: config_file)!
build(conf, cmd.args[0].int())! remote := cmd.flags.get_bool('remote')!
force := cmd.flags.get_bool('force')!
target_id := cmd.args[0].int()
if remote {
arch := cmd.flags.get_string('arch')!
if arch == '' {
return error('When scheduling the build remotely, you have to specify an architecture.')
}
c := client.new(conf.address, conf.api_key)
res := c.queue_job(target_id, arch, force)!
println(res.message)
} else {
build(conf, target_id, force)!
}
} }
}, },
] ]

View File

@ -79,7 +79,7 @@ fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) {
mut status := 0 mut status := 0
res := build.build_target(d.client.address, d.client.api_key, d.builder_images.last(), res := build.build_target(d.client.address, d.client.api_key, d.builder_images.last(),
&sb.target) or { &sb.target, false) or {
d.ldebug('build_target error: $err.msg()') d.ldebug('build_target error: $err.msg()')
status = 1 status = 1

View File

@ -84,6 +84,8 @@ pub fn (db &VieterDb) add_build_log(log BuildLog) int {
insert log into BuildLog insert log into BuildLog
} }
// Here, this does work because a log doesn't contain any foreign keys,
// meaning the ORM only has to do a single add
inserted_id := db.conn.last_id() as int inserted_id := db.conn.last_id() as int
return inserted_id return inserted_id

View File

@ -38,14 +38,17 @@ pub fn (db &VieterDb) get_target(target_id int) ?Target {
} }
// add_target inserts the given target into the database. // add_target inserts the given target into the database.
pub fn (db &VieterDb) add_target(repo Target) int { pub fn (db &VieterDb) add_target(target Target) int {
sql db.conn { sql db.conn {
insert repo into Target insert target into Target
} }
inserted_id := db.conn.last_id() as int // ID of inserted target is the largest id
inserted_target := sql db.conn {
select from Target order by id desc limit 1
}
return inserted_id return inserted_target.id
} }
// delete_target deletes the target with the given id from the database. // delete_target deletes the target with the given id from the database.

View File

@ -9,6 +9,7 @@ import console.schedule
import console.man import console.man
import console.aur import console.aur
import cron import cron
import agent
fn main() { fn main() {
mut app := cli.Command{ mut app := cli.Command{
@ -40,6 +41,7 @@ fn main() {
schedule.cmd(), schedule.cmd(),
man.cmd(), man.cmd(),
aur.cmd(), aur.cmd(),
agent.cmd(),
] ]
} }
app.setup() app.setup()

View File

@ -0,0 +1,49 @@
module server
import web
import web.response { new_data_response, new_response }
// v1_poll_job_queue allows agents to poll for new build jobs.
['/api/v1/jobs/poll'; auth; get]
fn (mut app App) v1_poll_job_queue() web.Result {
arch := app.query['arch'] or {
return app.json(.bad_request, new_response('Missing arch query arg.'))
}
max_str := app.query['max'] or {
return app.json(.bad_request, new_response('Missing max query arg.'))
}
max := max_str.int()
mut out := app.job_queue.pop_n(arch, max).map(it.config)
return app.json(.ok, new_data_response(out))
}
// v1_queue_job allows queueing a new one-time build job for the given target.
['/api/v1/jobs/queue'; auth; post]
fn (mut app App) v1_queue_job() web.Result {
target_id := app.query['target'] or {
return app.json(.bad_request, new_response('Missing target query arg.'))
}.int()
arch := app.query['arch'] or {
return app.json(.bad_request, new_response('Missing arch query arg.'))
}
if arch == '' {
app.json(.bad_request, new_response('Empty arch query arg.'))
}
force := 'force' in app.query
target := app.db.get_target(target_id) or {
return app.json(.bad_request, new_response('Unknown target id.'))
}
app.job_queue.insert(target: target, arch: arch, single: true, now: true, force: force) or {
return app.status(.internal_server_error)
}
return app.status(.ok)
}

View File

@ -12,17 +12,17 @@ fn (mut app App) v1_get_targets() web.Result {
filter := models.from_params<TargetFilter>(app.query) or { filter := models.from_params<TargetFilter>(app.query) or {
return app.json(http.Status.bad_request, new_response('Invalid query parameters.')) return app.json(http.Status.bad_request, new_response('Invalid query parameters.'))
} }
repos := app.db.get_targets(filter) targets := app.db.get_targets(filter)
return app.json(.ok, new_data_response(repos)) return app.json(.ok, new_data_response(targets))
} }
// v1_get_single_target returns the information for a single target. // v1_get_single_target returns the information for a single target.
['/api/v1/targets/:id'; auth; get] ['/api/v1/targets/:id'; auth; get]
fn (mut app App) v1_get_single_target(id int) web.Result { fn (mut app App) v1_get_single_target(id int) web.Result {
repo := app.db.get_target(id) or { return app.not_found() } target := app.db.get_target(id) or { return app.not_found() }
return app.json(.ok, new_data_response(repo)) return app.json(.ok, new_data_response(target))
} }
// v1_post_target creates a new target from the provided query string. // v1_post_target creates a new target from the provided query string.
@ -30,22 +30,27 @@ fn (mut app App) v1_get_single_target(id int) web.Result {
fn (mut app App) v1_post_target() web.Result { fn (mut app App) v1_post_target() web.Result {
mut params := app.query.clone() mut params := app.query.clone()
// If a repo is created without specifying the arch, we assume it's meant // If a target is created without specifying the arch, we assume it's meant
// for the default architecture. // for the default architecture.
if 'arch' !in params || params['arch'] == '' { if 'arch' !in params || params['arch'] == '' {
params['arch'] = app.conf.default_arch params['arch'] = app.conf.default_arch
} }
new_repo := models.from_params<Target>(params) or { mut new_target := models.from_params<Target>(params) or {
return app.json(http.Status.bad_request, new_response(err.msg())) return app.json(http.Status.bad_request, new_response(err.msg()))
} }
// Ensure someone doesn't submit an invalid kind // Ensure someone doesn't submit an invalid kind
if new_repo.kind !in models.valid_kinds { if new_target.kind !in models.valid_kinds {
return app.json(http.Status.bad_request, new_response('Invalid kind.')) return app.json(http.Status.bad_request, new_response('Invalid kind.'))
} }
id := app.db.add_target(new_repo) id := app.db.add_target(new_target)
new_target.id = id
// Add the target to the job queue
// TODO return better error here if it's the cron schedule that's incorrect
app.job_queue.insert_all(new_target) or { return app.status(.internal_server_error) }
return app.json(.ok, new_data_response(id)) return app.json(.ok, new_data_response(id))
} }
@ -54,6 +59,7 @@ fn (mut app App) v1_post_target() web.Result {
['/api/v1/targets/:id'; auth; delete] ['/api/v1/targets/:id'; auth; delete]
fn (mut app App) v1_delete_target(id int) web.Result { fn (mut app App) v1_delete_target(id int) web.Result {
app.db.delete_target(id) app.db.delete_target(id)
app.job_queue.invalidate(id)
return app.json(.ok, new_response('')) return app.json(.ok, new_response(''))
} }
@ -69,7 +75,10 @@ fn (mut app App) v1_patch_target(id int) web.Result {
app.db.update_target_archs(id, arch_objs) app.db.update_target_archs(id, arch_objs)
} }
repo := app.db.get_target(id) or { return app.status(.internal_server_error) } target := app.db.get_target(id) or { return app.status(.internal_server_error) }
return app.json(.ok, new_data_response(repo)) app.job_queue.invalidate(id)
app.job_queue.insert_all(target) or { return app.status(.internal_server_error) }
return app.json(.ok, new_data_response(target))
} }

View File

@ -10,7 +10,9 @@ pub:
data_dir string data_dir string
api_key string api_key string
default_arch string default_arch string
global_schedule string = '0 3'
port int = 8000 port int = 8000
base_image string = 'archlinux:base-devel'
} }
// cmd returns the cli submodule that handles starting the server // cmd returns the cli submodule that handles starting the server

View File

@ -6,6 +6,8 @@ import log
import repo import repo
import util import util
import db import db
import build { BuildJobQueue }
import cron.expression
const ( const (
log_file_name = 'vieter.log' log_file_name = 'vieter.log'
@ -20,9 +22,28 @@ pub:
conf Config [required; web_global] conf Config [required; web_global]
pub mut: pub mut:
repo repo.RepoGroupManager [required; web_global] repo repo.RepoGroupManager [required; web_global]
// Keys are the various architectures for packages
job_queue BuildJobQueue [required; web_global]
db db.VieterDb db db.VieterDb
} }
// init_job_queue populates a fresh job queue with all the targets currently
// stored in the database.
fn (mut app App) init_job_queue() ! {
// Initialize build queues
mut targets := app.db.get_targets(limit: 25)
mut i := u64(0)
for targets.len > 0 {
for target in targets {
app.job_queue.insert_all(target)!
}
i += 25
targets = app.db.get_targets(limit: 25, offset: i)
}
}
// server starts the web server & starts listening for requests // server starts the web server & starts listening for requests
pub fn server(conf Config) ! { pub fn server(conf Config) ! {
// Prevent using 'any' as the default arch // Prevent using 'any' as the default arch
@ -30,6 +51,10 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, "'any' is not allowed as the value for default_arch.") util.exit_with_message(1, "'any' is not allowed as the value for default_arch.")
} }
global_ce := expression.parse_expression(conf.global_schedule) or {
util.exit_with_message(1, 'Invalid global cron expression: $err.msg()')
}
// Configure logger // Configure logger
log_level := log.level_from_tag(conf.log_level) or { log_level := log.level_from_tag(conf.log_level) or {
util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.') util.exit_with_message(1, 'Invalid log level. The allowed values are FATAL, ERROR, WARN, INFO & DEBUG.')
@ -71,11 +96,17 @@ pub fn server(conf Config) ! {
util.exit_with_message(1, 'Failed to initialize database: $err.msg()') util.exit_with_message(1, 'Failed to initialize database: $err.msg()')
} }
web.run(&App{ mut app := &App{
logger: logger logger: logger
api_key: conf.api_key api_key: conf.api_key
conf: conf conf: conf
repo: repo repo: repo
db: db db: db
}, conf.port) job_queue: build.new_job_queue(global_ce, conf.base_image)
}
app.init_job_queue() or {
util.exit_with_message(1, 'Failed to inialize job queue: $err.msg()')
}
web.run(app, conf.port)
} }

View File

@ -4,11 +4,11 @@ data_dir = "data"
pkg_dir = "data/pkgs" pkg_dir = "data/pkgs"
log_level = "DEBUG" log_level = "DEBUG"
default_arch = "x86_64" default_arch = "x86_64"
arch = "x86_64"
address = "http://localhost:8000" address = "http://localhost:8000"
global_schedule = '* *' # global_schedule = '* *'
api_update_frequency = 2 api_update_frequency = 2
image_rebuild_frequency = 1 image_rebuild_frequency = 1
max_concurrent_builds = 3 max_concurrent_builds = 3