From aa0aae41ab465d1d2de83ff872ee7b0416f014d5 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 21 Jul 2024 22:00:38 +0200 Subject: [PATCH] feat(repo): implement async workers --- server/rieterd.toml | 5 +++- server/src/config.rs | 17 +++++++++-- server/src/main.rs | 3 +- server/src/repo/actor/async.rs | 32 +++++++++++++++++++++ server/src/repo/actor/mod.rs | 5 ++++ server/src/repo/{actor.rs => actor/sync.rs} | 12 ++++---- server/src/repo/handle.rs | 20 ++++++++++--- server/src/repo/mirror/mod.rs | 2 +- server/src/repo/mod.rs | 30 +++++++++++++------ 9 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 server/src/repo/actor/async.rs create mode 100644 server/src/repo/actor/mod.rs rename server/src/repo/{actor.rs => actor/sync.rs} (96%) diff --git a/server/rieterd.toml b/server/rieterd.toml index 9cc56bf..8efa678 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,7 +1,10 @@ api_key = "test" -pkg_workers = 2 log_level = "rieterd=debug" +[repo] +sync_workers = 2 +async_workers = 1 + [fs] type = "local" data_dir = "./data" diff --git a/server/src/config.rs b/server/src/config.rs index e165fdc..02f1182 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -36,6 +36,14 @@ pub enum DbConfig { }, } +#[derive(Deserialize, Debug, Clone)] +pub struct RepoConfig { + #[serde(default = "default_repo_sync_workers")] + pub sync_workers: u32, + #[serde(default = "default_repo_async_workers")] + pub async_workers: u32, +} + #[derive(Deserialize, Debug, Clone)] pub struct Config { pub api_key: String, @@ -47,8 +55,7 @@ pub struct Config { pub log_level: String, pub fs: FsConfig, pub db: DbConfig, - #[serde(default = "default_pkg_workers")] - pub pkg_workers: u32, + pub repo: RepoConfig, } impl Config { @@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 { 16 } -fn default_pkg_workers() -> u32 { +fn default_repo_sync_workers() -> u32 { + 1 +} + +fn default_repo_async_workers() -> u32 { 1 } diff --git a/server/src/main.rs b/server/src/main.rs index f99ba18..1921d2e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { data_dir.join("repos"), db.clone(), rt.clone(), - config.pkg_workers, + config.repo.sync_workers, + config.repo.async_workers, )? //rt.block_on(crate::repo::RepoMgr::new( // data_dir.join("repos"), diff --git a/server/src/repo/actor/async.rs b/server/src/repo/actor/async.rs new file mode 100644 index 0000000..2690d36 --- /dev/null +++ b/server/src/repo/actor/async.rs @@ -0,0 +1,32 @@ +use crate::{ + db, + repo::{archive, package, AsyncCommand, SharedState}, +}; + +use std::{ + path::PathBuf, + sync::{atomic::Ordering, Arc}, +}; + +pub struct AsyncActor { + state: Arc, +} + +impl AsyncActor { + pub fn new(state: &Arc) -> Self { + Self { + state: Arc::clone(state) + } + } + + pub async fn run(self) { + while let Some(msg) = { + let mut rx = self.state.async_queue.1.lock().await; + rx.recv().await + } { + match msg { + + } + } + } +} diff --git a/server/src/repo/actor/mod.rs b/server/src/repo/actor/mod.rs new file mode 100644 index 0000000..f30fa57 --- /dev/null +++ b/server/src/repo/actor/mod.rs @@ -0,0 +1,5 @@ +mod sync; +mod r#async; + +pub use sync::Actor; +pub use r#async::AsyncActor; diff --git a/server/src/repo/actor.rs b/server/src/repo/actor/sync.rs similarity index 96% rename from server/src/repo/actor.rs rename to server/src/repo/actor/sync.rs index 3aa04de..e7b6e44 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor/sync.rs @@ -1,5 +1,7 @@ -use super::{archive, package, Command, SharedState}; -use crate::db; +use crate::{ + db, + repo::{archive, package, Command, SharedState}, +}; use std::{ path::PathBuf, @@ -20,10 +22,10 @@ pub struct Actor { } impl Actor { - pub fn new(rt: runtime::Handle, state: Arc) -> Self { + pub fn new(rt: runtime::Handle, state: &Arc) -> Self { Self { rt, - state: Arc::clone(&state), + state: Arc::clone(state), } } @@ -37,7 +39,7 @@ impl Actor { /// Run the main actor loop pub fn run(self) { while let Some(msg) = { - let mut rx = self.state.rx.lock().unwrap(); + let mut rx = self.state.sync_queue.1.lock().unwrap(); rx.blocking_recv() } { match msg { diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index 6f07a7a..127542b 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -31,7 +31,11 @@ impl Handle { } pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> { - tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; + let repo_dir = self.state.repos_dir.join(repo_id.to_string()); + + if !tokio::fs::try_exists(repo_dir).await? { + tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; + } let mut repos = self.state.repos.write().await; repos.insert(repo_id, Default::default()); @@ -136,17 +140,25 @@ impl Handle { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); + self.state + .sync_queue + .0 + .send(Command::ParsePkg(repo, path)) + .unwrap(); self.state.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); } async fn queue_sync(&self, repo: i32) { - self.state.tx.send(Command::SyncRepo(repo)).unwrap(); + self.state + .sync_queue + .0 + .send(Command::SyncRepo(repo)) + .unwrap(); } async fn queue_clean(&self) { - self.state.tx.send(Command::Clean).unwrap(); + self.state.sync_queue.0.send(Command::Clean).unwrap(); } } diff --git a/server/src/repo/mirror/mod.rs b/server/src/repo/mirror/mod.rs index 02a0aab..61b0a0b 100644 --- a/server/src/repo/mirror/mod.rs +++ b/server/src/repo/mirror/mod.rs @@ -1,3 +1,3 @@ mod parser; -pub use parser::{DbArchiveParser, DbArchiveEntry}; +pub use parser::{DbArchiveEntry, DbArchiveParser}; diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 96d4bfa..393afa7 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -4,7 +4,7 @@ mod handle; mod mirror; pub mod package; -pub use actor::Actor; +pub use actor::{Actor, AsyncActor}; pub use handle::Handle; use std::{ @@ -28,25 +28,32 @@ pub enum Command { Clean, } +pub enum AsyncCommand { +} + type RepoState = (AtomicU32, Arc>); pub struct SharedState { pub repos_dir: PathBuf, pub conn: DbConn, - pub rx: Mutex>, - pub tx: UnboundedSender, + pub sync_queue: (UnboundedSender, Mutex>), + pub async_queue: ( + UnboundedSender, + tokio::sync::Mutex>, + ), pub repos: RwLock>, } impl SharedState { pub fn new(repos_dir: impl AsRef, conn: DbConn) -> Self { let (tx, rx) = unbounded_channel(); + let (async_tx, async_rx) = unbounded_channel(); Self { repos_dir: repos_dir.as_ref().to_path_buf(), conn, - rx: Mutex::new(rx), - tx, + sync_queue: (tx, Mutex::new(rx)), + async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)), repos: RwLock::new(Default::default()), } } @@ -56,7 +63,8 @@ pub fn start( repos_dir: impl AsRef, conn: DbConn, rt: runtime::Handle, - actors: u32, + sync_actors: u32, + async_actors: u32, ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; @@ -70,12 +78,18 @@ pub fn start( let state = Arc::new(SharedState::new(repos_dir, conn)); - for _ in 0..actors { - let actor = Actor::new(rt.clone(), Arc::clone(&state)); + for _ in 0..sync_actors { + let actor = Actor::new(rt.clone(), &state); std::thread::spawn(|| actor.run()); } + for _ in 0..async_actors { + let actor = AsyncActor::new(&state); + + rt.spawn(actor.run()); + } + let handle = Handle::new(&state); for id in repo_ids {