From d375df0ff4fde9fc8095e41e712c5cef27a86867 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 26 Jun 2024 22:00:43 +0200 Subject: [PATCH] refactor(repo): put some more code in its place --- server/src/main.rs | 2 +- server/src/repo/actor.rs | 66 ++++++------------------------- server/src/repo/handle.rs | 52 +++++------------------- server/src/repo/mod.rs | 83 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 105 insertions(+), 98 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 337ba2e..5a91fdb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -53,7 +53,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { let repo = match &config.fs { FsConfig::Local { data_dir } => { - crate::repo::Handle::start( + crate::repo::start( data_dir.join("repos"), db.clone(), rt.clone(), diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index 57f1b93..2c2fd74 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -1,68 +1,26 @@ -use super::{archive, package}; +use super::{archive, package, Command, SharedState}; use crate::db; use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, - }, + path::PathBuf, + sync::{atomic::Ordering, Arc}, }; use futures::StreamExt; -use sea_orm::{ColumnTrait, DbConn, EntityTrait, QueryFilter, QuerySelect}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; use sea_query::Expr; -use tokio::{ - runtime, - sync::{ - mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, - RwLock, - }, -}; +use tokio::{runtime, sync::mpsc}; use uuid::Uuid; -pub enum RepoCommand { - ParsePkg(i32, PathBuf), - SyncRepo(i32), - Clean, -} - -pub struct RepoSharedState { - pub repos_dir: PathBuf, - pub conn: DbConn, - pub rx: Mutex>, - pub tx: UnboundedSender, - pub repos: RwLock>)>>, -} - -impl RepoSharedState { - pub fn new( - repos_dir: impl AsRef, - conn: DbConn, - repos: HashMap>)>, - ) -> Self { - let (tx, rx) = unbounded_channel(); - - Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - rx: Mutex::new(rx), - tx, - repos: RwLock::new(repos), - } - } -} - /// The actor is responsible for mutating the repositories. They receive their commands their /// messages and process these commands in both a synchronous and asynchronous way. -pub struct RepoActor { +pub struct Actor { rt: runtime::Handle, - state: Arc, + state: Arc, } -impl RepoActor { - pub fn new(rt: runtime::Handle, state: Arc) -> Self { +impl Actor { + pub fn new(rt: runtime::Handle, state: Arc) -> Self { Self { rt, state: Arc::clone(&state), @@ -83,7 +41,7 @@ impl RepoActor { rx.blocking_recv() } { match msg { - RepoCommand::ParsePkg(repo, path) => { + Command::ParsePkg(repo, path) => { let _ = self.parse_pkg(repo, path); if self @@ -98,10 +56,10 @@ impl RepoActor { let _ = self.clean(); } } - RepoCommand::SyncRepo(repo) => { + Command::SyncRepo(repo) => { let _ = self.sync_repo(repo); } - RepoCommand::Clean => { + Command::Clean => { let _ = self.clean(); } } diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index 4cec237..bbcc153 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,56 +1,27 @@ -use super::{RepoCommand, RepoSharedState}; +use super::{Command, SharedState}; use crate::db; use std::{ - collections::HashMap, - path::{Path, PathBuf}, + path::PathBuf, sync::{atomic::Ordering, Arc}, }; use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, NotSet, QueryFilter, - QuerySelect, Set, + ActiveModelTrait, ColumnTrait, Condition, EntityTrait, NotSet, QueryFilter, QuerySelect, Set, }; use sea_query::Expr; -use tokio::runtime; use uuid::Uuid; #[derive(Clone)] pub struct Handle { - state: Arc, + state: Arc, } impl Handle { - pub fn start( - repos_dir: impl AsRef, - conn: DbConn, - rt: runtime::Handle, - actors: u32, - ) -> crate::Result { - std::fs::create_dir_all(repos_dir.as_ref())?; - - let mut repos = HashMap::new(); - let repo_ids: Vec = rt.block_on( - db::Repo::find() - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .all(&conn), - )?; - - for id in repo_ids { - repos.insert(id, Default::default()); + pub fn new(state: &Arc) -> Self { + Self { + state: Arc::clone(state), } - - let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos)); - - for _ in 0..actors { - let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state)); - - std::thread::spawn(|| actor.run()); - } - - Ok(Self { state }) } pub fn random_file_paths(&self) -> [PathBuf; C] { @@ -157,20 +128,17 @@ impl Handle { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state - .tx - .send(RepoCommand::ParsePkg(repo, path)) - .unwrap(); + self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); self.state.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); } async fn queue_sync(&self, repo: i32) { - self.state.tx.send(RepoCommand::SyncRepo(repo)).unwrap(); + self.state.tx.send(Command::SyncRepo(repo)).unwrap(); } async fn queue_clean(&self) { - self.state.tx.send(RepoCommand::Clean).unwrap(); + self.state.tx.send(Command::Clean).unwrap(); } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 16c368e..9920326 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -3,5 +3,86 @@ mod archive; mod handle; pub mod package; -pub use actor::{RepoActor, RepoCommand, RepoSharedState}; +pub use actor::Actor; pub use handle::Handle; + +use crate::db; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{atomic::AtomicU32, Arc, Mutex}, +}; + +use sea_orm::{DbConn, EntityTrait, QuerySelect}; +use tokio::{ + runtime, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + RwLock, + }, +}; + +pub enum Command { + ParsePkg(i32, PathBuf), + SyncRepo(i32), + Clean, +} + +pub struct SharedState { + pub repos_dir: PathBuf, + pub conn: DbConn, + pub rx: Mutex>, + pub tx: UnboundedSender, + pub repos: RwLock>)>>, +} + +impl SharedState { + pub fn new( + repos_dir: impl AsRef, + conn: DbConn, + repos: HashMap>)>, + ) -> Self { + let (tx, rx) = unbounded_channel(); + + Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + rx: Mutex::new(rx), + tx, + repos: RwLock::new(repos), + } + } +} + +pub fn start( + repos_dir: impl AsRef, + conn: DbConn, + rt: runtime::Handle, + actors: u32, +) -> crate::Result { + std::fs::create_dir_all(repos_dir.as_ref())?; + + let mut repos = HashMap::new(); + let repo_ids: Vec = rt.block_on( + db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn), + )?; + + for id in repo_ids { + repos.insert(id, Default::default()); + } + + let state = Arc::new(SharedState::new(repos_dir, conn, repos)); + + for _ in 0..actors { + let actor = Actor::new(rt.clone(), Arc::clone(&state)); + + std::thread::spawn(|| actor.run()); + } + + Ok(Handle::new(&state)) +}