feat(repo): implement async workers
parent
d38fd5ca74
commit
aa0aae41ab
|
@ -1,7 +1,10 @@
|
|||
api_key = "test"
|
||||
pkg_workers = 2
|
||||
log_level = "rieterd=debug"
|
||||
|
||||
[repo]
|
||||
sync_workers = 2
|
||||
async_workers = 1
|
||||
|
||||
[fs]
|
||||
type = "local"
|
||||
data_dir = "./data"
|
||||
|
|
|
@ -36,6 +36,14 @@ pub enum DbConfig {
|
|||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct RepoConfig {
|
||||
#[serde(default = "default_repo_sync_workers")]
|
||||
pub sync_workers: u32,
|
||||
#[serde(default = "default_repo_async_workers")]
|
||||
pub async_workers: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub api_key: String,
|
||||
|
@ -47,8 +55,7 @@ pub struct Config {
|
|||
pub log_level: String,
|
||||
pub fs: FsConfig,
|
||||
pub db: DbConfig,
|
||||
#[serde(default = "default_pkg_workers")]
|
||||
pub pkg_workers: u32,
|
||||
pub repo: RepoConfig,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 {
|
|||
16
|
||||
}
|
||||
|
||||
fn default_pkg_workers() -> u32 {
|
||||
fn default_repo_sync_workers() -> u32 {
|
||||
1
|
||||
}
|
||||
|
||||
fn default_repo_async_workers() -> u32 {
|
||||
1
|
||||
}
|
||||
|
|
|
@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
|
|||
data_dir.join("repos"),
|
||||
db.clone(),
|
||||
rt.clone(),
|
||||
config.pkg_workers,
|
||||
config.repo.sync_workers,
|
||||
config.repo.async_workers,
|
||||
)?
|
||||
//rt.block_on(crate::repo::RepoMgr::new(
|
||||
// data_dir.join("repos"),
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
use crate::{
|
||||
db,
|
||||
repo::{archive, package, AsyncCommand, SharedState},
|
||||
};
|
||||
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
};
|
||||
|
||||
pub struct AsyncActor {
|
||||
state: Arc<SharedState>,
|
||||
}
|
||||
|
||||
impl AsyncActor {
|
||||
pub fn new(state: &Arc<SharedState>) -> Self {
|
||||
Self {
|
||||
state: Arc::clone(state)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(self) {
|
||||
while let Some(msg) = {
|
||||
let mut rx = self.state.async_queue.1.lock().await;
|
||||
rx.recv().await
|
||||
} {
|
||||
match msg {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
mod sync;
|
||||
mod r#async;
|
||||
|
||||
pub use sync::Actor;
|
||||
pub use r#async::AsyncActor;
|
|
@ -1,5 +1,7 @@
|
|||
use super::{archive, package, Command, SharedState};
|
||||
use crate::db;
|
||||
use crate::{
|
||||
db,
|
||||
repo::{archive, package, Command, SharedState},
|
||||
};
|
||||
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
|
@ -20,10 +22,10 @@ pub struct Actor {
|
|||
}
|
||||
|
||||
impl Actor {
|
||||
pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
|
||||
pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self {
|
||||
Self {
|
||||
rt,
|
||||
state: Arc::clone(&state),
|
||||
state: Arc::clone(state),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,7 +39,7 @@ impl Actor {
|
|||
/// Run the main actor loop
|
||||
pub fn run(self) {
|
||||
while let Some(msg) = {
|
||||
let mut rx = self.state.rx.lock().unwrap();
|
||||
let mut rx = self.state.sync_queue.1.lock().unwrap();
|
||||
rx.blocking_recv()
|
||||
} {
|
||||
match msg {
|
|
@ -31,7 +31,11 @@ impl Handle {
|
|||
}
|
||||
|
||||
pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
|
||||
let repo_dir = self.state.repos_dir.join(repo_id.to_string());
|
||||
|
||||
if !tokio::fs::try_exists(repo_dir).await? {
|
||||
tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
|
||||
}
|
||||
|
||||
let mut repos = self.state.repos.write().await;
|
||||
repos.insert(repo_id, Default::default());
|
||||
|
@ -136,17 +140,25 @@ impl Handle {
|
|||
}
|
||||
|
||||
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
||||
self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
|
||||
self.state
|
||||
.sync_queue
|
||||
.0
|
||||
.send(Command::ParsePkg(repo, path))
|
||||
.unwrap();
|
||||
self.state.repos.read().await.get(&repo).inspect(|n| {
|
||||
n.0.fetch_add(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
|
||||
async fn queue_sync(&self, repo: i32) {
|
||||
self.state.tx.send(Command::SyncRepo(repo)).unwrap();
|
||||
self.state
|
||||
.sync_queue
|
||||
.0
|
||||
.send(Command::SyncRepo(repo))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn queue_clean(&self) {
|
||||
self.state.tx.send(Command::Clean).unwrap();
|
||||
self.state.sync_queue.0.send(Command::Clean).unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
mod parser;
|
||||
|
||||
pub use parser::{DbArchiveParser, DbArchiveEntry};
|
||||
pub use parser::{DbArchiveEntry, DbArchiveParser};
|
||||
|
|
|
@ -4,7 +4,7 @@ mod handle;
|
|||
mod mirror;
|
||||
pub mod package;
|
||||
|
||||
pub use actor::Actor;
|
||||
pub use actor::{Actor, AsyncActor};
|
||||
pub use handle::Handle;
|
||||
|
||||
use std::{
|
||||
|
@ -28,25 +28,32 @@ pub enum Command {
|
|||
Clean,
|
||||
}
|
||||
|
||||
pub enum AsyncCommand {
|
||||
}
|
||||
|
||||
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
||||
|
||||
pub struct SharedState {
|
||||
pub repos_dir: PathBuf,
|
||||
pub conn: DbConn,
|
||||
pub rx: Mutex<UnboundedReceiver<Command>>,
|
||||
pub tx: UnboundedSender<Command>,
|
||||
pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>),
|
||||
pub async_queue: (
|
||||
UnboundedSender<AsyncCommand>,
|
||||
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
|
||||
),
|
||||
pub repos: RwLock<HashMap<i32, RepoState>>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
let (async_tx, async_rx) = unbounded_channel();
|
||||
|
||||
Self {
|
||||
repos_dir: repos_dir.as_ref().to_path_buf(),
|
||||
conn,
|
||||
rx: Mutex::new(rx),
|
||||
tx,
|
||||
sync_queue: (tx, Mutex::new(rx)),
|
||||
async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
|
||||
repos: RwLock::new(Default::default()),
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +63,8 @@ pub fn start(
|
|||
repos_dir: impl AsRef<Path>,
|
||||
conn: DbConn,
|
||||
rt: runtime::Handle,
|
||||
actors: u32,
|
||||
sync_actors: u32,
|
||||
async_actors: u32,
|
||||
) -> crate::Result<Handle> {
|
||||
std::fs::create_dir_all(repos_dir.as_ref())?;
|
||||
|
||||
|
@ -70,12 +78,18 @@ pub fn start(
|
|||
|
||||
let state = Arc::new(SharedState::new(repos_dir, conn));
|
||||
|
||||
for _ in 0..actors {
|
||||
let actor = Actor::new(rt.clone(), Arc::clone(&state));
|
||||
for _ in 0..sync_actors {
|
||||
let actor = Actor::new(rt.clone(), &state);
|
||||
|
||||
std::thread::spawn(|| actor.run());
|
||||
}
|
||||
|
||||
for _ in 0..async_actors {
|
||||
let actor = AsyncActor::new(&state);
|
||||
|
||||
rt.spawn(actor.run());
|
||||
}
|
||||
|
||||
let handle = Handle::new(&state);
|
||||
|
||||
for id in repo_ids {
|
||||
|
|
Loading…
Reference in New Issue