From 731ad37a2a64d2f20f0ad2e06f4996054f9913ec Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 3 Aug 2023 21:25:42 +0200 Subject: [PATCH] feat(server): properly sync database with repo operations --- server/src/api/pagination.rs | 2 +- server/src/repo/manager.rs | 45 +++++++ server/src/repo/mod.rs | 227 +++++++++++++++++++++-------------- 3 files changed, 185 insertions(+), 89 deletions(-) diff --git a/server/src/api/pagination.rs b/server/src/api/pagination.rs index 3904ea4..376e06c 100644 --- a/server/src/api/pagination.rs +++ b/server/src/api/pagination.rs @@ -24,7 +24,7 @@ impl Query { pub fn res Serialize>(self, items: Vec) -> PaginatedResponse { PaginatedResponse { page: self.page.unwrap_or(DEFAULT_PAGE), - per_page: self.page.unwrap_or(DEFAULT_PER_PAGE), + per_page: self.per_page.unwrap_or(DEFAULT_PER_PAGE), count: items.len(), items, } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index 411cf98..5846b5d 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -259,4 +259,49 @@ impl RepoGroupManager { Ok(false) } + + /// Wrapper around `remove_pkg` that accepts a path relative to the package directory to a + /// package archive. + pub fn remove_pkg_from_path>( + &mut self, + path: P, + sync: bool, + ) -> io::Result> { + let path = path.as_ref(); + let components: Vec<_> = path.iter().collect(); + + if let [repo, _arch, file_name] = components[..] { + let full_path = self.pkg_dir.join(path); + + if full_path.try_exists()? { + let file_name = file_name.to_string_lossy(); + let (name, version, release, arch) = parse_pkg_filename(&file_name); + + let metadata_dir_name = format!("{}-{}-{}", name, version, release); + + // Remove package archive and entry in database + fs::remove_file(full_path)?; + fs::remove_dir_all(self.repo_dir.join(repo).join(arch).join(metadata_dir_name))?; + + if sync { + if arch == ANY_ARCH { + self.sync_all(&repo.to_string_lossy())?; + } else { + self.sync(&repo.to_string_lossy(), arch)?; + } + } + + Ok(Some(( + name, + version.to_string(), + release.to_string(), + arch.to_string(), + ))) + } else { + Ok(None) + } + } else { + Ok(None) + } + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 0c2b327..d300ffe 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -3,6 +3,8 @@ mod package; pub use manager::RepoGroupManager; +use std::path::PathBuf; + use crate::db::entities::{package as db_package, repo as db_repo}; use axum::body::Body; use axum::extract::{BodyStream, Path, State}; @@ -12,7 +14,7 @@ use axum::response::IntoResponse; use axum::routing::{delete, post}; use axum::Router; use futures::StreamExt; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, ModelTrait, QueryFilter}; use std::sync::Arc; use tokio::{fs, io::AsyncWriteExt}; use tower::util::ServiceExt; @@ -44,73 +46,6 @@ pub fn router(api_key: &str) -> Router { ) } -async fn post_package_archive( - State(global): State, - Path(repo): Path, - mut body: BodyStream, -) -> crate::Result<()> { - // We first stream the uploaded file to disk - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - let path = global.config.pkg_dir.join(uuid.to_string()); - let mut f = fs::File::create(&path).await?; - - while let Some(chunk) = body.next().await { - f.write_all(&chunk?).await?; - } - - let clone = Arc::clone(&global.repo_manager); - let path_clone = path.clone(); - let repo_clone = repo.clone(); - let res = tokio::task::spawn_blocking(move || { - clone - .write() - .unwrap() - .add_pkg_from_path(&repo_clone, &path_clone) - }) - .await?; - - match res { - // Insert the newly added package into the database - Ok(pkg) => { - tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo); - - // Query the repo for its ID, or create it if it does not already exist - let repo_entity = db_repo::Entity::find() - .filter(db_repo::Column::Name.eq(&repo)) - .one(&global.db) - .await?; - - let repo_id = if let Some(repo_entity) = repo_entity { - repo_entity.id - } else { - let model = db_repo::ActiveModel { - name: sea_orm::Set(repo.clone()), - ..Default::default() - }; - - db_repo::Entity::insert(model) - .exec(&global.db) - .await? - .last_insert_id - }; - - // Insert the package's data into the database - let mut model: db_package::ActiveModel = pkg.into(); - model.repo_id = sea_orm::Set(repo_id); - - model.insert(&global.db).await?; - - Ok(()) - } - // Remove the uploaded file and return the error - Err(err) => { - tokio::fs::remove_file(path).await?; - - Err(err.into()) - } - } -} - /// Serve the package archive files and database archives. If files are requested for an /// architecture that does not have any explicit packages, a repository containing only "any" files /// is returned. @@ -161,6 +96,85 @@ async fn get_file( Ok(res) } +async fn post_package_archive( + State(global): State, + Path(repo): Path, + mut body: BodyStream, +) -> crate::Result<()> { + // We first stream the uploaded file to disk + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let path = global.config.pkg_dir.join(uuid.to_string()); + let mut f = fs::File::create(&path).await?; + + while let Some(chunk) = body.next().await { + f.write_all(&chunk?).await?; + } + + let clone = Arc::clone(&global.repo_manager); + let path_clone = path.clone(); + let repo_clone = repo.clone(); + let res = tokio::task::spawn_blocking(move || { + clone + .write() + .unwrap() + .add_pkg_from_path(&repo_clone, &path_clone) + }) + .await?; + + match res { + // Insert the newly added package into the database + Ok(pkg) => { + tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo); + + // Query the repo for its ID, or create it if it does not already exist + let res = db_repo::Entity::find() + .filter(db_repo::Column::Name.eq(&repo)) + .one(&global.db) + .await?; + + let repo_id = if let Some(repo_entity) = res { + repo_entity.id + } else { + let model = db_repo::ActiveModel { + name: sea_orm::Set(repo.clone()), + ..Default::default() + }; + + db_repo::Entity::insert(model) + .exec(&global.db) + .await? + .last_insert_id + }; + + // If the package already exists in the database, we remove it first + let res = db_package::Entity::find() + .filter(db_package::Column::RepoId.eq(repo_id)) + .filter(db_package::Column::Name.eq(&pkg.info.name)) + .filter(db_package::Column::Arch.eq(&pkg.info.arch)) + .one(&global.db) + .await?; + + if let Some(entry) = res { + entry.delete(&global.db).await?; + } + + // Insert the package's data into the database + let mut model: db_package::ActiveModel = pkg.into(); + model.repo_id = sea_orm::Set(repo_id); + + model.insert(&global.db).await?; + + Ok(()) + } + // Remove the uploaded file and return the error + Err(err) => { + tokio::fs::remove_file(path).await?; + + Err(err.into()) + } + } +} + async fn delete_repo( State(global): State, Path(repo): Path, @@ -173,6 +187,15 @@ async fn delete_repo( .await??; if repo_removed { + let res = db_repo::Entity::find() + .filter(db_repo::Column::Name.eq(&repo)) + .one(&global.db) + .await?; + + if let Some(repo_entry) = res { + repo_entry.delete(&global.db).await?; + } + tracing::info!("Removed repository '{}'", repo); Ok(StatusCode::OK) @@ -187,13 +210,31 @@ async fn delete_arch_repo( ) -> crate::Result { let clone = Arc::clone(&global.repo_manager); - let log = format!("Removed architecture '{}' from repository '{}'", arch, repo); - let repo_removed = - tokio::task::spawn_blocking(move || clone.write().unwrap().remove_repo_arch(&repo, &arch)) - .await??; + let arch_clone = arch.clone(); + let repo_clone = repo.clone(); + let repo_removed = tokio::task::spawn_blocking(move || { + clone + .write() + .unwrap() + .remove_repo_arch(&repo_clone, &arch_clone) + }) + .await??; if repo_removed { - tracing::info!(log); + let res = db_repo::Entity::find() + .filter(db_repo::Column::Name.eq(&repo)) + .one(&global.db) + .await?; + + if let Some(repo_entry) = res { + // Also remove all packages for that architecture from database + db_package::Entity::delete_many() + .filter(db_package::Column::RepoId.eq(repo_entry.id)) + .filter(db_package::Column::Arch.eq(&arch)) + .exec(&global.db) + .await?; + } + tracing::info!("Removed architecture '{}' from repository '{}'", arch, repo); Ok(StatusCode::OK) } else { @@ -205,26 +246,36 @@ async fn delete_package( State(global): State, Path((repo, arch, file_name)): Path<(String, String, String)>, ) -> crate::Result { - let name_parts = file_name.split('-').collect::>(); - - // Package archive files use the naming scheme pkgname-pkgver-pkgrel-arch, so a valid - // name contains at least 4 dash-separated sections - if name_parts.len() < 4 { - return Ok(StatusCode::NOT_FOUND); - } - - let name = name_parts[..name_parts.len() - 3].join("-"); - let log = format!("Removed '{}' from repository '{}'", file_name, repo); - let clone = Arc::clone(&global.repo_manager); + let path = PathBuf::from(&repo).join(arch).join(&file_name); - let pkg_removed = tokio::task::spawn_blocking(move || { - clone.write().unwrap().remove_pkg(&repo, &arch, &name, true) + let res = tokio::task::spawn_blocking(move || { + clone.write().unwrap().remove_pkg_from_path(path, true) }) .await??; - if pkg_removed { - tracing::info!(log); + if let Some((name, version, release, arch)) = res { + let res = db_repo::Entity::find() + .filter(db_repo::Column::Name.eq(&repo)) + .one(&global.db) + .await?; + + if let Some(repo_entry) = res { + // Also remove entry from database + let res = db_package::Entity::find() + .filter(db_package::Column::RepoId.eq(repo_entry.id)) + .filter(db_package::Column::Name.eq(name)) + .filter(db_package::Column::Version.eq(format!("{}-{}", version, release))) + .filter(db_package::Column::Arch.eq(arch)) + .one(&global.db) + .await?; + + if let Some(entry) = res { + entry.delete(&global.db).await?; + } + } + + tracing::info!("Removed '{}' from repository '{}'", file_name, repo); Ok(StatusCode::OK) } else {