From be2ce7bf4552c28e7f9380feb6c701b35e6ad586 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Thu, 13 Jun 2024 18:40:24 +0200 Subject: [PATCH] wip: mspc-based pkg queue --- server/rieterd.toml | 8 ++ server/src/cli.rs | 18 ++-- server/src/config.rs | 8 +- server/src/error.rs | 4 +- server/src/main.rs | 4 +- server/src/repo/manager2.rs | 160 +++++++++++++++++++++++++++++++++--- server/src/repo/mod.rs | 104 ++++++++++++----------- 7 files changed, 231 insertions(+), 75 deletions(-) diff --git a/server/rieterd.toml b/server/rieterd.toml index 5e3b8b7..781a055 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,3 +1,11 @@ api_key = "test" port = 8000 log_level = "tower_http=debug,rieterd=debug" + +[fs] +type = "locl" +data_dir = "./data" + +[db] +type = "sqlite" +db_dir = "./data" diff --git a/server/src/cli.rs b/server/src/cli.rs index c0867b9..550d7dc 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,6 +1,6 @@ use crate::{distro::MetaDistroMgr, Config, Global}; -use std::{io, path::PathBuf}; +use std::{io, path::PathBuf, sync::Arc}; use axum::Router; use clap::Parser; @@ -61,9 +61,11 @@ impl Cli { pub async fn run(&self) -> crate::Result<()> { self.init_tracing(); - tracing::debug!("{:?}", &self.config_file); - let new_config = crate::config::Config::figment(&self.config_file).extract()?; - tracing::debug!("{:?}", new_config); + //tracing::debug!("{:?}", &self.config_file); + //let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err( + // |e| tracing::error!("{}", e) + //)?; + //tracing::debug!("{:?}", new_config); let db_url = if let Some(url) = &self.database_url { url.clone() @@ -88,7 +90,13 @@ impl Cli { data_dir: self.data_dir.clone(), }; - let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?; + let mgr = Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); + + for _ in 0..1 { + let clone = Arc::clone(&mgr); + + tokio::spawn(async move { clone.pkg_parse_task().await }); + } let global = Global { config, mgr, db }; diff --git a/server/src/config.rs b/server/src/config.rs index 99e61d3..a639362 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -6,19 +6,19 @@ use figment::{ }; use serde::Deserialize; -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum FsConfig { Local { data_dir: PathBuf }, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum DbConfig { Sqlite { - path: PathBuf, + db_dir: PathBuf, }, Postgres { host: String, @@ -27,7 +27,7 @@ pub enum DbConfig { }, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] pub struct Config { api_key: String, port: u16, diff --git a/server/src/error.rs b/server/src/error.rs index cc86445..26bfc60 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -15,6 +15,7 @@ pub enum ServerError { Status(StatusCode), Archive(libarchive::error::ArchiveError), Figment(figment::Error), + Unit, } impl fmt::Display for ServerError { @@ -26,6 +27,7 @@ impl fmt::Display for ServerError { ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err), ServerError::Figment(err) => write!(fmt, "{}", err), + ServerError::Unit => Ok(()), } } } @@ -43,7 +45,7 @@ impl IntoResponse for ServerError { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { StatusCode::NOT_FOUND.into_response() } - ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) => { + ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) | ServerError::Unit => { StatusCode::INTERNAL_SERVER_ERROR.into_response() } } diff --git a/server/src/main.rs b/server/src/main.rs index 5df9e18..f1e70f9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -10,7 +10,7 @@ pub use error::{Result, ServerError}; use repo::DistroMgr; use clap::Parser; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; #[derive(Clone)] pub struct Config { @@ -20,7 +20,7 @@ pub struct Config { #[derive(Clone)] pub struct Global { config: Config, - mgr: distro::MetaDistroMgr, + mgr: Arc, db: sea_orm::DbConn, } diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 76f7ab0..070b822 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -1,16 +1,21 @@ use super::{archive, package}; use crate::db; -use std::path::{Path, PathBuf}; +use std::{path::{Path, PathBuf}, sync::{atomic::{Ordering, AtomicU32}, Arc}, collections::HashMap}; +use futures::StreamExt; use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; +use sea_query::{Expr, Query}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, + Mutex, RwLock, }; +use uuid::Uuid; + +pub const ANY_ARCH: &'static str = "any"; struct PkgQueueMsg { repo: i32, @@ -26,7 +31,7 @@ pub struct RepoMgr { UnboundedSender, Mutex>, ), - repos_lock: Mutex<()>, + repos: RwLock>, } impl RepoMgr { @@ -37,22 +42,137 @@ impl RepoMgr { let (tx, rx) = unbounded_channel(); + let mut repos = HashMap::new(); + let repo_ids: Vec = db::Repo::find().select_only().column(db::repo::Column::Id).into_tuple().all(&conn).await?; + + for id in repo_ids { + repos.insert(id, AtomicU32::new(0)); + } + Ok(Self { repos_dir: repos_dir.as_ref().to_path_buf(), conn, pkg_queue: (tx, Mutex::new(rx)), - repos_lock: Mutex::new(()), + repos: RwLock::new(repos) }) } + /// Generate archive databases for all known architectures in the repository, including the + /// "any" architecture. + pub async fn sync_repo(&self, repo_id: i32) -> crate::Result<()> { + let mut archs = db::Package::find() + .filter(db::package::Column::RepoId.eq(repo_id)) + .select_only() + .column(db::package::Column::Arch) + .distinct() + .into_tuple::() + .stream(&self.conn) + .await?; + + while let Some(arch) = archs.next().await.transpose()? { + self.generate_archives(repo_id, &arch).await?; + } + + Ok(()) + } + + /// Generate the archive databases for the given repository and architecture. + async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { + let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = + self.random_file_paths(); + let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; + let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?; + + // Query all packages in the repo that have the given architecture or the "any" + // architecture + let mut pkgs = db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) + .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) + .filter( + db::package::Column::Id.in_subquery( + Query::select() + .expr(db::package::Column::Id.max()) + .from(db::package::Entity) + .group_by_columns([db::package::Column::Arch, db::package::Column::Name]) + .to_owned(), + ), + ) + .stream(&self.conn) + .await?; + + let mut commited_ids: Vec = Vec::new(); + + while let Some(pkg) = pkgs.next().await.transpose()? { + commited_ids.push(pkg.id); + + let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; + let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; + + package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?; + package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?; + + let full_name = format!("{}-{}", pkg.name, pkg.version); + + ar_db + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &files_tmp_file_path, false) + .await?; + } + + // Cleanup + ar_db.close().await?; + ar_files.close().await?; + + let repo_dir = self.repos_dir.join(repo.to_string()); + + // Move the db archives to their respective places + tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; + tokio::fs::rename( + tmp_ar_files_path, + repo_dir.join(format!("{}.files.tar.gz", arch)), + ) + .await?; + + // Only after we have successfully written everything to disk do we update the database. + // This order ensures any failure can be recovered, as the database is our single source of + // truth. + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(commited_ids)) + .exec(&self.conn) + .await?; + + // If this fails there's no point in failing the function + if there were no packages in + // the repo, this fails anyway because the temp file doesn't exist + let _ = tokio::fs::remove_file(desc_tmp_file_path).await; + let _ = tokio::fs::remove_file(files_tmp_file_path).await; + + tracing::info!( + "Package archives generated for repo {} ('{}')", + repo, + arch + ); + + Ok(()) + } + pub async fn pkg_parse_task(&self) { loop { // Receive the next message and immediately drop the mutex afterwards. As long as the // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked // as soon as a message is received, so another worker can pick up the mutex. - let mut recv = self.pkg_queue.1.lock().await; - let msg = recv.recv().await; - drop(recv); + let msg = { + let mut recv = self.pkg_queue.1.lock().await; + recv.recv().await + }; if let Some(msg) = msg { // TODO better handle this error (retry if failure wasn't because the package is @@ -61,16 +181,25 @@ impl RepoMgr { .add_pkg_from_path(msg.path, msg.repo) .await .inspect_err(|e| tracing::error!("{:?}", e)); + + let old = self.repos.read().await.get(&msg.repo).map(|n| n.fetch_sub(1, Ordering::SeqCst) ); + + // Every time the queue for a repo becomes empty, we run a sync job + if old == Some(1) { + // TODO error handling + let _ = self.sync_repo(msg.repo).await; + } } } } - pub fn queue_pkg(&self, repo: i32, path: PathBuf) { + pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); + self.repos.read().await.get(&repo).inspect(|n| { n.fetch_add(1, Ordering::SeqCst); }); } pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { - let _guard = self.repos_lock.lock().await; + let mut repos = self.repos.write().await; let distro_id: Option = db::Distro::find() .filter(db::distro::Column::Name.eq(distro)) @@ -109,8 +238,12 @@ impl RepoMgr { name: Set(repo.to_string()), description: NotSet, }; + let id = new_repo.insert(&self.conn).await?.id; - new_repo.insert(&self.conn).await?.id + tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; + repos.insert(id, AtomicU32::new(0)); + + id }; Ok(repo_id) @@ -141,4 +274,11 @@ impl RepoMgr { Ok(()) } + + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.repos_dir.join(uuid.to_string()) + }) + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index a0d4c15..c5549ef 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -4,6 +4,7 @@ mod manager2; pub mod package; pub use manager::DistroMgr; +pub use manager2::RepoMgr; use axum::{ body::Body, @@ -75,19 +76,13 @@ async fn post_package_archive( body: Body, ) -> crate::Result<()> { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - let mgr = global.mgr.get_or_create_mgr(&distro).await?; - let [tmp_path] = mgr.random_file_paths(); + let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; + let [tmp_path] = global.mgr.random_file_paths(); let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; tokio::io::copy(&mut body, &mut tmp_file).await?; - tokio::spawn(async move { - if let Ok((repo, _, _, _)) = mgr.add_pkg_from_path(tmp_path, &repo).await { - tracing::debug!("starting schedule_sync"); - let _ = mgr.schedule_sync(repo).await; - tracing::debug!("finished schedule_sync"); - }; - }); + global.mgr.queue_pkg(repo, tmp_path).await; //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; // @@ -108,60 +103,63 @@ async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let repo_removed = mgr.remove_repo(&repo).await?; - - if repo_removed { - tracing::info!("Removed repository '{}'", repo); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let repo_removed = mgr.remove_repo(&repo).await?; + // + // if repo_removed { + // tracing::info!("Removed repository '{}'", repo); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; - - if repo_removed { - tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; + // + // if repo_removed { + // tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } async fn delete_package( State(global): State, Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; - - if pkg_removed { - tracing::info!( - "Removed package '{}' ({}) from repository '{}'", - pkg_name, - arch, - repo - ); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; + // + // if pkg_removed { + // tracing::info!( + // "Removed package '{}' ({}) from repository '{}'", + // pkg_name, + // arch, + // repo + // ); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} }