refactor(repo): put some more code in its place

agents
Jef Roosens 2024-06-26 22:00:43 +02:00
parent a6de2c3c14
commit d375df0ff4
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 105 additions and 98 deletions

View File

@ -53,7 +53,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
let repo = match &config.fs { let repo = match &config.fs {
FsConfig::Local { data_dir } => { FsConfig::Local { data_dir } => {
crate::repo::Handle::start( crate::repo::start(
data_dir.join("repos"), data_dir.join("repos"),
db.clone(), db.clone(),
rt.clone(), rt.clone(),

View File

@ -1,68 +1,26 @@
use super::{archive, package}; use super::{archive, package, Command, SharedState};
use crate::db; use crate::db;
use std::{ use std::{
collections::HashMap, path::PathBuf,
path::{Path, PathBuf}, sync::{atomic::Ordering, Arc},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
}; };
use futures::StreamExt; use futures::StreamExt;
use sea_orm::{ColumnTrait, DbConn, EntityTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
use sea_query::Expr; use sea_query::Expr;
use tokio::{ use tokio::{runtime, sync::mpsc};
runtime,
sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
};
use uuid::Uuid; use uuid::Uuid;
pub enum RepoCommand {
ParsePkg(i32, PathBuf),
SyncRepo(i32),
Clean,
}
pub struct RepoSharedState {
pub repos_dir: PathBuf,
pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<RepoCommand>>,
pub tx: UnboundedSender<RepoCommand>,
pub repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoSharedState {
pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel();
Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
rx: Mutex::new(rx),
tx,
repos: RwLock::new(repos),
}
}
}
/// The actor is responsible for mutating the repositories. They receive their commands their /// The actor is responsible for mutating the repositories. They receive their commands their
/// messages and process these commands in both a synchronous and asynchronous way. /// messages and process these commands in both a synchronous and asynchronous way.
pub struct RepoActor { pub struct Actor {
rt: runtime::Handle, rt: runtime::Handle,
state: Arc<RepoSharedState>, state: Arc<SharedState>,
} }
impl RepoActor { impl Actor {
pub fn new(rt: runtime::Handle, state: Arc<RepoSharedState>) -> Self { pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
Self { Self {
rt, rt,
state: Arc::clone(&state), state: Arc::clone(&state),
@ -83,7 +41,7 @@ impl RepoActor {
rx.blocking_recv() rx.blocking_recv()
} { } {
match msg { match msg {
RepoCommand::ParsePkg(repo, path) => { Command::ParsePkg(repo, path) => {
let _ = self.parse_pkg(repo, path); let _ = self.parse_pkg(repo, path);
if self if self
@ -98,10 +56,10 @@ impl RepoActor {
let _ = self.clean(); let _ = self.clean();
} }
} }
RepoCommand::SyncRepo(repo) => { Command::SyncRepo(repo) => {
let _ = self.sync_repo(repo); let _ = self.sync_repo(repo);
} }
RepoCommand::Clean => { Command::Clean => {
let _ = self.clean(); let _ = self.clean();
} }
} }

View File

@ -1,56 +1,27 @@
use super::{RepoCommand, RepoSharedState}; use super::{Command, SharedState};
use crate::db; use crate::db;
use std::{ use std::{
collections::HashMap, path::PathBuf,
path::{Path, PathBuf},
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc},
}; };
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, NotSet, QueryFilter, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, NotSet, QueryFilter, QuerySelect, Set,
QuerySelect, Set,
}; };
use sea_query::Expr; use sea_query::Expr;
use tokio::runtime;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone)] #[derive(Clone)]
pub struct Handle { pub struct Handle {
state: Arc<RepoSharedState>, state: Arc<SharedState>,
} }
impl Handle { impl Handle {
pub fn start( pub fn new(state: &Arc<SharedState>) -> Self {
repos_dir: impl AsRef<Path>, Self {
conn: DbConn, state: Arc::clone(state),
rt: runtime::Handle,
actors: u32,
) -> crate::Result<Self> {
std::fs::create_dir_all(repos_dir.as_ref())?;
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn),
)?;
for id in repo_ids {
repos.insert(id, Default::default());
} }
let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos));
for _ in 0..actors {
let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state));
std::thread::spawn(|| actor.run());
}
Ok(Self { state })
} }
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] { pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
@ -157,20 +128,17 @@ impl Handle {
} }
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.state self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
.tx
.send(RepoCommand::ParsePkg(repo, path))
.unwrap();
self.state.repos.read().await.get(&repo).inspect(|n| { self.state.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst); n.0.fetch_add(1, Ordering::SeqCst);
}); });
} }
async fn queue_sync(&self, repo: i32) { async fn queue_sync(&self, repo: i32) {
self.state.tx.send(RepoCommand::SyncRepo(repo)).unwrap(); self.state.tx.send(Command::SyncRepo(repo)).unwrap();
} }
async fn queue_clean(&self) { async fn queue_clean(&self) {
self.state.tx.send(RepoCommand::Clean).unwrap(); self.state.tx.send(Command::Clean).unwrap();
} }
} }

View File

@ -3,5 +3,86 @@ mod archive;
mod handle; mod handle;
pub mod package; pub mod package;
pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use actor::Actor;
pub use handle::Handle; pub use handle::Handle;
use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex},
};
use sea_orm::{DbConn, EntityTrait, QuerySelect};
use tokio::{
runtime,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
};
pub enum Command {
ParsePkg(i32, PathBuf),
SyncRepo(i32),
Clean,
}
pub struct SharedState {
pub repos_dir: PathBuf,
pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<Command>>,
pub tx: UnboundedSender<Command>,
pub repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl SharedState {
pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel();
Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
rx: Mutex::new(rx),
tx,
repos: RwLock::new(repos),
}
}
}
pub fn start(
repos_dir: impl AsRef<Path>,
conn: DbConn,
rt: runtime::Handle,
actors: u32,
) -> crate::Result<Handle> {
std::fs::create_dir_all(repos_dir.as_ref())?;
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn),
)?;
for id in repo_ids {
repos.insert(id, Default::default());
}
let state = Arc::new(SharedState::new(repos_dir, conn, repos));
for _ in 0..actors {
let actor = Actor::new(rt.clone(), Arc::clone(&state));
std::thread::spawn(|| actor.run());
}
Ok(Handle::new(&state))
}