Agent worker threads #355

Merged
Jef Roosens merged 1 commits from Chewing_Bever/vieter:agent-threads into dev 2023-04-05 11:20:44 +02:00
1 changed files with 22 additions and 22 deletions

View File

@ -20,11 +20,13 @@ struct AgentDaemon {
client client.Client
mut:
images ImageManager
// Which builds are currently running; length is conf.max_concurrent_builds
builds []BuildConfig
// Atomic variables used to detect when a build has finished; length is
// conf.max_concurrent_builds
// conf.max_concurrent_builds. This approach is used as the difference
// between a recently finished build and an empty build slot is important
// for knowing whether the agent is currently "active".
atomics []u64
// Channel used to send builds to worker threads
build_channel chan BuildConfig
}
// agent_init initializes a new agent
@ -34,8 +36,8 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
client: client.new(conf.address, conf.api_key)
conf: conf
images: new_image_manager(conf.image_rebuild_frequency * 60)
builds: []BuildConfig{len: conf.max_concurrent_builds}
atomics: []u64{len: conf.max_concurrent_builds}
build_channel: chan BuildConfig{cap: conf.max_concurrent_builds}
}
return d
@ -43,6 +45,11 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
// run starts the actual agent daemon. This function will run forever.
pub fn (mut d AgentDaemon) run() {
// Spawn worker threads
for builder_index in 0 .. d.conf.max_concurrent_builds {
spawn d.builder_thread(d.build_channel, builder_index)
}
// This is just so that the very first time the loop is ran, the jobs are
// always polled
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
@ -107,10 +114,10 @@ pub fn (mut d AgentDaemon) run() {
// It's technically still possible that the build image is
// removed in the very short period between building the
// builder image and starting a build container with it. If
// this happens, faith really just didn't want you to do this
// this happens, fate really just didn't want you to do this
// build.
d.start_build(config)
d.build_channel <- config
running++
}
}
@ -147,22 +154,6 @@ fn (mut d AgentDaemon) update_atomics() (int, int) {
return finished, empty
}
// start_build starts a build for the given BuildConfig.
fn (mut d AgentDaemon) start_build(config BuildConfig) bool {
for i in 0 .. d.atomics.len {
if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
stdatomic.store_u64(&d.atomics[i], agent.build_running)
d.builds[i] = config
spawn d.run_build(i, config)
return true
}
}
return false
}
// run_build actually starts the build process for a given target.
fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
d.linfo('started build: ${config}')
@ -195,3 +186,12 @@ fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
stdatomic.store_u64(&d.atomics[build_index], agent.build_done)
}
// builder_thread is a thread that constantly listens for builds to process
fn (mut d AgentDaemon) builder_thread(ch chan BuildConfig, builder_index int) {
for {
build_config := <-ch or { break }
d.run_build(builder_index, build_config)
}
}