diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 8e9c17b..2ba1996 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; +use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -222,8 +222,8 @@ pub struct PkgToRemove { pub id: i32, } -fn max_pkg_ids_query() -> SelectStatement { - Query::select() +fn max_pkg_ids_query(committed: bool) -> SelectStatement { + let mut query = Query::select() .from(db::package::Entity) .columns([ db::package::Column::RepoId, @@ -236,39 +236,29 @@ fn max_pkg_ids_query() -> SelectStatement { db::package::Column::Arch, db::package::Column::Name, ]) - .cond_where( - Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), - ) - .to_owned() -} - -pub fn pkgs_to_sync( - conn: &DbConn, - repo: i32, - arch: &str, -) -> SelectorRaw> { - let max_id_query = Query::select() - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .from(db::package::Entity) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) .to_owned(); + if committed { + query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); + } + + query +} + +/// Query that returns all packages that should be included in a sync for the given repository and +/// arch. +pub fn pkgs_to_sync( + conn: &DbConn, + repo: i32, + arch: &str, +) -> SelectorRaw> { let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let query = Query::select() - .column((p1.clone(), Asterisk)) + .columns(db::package::Column::iter().map(|c| (p1.clone(), c))) .from_as(db::package::Entity, p1.clone()) .join_subquery( JoinType::InnerJoin, - max_id_query, + max_pkg_ids_query(false), p2.clone(), Expr::col((p1.clone(), db::package::Column::Id)) .eq(Expr::col((p2.clone(), Alias::new("max_id")))), @@ -276,13 +266,13 @@ pub fn pkgs_to_sync( .cond_where( Condition::all() .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) - .add( - Expr::col((p1.clone(), db::package::Column::State)) - .ne(db::PackageState::PendingDeletion), - ) .add( Expr::col((p1.clone(), db::package::Column::Arch)) .is_in([arch, crate::ANY_ARCH]), + ) + .add( + Expr::col((p1.clone(), db::package::Column::State)) + .ne(db::PackageState::PendingDeletion), ), ) .to_owned(); @@ -293,36 +283,10 @@ pub fn pkgs_to_sync( } fn stale_pkgs_query(include_repo: bool) -> SelectStatement { - // In each repository, only one version of a package can exist for any given arch. Because ids - // are monotonically increasing, we know that the row that represents the actual package - // currently in the repository is the row with the largest id whose state is "committed". This - // query finds this id for each (repo, arch, name) tuple. - let mut max_id_query = Query::select(); - max_id_query - .from(db::package::Entity) - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .cond_where( - Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), - ); - let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let mut query = Query::select(); - - // We then perform an inner join between the max id query above and the package table, where we - // filter on rows whose id is less than their respective package's max id or whose state is set - // to "pending deletion". This gives us all rows in the database that correspond to packages - // that are no longer needed, and can thus be removed. - query.from_as(db::package::Entity, p1.clone()); + let mut query = Query::select() + .from_as(db::package::Entity, p1.clone()) + .to_owned(); if include_repo { query.columns([ @@ -333,10 +297,13 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { query.column((p1.clone(), db::package::Column::Id)); } + // We left join on the max pkgs query because a repository that has all its packages set to + // "pending deletion" doesn't show up in the query. These are also included with a where clause + // on the joined rows. query .join_subquery( - JoinType::InnerJoin, - max_id_query, + JoinType::LeftJoin, + max_pkg_ids_query(true), p2.clone(), Condition::all() .add( @@ -359,11 +326,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { .lt(Expr::col((p2.clone(), Alias::new("max_id")))), ) .add( - Expr::col((p1.clone(), db::package::Column::Id)) + Expr::col((p1.clone(), db::package::Column::State)) .eq(db::PackageState::PendingDeletion), ), - ) - .to_owned() + ); + + query } pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index f91ab69..266eeee 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -100,42 +100,6 @@ impl RepoMgr { Ok(()) } - /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) - .stream(&self.conn) - .await?; - - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; - } - - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); - - Ok(()) - } - /// Generate the archive databases for the given repository and architecture. async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = @@ -209,6 +173,42 @@ impl RepoMgr { Ok(()) } + /// Clean any remaining old package files from the database and file system + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) + .stream(&self.conn) + .await?; + + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; + } + + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); + + Ok(()) + } + pub async fn pkg_parse_task(&self) { loop { // Receive the next message and immediately drop the mutex afterwards. As long as the diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 290f9a7..d088095 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -78,7 +78,7 @@ async fn post_package_archive( State(global): State, Path((distro, repo)): Path<(String, String)>, body: Body, -) -> crate::Result<()> { +) -> crate::Result { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; let [tmp_path] = global.mgr.random_file_paths(); @@ -88,7 +88,7 @@ async fn post_package_archive( global.mgr.queue_pkg(repo, tmp_path).await; - Ok(()) + Ok(StatusCode::ACCEPTED) } async fn delete_repo( @@ -110,7 +110,15 @@ async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo_arch(repo, &arch).await?; + + tracing::info!("Removed architecture '{arch}' from repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } //if let Some(mgr) = global.mgr.get_mgr(&distro).await { // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; //