forked from vieter-v/vieter
Merge pull request 'Agent worker threads' (#355) from Chewing_Bever/vieter:agent-threads into dev
Reviewed-on: vieter-v/vieter#355
commit
37f368b769
|
@ -20,11 +20,13 @@ struct AgentDaemon {
|
||||||
client client.Client
|
client client.Client
|
||||||
mut:
|
mut:
|
||||||
images ImageManager
|
images ImageManager
|
||||||
// Which builds are currently running; length is conf.max_concurrent_builds
|
|
||||||
builds []BuildConfig
|
|
||||||
// Atomic variables used to detect when a build has finished; length is
|
// Atomic variables used to detect when a build has finished; length is
|
||||||
// conf.max_concurrent_builds
|
// conf.max_concurrent_builds. This approach is used as the difference
|
||||||
|
// between a recently finished build and an empty build slot is important
|
||||||
|
// for knowing whether the agent is currently "active".
|
||||||
atomics []u64
|
atomics []u64
|
||||||
|
// Channel used to send builds to worker threads
|
||||||
|
build_channel chan BuildConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// agent_init initializes a new agent
|
// agent_init initializes a new agent
|
||||||
|
@ -34,8 +36,8 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
|
||||||
client: client.new(conf.address, conf.api_key)
|
client: client.new(conf.address, conf.api_key)
|
||||||
conf: conf
|
conf: conf
|
||||||
images: new_image_manager(conf.image_rebuild_frequency * 60)
|
images: new_image_manager(conf.image_rebuild_frequency * 60)
|
||||||
builds: []BuildConfig{len: conf.max_concurrent_builds}
|
|
||||||
atomics: []u64{len: conf.max_concurrent_builds}
|
atomics: []u64{len: conf.max_concurrent_builds}
|
||||||
|
build_channel: chan BuildConfig{cap: conf.max_concurrent_builds}
|
||||||
}
|
}
|
||||||
|
|
||||||
return d
|
return d
|
||||||
|
@ -43,6 +45,11 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon {
|
||||||
|
|
||||||
// run starts the actual agent daemon. This function will run forever.
|
// run starts the actual agent daemon. This function will run forever.
|
||||||
pub fn (mut d AgentDaemon) run() {
|
pub fn (mut d AgentDaemon) run() {
|
||||||
|
// Spawn worker threads
|
||||||
|
for builder_index in 0 .. d.conf.max_concurrent_builds {
|
||||||
|
spawn d.builder_thread(d.build_channel, builder_index)
|
||||||
|
}
|
||||||
|
|
||||||
// This is just so that the very first time the loop is ran, the jobs are
|
// This is just so that the very first time the loop is ran, the jobs are
|
||||||
// always polled
|
// always polled
|
||||||
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
|
mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency)
|
||||||
|
@ -107,10 +114,10 @@ pub fn (mut d AgentDaemon) run() {
|
||||||
// It's technically still possible that the build image is
|
// It's technically still possible that the build image is
|
||||||
// removed in the very short period between building the
|
// removed in the very short period between building the
|
||||||
// builder image and starting a build container with it. If
|
// builder image and starting a build container with it. If
|
||||||
// this happens, faith really just didn't want you to do this
|
// this happens, fate really just didn't want you to do this
|
||||||
// build.
|
// build.
|
||||||
|
|
||||||
d.start_build(config)
|
d.build_channel <- config
|
||||||
running++
|
running++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,22 +154,6 @@ fn (mut d AgentDaemon) update_atomics() (int, int) {
|
||||||
return finished, empty
|
return finished, empty
|
||||||
}
|
}
|
||||||
|
|
||||||
// start_build starts a build for the given BuildConfig.
|
|
||||||
fn (mut d AgentDaemon) start_build(config BuildConfig) bool {
|
|
||||||
for i in 0 .. d.atomics.len {
|
|
||||||
if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty {
|
|
||||||
stdatomic.store_u64(&d.atomics[i], agent.build_running)
|
|
||||||
d.builds[i] = config
|
|
||||||
|
|
||||||
spawn d.run_build(i, config)
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// run_build actually starts the build process for a given target.
|
// run_build actually starts the build process for a given target.
|
||||||
fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
|
fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
|
||||||
d.linfo('started build: ${config}')
|
d.linfo('started build: ${config}')
|
||||||
|
@ -195,3 +186,12 @@ fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) {
|
||||||
|
|
||||||
stdatomic.store_u64(&d.atomics[build_index], agent.build_done)
|
stdatomic.store_u64(&d.atomics[build_index], agent.build_done)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// builder_thread is a thread that constantly listens for builds to process
|
||||||
|
fn (mut d AgentDaemon) builder_thread(ch chan BuildConfig, builder_index int) {
|
||||||
|
for {
|
||||||
|
build_config := <-ch or { break }
|
||||||
|
|
||||||
|
d.run_build(builder_index, build_config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue