From 97612e1af61dbcfcce1c136dc7b88eba11ff1bcf Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sun, 9 Jun 2024 23:04:45 +0200 Subject: [PATCH] feat: better concurrent uploads with limited parallel parsing --- Cargo.lock | 1 + server/Cargo.toml | 1 + server/src/repo/manager.rs | 116 ++++++++++++++++++++++++------------- server/src/repo/mod.rs | 25 +++++--- server/src/repo/package.rs | 2 +- 5 files changed, 96 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 333bc72..d6e9e55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1662,6 +1662,7 @@ dependencies = [ "libarchive", "sea-orm", "sea-orm-migration", + "sea-query", "serde", "sha256", "tokio", diff --git a/server/Cargo.toml b/server/Cargo.toml index cd86713..75b5a09 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3.28" http-body-util = "0.1.1" libarchive = { path = "../libarchive" } sea-orm-migration = "0.12.1" +sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] } serde = { version = "1.0.178", features = ["derive"] } sha256 = "1.1.4" tokio = { version = "1.29.1", features = ["full"] } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index 691f6b0..d3c753c 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -7,19 +7,27 @@ use std::{ }; use futures::StreamExt; -use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::{io::AsyncRead, sync::Mutex}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter, QuerySelect, + Related, RelationTrait, Set, TransactionTrait, +}; +use sea_query::{Expr, Query}; +use tokio::{ + io::AsyncRead, + sync::{Mutex, Semaphore}, +}; use uuid::Uuid; pub const ANY_ARCH: &'static str = "any"; pub const REPOS_DIR: &'static str = "repos"; -pub const QUEUE_DIR: &'static str = "queue"; pub struct DistroMgr { distro_dir: PathBuf, distro_id: i32, conn: DbConn, - lock: Arc>, + repo_lock: Arc>, + sync_lock: Arc>, + pkg_sema: Arc, } impl DistroMgr { @@ -34,23 +42,21 @@ impl DistroMgr { tokio::fs::create_dir(repos_dir).await?; } - let queue_dir = distro_dir.as_ref().join(QUEUE_DIR); - - if !tokio::fs::try_exists(&queue_dir).await? { - tokio::fs::create_dir(queue_dir).await?; - } - Ok(Self { distro_dir: distro_dir.as_ref().to_path_buf(), distro_id, conn, - lock: Arc::new(Mutex::new(())), + repo_lock: Arc::new(Mutex::new(())), + sync_lock: Arc::new(Mutex::new(())), + pkg_sema: Arc::new(Semaphore::new(1)), }) } /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { + pub async fn sync_repo(&self, repo: &str) -> Result<()> { + let _guard = self.sync_lock.lock().await; + let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; if repo.is_none() { @@ -60,31 +66,23 @@ impl DistroMgr { let repo = repo.unwrap(); let mut archs = repo - .find_related(crate::db::Package) + .find_related(db::Package) .select_only() - .column(crate::db::package::Column::Arch) + .column(db::package::Column::Arch) .distinct() .into_tuple::() .stream(&self.conn) .await?; while let Some(arch) = archs.next().await.transpose()? { - self.generate_archives(&repo.name, &arch).await?; + self.generate_archives(&repo, &arch).await?; } Ok(()) } /// Generate the archive databases for the given repository and architecture. - pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; - - if repo.is_none() { - return Ok(()); - } - - let repo = repo.unwrap(); - + async fn generate_archives(&self, repo: &db::repo::Model, arch: &str) -> Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = self.random_file_paths(); let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; @@ -95,10 +93,23 @@ impl DistroMgr { let mut pkgs = repo .find_related(crate::db::Package) .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) + .filter( + db::package::Column::Id.in_subquery( + Query::select() + .expr(db::package::Column::Id.max()) + .from(db::package::Entity) + .group_by_columns([db::package::Column::Arch, db::package::Column::Name]) + .to_owned(), + ), + ) .stream(&self.conn) .await?; + let mut commited_ids: Vec = Vec::new(); + while let Some(pkg) = pkgs.next().await.transpose()? { + commited_ids.push(pkg.id); + let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; @@ -132,16 +143,34 @@ impl DistroMgr { ) .await?; + // Only after we have successfully written everything to disk do we update the database. + // This order ensures any failure can be recovered, as the database is our single source of + // truth. + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(commited_ids)) + .exec(&self.conn) + .await?; + // If this fails there's no point in failing the function + if there were no packages in // the repo, this fails anyway because the temp file doesn't exist let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await; + tracing::info!( + "Package archives generated for '{}' ('{}')", + &repo.name, + arch + ); + Ok(()) } async fn get_or_create_repo(&self, repo: &str) -> Result { - let _guard = self.lock.lock().await; + let _guard = self.repo_lock.lock().await; if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? { Ok(repo) @@ -205,7 +234,7 @@ impl DistroMgr { // If we removed all "any" packages, we need to resync all databases if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; + self.sync_repo(&repo.name).await?; } Ok(true) @@ -231,11 +260,11 @@ impl DistroMgr { .await?; pkg.delete(&self.conn).await?; - if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; - } else { - self.generate_archives(&repo.name, arch).await?; - } + //if arch == ANY_ARCH { + // self.sync_repo(&repo.name).await?; + //} else { + // self.generate_archives(&repo.name, arch).await?; + //} Ok(true) } else { @@ -246,26 +275,33 @@ impl DistroMgr { } } - pub async fn add_pkg_from_reader( + pub async fn add_pkg_from_path>( &self, - reader: &mut R, + path: P, repo: &str, ) -> crate::Result<(String, String, String)> { - let [tmp_file_path] = self.random_file_paths(); - let mut temp_file = tokio::fs::File::create(&tmp_file_path).await?; + let _guard = self.pkg_sema.acquire().await.unwrap(); - tokio::io::copy(reader, &mut temp_file).await?; - - let path_clone = tmp_file_path.clone(); + let path_clone = path.as_ref().to_path_buf(); let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .await .unwrap()?; let repo = self.get_or_create_repo(repo).await?; + + // TODO prevent database from being updated but file failing to move to repo dir? let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?; - let queue_path = self.distro_dir.join(QUEUE_DIR).join(pkg.id.to_string()); - tokio::fs::rename(tmp_file_path, queue_path).await?; + let queue_path = self.distro_dir.join(&repo.name).join(pkg.id.to_string()); + tokio::fs::rename(path.as_ref(), queue_path).await?; + + tracing::info!( + "Added '{}-{}' to repository '{}' ({})", + pkg.name, + pkg.version, + repo.name, + pkg.arch + ); // If the package already exists in the database, we remove it first //let res = db::query::package::by_fields( diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 7d7e321..cdc6c09 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -75,15 +75,24 @@ async fn post_package_archive( ) -> crate::Result<()> { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let mgr = global.mgr.get_or_create_mgr(&distro).await?; - let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?; + let [tmp_path] = mgr.random_file_paths(); - tracing::info!( - "Added '{}-{}' to repository '{}' ({})", - name, - version, - repo, - arch - ); + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + tokio::io::copy(&mut body, &mut tmp_file).await?; + + tokio::spawn(async move { mgr.add_pkg_from_path(tmp_path, &repo).await }); + + //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; + // + //tracing::info!( + // "Added '{}-{}' to repository '{}' ({})", + // name, + // version, + // repo, + // arch + //); + + //tokio::spawn(async move { mgr.sync_repo(&repo).await }); Ok(()) } diff --git a/server/src/repo/package.rs b/server/src/repo/package.rs index 24979eb..66c8fa1 100644 --- a/server/src/repo/package.rs +++ b/server/src/repo/package.rs @@ -323,7 +323,7 @@ pub async fn write_desc( pkg: &package::Model, ) -> crate::Result<()> { writer - .write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes()) + .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes()) .await?; write_attribute(writer, "NAME", &pkg.name).await?;