diff --git a/Makefile b/Makefile index cc9c829..8ddabbe 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ SRC_DIR := src SOURCES != find '$(SRC_DIR)' -iname '*.v' V_PATH ?= v -V := $(V_PATH) -showcc -gc boehm -skip-unused +V := $(V_PATH) -showcc -gc boehm all: vieter diff --git a/src/cron/daemon/build.v b/src/cron/daemon/build.v index e7e5ac3..73ba183 100644 --- a/src/cron/daemon/build.v +++ b/src/cron/daemon/build.v @@ -1,9 +1,25 @@ module daemon -import git import time import sync.stdatomic +const build_empty = 0 +const build_running = 1 +const build_done = 2 + +// reschedule_builds looks for any builds with status code 2 & re-adds them to +// the queue. +fn (mut d Daemon) reschedule_builds() ? { + for i in 0..d.atomics.len { + if stdatomic.load_u64(&d.atomics[i]) == build_done { + stdatomic.store_u64(&d.atomics[i], build_empty) + sb := d.builds[i] + + d.schedule_build(sb.repo_id, sb.repo) ? + } + } +} + // update_builds starts as many builds as possible. fn (mut d Daemon) update_builds() ? { now := time.now() @@ -13,7 +29,7 @@ fn (mut d Daemon) update_builds() ? { sb := d.queue.pop() ? // If this build couldn't be scheduled, no more will be possible. - if !d.start_build(sb.repo_id)? { + if !d.start_build(sb)? { break } } else { @@ -22,13 +38,14 @@ fn (mut d Daemon) update_builds() ? { } } -// start_build starts a build for the given repo_id. -fn (mut d Daemon) start_build(repo_id string) ?bool { +// start_build starts a build for the given ScheduledBuild object. +fn (mut d Daemon) start_build(sb ScheduledBuild) ?bool { for i in 0..d.atomics.len { - if stdatomic.load_u64(&d.atomics[i]) == 0 { - stdatomic.store_u64(&d.atomics[i], 1) + if stdatomic.load_u64(&d.atomics[i]) == build_empty { + stdatomic.store_u64(&d.atomics[i], build_running) + d.builds[i] = sb - go d.run_build(i, d.repos_map[repo_id]) + go d.run_build(i, sb) return true } @@ -37,9 +54,10 @@ fn (mut d Daemon) start_build(repo_id string) ?bool { return false } -fn (mut d Daemon) run_build(build_index int, repo git.GitRepo) ? { +// run_build actually starts the build process for a given repo. +fn (mut d Daemon) run_build(build_index int, sb ScheduledBuild) ? { time.sleep(10 * time.second) - stdatomic.store_u64(&d.atomics[build_index], 2) + stdatomic.store_u64(&d.atomics[build_index], build_done) } diff --git a/src/cron/daemon/daemon.v b/src/cron/daemon/daemon.v index fc917e4..816bc15 100644 --- a/src/cron/daemon/daemon.v +++ b/src/cron/daemon/daemon.v @@ -30,7 +30,7 @@ mut: api_update_timestamp time.Time queue MinHeap // Which builds are currently running - builds []git.GitRepo + builds []ScheduledBuild // Atomic variables used to detect when a build has finished; length is the // same as builds atomics []u64 @@ -47,7 +47,7 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st global_schedule: global_schedule api_update_frequency: api_update_frequency atomics: []u64{len: max_concurrent_builds} - builds: []git.GitRepo{len: max_concurrent_builds} + builds: []ScheduledBuild{len: max_concurrent_builds} logger: logger } @@ -62,14 +62,37 @@ pub fn init_daemon(logger log.Log, address string, api_key string, base_image st // periodically refreshes the list of repositories to ensure we stay in sync. pub fn (mut d Daemon) run() ? { for { + println('1') + // Cleans up finished builds, opening up spots for new builds + d.reschedule_builds() ? + println('2') + // Schedules new builds when possible d.update_builds() ? + println(d.queue) println(d.atomics) - time.sleep(60 * time.second) + time.sleep(10 * time.second) } } +// schedule_build adds the next occurence of the given repo build to the queue. +fn (mut d Daemon) schedule_build(repo_id string, repo git.GitRepo) ? { + ce := parse_expression(repo.schedule) or { + d.lerror("Error while parsing cron expression '$repo.schedule' ($repo_id): $err.msg()") + + d.global_schedule + } + // A repo that can't be scheduled will just be skipped for now + timestamp := ce.next_from_now() ? + + d.queue.insert(ScheduledBuild{ + repo_id: repo_id + repo: repo + timestamp: timestamp + }) +} + fn (mut d Daemon) renew_repos() ? { mut new_repos := git.get_repos(d.address, d.api_key) ? @@ -101,19 +124,11 @@ fn (mut d Daemon) renew_queue() ? { } } + d.queue = new_queue + // For each repository in repos_map, parse their cron expression (or use // the default one if not present) & add them to the queue for id, repo in d.repos_map { - ce := parse_expression(repo.schedule) or { d.global_schedule } - // A repo that can't be scheduled will just be skipped for now - timestamp := ce.next(now) or { continue } - - new_queue.insert(ScheduledBuild{ - repo_id: id - repo: repo - timestamp: timestamp - }) + d.schedule_build(id, repo) ? } - - d.queue = new_queue } diff --git a/src/cron/expression/expression.v b/src/cron/expression/expression.v index c122585..6e11da2 100644 --- a/src/cron/expression/expression.v +++ b/src/cron/expression/expression.v @@ -114,7 +114,7 @@ pub fn (ce &CronExpression) next(ref time.Time) ?time.Time { }) } -fn (ce &CronExpression) next_from_now() ?time.Time { +pub fn (ce &CronExpression) next_from_now() ?time.Time { return ce.next(time.now()) } diff --git a/src/server/git.v b/src/server/git.v index a7655fe..a9d6f50 100644 --- a/src/server/git.v +++ b/src/server/git.v @@ -8,7 +8,7 @@ import response { new_data_response, new_response } const repos_file = 'repos.json' -['/api/repos'; get; markused] +['/api/repos'; get] fn (mut app App) get_repos() web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.')) @@ -25,7 +25,7 @@ fn (mut app App) get_repos() web.Result { return app.json(http.Status.ok, new_data_response(repos)) } -['/api/repos/:id'; get; markused] +['/api/repos/:id'; get] fn (mut app App) get_single_repo(id string) web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.')) @@ -48,7 +48,7 @@ fn (mut app App) get_single_repo(id string) web.Result { return app.json(http.Status.ok, new_data_response(repo)) } -['/api/repos'; post; markused] +['/api/repos'; post] fn (mut app App) post_repo() web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.')) @@ -86,7 +86,7 @@ fn (mut app App) post_repo() web.Result { return app.json(http.Status.ok, new_response('Repo added successfully.')) } -['/api/repos/:id'; delete; markused] +['/api/repos/:id'; delete] fn (mut app App) delete_repo(id string) web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.')) @@ -113,7 +113,7 @@ fn (mut app App) delete_repo(id string) web.Result { return app.json(http.Status.ok, new_response('Repo removed successfully.')) } -['/api/repos/:id'; patch; markused] +['/api/repos/:id'; patch] fn (mut app App) patch_repo(id string) web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.')) diff --git a/src/server/routes.v b/src/server/routes.v index b048f76..4f6c4f0 100644 --- a/src/server/routes.v +++ b/src/server/routes.v @@ -11,12 +11,12 @@ import response { new_response } // healthcheck just returns a string, but can be used to quickly check if the // server is still responsive. -['/health'; get; markused] +['/health'; get] pub fn (mut app App) healthcheck() web.Result { return app.json(http.Status.ok, new_response('Healthy.')) } -['/:repo/:arch/:filename'; get; head; markused] +['/:repo/:arch/:filename'; get; head] fn (mut app App) get_repo_file(repo string, arch string, filename string) web.Result { mut full_path := '' @@ -54,7 +54,7 @@ fn (mut app App) get_repo_file(repo string, arch string, filename string) web.Re return app.file(full_path) } -['/:repo/publish'; post; markused] +['/:repo/publish'; post] fn (mut app App) put_package(repo string) web.Result { if !app.is_authorized() { return app.json(http.Status.unauthorized, new_response('Unauthorized.'))