From 986162e926401020ec94de48a81c8f4ac94470a7 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 21 Jul 2024 19:25:03 +0200 Subject: [PATCH 1/3] refactor(repo): use register command for start --- server/src/repo/mod.rs | 18 +++++++++--------- server/src/web/api/mod.rs | 1 - 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index cfccb4a..4997b8a 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -41,7 +41,6 @@ impl SharedState { pub fn new( repos_dir: impl AsRef, conn: DbConn, - repos: HashMap>)>, ) -> Self { let (tx, rx) = unbounded_channel(); @@ -50,7 +49,7 @@ impl SharedState { conn, rx: Mutex::new(rx), tx, - repos: RwLock::new(repos), + repos: RwLock::new(Default::default()), } } } @@ -63,7 +62,6 @@ pub fn start( ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; - let mut repos = HashMap::new(); let repo_ids: Vec = rt.block_on( entity::prelude::Repo::find() .select_only() @@ -72,11 +70,7 @@ pub fn start( .all(&conn), )?; - for id in repo_ids { - repos.insert(id, Default::default()); - } - - let state = Arc::new(SharedState::new(repos_dir, conn, repos)); + let state = Arc::new(SharedState::new(repos_dir, conn)); for _ in 0..actors { let actor = Actor::new(rt.clone(), Arc::clone(&state)); @@ -84,5 +78,11 @@ pub fn start( std::thread::spawn(|| actor.run()); } - Ok(Handle::new(&state)) + let handle = Handle::new(&state); + + for id in repo_ids { + rt.block_on(handle.register_repo(id))?; + } + + Ok(handle) } diff --git a/server/src/web/api/mod.rs b/server/src/web/api/mod.rs index dc99e53..07395fc 100644 --- a/server/src/web/api/mod.rs +++ b/server/src/web/api/mod.rs @@ -1,5 +1,4 @@ mod pagination; -mod repo; use crate::db; use pagination::PaginatedResponse; From d38fd5ca748b104e953c682d2990e9d50a6a43ff Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 21 Jul 2024 21:09:05 +0200 Subject: [PATCH 2/3] feat(repo): write db archive parser --- libarchive/src/lib.rs | 2 +- libarchive/src/read/mod.rs | 3 +- server/src/repo/mirror/mod.rs | 3 ++ server/src/repo/mirror/parser.rs | 75 ++++++++++++++++++++++++++++++++ server/src/repo/mod.rs | 8 ++-- 5 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 server/src/repo/mirror/mod.rs create mode 100644 server/src/repo/mirror/parser.rs diff --git a/libarchive/src/lib.rs b/libarchive/src/lib.rs index e8a91f2..8ffc5ff 100644 --- a/libarchive/src/lib.rs +++ b/libarchive/src/lib.rs @@ -5,6 +5,6 @@ pub mod write; pub use archive::{ Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat, - WriteFilter, WriteFormat, + WriteFilter, WriteFormat, FileType, }; pub use error::Result; diff --git a/libarchive/src/read/mod.rs b/libarchive/src/read/mod.rs index 579b3e5..596595b 100644 --- a/libarchive/src/read/mod.rs +++ b/libarchive/src/read/mod.rs @@ -6,9 +6,10 @@ pub use builder::Builder; use crate::archive::Handle; use crate::ReadFilter; -use entries::Entries; +pub use entries::{Entries, ReadEntry}; use libarchive3_sys::ffi; use std::path::Path; +pub use file::FileReader; // Represents a read view of an archive pub trait Archive: Handle + Sized { diff --git a/server/src/repo/mirror/mod.rs b/server/src/repo/mirror/mod.rs new file mode 100644 index 0000000..02a0aab --- /dev/null +++ b/server/src/repo/mirror/mod.rs @@ -0,0 +1,3 @@ +mod parser; + +pub use parser::{DbArchiveParser, DbArchiveEntry}; diff --git a/server/src/repo/mirror/parser.rs b/server/src/repo/mirror/parser.rs new file mode 100644 index 0000000..d5270f4 --- /dev/null +++ b/server/src/repo/mirror/parser.rs @@ -0,0 +1,75 @@ +use std::{ + io::{self, BufRead}, + path::Path, +}; + +use libarchive::{ + read::{Archive, Builder, Entries, FileReader, ReadEntry}, + Entry, +}; + +pub struct DbArchiveParser<'a, T: 'a + Archive> { + entries: Entries<'a, T>, +} + +pub struct DbArchiveEntry { + name: String, + version: String, + filename: String, +} + +impl<'a, T: Archive> DbArchiveParser<'a, T> { + pub fn new(ar: &'a mut T) -> Self { + Self { + entries: Entries::new(ar), + } + } + + // parse a given entry. If the entry's not a regular file, the function returns None. + fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result { + let reader = io::BufReader::new(entry); + let mut lines = reader.lines(); + + let mut name: Option = None; + let mut version: Option = None; + let mut filename: Option = None; + + while let Some(line) = lines.next().transpose()? { + match line.as_str() { + "%NAME%" => name = lines.next().transpose()?, + "%VERSION%" => version = lines.next().transpose()?, + "%FILENAME%" => filename = lines.next().transpose()?, + _ => {} + } + } + + if name.is_some() && version.is_some() && filename.is_some() { + Ok(DbArchiveEntry { + name: name.unwrap(), + version: version.unwrap(), + filename: filename.unwrap(), + }) + } else { + Err(io::Error::other("Missing fields in entry file")) + } + } +} + +impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> { + type Item = io::Result; + + fn next(&mut self) -> Option { + while let Some(entry) = self.entries.next() { + match entry { + Ok(entry) => { + if entry.filetype() == libarchive::FileType::RegularFile { + return Some(Self::parse_entry(entry)); + } + } + Err(err) => return Some(Err(err.into())), + } + } + + None + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 4997b8a..96d4bfa 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,6 +1,7 @@ mod actor; mod archive; mod handle; +mod mirror; pub mod package; pub use actor::Actor; @@ -38,10 +39,7 @@ pub struct SharedState { } impl SharedState { - pub fn new( - repos_dir: impl AsRef, - conn: DbConn, - ) -> Self { + pub fn new(repos_dir: impl AsRef, conn: DbConn) -> Self { let (tx, rx) = unbounded_channel(); Self { @@ -78,7 +76,7 @@ pub fn start( std::thread::spawn(|| actor.run()); } - let handle = Handle::new(&state); + let handle = Handle::new(&state); for id in repo_ids { rt.block_on(handle.register_repo(id))?; From aa0aae41ab465d1d2de83ff872ee7b0416f014d5 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 21 Jul 2024 22:00:38 +0200 Subject: [PATCH 3/3] feat(repo): implement async workers --- server/rieterd.toml | 5 +++- server/src/config.rs | 17 +++++++++-- server/src/main.rs | 3 +- server/src/repo/actor/async.rs | 32 +++++++++++++++++++++ server/src/repo/actor/mod.rs | 5 ++++ server/src/repo/{actor.rs => actor/sync.rs} | 12 ++++---- server/src/repo/handle.rs | 20 ++++++++++--- server/src/repo/mirror/mod.rs | 2 +- server/src/repo/mod.rs | 30 +++++++++++++------ 9 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 server/src/repo/actor/async.rs create mode 100644 server/src/repo/actor/mod.rs rename server/src/repo/{actor.rs => actor/sync.rs} (96%) diff --git a/server/rieterd.toml b/server/rieterd.toml index 9cc56bf..8efa678 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,7 +1,10 @@ api_key = "test" -pkg_workers = 2 log_level = "rieterd=debug" +[repo] +sync_workers = 2 +async_workers = 1 + [fs] type = "local" data_dir = "./data" diff --git a/server/src/config.rs b/server/src/config.rs index e165fdc..02f1182 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -36,6 +36,14 @@ pub enum DbConfig { }, } +#[derive(Deserialize, Debug, Clone)] +pub struct RepoConfig { + #[serde(default = "default_repo_sync_workers")] + pub sync_workers: u32, + #[serde(default = "default_repo_async_workers")] + pub async_workers: u32, +} + #[derive(Deserialize, Debug, Clone)] pub struct Config { pub api_key: String, @@ -47,8 +55,7 @@ pub struct Config { pub log_level: String, pub fs: FsConfig, pub db: DbConfig, - #[serde(default = "default_pkg_workers")] - pub pkg_workers: u32, + pub repo: RepoConfig, } impl Config { @@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 { 16 } -fn default_pkg_workers() -> u32 { +fn default_repo_sync_workers() -> u32 { + 1 +} + +fn default_repo_async_workers() -> u32 { 1 } diff --git a/server/src/main.rs b/server/src/main.rs index f99ba18..1921d2e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { data_dir.join("repos"), db.clone(), rt.clone(), - config.pkg_workers, + config.repo.sync_workers, + config.repo.async_workers, )? //rt.block_on(crate::repo::RepoMgr::new( // data_dir.join("repos"), diff --git a/server/src/repo/actor/async.rs b/server/src/repo/actor/async.rs new file mode 100644 index 0000000..2690d36 --- /dev/null +++ b/server/src/repo/actor/async.rs @@ -0,0 +1,32 @@ +use crate::{ + db, + repo::{archive, package, AsyncCommand, SharedState}, +}; + +use std::{ + path::PathBuf, + sync::{atomic::Ordering, Arc}, +}; + +pub struct AsyncActor { + state: Arc, +} + +impl AsyncActor { + pub fn new(state: &Arc) -> Self { + Self { + state: Arc::clone(state) + } + } + + pub async fn run(self) { + while let Some(msg) = { + let mut rx = self.state.async_queue.1.lock().await; + rx.recv().await + } { + match msg { + + } + } + } +} diff --git a/server/src/repo/actor/mod.rs b/server/src/repo/actor/mod.rs new file mode 100644 index 0000000..f30fa57 --- /dev/null +++ b/server/src/repo/actor/mod.rs @@ -0,0 +1,5 @@ +mod sync; +mod r#async; + +pub use sync::Actor; +pub use r#async::AsyncActor; diff --git a/server/src/repo/actor.rs b/server/src/repo/actor/sync.rs similarity index 96% rename from server/src/repo/actor.rs rename to server/src/repo/actor/sync.rs index 3aa04de..e7b6e44 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor/sync.rs @@ -1,5 +1,7 @@ -use super::{archive, package, Command, SharedState}; -use crate::db; +use crate::{ + db, + repo::{archive, package, Command, SharedState}, +}; use std::{ path::PathBuf, @@ -20,10 +22,10 @@ pub struct Actor { } impl Actor { - pub fn new(rt: runtime::Handle, state: Arc) -> Self { + pub fn new(rt: runtime::Handle, state: &Arc) -> Self { Self { rt, - state: Arc::clone(&state), + state: Arc::clone(state), } } @@ -37,7 +39,7 @@ impl Actor { /// Run the main actor loop pub fn run(self) { while let Some(msg) = { - let mut rx = self.state.rx.lock().unwrap(); + let mut rx = self.state.sync_queue.1.lock().unwrap(); rx.blocking_recv() } { match msg { diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index 6f07a7a..127542b 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -31,7 +31,11 @@ impl Handle { } pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> { - tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; + let repo_dir = self.state.repos_dir.join(repo_id.to_string()); + + if !tokio::fs::try_exists(repo_dir).await? { + tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; + } let mut repos = self.state.repos.write().await; repos.insert(repo_id, Default::default()); @@ -136,17 +140,25 @@ impl Handle { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); + self.state + .sync_queue + .0 + .send(Command::ParsePkg(repo, path)) + .unwrap(); self.state.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); } async fn queue_sync(&self, repo: i32) { - self.state.tx.send(Command::SyncRepo(repo)).unwrap(); + self.state + .sync_queue + .0 + .send(Command::SyncRepo(repo)) + .unwrap(); } async fn queue_clean(&self) { - self.state.tx.send(Command::Clean).unwrap(); + self.state.sync_queue.0.send(Command::Clean).unwrap(); } } diff --git a/server/src/repo/mirror/mod.rs b/server/src/repo/mirror/mod.rs index 02a0aab..61b0a0b 100644 --- a/server/src/repo/mirror/mod.rs +++ b/server/src/repo/mirror/mod.rs @@ -1,3 +1,3 @@ mod parser; -pub use parser::{DbArchiveParser, DbArchiveEntry}; +pub use parser::{DbArchiveEntry, DbArchiveParser}; diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 96d4bfa..393afa7 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -4,7 +4,7 @@ mod handle; mod mirror; pub mod package; -pub use actor::Actor; +pub use actor::{Actor, AsyncActor}; pub use handle::Handle; use std::{ @@ -28,25 +28,32 @@ pub enum Command { Clean, } +pub enum AsyncCommand { +} + type RepoState = (AtomicU32, Arc>); pub struct SharedState { pub repos_dir: PathBuf, pub conn: DbConn, - pub rx: Mutex>, - pub tx: UnboundedSender, + pub sync_queue: (UnboundedSender, Mutex>), + pub async_queue: ( + UnboundedSender, + tokio::sync::Mutex>, + ), pub repos: RwLock>, } impl SharedState { pub fn new(repos_dir: impl AsRef, conn: DbConn) -> Self { let (tx, rx) = unbounded_channel(); + let (async_tx, async_rx) = unbounded_channel(); Self { repos_dir: repos_dir.as_ref().to_path_buf(), conn, - rx: Mutex::new(rx), - tx, + sync_queue: (tx, Mutex::new(rx)), + async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)), repos: RwLock::new(Default::default()), } } @@ -56,7 +63,8 @@ pub fn start( repos_dir: impl AsRef, conn: DbConn, rt: runtime::Handle, - actors: u32, + sync_actors: u32, + async_actors: u32, ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; @@ -70,12 +78,18 @@ pub fn start( let state = Arc::new(SharedState::new(repos_dir, conn)); - for _ in 0..actors { - let actor = Actor::new(rt.clone(), Arc::clone(&state)); + for _ in 0..sync_actors { + let actor = Actor::new(rt.clone(), &state); std::thread::spawn(|| actor.run()); } + for _ in 0..async_actors { + let actor = AsyncActor::new(&state); + + rt.spawn(actor.run()); + } + let handle = Handle::new(&state); for id in repo_ids {