refactor: use shared state struct

agents
Jef Roosens 2024-06-25 16:53:30 +02:00
parent 8864925e58
commit 656df06b4e
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 42 additions and 40 deletions

View File

@ -11,43 +11,60 @@ use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
}; };
use tokio::{runtime, sync::mpsc::UnboundedReceiver}; use tokio::{
runtime,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};
pub enum RepoCommand { pub enum RepoCommand {
ParsePkg(i32, PathBuf), ParsePkg(i32, PathBuf),
} }
pub struct RepoSharedState {
repos_dir: PathBuf,
conn: DbConn,
rx: Mutex<UnboundedReceiver<RepoCommand>>,
tx: UnboundedSender<RepoCommand>,
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoSharedState {
pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel();
Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
rx: Mutex::new(rx),
tx,
repos: RwLock::new(repos),
}
}
}
/// The actor is responsible for mutating the repositories. They receive their commands their /// The actor is responsible for mutating the repositories. They receive their commands their
/// messages and process these commands in both a synchronous and asynchronous way. /// messages and process these commands in both a synchronous and asynchronous way.
pub struct RepoActor { pub struct RepoActor {
repos_dir: PathBuf,
conn: DbConn,
rt: runtime::Handle, rt: runtime::Handle,
rx: Arc<Mutex<UnboundedReceiver<RepoCommand>>>, state: Arc<RepoSharedState>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
} }
impl RepoActor { impl RepoActor {
pub fn new( pub fn new(rt: runtime::Handle, state: Arc<RepoSharedState>) -> Self {
repos_dir: PathBuf,
conn: DbConn,
rt: runtime::Handle,
rx: Arc<Mutex<UnboundedReceiver<RepoCommand>>>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
) -> Self {
Self { Self {
repos_dir,
conn,
rt, rt,
rx, state: Arc::clone(&state),
repos,
} }
} }
/// Run the main actor loop /// Run the main actor loop
pub fn run(self) { pub fn run(self) {
while let Some(msg) = { while let Some(msg) = {
let mut rx = self.rx.lock().unwrap(); let mut rx = self.state.rx.lock().unwrap();
rx.blocking_recv() rx.blocking_recv()
} { } {
match msg { match msg {
@ -62,8 +79,9 @@ impl RepoActor {
let pkg = package::Package::open(&path)?; let pkg = package::Package::open(&path)?;
let pkg = self let pkg = self
.rt .rt
.block_on(db::query::package::insert(&self.conn, repo, pkg))?; .block_on(db::query::package::insert(&self.state.conn, repo, pkg))?;
let dest_path = self let dest_path = self
.state
.repos_dir .repos_dir
.join(repo.to_string()) .join(repo.to_string())
.join(pkg.id.to_string()); .join(pkg.id.to_string());

View File

@ -1,3 +1,4 @@
use super::RepoSharedState;
use crate::db; use crate::db;
use std::{ use std::{
@ -17,10 +18,7 @@ use tokio::{
#[derive(Clone)] #[derive(Clone)]
pub struct RepoHandle { pub struct RepoHandle {
repos_dir: PathBuf, state: Arc<RepoSharedState>,
conn: DbConn,
tx: UnboundedSender<super::RepoCommand>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
} }
impl RepoHandle { impl RepoHandle {
@ -32,8 +30,6 @@ impl RepoHandle {
) -> crate::Result<Self> { ) -> crate::Result<Self> {
std::fs::create_dir_all(repos_dir.as_ref())?; std::fs::create_dir_all(repos_dir.as_ref())?;
let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new(); let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on( let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find() db::Repo::find()
@ -47,26 +43,14 @@ impl RepoHandle {
repos.insert(id, Default::default()); repos.insert(id, Default::default());
} }
let rx = Arc::new(Mutex::new(rx)); let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos));
let repos = Arc::new(RwLock::new(repos));
for _ in 0..actors { for _ in 0..actors {
let actor = super::RepoActor::new( let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state));
repos_dir.as_ref().to_path_buf(),
conn.clone(),
rt.clone(),
Arc::clone(&rx),
Arc::clone(&repos),
);
std::thread::spawn(|| actor.run()); std::thread::spawn(|| actor.run());
} }
Ok(Self { Ok(Self { state })
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
tx,
repos,
})
} }
} }

View File

@ -4,7 +4,7 @@ mod handle;
mod manager; mod manager;
pub mod package; pub mod package;
pub use actor::{RepoActor, RepoCommand}; pub use actor::{RepoActor, RepoCommand, RepoSharedState};
pub use handle::RepoHandle; pub use handle::RepoHandle;
pub use manager::RepoMgr; pub use manager::RepoMgr;