From 8864925e58188e0b56a3846a3fcfafd533313a3e Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Mon, 24 Jun 2024 13:02:26 +0200 Subject: [PATCH 1/5] feat: set up prober repo actors; refactor code; this commit is too large --- server/src/cli.rs | 62 +---------------------------- server/src/main.rs | 77 ++++++++++++++++++++++++++++++++++-- server/src/repo/actor.rs | 82 +++++++++++++++++++++++++++++++++++++++ server/src/repo/handle.rs | 72 ++++++++++++++++++++++++++++++++++ server/src/repo/mod.rs | 4 ++ 5 files changed, 232 insertions(+), 65 deletions(-) create mode 100644 server/src/repo/actor.rs create mode 100644 server/src/repo/handle.rs diff --git a/server/src/cli.rs b/server/src/cli.rs index c6998eb..5e8469e 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,12 +1,6 @@ -use crate::{Config, FsConfig, Global}; +use std::path::PathBuf; -use std::{io, path::PathBuf, sync::Arc}; - -use axum::Router; use clap::Parser; -use sea_orm_migration::MigratorTrait; -use tower_http::trace::TraceLayer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -19,57 +13,3 @@ pub struct Cli { )] pub config_file: PathBuf, } - -impl Cli { - pub async fn run(&self) -> crate::Result<()> { - let config: Config = Config::figment(&self.config_file) - .extract() - .inspect_err(|e| tracing::error!("{}", e))?; - - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) - .with(tracing_subscriber::fmt::layer()) - .init(); - - tracing::info!("Connecting to database"); - let db = crate::db::connect(&config.db).await?; - - crate::db::Migrator::up(&db, None).await?; - - let mgr = match &config.fs { - FsConfig::Local { data_dir } => { - crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? - } - }; - - let mgr = Arc::new(mgr); - - for _ in 0..config.pkg_workers { - let clone = Arc::clone(&mgr); - - tokio::spawn(async move { clone.pkg_parse_task().await }); - } - - let global = Global { - config: config.clone(), - mgr, - db, - }; - - // build our application with a single route - let app = Router::new() - .nest("/api", crate::api::router()) - .merge(crate::repo::router(&config.api_key)) - .with_state(global) - .layer(TraceLayer::new_for_http()); - - let domain: String = format!("{}:{}", config.domain, config.port) - .parse() - .unwrap(); - let listener = tokio::net::TcpListener::bind(domain).await?; - // run it with hyper on localhost:3000 - Ok(axum::serve(listener, app.into_make_service()) - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) - } -} diff --git a/server/src/main.rs b/server/src/main.rs index eb1c3d0..5c0ecac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,9 +8,15 @@ mod repo; pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use std::sync::Arc; +use std::{io, path::PathBuf, sync::Arc}; + +use axum::Router; +use tower_http::trace::TraceLayer; use clap::Parser; +use sea_orm_migration::MigratorTrait; +use tokio::runtime; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub const ANY_ARCH: &'static str = "any"; @@ -21,8 +27,71 @@ pub struct Global { db: sea_orm::DbConn, } -#[tokio::main] -async fn main() -> crate::Result<()> { +fn main() -> crate::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let handle = rt.handle(); + let cli = cli::Cli::parse(); - cli.run().await + let global = setup(handle, cli.config_file)?; + + handle.block_on(run(global)) +} + +fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { + let config: Config = Config::figment(config_file) + .extract() + .inspect_err(|e| tracing::error!("{}", e))?; + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) + .with(tracing_subscriber::fmt::layer()) + .init(); + + tracing::info!("Connecting to database"); + let db = rt.block_on(crate::db::connect(&config.db))?; + rt.block_on(crate::db::Migrator::up(&db, None))?; + + let mgr = match &config.fs { + FsConfig::Local { data_dir } => { + rt.block_on(crate::repo::RepoMgr::new( + data_dir.join("repos"), + db.clone(), + ))? + //RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())? + } + }; + let mgr = Arc::new(mgr); + + for _ in 0..config.pkg_workers { + let clone = Arc::clone(&mgr); + + rt.spawn(async move { clone.pkg_parse_task().await }); + } + + Ok(Global { + config: config.clone(), + mgr, + db, + }) +} + +async fn run(global: Global) -> crate::Result<()> { + let domain: String = format!("{}:{}", &global.config.domain, global.config.port) + .parse() + .unwrap(); + let listener = tokio::net::TcpListener::bind(domain).await?; + + // build our application with a single route + let app = Router::new() + .nest("/api", crate::api::router()) + .merge(crate::repo::router(&global.config.api_key)) + .with_state(global) + .layer(TraceLayer::new_for_http()); + // run it with hyper on localhost:3000 + Ok(axum::serve(listener, app.into_make_service()) + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) } diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs new file mode 100644 index 0000000..1291656 --- /dev/null +++ b/server/src/repo/actor.rs @@ -0,0 +1,82 @@ +use super::{archive, package, RepoHandle}; +use crate::db; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, +}; + +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use tokio::{runtime, sync::mpsc::UnboundedReceiver}; + +pub enum RepoCommand { + ParsePkg(i32, PathBuf), +} + +/// The actor is responsible for mutating the repositories. They receive their commands their +/// messages and process these commands in both a synchronous and asynchronous way. +pub struct RepoActor { + repos_dir: PathBuf, + conn: DbConn, + rt: runtime::Handle, + rx: Arc>>, + repos: Arc>)>>>, +} + +impl RepoActor { + pub fn new( + repos_dir: PathBuf, + conn: DbConn, + rt: runtime::Handle, + rx: Arc>>, + repos: Arc>)>>>, + ) -> Self { + Self { + repos_dir, + conn, + rt, + rx, + repos, + } + } + + /// Run the main actor loop + pub fn run(self) { + while let Some(msg) = { + let mut rx = self.rx.lock().unwrap(); + rx.blocking_recv() + } { + match msg { + RepoCommand::ParsePkg(repo, path) => { + let _ = self.parse_pkg(repo, path); + } + } + } + } + + fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> { + let pkg = package::Package::open(&path)?; + let pkg = self + .rt + .block_on(db::query::package::insert(&self.conn, repo, pkg))?; + let dest_path = self + .repos_dir + .join(repo.to_string()) + .join(pkg.id.to_string()); + std::fs::rename(path, dest_path)?; + + tracing::info!( + "Added '{}-{}-{}' to repository {}", + pkg.name, + pkg.version, + pkg.arch, + repo, + ); + + Ok(()) + } +} diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs new file mode 100644 index 0000000..b720390 --- /dev/null +++ b/server/src/repo/handle.rs @@ -0,0 +1,72 @@ +use crate::db; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, +}; + +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use tokio::{ + runtime, + sync::mpsc::{unbounded_channel, UnboundedSender}, +}; + +#[derive(Clone)] +pub struct RepoHandle { + repos_dir: PathBuf, + conn: DbConn, + tx: UnboundedSender, + repos: Arc>)>>>, +} + +impl RepoHandle { + pub fn start( + repos_dir: impl AsRef, + conn: DbConn, + actors: u32, + rt: runtime::Handle, + ) -> crate::Result { + std::fs::create_dir_all(repos_dir.as_ref())?; + + let (tx, rx) = unbounded_channel(); + + let mut repos = HashMap::new(); + let repo_ids: Vec = rt.block_on( + db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn), + )?; + + for id in repo_ids { + repos.insert(id, Default::default()); + } + + let rx = Arc::new(Mutex::new(rx)); + let repos = Arc::new(RwLock::new(repos)); + + for _ in 0..actors { + let actor = super::RepoActor::new( + repos_dir.as_ref().to_path_buf(), + conn.clone(), + rt.clone(), + Arc::clone(&rx), + Arc::clone(&repos), + ); + + std::thread::spawn(|| actor.run()); + } + + Ok(Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + tx, + repos, + }) + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 953b631..6ea74ab 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,7 +1,11 @@ +mod actor; mod archive; +mod handle; mod manager; pub mod package; +pub use actor::{RepoActor, RepoCommand}; +pub use handle::RepoHandle; pub use manager::RepoMgr; use crate::FsConfig; From 656df06b4e4c40827a5f09768784c5027d4102df Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 25 Jun 2024 16:53:30 +0200 Subject: [PATCH 2/5] refactor: use shared state struct --- server/src/repo/actor.rs | 54 ++++++++++++++++++++++++++------------- server/src/repo/handle.rs | 26 ++++--------------- server/src/repo/mod.rs | 2 +- 3 files changed, 42 insertions(+), 40 deletions(-) diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index 1291656..c232df2 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -11,43 +11,60 @@ use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; -use tokio::{runtime, sync::mpsc::UnboundedReceiver}; +use tokio::{ + runtime, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, +}; pub enum RepoCommand { ParsePkg(i32, PathBuf), } +pub struct RepoSharedState { + repos_dir: PathBuf, + conn: DbConn, + rx: Mutex>, + tx: UnboundedSender, + repos: RwLock>)>>, +} + +impl RepoSharedState { + pub fn new( + repos_dir: impl AsRef, + conn: DbConn, + repos: HashMap>)>, + ) -> Self { + let (tx, rx) = unbounded_channel(); + + Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + rx: Mutex::new(rx), + tx, + repos: RwLock::new(repos), + } + } +} + /// The actor is responsible for mutating the repositories. They receive their commands their /// messages and process these commands in both a synchronous and asynchronous way. pub struct RepoActor { - repos_dir: PathBuf, - conn: DbConn, rt: runtime::Handle, - rx: Arc>>, - repos: Arc>)>>>, + state: Arc, } impl RepoActor { - pub fn new( - repos_dir: PathBuf, - conn: DbConn, - rt: runtime::Handle, - rx: Arc>>, - repos: Arc>)>>>, - ) -> Self { + pub fn new(rt: runtime::Handle, state: Arc) -> Self { Self { - repos_dir, - conn, rt, - rx, - repos, + state: Arc::clone(&state), } } /// Run the main actor loop pub fn run(self) { while let Some(msg) = { - let mut rx = self.rx.lock().unwrap(); + let mut rx = self.state.rx.lock().unwrap(); rx.blocking_recv() } { match msg { @@ -62,8 +79,9 @@ impl RepoActor { let pkg = package::Package::open(&path)?; let pkg = self .rt - .block_on(db::query::package::insert(&self.conn, repo, pkg))?; + .block_on(db::query::package::insert(&self.state.conn, repo, pkg))?; let dest_path = self + .state .repos_dir .join(repo.to_string()) .join(pkg.id.to_string()); diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index b720390..b918aaf 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,3 +1,4 @@ +use super::RepoSharedState; use crate::db; use std::{ @@ -17,10 +18,7 @@ use tokio::{ #[derive(Clone)] pub struct RepoHandle { - repos_dir: PathBuf, - conn: DbConn, - tx: UnboundedSender, - repos: Arc>)>>>, + state: Arc, } impl RepoHandle { @@ -32,8 +30,6 @@ impl RepoHandle { ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; - let (tx, rx) = unbounded_channel(); - let mut repos = HashMap::new(); let repo_ids: Vec = rt.block_on( db::Repo::find() @@ -47,26 +43,14 @@ impl RepoHandle { repos.insert(id, Default::default()); } - let rx = Arc::new(Mutex::new(rx)); - let repos = Arc::new(RwLock::new(repos)); + let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos)); for _ in 0..actors { - let actor = super::RepoActor::new( - repos_dir.as_ref().to_path_buf(), - conn.clone(), - rt.clone(), - Arc::clone(&rx), - Arc::clone(&repos), - ); + let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state)); std::thread::spawn(|| actor.run()); } - Ok(Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - tx, - repos, - }) + Ok(Self { state }) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 6ea74ab..8e9a627 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -4,7 +4,7 @@ mod handle; mod manager; pub mod package; -pub use actor::{RepoActor, RepoCommand}; +pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use handle::RepoHandle; pub use manager::RepoMgr; From 80d52915089d5209cdfb9c97803a372072115235 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 25 Jun 2024 17:05:14 +0200 Subject: [PATCH 3/5] refactor: switch to new repo actors --- server/src/main.rs | 30 ++++++----- server/src/repo/actor.rs | 2 +- server/src/repo/handle.rs | 6 +-- server/src/repo/mod.rs | 110 ++++++++++++++++++++------------------ 4 files changed, 79 insertions(+), 69 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 5c0ecac..274d419 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -23,7 +23,7 @@ pub const ANY_ARCH: &'static str = "any"; #[derive(Clone)] pub struct Global { config: crate::config::Config, - mgr: Arc, + repo: repo::Handle, db: sea_orm::DbConn, } @@ -54,26 +54,32 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { let db = rt.block_on(crate::db::connect(&config.db))?; rt.block_on(crate::db::Migrator::up(&db, None))?; - let mgr = match &config.fs { + let repo = match &config.fs { FsConfig::Local { data_dir } => { - rt.block_on(crate::repo::RepoMgr::new( + crate::repo::Handle::start( data_dir.join("repos"), db.clone(), - ))? + rt.clone(), + config.pkg_workers, + )? + //rt.block_on(crate::repo::RepoMgr::new( + // data_dir.join("repos"), + // db.clone(), + //))? //RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())? } }; - let mgr = Arc::new(mgr); - - for _ in 0..config.pkg_workers { - let clone = Arc::clone(&mgr); - - rt.spawn(async move { clone.pkg_parse_task().await }); - } + //let mgr = Arc::new(mgr); + // + //for _ in 0..config.pkg_workers { + // let clone = Arc::clone(&mgr); + // + // rt.spawn(async move { clone.pkg_parse_task().await }); + //} Ok(Global { config: config.clone(), - mgr, + repo, db, }) } diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index c232df2..a24922f 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -1,4 +1,4 @@ -use super::{archive, package, RepoHandle}; +use super::{archive, package, Handle}; use crate::db; use std::{ diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index b918aaf..ff12f42 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -17,16 +17,16 @@ use tokio::{ }; #[derive(Clone)] -pub struct RepoHandle { +pub struct Handle { state: Arc, } -impl RepoHandle { +impl Handle { pub fn start( repos_dir: impl AsRef, conn: DbConn, - actors: u32, rt: runtime::Handle, + actors: u32, ) -> crate::Result { std::fs::create_dir_all(repos_dir.as_ref())?; diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 8e9a627..3bd2a1c 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -5,7 +5,7 @@ mod manager; pub mod package; pub use actor::{RepoActor, RepoCommand, RepoSharedState}; -pub use handle::RepoHandle; +pub use handle::Handle; pub use manager::RepoMgr; use crate::FsConfig; @@ -53,30 +53,31 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - match global.config.fs { - FsConfig::Local { data_dir } => { - let repo_dir = data_dir.join("repos").join(repo_id.to_string()); - - let file_name = if file_name == format!("{}.db", repo) - || file_name == format!("{}.db.tar.gz", repo) - { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; - - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) - } - } - } else { - Err(StatusCode::NOT_FOUND.into()) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { + // match global.config.fs { + // FsConfig::Local { data_dir } => { + // let repo_dir = data_dir.join("repos").join(repo_id.to_string()); + // + // let file_name = if file_name == format!("{}.db", repo) + // || file_name == format!("{}.db.tar.gz", repo) + // { + // format!("{}.db.tar.gz", arch) + // } else if file_name == format!("{}.files", repo) + // || file_name == format!("{}.files.tar.gz", repo) + // { + // format!("{}.files.tar.gz", arch) + // } else { + // file_name + // }; + // + // let path = repo_dir.join(file_name); + // Ok(ServeFile::new(path).oneshot(req).await) + // } + // } + //} else { + // Err(StatusCode::NOT_FOUND.into()) + //} } async fn post_package_archive( @@ -84,46 +85,49 @@ async fn post_package_archive( Path((distro, repo)): Path<(String, String)>, body: Body, ) -> crate::Result { - let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; - let [tmp_path] = global.mgr.random_file_paths(); - - let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; - tokio::io::copy(&mut body, &mut tmp_file).await?; - - global.mgr.queue_pkg(repo, tmp_path).await; - - Ok(StatusCode::ACCEPTED) + Ok(StatusCode::NOT_FOUND) + //let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); + //let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; + //let [tmp_path] = global.mgr.random_file_paths(); + // + //let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + //tokio::io::copy(&mut body, &mut tmp_file).await?; + // + //global.mgr.queue_pkg(repo, tmp_path).await; + // + //Ok(StatusCode::ACCEPTED) } async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo(repo).await?; - - tracing::info!("Removed repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + // global.mgr.remove_repo(repo).await?; + // + // tracing::info!("Removed repository {repo}"); + // + // Ok(StatusCode::OK) + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo_arch(repo, &arch).await?; - - tracing::info!("Removed architecture '{arch}' from repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + // global.mgr.remove_repo_arch(repo, &arch).await?; + // + // tracing::info!("Removed architecture '{arch}' from repository {repo}"); + // + // Ok(StatusCode::OK) + //} else { + // Ok(StatusCode::NOT_FOUND) + //} //if let Some(mgr) = global.mgr.get_mgr(&distro).await { // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; // From a7c0d3e062b025eb3ca6f2bbdffb774c8741a4f9 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 26 Jun 2024 12:27:51 +0200 Subject: [PATCH 4/5] feat: start of sync reimplementation --- server/src/repo/actor.rs | 62 +++++++++++++++++++++++++++---- server/src/repo/handle.rs | 77 ++++++++++++++++++++++++++++++++++++++- server/src/repo/mod.rs | 21 +++++------ 3 files changed, 140 insertions(+), 20 deletions(-) diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index a24922f..c1a2c73 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -4,7 +4,10 @@ use crate::db; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, }; use sea_orm::{ @@ -13,7 +16,10 @@ use sea_orm::{ }; use tokio::{ runtime, - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + RwLock, + }, }; pub enum RepoCommand { @@ -21,11 +27,11 @@ pub enum RepoCommand { } pub struct RepoSharedState { - repos_dir: PathBuf, - conn: DbConn, - rx: Mutex>, - tx: UnboundedSender, - repos: RwLock>)>>, + pub repos_dir: PathBuf, + pub conn: DbConn, + pub rx: Mutex>, + pub tx: UnboundedSender, + pub repos: RwLock>)>>, } impl RepoSharedState { @@ -70,11 +76,23 @@ impl RepoActor { match msg { RepoCommand::ParsePkg(repo, path) => { let _ = self.parse_pkg(repo, path); + + if self + .state + .repos + .blocking_read() + .get(&repo) + .map(|n| n.0.load(Ordering::SeqCst)) + == Some(0) + { + // TODO sync + } } } } } + /// Parse a queued package for the given repository. fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> { let pkg = package::Package::open(&path)?; let pkg = self @@ -95,6 +113,36 @@ impl RepoActor { repo, ); + self.state.repos.blocking_read().get(&repo).inspect(|n| { + n.0.fetch_sub(1, Ordering::SeqCst); + }); + + Ok(()) + } + + fn sync_repo(&self, repo: i32) -> crate::Result<()> { + let repos = self.state.repos.blocking_read(); + + if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) { + let archs: Vec = self.rt.block_on( + db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) + .select_only() + .column(db::package::Column::Arch) + .distinct() + .into_tuple() + .all(&self.state.conn), + )?; + + for arch in archs { + self.generate_archives(repo, &arch)?; + } + } + + Ok(()) + } + + fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { Ok(()) } } diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index ff12f42..262f274 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,10 +1,13 @@ -use super::RepoSharedState; +use super::{RepoCommand, RepoSharedState}; use crate::db; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, RwLock, + }, }; use sea_orm::{ @@ -15,6 +18,7 @@ use tokio::{ runtime, sync::mpsc::{unbounded_channel, UnboundedSender}, }; +use uuid::Uuid; #[derive(Clone)] pub struct Handle { @@ -53,4 +57,73 @@ impl Handle { Ok(Self { state }) } + + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.state.repos_dir.join(uuid.to_string()) + }) + } + + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { + let mut repos = self.state.repos.write().await; + + let distro_id: Option = db::Distro::find() + .filter(db::distro::Column::Name.eq(distro)) + .select_only() + .column(db::distro::Column::Id) + .into_tuple() + .one(&self.state.conn) + .await?; + + let distro_id = if let Some(id) = distro_id { + id + } else { + let new_distro = db::distro::ActiveModel { + id: NotSet, + name: Set(distro.to_string()), + description: NotSet, + }; + + new_distro.insert(&self.state.conn).await?.id + }; + + let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::DistroId.eq(distro_id)) + .filter(db::repo::Column::Name.eq(repo)) + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .one(&self.state.conn) + .await?; + + let repo_id = if let Some(id) = repo_id { + id + } else { + let new_repo = db::repo::ActiveModel { + id: NotSet, + distro_id: Set(distro_id), + name: Set(repo.to_string()), + description: NotSet, + }; + let id = new_repo.insert(&self.state.conn).await?.id; + + tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?; + repos.insert(id, Default::default()); + + id + }; + + Ok(repo_id) + } + + pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { + self.state + .tx + .send(RepoCommand::ParsePkg(repo, path)) + .unwrap(); + self.state.repos.read().await.get(&repo).inspect(|n| { + n.0.fetch_add(1, Ordering::SeqCst); + }); + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 3bd2a1c..6fe6650 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -85,17 +85,16 @@ async fn post_package_archive( Path((distro, repo)): Path<(String, String)>, body: Body, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - //let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; - //let [tmp_path] = global.mgr.random_file_paths(); - // - //let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; - //tokio::io::copy(&mut body, &mut tmp_file).await?; - // - //global.mgr.queue_pkg(repo, tmp_path).await; - // - //Ok(StatusCode::ACCEPTED) + let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?; + + let [tmp_path] = global.repo.random_file_paths(); + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); + tokio::io::copy(&mut body, &mut tmp_file).await?; + + global.repo.queue_pkg(repo_id, tmp_path).await; + + Ok(StatusCode::ACCEPTED) } async fn delete_repo( From 9237add86967917fbef294ee4396ebf90c0ab458 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 26 Jun 2024 14:03:00 +0200 Subject: [PATCH 5/5] feat: reimplement synchronous package sync --- server/src/repo/actor.rs | 82 +++++++++++- server/src/repo/archive.rs | 250 +++++++++++++++++++++++++++++-------- server/src/repo/mod.rs | 2 - 3 files changed, 277 insertions(+), 57 deletions(-) diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index c1a2c73..b90fcee 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -10,17 +10,20 @@ use std::{ }, }; +use futures::StreamExt; use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; +use sea_query::{Alias, Expr, Query}; use tokio::{ runtime, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, RwLock, }, }; +use uuid::Uuid; pub enum RepoCommand { ParsePkg(i32, PathBuf), @@ -67,6 +70,13 @@ impl RepoActor { } } + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.state.repos_dir.join(uuid.to_string()) + }) + } + /// Run the main actor loop pub fn run(self) { while let Some(msg) = { @@ -85,7 +95,7 @@ impl RepoActor { .map(|n| n.0.load(Ordering::SeqCst)) == Some(0) { - // TODO sync + let _ = self.sync_repo(repo); } } } @@ -143,6 +153,74 @@ impl RepoActor { } fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { + let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths(); + + let mut ars = archive::RepoArchivesWriter::new( + &tmp_ar_db_path, + &tmp_ar_files_path, + self.random_file_paths(), + &self.rt, + &self.state.conn, + )?; + + let (tx, mut rx) = mpsc::channel(1); + + let conn = self.state.conn.clone(); + let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch); + + // sea_orm needs its connections to be dropped inside an async context, so we spawn a task + // that streams the responses to the synchronous context via message passing + self.rt.spawn(async move { + let stream = query.stream(&conn).await; + + if let Err(err) = stream { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut stream = stream.unwrap(); + + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; + } + } + }); + + let mut committed_ids: Vec = Vec::new(); + + while let Some(pkg) = rx.blocking_recv().transpose()? { + committed_ids.push(pkg.id); + ars.append_pkg(&pkg)?; + } + + ars.close()?; + + // Move newly generated package archives to their correct place + let repo_dir = self.state.repos_dir.join(repo.to_string()); + std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?; + std::fs::rename( + tmp_ar_files_path, + repo_dir.join(format!("{}.files.tar.gz", arch)), + )?; + + // Update the state for the newly committed packages + self.rt.block_on( + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(committed_ids)) + .exec(&self.state.conn), + )?; + + tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); + Ok(()) } } diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index a979c09..973a395 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,78 +1,222 @@ +use crate::db; use std::{ - io, + io::{self, Write}, path::{Path, PathBuf}, - sync::{Arc, Mutex}, }; +use futures::StreamExt; use libarchive::{ write::{Builder, FileWriter, WriteEntry}, Entry, WriteFilter, WriteFormat, }; +use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; +use tokio::{runtime, sync::mpsc}; -/// Struct to abstract away the intrinsics of writing entries to an archive file -pub struct RepoArchiveWriter { - ar: Arc>, +pub struct RepoArchivesWriter { + ar_db: FileWriter, + ar_files: FileWriter, + rt: runtime::Handle, + conn: DbConn, + tmp_paths: [PathBuf; 2], } -impl RepoArchiveWriter { - pub async fn open>(path: P) -> io::Result { - let path = PathBuf::from(path.as_ref()); - - // Open the archive file - let ar = tokio::task::spawn_blocking(move || { - let mut builder = Builder::new(); - builder.add_filter(WriteFilter::Gzip)?; - builder.set_format(WriteFormat::PaxRestricted)?; - - builder.open_file(path) - }) - .await - .unwrap()?; +impl RepoArchivesWriter { + pub fn new( + ar_db_path: impl AsRef, + ar_files_path: impl AsRef, + tmp_paths: [impl AsRef; 2], + rt: &runtime::Handle, + conn: &sea_orm::DbConn, + ) -> crate::Result { + let ar_db = Self::open_ar(ar_db_path)?; + let ar_files = Self::open_ar(ar_files_path)?; Ok(Self { - // In practice, mutex is only ever used by one thread at a time. It's simply here so we - // can use spawn_blocking without issues. - ar: Arc::new(Mutex::new(ar)), + ar_db, + ar_files, + rt: rt.clone(), + conn: conn.clone(), + tmp_paths: [ + tmp_paths[0].as_ref().to_path_buf(), + tmp_paths[1].as_ref().to_path_buf(), + ], }) } - /// Add either a "desc" or "files" entry to the archive - pub async fn add_entry>( - &self, - full_name: &str, - path: P, - desc: bool, - ) -> io::Result<()> { - let metadata = tokio::fs::metadata(&path).await?; + fn open_ar(path: impl AsRef) -> crate::Result { + let mut builder = Builder::new(); + builder.add_filter(WriteFilter::Gzip)?; + builder.set_format(WriteFormat::PaxRestricted)?; + + Ok(builder.open_file(path)?) + } + + fn append_entry( + ar: &mut FileWriter, + src_path: impl AsRef, + dest_path: impl AsRef, + ) -> crate::Result<()> { + let metadata = std::fs::metadata(&src_path)?; let file_size = metadata.len(); - let ar = Arc::clone(&self.ar); - let full_name = String::from(full_name); - let path = PathBuf::from(path.as_ref()); + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); - Ok(tokio::task::spawn_blocking(move || { - let mut ar_entry = WriteEntry::new(); - ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(dest_path); + ar_entry.set_mode(0o100644); + ar_entry.set_size(file_size.try_into().unwrap()); - let name = if desc { "desc" } else { "files" }; - - ar_entry.set_pathname(PathBuf::from(full_name).join(name)); - ar_entry.set_mode(0o100644); - ar_entry.set_size(file_size.try_into().unwrap()); - - ar.lock().unwrap().append_path(&mut ar_entry, path) - }) - .await - .unwrap()?) + Ok(ar.append_path(&mut ar_entry, src_path)?) } - pub async fn close(&self) -> io::Result<()> { - let ar = Arc::clone(&self.ar); + pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> { + self.write_desc(&self.tmp_paths[0], pkg)?; + self.write_files(&self.tmp_paths[1], pkg)?; - Ok( - tokio::task::spawn_blocking(move || ar.lock().unwrap().close()) - .await - .unwrap()?, - ) + let full_name = format!("{}-{}", pkg.name, pkg.version); + let dest_desc_path = format!("{}/desc", full_name); + let dest_files_path = format!("{}/files", full_name); + + Self::append_entry(&mut self.ar_db, &self.tmp_paths[0], &dest_desc_path)?; + Self::append_entry(&mut self.ar_files, &self.tmp_paths[0], &dest_desc_path)?; + Self::append_entry(&mut self.ar_files, &self.tmp_paths[1], &dest_files_path)?; + + Ok(()) + } + + /// Generate a "files" archive entry for the package in the given path + fn write_files(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { + let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); + + writeln!(f, "%FILES%")?; + + let (tx, mut rx) = mpsc::channel(1); + + let conn = self.conn.clone(); + let query = pkg.find_related(db::PackageFile); + self.rt.spawn(async move { + let files = query.stream(&conn).await; + + if let Err(err) = files { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut files = files.unwrap(); + + while let Some(res) = files.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; + } + } + }); + + while let Some(file) = rx.blocking_recv().transpose()? { + writeln!(f, "{}", file.path)?; + } + + f.flush()?; + Ok(()) + } + + fn write_desc(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { + let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); + + writeln!(f, "%FILENAME%\n{}", pkg.id)?; + + let mut write_attr = |k: &str, v: &str| { + if !v.is_empty() { + writeln!(f, "\n%{}%\n{}", k, v) + } else { + Ok(()) + } + }; + + write_attr("NAME", &pkg.name)?; + write_attr("BASE", &pkg.base)?; + write_attr("VERSION", &pkg.version)?; + + if let Some(ref desc) = pkg.description { + write_attr("DESC", desc)?; + } + + let groups: Vec = self.rt.block_on( + pkg.find_related(db::PackageGroup) + .select_only() + .column(db::package_group::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + + write_attr("GROUPS", &groups.join("\n"))?; + + write_attr("CSIZE", &pkg.c_size.to_string())?; + write_attr("ISIZE", &pkg.size.to_string())?; + write_attr("SHA256SUM", &pkg.sha256_sum)?; + + if let Some(ref url) = pkg.url { + write_attr("URL", url)?; + } + + let licenses: Vec = self.rt.block_on( + pkg.find_related(db::PackageLicense) + .select_only() + .column(db::package_license::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + write_attr("LICENSE", &licenses.join("\n"))?; + + write_attr("ARCH", &pkg.arch)?; + + // TODO build date + write_attr( + "BUILDDATE", + &pkg.build_date.and_utc().timestamp().to_string(), + )?; + + if let Some(ref packager) = pkg.packager { + write_attr("PACKAGER", packager)?; + } + + let related = [ + ("REPLACES", db::PackageRelatedEnum::Replaces), + ("CONFLICTS", db::PackageRelatedEnum::Conflicts), + ("PROVIDES", db::PackageRelatedEnum::Provides), + ("DEPENDS", db::PackageRelatedEnum::Depend), + ("OPTDEPENDS", db::PackageRelatedEnum::Optdepend), + ("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend), + ("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend), + ]; + + for (key, attr) in related.into_iter() { + let items: Vec = self.rt.block_on( + pkg.find_related(db::PackageRelated) + .filter(db::package_related::Column::Type.eq(attr)) + .select_only() + .column(db::package_related::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + + write_attr(key, &items.join("\n"))?; + } + + f.flush()?; + Ok(()) + } + + pub fn close(&mut self) -> crate::Result<()> { + self.ar_db.close()?; + self.ar_files.close()?; + + let _ = std::fs::remove_file(&self.tmp_paths[0])?; + let _ = std::fs::remove_file(&self.tmp_paths[1])?; + + Ok(()) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 6fe6650..e8b65e3 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,12 +1,10 @@ mod actor; mod archive; mod handle; -mod manager; pub mod package; pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use handle::Handle; -pub use manager::RepoMgr; use crate::FsConfig;