forked from vieter-v/vieter
				
			feat(cron): improve sleep calculation; prevent invalid rescheduling of
finished buildsmain
							parent
							
								
									a1c308f29d
								
							
						
					
					
						commit
						caee56efd4
					
				|  | @ -2,6 +2,7 @@ module daemon | |||
| 
 | ||||
| import time | ||||
| import sync.stdatomic | ||||
| import rand | ||||
| 
 | ||||
| const build_empty = 0 | ||||
| 
 | ||||
|  | @ -9,21 +10,23 @@ const build_running = 1 | |||
| 
 | ||||
| const build_done = 2 | ||||
| 
 | ||||
| // reschedule_builds looks for any builds with status code 2 & re-adds them to | ||||
| // the queue. | ||||
| fn (mut d Daemon) reschedule_builds() ? { | ||||
| // clean_finished_builds removes finished builds from the build slots & returns | ||||
| // them. | ||||
| fn (mut d Daemon) clean_finished_builds() ?[]ScheduledBuild { | ||||
| 	mut out := []ScheduledBuild{} | ||||
| 
 | ||||
| 	for i in 0 .. d.atomics.len { | ||||
| 		if stdatomic.load_u64(&d.atomics[i]) == daemon.build_done { | ||||
| 			stdatomic.store_u64(&d.atomics[i], daemon.build_empty) | ||||
| 			sb := d.builds[i] | ||||
| 
 | ||||
| 			d.schedule_build(sb.repo_id, sb.repo) ? | ||||
| 			out << d.builds[i] | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return out | ||||
| } | ||||
| 
 | ||||
| // update_builds starts as many builds as possible. | ||||
| fn (mut d Daemon) update_builds() ? { | ||||
| fn (mut d Daemon) start_new_builds() ? { | ||||
| 	now := time.now() | ||||
| 
 | ||||
| 	for d.queue.len() > 0 { | ||||
|  | @ -31,8 +34,8 @@ fn (mut d Daemon) update_builds() ? { | |||
| 			sb := d.queue.pop() ? | ||||
| 
 | ||||
| 			// If this build couldn't be scheduled, no more will be possible. | ||||
| 			// TODO a build that couldn't be scheduled should be re-added to the queue. | ||||
| 			if !d.start_build(sb) { | ||||
| 				d.queue.insert(sb) | ||||
| 				break | ||||
| 			} | ||||
| 		} else { | ||||
|  | @ -60,7 +63,20 @@ fn (mut d Daemon) start_build(sb ScheduledBuild) bool { | |||
| // run_build actually starts the build process for a given repo. | ||||
| fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { | ||||
| 	d.linfo('build $sb.repo.url') | ||||
| 	time.sleep(10 * time.second) | ||||
| 	time.sleep(rand.int_in_range(1, 6) ? * time.second) | ||||
| 
 | ||||
| 	stdatomic.store_u64(&d.atomics[build_index], daemon.build_done) | ||||
| } | ||||
| 
 | ||||
| // current_build_count returns how many builds are currently running. | ||||
| fn (mut d Daemon) current_build_count() int { | ||||
| 	mut res := 0 | ||||
| 
 | ||||
| 	for i in 0 .. d.atomics.len { | ||||
| 		if stdatomic.load_u64(&d.atomics[i]) == daemon.build_running { | ||||
| 			res += 1 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return res | ||||
| } | ||||
|  |  | |||
|  | @ -6,7 +6,6 @@ import log | |||
| import datatypes { MinHeap } | ||||
| import cron.expression { CronExpression, parse_expression } | ||||
| import math | ||||
| import arrays | ||||
| 
 | ||||
| struct ScheduledBuild { | ||||
| pub: | ||||
|  | @ -64,40 +63,51 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st | |||
| // periodically refreshes the list of repositories to ensure we stay in sync. | ||||
| pub fn (mut d Daemon) run() ? { | ||||
| 	for { | ||||
| 		finished_builds := d.clean_finished_builds() ? | ||||
| 
 | ||||
| 		// Update the API's contents if needed & renew the queue | ||||
| 		if time.now() >= d.api_update_timestamp { | ||||
| 			d.renew_repos() ? | ||||
| 			d.renew_queue() ? | ||||
| 		} | ||||
| 
 | ||||
| 		// Cleans up finished builds, opening up spots for new builds | ||||
| 		d.reschedule_builds() ? | ||||
| 		// The finished builds should only be rescheduled if the API contents | ||||
| 		// haven't been renewed. | ||||
| 		else { | ||||
| 			for sb in finished_builds { | ||||
| 				d.schedule_build(sb.repo_id, sb.repo) ? | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		// TODO rebuild builder image when needed | ||||
| 
 | ||||
| 		// Schedules new builds when possible | ||||
| 		d.update_builds() ? | ||||
| 		d.start_new_builds() ? | ||||
| 
 | ||||
| 		// If there are builds currently running, the daemon should refresh | ||||
| 		// every second to clean up any finished builds & start new ones. | ||||
| 		mut delay := time.Duration(1 * time.second) | ||||
| 
 | ||||
| 		// Sleep either until we have to refresh the repos or when the next | ||||
| 		// build has to start, with a minimum of 1 second. | ||||
| 		now := time.now() | ||||
| 		if d.current_build_count() == 0 { | ||||
| 			now := time.now() | ||||
| 			delay = d.api_update_timestamp - now | ||||
| 
 | ||||
| 		mut delay := d.api_update_timestamp - now | ||||
| 			if d.queue.len() > 0 { | ||||
| 				time_until_next_job := d.queue.peek() ?.timestamp - now | ||||
| 
 | ||||
| 		if d.queue.len() > 0 { | ||||
| 			time_until_next_job := d.queue.peek() ?.timestamp - now | ||||
| 
 | ||||
| 			delay = math.min(delay, time_until_next_job) | ||||
| 				delay = math.min(delay, time_until_next_job) | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		d.ldebug('Sleeping for ${delay}...') | ||||
| 
 | ||||
| 		// TODO if there are builds active, the sleep time should be much lower to clean up the builds when they're finished. | ||||
| 
 | ||||
| 		// We sleep for at least one second. This is to prevent the program | ||||
| 		// from looping agressively when a cronjob can be scheduled, but | ||||
| 		// there's no spots free for it to be started. | ||||
| 		time.sleep(math.max(delay, 1 * time.second)) | ||||
| 		delay = math.max(delay, 1 * time.second) | ||||
| 
 | ||||
| 		d.ldebug('Sleeping for ${delay}...') | ||||
| 
 | ||||
| 		time.sleep(delay) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -114,6 +114,8 @@ pub fn (ce &CronExpression) next(ref time.Time) ?time.Time { | |||
| 	}) | ||||
| } | ||||
| 
 | ||||
| // next_from_now returns the result of ce.next(ref) where ref is the result of | ||||
| // time.now(). | ||||
| pub fn (ce &CronExpression) next_from_now() ?time.Time { | ||||
| 	return ce.next(time.now()) | ||||
| } | ||||
|  |  | |||
|  | @ -10,4 +10,5 @@ default_arch = "x86_64" | |||
| address = "http://localhost:8000" | ||||
| 
 | ||||
| global_schedule = '* *' | ||||
| 
 | ||||
| api_update_frequency = 2 | ||||
| max_concurrent_builds = 3 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue