From 656df06b4e4c40827a5f09768784c5027d4102df Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 25 Jun 2024 16:53:30 +0200 Subject: [PATCH] refactor: use shared state struct --- server/src/repo/actor.rs | 54 ++++++++++++++++++++++++++------------- server/src/repo/handle.rs | 26 ++++--------------- server/src/repo/mod.rs | 2 +- 3 files changed, 42 insertions(+), 40 deletions(-) diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index 1291656..c232df2 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -11,43 +11,60 @@ use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; -use tokio::{runtime, sync::mpsc::UnboundedReceiver}; +use tokio::{ + runtime, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, +}; pub enum RepoCommand { ParsePkg(i32, PathBuf), } +pub struct RepoSharedState { + repos_dir: PathBuf, + conn: DbConn, + rx: Mutex>, + tx: UnboundedSender, + repos: RwLock>)>>, +} + +impl RepoSharedState { + pub fn new( + repos_dir: impl AsRef, + conn: DbConn, + repos: HashMap>)>, + ) -> Self { + let (tx, rx) = unbounded_channel(); + + Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + rx: Mutex::new(rx), + tx, + repos: RwLock::new(repos), + } + } +} + /// The actor is responsible for mutating the repositories. They receive their commands their /// messages and process these commands in both a synchronous and asynchronous way. pub struct RepoActor { - repos_dir: PathBuf, - conn: DbConn, rt: runtime::Handle, - rx: Arc>>, - repos: Arc>)>>>, + state: Arc, } impl RepoActor { - pub fn new( - repos_dir: PathBuf, - conn: DbConn, - rt: runtime::Handle, - rx: Arc>>, - repos: Arc>)>>>, - ) -> Self { + pub fn new(rt: runtime::Handle, state: Arc) -> Self { Self { - repos_dir, - conn, rt, - rx, - repos, + state: Arc::clone(&state), } } /// Run the main actor loop pub fn run(self) { while let Some(msg) = { - let mut rx = self.rx.lock().unwrap(); + let mut rx = self.state.rx.lock().unwrap(); rx.blocking_recv() } { match msg { @@ -62,8 +79,9 @@ impl RepoActor { let pkg = package::Package::open(&path)?; let pkg = self .rt - .block_on(db::query::package::insert(&self.conn, repo, pkg))?; + .block_on(db::query::package::insert(&self.state.conn, repo, pkg))?; let dest_path = self + .state .repos_dir .join(repo.to_string()) .join(pkg.id.to_string()); diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index b720390..b918aaf 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,3 +1,4 @@ +use super::RepoSharedState; use crate::db; use std::{ @@ -17,10 +18,7 @@ use tokio::{ #[derive(Clone)] pub struct RepoHandle { - repos_dir: PathBuf, - conn: DbConn, - tx: UnboundedSender, - repos: Arc>)>>>, + state: Arc, } impl RepoHandle { @@ -32,8 +30,6 @@ impl RepoHandle { ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; - let (tx, rx) = unbounded_channel(); - let mut repos = HashMap::new(); let repo_ids: Vec = rt.block_on( db::Repo::find() @@ -47,26 +43,14 @@ impl RepoHandle { repos.insert(id, Default::default()); } - let rx = Arc::new(Mutex::new(rx)); - let repos = Arc::new(RwLock::new(repos)); + let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos)); for _ in 0..actors { - let actor = super::RepoActor::new( - repos_dir.as_ref().to_path_buf(), - conn.clone(), - rt.clone(), - Arc::clone(&rx), - Arc::clone(&repos), - ); + let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state)); std::thread::spawn(|| actor.run()); } - Ok(Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - tx, - repos, - }) + Ok(Self { state }) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 6ea74ab..8e9a627 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -4,7 +4,7 @@ mod handle; mod manager; pub mod package; -pub use actor::{RepoActor, RepoCommand}; +pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use handle::RepoHandle; pub use manager::RepoMgr;