forked from vieter-v/vieter
				
			fix(cron): made Daemon.run infallible
							parent
							
								
									37c27ae84b
								
							
						
					
					
						commit
						325dcc27de
					
				|  | @ -25,5 +25,5 @@ pub fn cron(conf Config) ? { | |||
| 	mut d := daemon.init_daemon(logger, conf.address, conf.api_key, conf.base_image, ce, | ||||
| 		conf.max_concurrent_builds, conf.api_update_frequency, conf.image_rebuild_frequency) ? | ||||
| 
 | ||||
| 	d.run() ? | ||||
| 	d.run() | ||||
| } | ||||
|  |  | |||
|  | @ -12,7 +12,7 @@ const build_done = 2 | |||
| 
 | ||||
| // clean_finished_builds removes finished builds from the build slots & returns | ||||
| // them. | ||||
| fn (mut d Daemon) clean_finished_builds() ?[]ScheduledBuild { | ||||
| fn (mut d Daemon) clean_finished_builds() []ScheduledBuild { | ||||
| 	mut out := []ScheduledBuild{} | ||||
| 
 | ||||
| 	for i in 0 .. d.atomics.len { | ||||
|  | @ -26,12 +26,22 @@ fn (mut d Daemon) clean_finished_builds() ?[]ScheduledBuild { | |||
| } | ||||
| 
 | ||||
| // update_builds starts as many builds as possible. | ||||
| fn (mut d Daemon) start_new_builds() ? { | ||||
| fn (mut d Daemon) start_new_builds() { | ||||
| 	now := time.now() | ||||
| 
 | ||||
| 	for d.queue.len() > 0 { | ||||
| 		if d.queue.peek() ?.timestamp < now { | ||||
| 			sb := d.queue.pop() ? | ||||
| 		elem := d.queue.peek() or { | ||||
| 			d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") | ||||
| 
 | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		if elem.timestamp < now { | ||||
| 			sb := d.queue.pop() or { | ||||
| 				d.lerror("queue.pop() unexpectedly returned an error. This shouldn't happen.") | ||||
| 
 | ||||
| 				break | ||||
| 			} | ||||
| 
 | ||||
| 			// If this build couldn't be scheduled, no more will be possible. | ||||
| 			if !d.start_build(sb) { | ||||
|  | @ -61,10 +71,19 @@ fn (mut d Daemon) start_build(sb ScheduledBuild) bool { | |||
| } | ||||
| 
 | ||||
| // run_build actually starts the build process for a given repo. | ||||
| fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { | ||||
| fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) { | ||||
| 	d.linfo('started build: $sb.repo.url $sb.repo.branch') | ||||
| 
 | ||||
| 	build.build_repo(d.address, d.api_key, d.builder_images.last(), &sb.repo) ? | ||||
| 	// 0 means success, 1 means failure | ||||
| 	mut status := 0 | ||||
| 
 | ||||
| 	build.build_repo(d.address, d.api_key, d.builder_images.last(), &sb.repo) or { status = 1 } | ||||
| 
 | ||||
| 	if status == 0 { | ||||
| 		d.linfo('finished build: $sb.repo.url $sb.repo.branch') | ||||
| 	} else { | ||||
| 		d.linfo('failed build: $sb.repo.url $sb.repo.branch') | ||||
| 	} | ||||
| 
 | ||||
| 	stdatomic.store_u64(&d.atomics[build_index], daemon.build_done) | ||||
| } | ||||
|  |  | |||
|  | @ -9,6 +9,12 @@ import math | |||
| import build | ||||
| import docker | ||||
| 
 | ||||
| // How many seconds to wait before retrying to update API if failed | ||||
| const api_update_retry_timeout = 5 | ||||
| 
 | ||||
| // How many seconds to wait before retrying to rebuild image if failed | ||||
| const rebuild_base_image_retry_timout = 30 | ||||
| 
 | ||||
| struct ScheduledBuild { | ||||
| pub: | ||||
| 	repo_id   string | ||||
|  | @ -60,29 +66,31 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st | |||
| 	} | ||||
| 
 | ||||
| 	// Initialize the repos & queue | ||||
| 	d.renew_repos() ? | ||||
| 	d.renew_queue() ? | ||||
| 	d.rebuild_base_image() ? | ||||
| 	d.renew_repos() | ||||
| 	d.renew_queue() | ||||
| 	if !d.rebuild_base_image() { | ||||
| 		return error('The base image failed to build. The Vieter cron daemon cannot run without an initial builder image.') | ||||
| 	} | ||||
| 
 | ||||
| 	return d | ||||
| } | ||||
| 
 | ||||
| // run starts the actual daemon process. It runs builds when possible & | ||||
| // periodically refreshes the list of repositories to ensure we stay in sync. | ||||
| pub fn (mut d Daemon) run() ? { | ||||
| pub fn (mut d Daemon) run() { | ||||
| 	for { | ||||
| 		finished_builds := d.clean_finished_builds() ? | ||||
| 		finished_builds := d.clean_finished_builds() | ||||
| 
 | ||||
| 		// Update the API's contents if needed & renew the queue | ||||
| 		if time.now() >= d.api_update_timestamp { | ||||
| 			d.renew_repos() ? | ||||
| 			d.renew_queue() ? | ||||
| 			d.renew_repos() | ||||
| 			d.renew_queue() | ||||
| 		} | ||||
| 		// The finished builds should only be rescheduled if the API contents | ||||
| 		// haven't been renewed. | ||||
| 		else { | ||||
| 			for sb in finished_builds { | ||||
| 				d.schedule_build(sb.repo_id, sb.repo) ? | ||||
| 				d.schedule_build(sb.repo_id, sb.repo) | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
|  | @ -93,14 +101,14 @@ pub fn (mut d Daemon) run() ? { | |||
| 		// keep track of a list or something & remove an image once we have | ||||
| 		// made sure it isn't being used anymore. | ||||
| 		if time.now() >= d.image_build_timestamp { | ||||
| 			d.rebuild_base_image() ? | ||||
| 			d.rebuild_base_image() | ||||
| 			// In theory, executing this function here allows an old builder | ||||
| 			// image to exist for at most image_rebuild_frequency minutes. | ||||
| 			d.clean_old_base_images() | ||||
| 		} | ||||
| 
 | ||||
| 		// Schedules new builds when possible | ||||
| 		d.start_new_builds() ? | ||||
| 		d.start_new_builds() | ||||
| 
 | ||||
| 		// If there are builds currently running, the daemon should refresh | ||||
| 		// every second to clean up any finished builds & start new ones. | ||||
|  | @ -113,7 +121,17 @@ pub fn (mut d Daemon) run() ? { | |||
| 			delay = d.api_update_timestamp - now | ||||
| 
 | ||||
| 			if d.queue.len() > 0 { | ||||
| 				time_until_next_job := d.queue.peek() ?.timestamp - now | ||||
| 				elem := d.queue.peek() or { | ||||
| 					d.lerror("queue.peek() unexpectedly returned an error. This shouldn't happen.") | ||||
| 
 | ||||
| 					// This is just a fallback option. In theory, queue.peek() | ||||
| 					// should *never* return an error or none, because we check | ||||
| 					// its len beforehand. | ||||
| 					time.sleep(1) | ||||
| 					continue | ||||
| 				} | ||||
| 
 | ||||
| 				time_until_next_job := elem.timestamp - now | ||||
| 
 | ||||
| 				delay = math.min(delay, time_until_next_job) | ||||
| 			} | ||||
|  | @ -131,7 +149,7 @@ pub fn (mut d Daemon) run() ? { | |||
| } | ||||
| 
 | ||||
| // schedule_build adds the next occurence of the given repo build to the queue. | ||||
| fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { | ||||
| fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) { | ||||
| 	ce := if repo.schedule != '' { | ||||
| 		parse_expression(repo.schedule) or { | ||||
| 			// TODO This shouldn't return an error if the expression is empty. | ||||
|  | @ -144,7 +162,10 @@ fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { | |||
| 	} | ||||
| 
 | ||||
| 	// A repo that can't be scheduled will just be skipped for now | ||||
| 	timestamp := ce.next_from_now() ? | ||||
| 	timestamp := ce.next_from_now() or { | ||||
| 		d.lerror("Couldn't calculate next timestamp from '$repo.schedule'; skipping") | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	d.queue.insert(ScheduledBuild{ | ||||
| 		repo_id: repo_id | ||||
|  | @ -155,9 +176,15 @@ fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { | |||
| 
 | ||||
| // renew_repos requests the newest list of Git repos from the server & replaces | ||||
| // the old one. | ||||
| fn (mut d Daemon) renew_repos() ? { | ||||
| fn (mut d Daemon) renew_repos() { | ||||
| 	d.linfo('Renewing repos...') | ||||
| 	mut new_repos := git.get_repos(d.address, d.api_key) ? | ||||
| 
 | ||||
| 	mut new_repos := git.get_repos(d.address, d.api_key) or { | ||||
| 		d.lerror('Failed to renew repos. Retrying in ${daemon.api_update_retry_timeout}s...') | ||||
| 		d.api_update_timestamp = time.now().add_seconds(daemon.api_update_retry_timeout) | ||||
| 
 | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	d.repos_map = new_repos.move() | ||||
| 
 | ||||
|  | @ -166,7 +193,7 @@ fn (mut d Daemon) renew_repos() ? { | |||
| 
 | ||||
| // renew_queue replaces the old queue with a new one that reflects the newest | ||||
| // values in repos_map. | ||||
| fn (mut d Daemon) renew_queue() ? { | ||||
| fn (mut d Daemon) renew_queue() { | ||||
| 	d.linfo('Renewing queue...') | ||||
| 	mut new_queue := MinHeap<ScheduledBuild>{} | ||||
| 
 | ||||
|  | @ -181,8 +208,13 @@ fn (mut d Daemon) renew_queue() ? { | |||
| 	// here causes the function to prematurely just exit, without any errors or anything, very weird | ||||
| 	// https://github.com/vlang/v/issues/14042 | ||||
| 	for d.queue.len() > 0 { | ||||
| 		if d.queue.peek() ?.timestamp < now { | ||||
| 			new_queue.insert(d.queue.pop() ?) | ||||
| 		elem := d.queue.pop() or { | ||||
| 			d.lerror("queue.pop() returned an error. This shouldn't happen.") | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		if elem.timestamp < now { | ||||
| 			new_queue.insert(elem) | ||||
| 		} else { | ||||
| 			break | ||||
| 		} | ||||
|  | @ -193,16 +225,24 @@ fn (mut d Daemon) renew_queue() ? { | |||
| 	// For each repository in repos_map, parse their cron expression (or use | ||||
| 	// the default one if not present) & add them to the queue | ||||
| 	for id, repo in d.repos_map { | ||||
| 		d.schedule_build(id, repo) ? | ||||
| 		d.schedule_build(id, repo) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // rebuild_base_image recreates the builder image. | ||||
| fn (mut d Daemon) rebuild_base_image() ? { | ||||
| fn (mut d Daemon) rebuild_base_image() bool { | ||||
| 	d.linfo('Rebuilding builder image....') | ||||
| 
 | ||||
| 	d.builder_images << build.create_build_image(d.base_image) ? | ||||
| 	d.builder_images << build.create_build_image(d.base_image) or { | ||||
| 		d.lerror('Failed to rebuild base image. Retrying in ${daemon.rebuild_base_image_retry_timout}s...') | ||||
| 		d.image_build_timestamp = time.now().add_seconds(daemon.rebuild_base_image_retry_timout) | ||||
| 
 | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	d.image_build_timestamp = time.now().add_seconds(60 * d.image_rebuild_frequency) | ||||
| 
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // clean_old_base_images tries to remove any old but still present builder | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue