forked from vieter-v/vieter
				
			feat(agent): use worker thread approach
							parent
							
								
									3b24ad0f2c
								
							
						
					
					
						commit
						094634084b
					
				|  | @ -20,11 +20,13 @@ struct AgentDaemon { | ||||||
| 	client client.Client | 	client client.Client | ||||||
| mut: | mut: | ||||||
| 	images ImageManager | 	images ImageManager | ||||||
| 	// Which builds are currently running; length is conf.max_concurrent_builds |  | ||||||
| 	builds []BuildConfig |  | ||||||
| 	// Atomic variables used to detect when a build has finished; length is | 	// Atomic variables used to detect when a build has finished; length is | ||||||
| 	// conf.max_concurrent_builds | 	// conf.max_concurrent_builds. This approach is used as the difference | ||||||
|  | 	// between a recently finished build and an empty build slot is important | ||||||
|  | 	// for knowing whether the agent is currently "active". | ||||||
| 	atomics []u64 | 	atomics []u64 | ||||||
|  | 	// Channel used to send builds to worker threads | ||||||
|  | 	build_channel chan BuildConfig | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // agent_init initializes a new agent | // agent_init initializes a new agent | ||||||
|  | @ -34,8 +36,8 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon { | ||||||
| 		client: client.new(conf.address, conf.api_key) | 		client: client.new(conf.address, conf.api_key) | ||||||
| 		conf: conf | 		conf: conf | ||||||
| 		images: new_image_manager(conf.image_rebuild_frequency * 60) | 		images: new_image_manager(conf.image_rebuild_frequency * 60) | ||||||
| 		builds: []BuildConfig{len: conf.max_concurrent_builds} |  | ||||||
| 		atomics: []u64{len: conf.max_concurrent_builds} | 		atomics: []u64{len: conf.max_concurrent_builds} | ||||||
|  | 		build_channel: chan BuildConfig{cap: conf.max_concurrent_builds} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return d | 	return d | ||||||
|  | @ -43,6 +45,11 @@ fn agent_init(logger log.Log, conf Config) AgentDaemon { | ||||||
| 
 | 
 | ||||||
| // run starts the actual agent daemon. This function will run forever. | // run starts the actual agent daemon. This function will run forever. | ||||||
| pub fn (mut d AgentDaemon) run() { | pub fn (mut d AgentDaemon) run() { | ||||||
|  | 	// Spawn worker threads | ||||||
|  | 	for builder_index in 0 .. d.conf.max_concurrent_builds { | ||||||
|  | 		spawn d.builder_thread(d.build_channel, builder_index) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// This is just so that the very first time the loop is ran, the jobs are | 	// This is just so that the very first time the loop is ran, the jobs are | ||||||
| 	// always polled | 	// always polled | ||||||
| 	mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency) | 	mut last_poll_time := time.now().add_seconds(-d.conf.polling_frequency) | ||||||
|  | @ -107,10 +114,10 @@ pub fn (mut d AgentDaemon) run() { | ||||||
| 				// It's technically still possible that the build image is | 				// It's technically still possible that the build image is | ||||||
| 				// removed in the very short period between building the | 				// removed in the very short period between building the | ||||||
| 				// builder image and starting a build container with it. If | 				// builder image and starting a build container with it. If | ||||||
| 				// this happens, faith really just didn't want you to do this | 				// this happens, fate really just didn't want you to do this | ||||||
| 				// build. | 				// build. | ||||||
| 
 | 
 | ||||||
| 				d.start_build(config) | 				d.build_channel <- config | ||||||
| 				running++ | 				running++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | @ -147,22 +154,6 @@ fn (mut d AgentDaemon) update_atomics() (int, int) { | ||||||
| 	return finished, empty | 	return finished, empty | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // start_build starts a build for the given BuildConfig. |  | ||||||
| fn (mut d AgentDaemon) start_build(config BuildConfig) bool { |  | ||||||
| 	for i in 0 .. d.atomics.len { |  | ||||||
| 		if stdatomic.load_u64(&d.atomics[i]) == agent.build_empty { |  | ||||||
| 			stdatomic.store_u64(&d.atomics[i], agent.build_running) |  | ||||||
| 			d.builds[i] = config |  | ||||||
| 
 |  | ||||||
| 			spawn d.run_build(i, config) |  | ||||||
| 
 |  | ||||||
| 			return true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // run_build actually starts the build process for a given target. | // run_build actually starts the build process for a given target. | ||||||
| fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) { | fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) { | ||||||
| 	d.linfo('started build: ${config}') | 	d.linfo('started build: ${config}') | ||||||
|  | @ -195,3 +186,12 @@ fn (mut d AgentDaemon) run_build(build_index int, config BuildConfig) { | ||||||
| 
 | 
 | ||||||
| 	stdatomic.store_u64(&d.atomics[build_index], agent.build_done) | 	stdatomic.store_u64(&d.atomics[build_index], agent.build_done) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // builder_thread is a thread that constantly listens for builds to process | ||||||
|  | fn (mut d AgentDaemon) builder_thread(ch chan BuildConfig, builder_index int) { | ||||||
|  | 	for { | ||||||
|  | 		build_config := <-ch or { break } | ||||||
|  | 
 | ||||||
|  | 		d.run_build(builder_index, build_config) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue