forked from vieter-v/vieter
				
			feat(cron): start of working loop
							parent
							
								
									4d26797453
								
							
						
					
					
						commit
						6f9e1b5f3c
					
				|  | @ -1,9 +1,25 @@ | |||
| module daemon | ||||
| 
 | ||||
| import git | ||||
| import time | ||||
| import sync.stdatomic | ||||
| 
 | ||||
| const build_empty = 0 | ||||
| const build_running = 1 | ||||
| const build_done = 2 | ||||
| 
 | ||||
| // reschedule_builds looks for any builds with status code 2 & re-adds them to | ||||
| // the queue. | ||||
| fn (mut d Daemon) reschedule_builds() ? { | ||||
| 	for i in 0..d.atomics.len { | ||||
| 		if stdatomic.load_u64(&d.atomics[i]) == build_done { | ||||
| 			stdatomic.store_u64(&d.atomics[i], build_empty) | ||||
| 			sb := d.builds[i] | ||||
| 
 | ||||
| 			d.schedule_build(sb.repo_id, sb.repo) ? | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // update_builds starts as many builds as possible. | ||||
| fn (mut d Daemon) update_builds() ? { | ||||
| 	now := time.now() | ||||
|  | @ -13,7 +29,7 @@ fn (mut d Daemon) update_builds() ? { | |||
| 			sb := d.queue.pop() ? | ||||
| 
 | ||||
| 			// If this build couldn't be scheduled, no more will be possible. | ||||
| 			if !d.start_build(sb.repo_id)? { | ||||
| 			if !d.start_build(sb)? { | ||||
| 				break | ||||
| 			} | ||||
| 		} else { | ||||
|  | @ -22,13 +38,14 @@ fn (mut d Daemon) update_builds() ? { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // start_build starts a build for the given repo_id. | ||||
| fn (mut d Daemon) start_build(repo_id string) ?bool { | ||||
| // start_build starts a build for the given ScheduledBuild object. | ||||
| fn (mut d Daemon) start_build(sb ScheduledBuild) ?bool { | ||||
| 	for i in 0..d.atomics.len { | ||||
| 		if stdatomic.load_u64(&d.atomics[i]) == 0 { | ||||
| 			stdatomic.store_u64(&d.atomics[i], 1) | ||||
| 		if stdatomic.load_u64(&d.atomics[i]) == build_empty { | ||||
| 			stdatomic.store_u64(&d.atomics[i], build_running) | ||||
| 			d.builds[i] = sb | ||||
| 
 | ||||
| 			go d.run_build(i, d.repos_map[repo_id]) | ||||
| 			go d.run_build(i, sb) | ||||
| 
 | ||||
| 			return true | ||||
| 		} | ||||
|  | @ -37,9 +54,10 @@ fn (mut d Daemon) start_build(repo_id string) ?bool { | |||
| 	return false | ||||
| } | ||||
| 
 | ||||
| fn (mut d Daemon) run_build(build_index int, repo git.GitRepo) ? { | ||||
| // run_build actually starts the build process for a given repo. | ||||
| fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { | ||||
| 	time.sleep(10 * time.second) | ||||
| 
 | ||||
| 	stdatomic.store_u64(&d.atomics[build_index], 2) | ||||
| 	stdatomic.store_u64(&d.atomics[build_index], build_done) | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -30,7 +30,7 @@ mut: | |||
| 	api_update_timestamp time.Time | ||||
| 	queue                MinHeap<ScheduledBuild> | ||||
| 	// Which builds are currently running | ||||
| 	builds []git.GitRepo | ||||
| 	builds []ScheduledBuild | ||||
| 	// Atomic variables used to detect when a build has finished; length is the | ||||
| 	// same as builds | ||||
| 	atomics []u64 | ||||
|  | @ -47,7 +47,7 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st | |||
| 		global_schedule: global_schedule | ||||
| 		api_update_frequency: api_update_frequency | ||||
| 		atomics: []u64{len: max_concurrent_builds} | ||||
| 		builds: []git.GitRepo{len: max_concurrent_builds} | ||||
| 		builds: []ScheduledBuild{len: max_concurrent_builds} | ||||
| 		logger: logger | ||||
| 	} | ||||
| 
 | ||||
|  | @ -62,14 +62,37 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st | |||
| // periodically refreshes the list of repositories to ensure we stay in sync. | ||||
| pub fn (mut d Daemon) run() ? { | ||||
| 	for { | ||||
| 		println('1') | ||||
| 		// Cleans up finished builds, opening up spots for new builds | ||||
| 		d.reschedule_builds() ? | ||||
| 		println('2') | ||||
| 		// Schedules new builds when possible | ||||
| 		d.update_builds() ? | ||||
| 
 | ||||
| 		println(d.queue) | ||||
| 		println(d.atomics) | ||||
| 
 | ||||
| 		time.sleep(60 * time.second) | ||||
| 		time.sleep(10 * time.second) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // schedule_build adds the next occurence of the given repo build to the queue. | ||||
| fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { | ||||
| 	ce := parse_expression(repo.schedule) or { | ||||
| 		d.lerror("Error while parsing cron expression '$repo.schedule' ($repo_id): $err.msg()") | ||||
| 
 | ||||
| 		d.global_schedule | ||||
| 	} | ||||
| 	// A repo that can't be scheduled will just be skipped for now | ||||
| 	timestamp := ce.next_from_now() ? | ||||
| 
 | ||||
| 	d.queue.insert(ScheduledBuild{ | ||||
| 		repo_id: repo_id | ||||
| 		repo: repo | ||||
| 		timestamp: timestamp | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| fn (mut d Daemon) renew_repos() ? { | ||||
| 	mut new_repos := git.get_repos(d.address, d.api_key) ? | ||||
| 
 | ||||
|  | @ -101,19 +124,11 @@ fn (mut d Daemon) renew_queue() ? { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	d.queue = new_queue | ||||
| 
 | ||||
| 	// For each repository in repos_map, parse their cron expression (or use | ||||
| 	// the default one if not present) & add them to the queue | ||||
| 	for id, repo in d.repos_map { | ||||
| 		ce := parse_expression(repo.schedule) or { d.global_schedule } | ||||
| 		// A repo that can't be scheduled will just be skipped for now | ||||
| 		timestamp := ce.next(now) or { continue } | ||||
| 
 | ||||
| 		new_queue.insert(ScheduledBuild{ | ||||
| 			repo_id: id | ||||
| 			repo: repo | ||||
| 			timestamp: timestamp | ||||
| 		}) | ||||
| 		d.schedule_build(id, repo) ? | ||||
| 	} | ||||
| 
 | ||||
| 	d.queue = new_queue | ||||
| } | ||||
|  |  | |||
|  | @ -114,7 +114,7 @@ pub fn (ce &CronExpression) next(ref time.Time) ?time.Time { | |||
| 	}) | ||||
| } | ||||
| 
 | ||||
| fn (ce &CronExpression) next_from_now() ?time.Time { | ||||
| pub fn (ce &CronExpression) next_from_now() ?time.Time { | ||||
| 	return ce.next(time.now()) | ||||
| } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue