diff --git a/libarchive/src/lib.rs b/libarchive/src/lib.rs index 8ffc5ff..e8a91f2 100644 --- a/libarchive/src/lib.rs +++ b/libarchive/src/lib.rs @@ -5,6 +5,6 @@ pub mod write; pub use archive::{ Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat, - WriteFilter, WriteFormat, FileType, + WriteFilter, WriteFormat, }; pub use error::Result; diff --git a/libarchive/src/read/mod.rs b/libarchive/src/read/mod.rs index 596595b..579b3e5 100644 --- a/libarchive/src/read/mod.rs +++ b/libarchive/src/read/mod.rs @@ -6,10 +6,9 @@ pub use builder::Builder; use crate::archive::Handle; use crate::ReadFilter; -pub use entries::{Entries, ReadEntry}; +use entries::Entries; use libarchive3_sys::ffi; use std::path::Path; -pub use file::FileReader; // Represents a read view of an archive pub trait Archive: Handle + Sized { diff --git a/server/rieterd.toml b/server/rieterd.toml index 8efa678..9cc56bf 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,10 +1,7 @@ api_key = "test" +pkg_workers = 2 log_level = "rieterd=debug" -[repo] -sync_workers = 2 -async_workers = 1 - [fs] type = "local" data_dir = "./data" diff --git a/server/src/config.rs b/server/src/config.rs index 02f1182..e165fdc 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -36,14 +36,6 @@ pub enum DbConfig { }, } -#[derive(Deserialize, Debug, Clone)] -pub struct RepoConfig { - #[serde(default = "default_repo_sync_workers")] - pub sync_workers: u32, - #[serde(default = "default_repo_async_workers")] - pub async_workers: u32, -} - #[derive(Deserialize, Debug, Clone)] pub struct Config { pub api_key: String, @@ -55,7 +47,8 @@ pub struct Config { pub log_level: String, pub fs: FsConfig, pub db: DbConfig, - pub repo: RepoConfig, + #[serde(default = "default_pkg_workers")] + pub pkg_workers: u32, } impl Config { @@ -90,10 +83,6 @@ fn default_db_postgres_max_connections() -> u32 { 16 } -fn default_repo_sync_workers() -> u32 { - 1 -} - -fn default_repo_async_workers() -> u32 { +fn default_pkg_workers() -> u32 { 1 } diff --git a/server/src/main.rs b/server/src/main.rs index 1921d2e..f99ba18 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -60,8 +60,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { data_dir.join("repos"), db.clone(), rt.clone(), - config.repo.sync_workers, - config.repo.async_workers, + config.pkg_workers, )? //rt.block_on(crate::repo::RepoMgr::new( // data_dir.join("repos"), diff --git a/server/src/repo/actor/sync.rs b/server/src/repo/actor.rs similarity index 96% rename from server/src/repo/actor/sync.rs rename to server/src/repo/actor.rs index e7b6e44..3aa04de 100644 --- a/server/src/repo/actor/sync.rs +++ b/server/src/repo/actor.rs @@ -1,7 +1,5 @@ -use crate::{ - db, - repo::{archive, package, Command, SharedState}, -}; +use super::{archive, package, Command, SharedState}; +use crate::db; use std::{ path::PathBuf, @@ -22,10 +20,10 @@ pub struct Actor { } impl Actor { - pub fn new(rt: runtime::Handle, state: &Arc) -> Self { + pub fn new(rt: runtime::Handle, state: Arc) -> Self { Self { rt, - state: Arc::clone(state), + state: Arc::clone(&state), } } @@ -39,7 +37,7 @@ impl Actor { /// Run the main actor loop pub fn run(self) { while let Some(msg) = { - let mut rx = self.state.sync_queue.1.lock().unwrap(); + let mut rx = self.state.rx.lock().unwrap(); rx.blocking_recv() } { match msg { diff --git a/server/src/repo/actor/async.rs b/server/src/repo/actor/async.rs deleted file mode 100644 index 2690d36..0000000 --- a/server/src/repo/actor/async.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::{ - db, - repo::{archive, package, AsyncCommand, SharedState}, -}; - -use std::{ - path::PathBuf, - sync::{atomic::Ordering, Arc}, -}; - -pub struct AsyncActor { - state: Arc, -} - -impl AsyncActor { - pub fn new(state: &Arc) -> Self { - Self { - state: Arc::clone(state) - } - } - - pub async fn run(self) { - while let Some(msg) = { - let mut rx = self.state.async_queue.1.lock().await; - rx.recv().await - } { - match msg { - - } - } - } -} diff --git a/server/src/repo/actor/mod.rs b/server/src/repo/actor/mod.rs deleted file mode 100644 index f30fa57..0000000 --- a/server/src/repo/actor/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod sync; -mod r#async; - -pub use sync::Actor; -pub use r#async::AsyncActor; diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index 127542b..6f07a7a 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -31,11 +31,7 @@ impl Handle { } pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> { - let repo_dir = self.state.repos_dir.join(repo_id.to_string()); - - if !tokio::fs::try_exists(repo_dir).await? { - tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; - } + tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?; let mut repos = self.state.repos.write().await; repos.insert(repo_id, Default::default()); @@ -140,25 +136,17 @@ impl Handle { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state - .sync_queue - .0 - .send(Command::ParsePkg(repo, path)) - .unwrap(); + self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); self.state.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); } async fn queue_sync(&self, repo: i32) { - self.state - .sync_queue - .0 - .send(Command::SyncRepo(repo)) - .unwrap(); + self.state.tx.send(Command::SyncRepo(repo)).unwrap(); } async fn queue_clean(&self) { - self.state.sync_queue.0.send(Command::Clean).unwrap(); + self.state.tx.send(Command::Clean).unwrap(); } } diff --git a/server/src/repo/mirror/mod.rs b/server/src/repo/mirror/mod.rs deleted file mode 100644 index 61b0a0b..0000000 --- a/server/src/repo/mirror/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod parser; - -pub use parser::{DbArchiveEntry, DbArchiveParser}; diff --git a/server/src/repo/mirror/parser.rs b/server/src/repo/mirror/parser.rs deleted file mode 100644 index d5270f4..0000000 --- a/server/src/repo/mirror/parser.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::{ - io::{self, BufRead}, - path::Path, -}; - -use libarchive::{ - read::{Archive, Builder, Entries, FileReader, ReadEntry}, - Entry, -}; - -pub struct DbArchiveParser<'a, T: 'a + Archive> { - entries: Entries<'a, T>, -} - -pub struct DbArchiveEntry { - name: String, - version: String, - filename: String, -} - -impl<'a, T: Archive> DbArchiveParser<'a, T> { - pub fn new(ar: &'a mut T) -> Self { - Self { - entries: Entries::new(ar), - } - } - - // parse a given entry. If the entry's not a regular file, the function returns None. - fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result { - let reader = io::BufReader::new(entry); - let mut lines = reader.lines(); - - let mut name: Option = None; - let mut version: Option = None; - let mut filename: Option = None; - - while let Some(line) = lines.next().transpose()? { - match line.as_str() { - "%NAME%" => name = lines.next().transpose()?, - "%VERSION%" => version = lines.next().transpose()?, - "%FILENAME%" => filename = lines.next().transpose()?, - _ => {} - } - } - - if name.is_some() && version.is_some() && filename.is_some() { - Ok(DbArchiveEntry { - name: name.unwrap(), - version: version.unwrap(), - filename: filename.unwrap(), - }) - } else { - Err(io::Error::other("Missing fields in entry file")) - } - } -} - -impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> { - type Item = io::Result; - - fn next(&mut self) -> Option { - while let Some(entry) = self.entries.next() { - match entry { - Ok(entry) => { - if entry.filetype() == libarchive::FileType::RegularFile { - return Some(Self::parse_entry(entry)); - } - } - Err(err) => return Some(Err(err.into())), - } - } - - None - } -} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 393afa7..cfccb4a 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,10 +1,9 @@ mod actor; mod archive; mod handle; -mod mirror; pub mod package; -pub use actor::{Actor, AsyncActor}; +pub use actor::Actor; pub use handle::Handle; use std::{ @@ -28,33 +27,30 @@ pub enum Command { Clean, } -pub enum AsyncCommand { -} - type RepoState = (AtomicU32, Arc>); pub struct SharedState { pub repos_dir: PathBuf, pub conn: DbConn, - pub sync_queue: (UnboundedSender, Mutex>), - pub async_queue: ( - UnboundedSender, - tokio::sync::Mutex>, - ), + pub rx: Mutex>, + pub tx: UnboundedSender, pub repos: RwLock>, } impl SharedState { - pub fn new(repos_dir: impl AsRef, conn: DbConn) -> Self { + pub fn new( + repos_dir: impl AsRef, + conn: DbConn, + repos: HashMap>)>, + ) -> Self { let (tx, rx) = unbounded_channel(); - let (async_tx, async_rx) = unbounded_channel(); Self { repos_dir: repos_dir.as_ref().to_path_buf(), conn, - sync_queue: (tx, Mutex::new(rx)), - async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)), - repos: RwLock::new(Default::default()), + rx: Mutex::new(rx), + tx, + repos: RwLock::new(repos), } } } @@ -63,11 +59,11 @@ pub fn start( repos_dir: impl AsRef, conn: DbConn, rt: runtime::Handle, - sync_actors: u32, - async_actors: u32, + actors: u32, ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; + let mut repos = HashMap::new(); let repo_ids: Vec = rt.block_on( entity::prelude::Repo::find() .select_only() @@ -76,25 +72,17 @@ pub fn start( .all(&conn), )?; - let state = Arc::new(SharedState::new(repos_dir, conn)); + for id in repo_ids { + repos.insert(id, Default::default()); + } - for _ in 0..sync_actors { - let actor = Actor::new(rt.clone(), &state); + let state = Arc::new(SharedState::new(repos_dir, conn, repos)); + + for _ in 0..actors { + let actor = Actor::new(rt.clone(), Arc::clone(&state)); std::thread::spawn(|| actor.run()); } - for _ in 0..async_actors { - let actor = AsyncActor::new(&state); - - rt.spawn(actor.run()); - } - - let handle = Handle::new(&state); - - for id in repo_ids { - rt.block_on(handle.register_repo(id))?; - } - - Ok(handle) + Ok(Handle::new(&state)) } diff --git a/server/src/web/api/mod.rs b/server/src/web/api/mod.rs index 07395fc..dc99e53 100644 --- a/server/src/web/api/mod.rs +++ b/server/src/web/api/mod.rs @@ -1,4 +1,5 @@ mod pagination; +mod repo; use crate::db; use pagination::PaginatedResponse;