From 67b4640e569e477196e7e5b2eddf18425e2e0a7a Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 15 Jun 2024 18:12:14 +0200 Subject: [PATCH 1/3] feat: add package cleaning --- server/src/db/query/package.rs | 47 +++++++++++++++++++++++++++------- server/src/repo/manager2.rs | 34 +++++++++++++++++++----- 2 files changed, 66 insertions(+), 15 deletions(-) diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 5e400ea..0115f5b 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Expr, Query}; +use sea_query::{Alias, Expr, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -218,11 +218,15 @@ pub async fn full(conn: &DbConn, id: i32) -> Result> { #[derive(FromQueryResult)] pub struct PkgToRemove { - repo_id: i32, - id: i32, + pub repo_id: i32, + pub id: i32, } -pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw> { +fn stale_pkgs_query(include_repo: bool) -> SelectStatement { + // In each repository, only one version of a package can exist for any given arch. Because ids + // are monotonically increasing, we know that the row that represents the actual package + // currently in the repository is the row with the largest id whose state is "committed". This + // query finds this id for each (repo, arch, name) tuple. let mut max_id_query = Query::select(); max_id_query .from(db::package::Entity) @@ -243,12 +247,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw SelectorRaw SelectorRaw> { + let query = stale_pkgs_query(true); let builder = conn.get_database_backend(); let sql = builder.build(&query); PkgToRemove::find_by_statement(sql) } + +pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> { + Ok(db::Package::delete_many() + .filter(db::package::Column::Id.lte(max_id)) + .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false))) + .exec(conn) + .await + .map(|_| ())?) +} diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index b0df209..67d36eb 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -1,5 +1,5 @@ use super::{archive, package}; -use crate::db; +use crate::db::{self, query::package::delete_stale_pkgs}; use std::{ collections::HashMap, @@ -103,16 +103,38 @@ impl RepoMgr { } /// Clean any remaining old package files from the database and file system - pub async fn clean(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::to_be_removed_query(&self.conn) + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) .stream(&self.conn) .await?; + let mut max_id = -1; + let mut removed_pkgs = 0; + + // TODO track largest ID seen, then perform similar query to above except we remove the + // matched IDs, but only if they're smaller than or equal to the largest seen ID so we + // don't remove newly added packages while let Some(pkg) = pkgs.next().await.transpose()? { - // TODO remove package from file system and database + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; } - // TODO log indicating how many packages were cleaned + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); Ok(()) } @@ -233,7 +255,7 @@ impl RepoMgr { // TODO move this so that we only clean if entire queue is empty, not just // queue for specific repo - let _ = self.clean().await; + let _ = self.remove_stale_pkgs().await; } } } From 5d7832c43aa40ef8a6f9c239b1b6ec93beec88bf Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 15 Jun 2024 20:24:58 +0200 Subject: [PATCH 2/3] fix: fixed get_file route --- server/src/cli.rs | 2 +- server/src/repo/manager2.rs | 13 ++++++++++ server/src/repo/mod.rs | 50 ++++++++++++++++--------------------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 68658ae..1ceaf27 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -44,7 +44,7 @@ pub struct Cli { #[arg( long, value_name = "LOG_LEVEL", - default_value = "tower_http=debug,rieterd=debug", + default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", env = "RIETER_LOG" )] pub log: String, diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 67d36eb..9a10e0d 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -268,6 +268,19 @@ impl RepoMgr { }); } + pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { + Ok(db::Repo::find() + .find_also_related(db::Distro) + .filter( + Condition::all() + .add(db::repo::Column::Name.eq(repo)) + .add(db::distro::Column::Name.eq(distro)), + ) + .one(&self.conn) + .await + .map(|res| res.map(|(repo, _)| repo.id))?) + } + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { let mut repos = self.repos.write().await; diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index c5549ef..bb592c9 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -49,25 +49,29 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - let repo_dir = global - .config - .data_dir - .join("distros") - .join(&distro) - .join(&repo); + if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { + let repo_dir = global + .config + .data_dir + .join("repos") + .join(repo_id.to_string()); - let file_name = - if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = + if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await) + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) + } else { + Err(StatusCode::NOT_FOUND.into()) + } } async fn post_package_archive( @@ -84,18 +88,6 @@ async fn post_package_archive( global.mgr.queue_pkg(repo, tmp_path).await; - //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; - // - //tracing::info!( - // "Added '{}-{}' to repository '{}' ({})", - // name, - // version, - // repo, - // arch - //); - - //tokio::spawn(async move { mgr.sync_repo(&repo).await }); - Ok(()) } From 27afb3496d6c1c1df4165426bfe2e8fe999b9a66 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 15 Jun 2024 21:59:58 +0200 Subject: [PATCH 3/3] feat: start reimplementing package removals; some fixes --- server/src/db/query/package.rs | 72 +++++++++++++++++++++++++++++++++- server/src/main.rs | 2 + server/src/repo/manager2.rs | 51 ++++++++++++++++-------- server/src/repo/mod.rs | 23 +++++------ 4 files changed, 116 insertions(+), 32 deletions(-) diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 0115f5b..8e9c17b 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Expr, Query, SelectStatement}; +use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -222,6 +222,76 @@ pub struct PkgToRemove { pub id: i32, } +fn max_pkg_ids_query() -> SelectStatement { + Query::select() + .from(db::package::Entity) + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .cond_where( + Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), + ) + .to_owned() +} + +pub fn pkgs_to_sync( + conn: &DbConn, + repo: i32, + arch: &str, +) -> SelectorRaw> { + let max_id_query = Query::select() + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .from(db::package::Entity) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .to_owned(); + + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); + let query = Query::select() + .column((p1.clone(), Asterisk)) + .from_as(db::package::Entity, p1.clone()) + .join_subquery( + JoinType::InnerJoin, + max_id_query, + p2.clone(), + Expr::col((p1.clone(), db::package::Column::Id)) + .eq(Expr::col((p2.clone(), Alias::new("max_id")))), + ) + .cond_where( + Condition::all() + .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) + .add( + Expr::col((p1.clone(), db::package::Column::State)) + .ne(db::PackageState::PendingDeletion), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Arch)) + .is_in([arch, crate::ANY_ARCH]), + ), + ) + .to_owned(); + let builder = conn.get_database_backend(); + let sql = builder.build(&query); + + db::Package::find().from_raw_sql(sql) +} + fn stale_pkgs_query(include_repo: bool) -> SelectStatement { // In each repository, only one version of a package can exist for any given arch. Because ids // are monotonically increasing, we know that the row that represents the actual package diff --git a/server/src/main.rs b/server/src/main.rs index f1e70f9..c3237cf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -12,6 +12,8 @@ use repo::DistroMgr; use clap::Parser; use std::{path::PathBuf, sync::Arc}; +pub const ANY_ARCH: &'static str = "any"; + #[derive(Clone)] pub struct Config { data_dir: PathBuf, diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 9a10e0d..f91ab69 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -22,8 +22,6 @@ use tokio::sync::{ }; use uuid::Uuid; -pub const ANY_ARCH: &'static str = "any"; - struct PkgQueueMsg { repo: i32, path: PathBuf, @@ -108,12 +106,11 @@ impl RepoMgr { .stream(&self.conn) .await?; + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later let mut max_id = -1; let mut removed_pkgs = 0; - // TODO track largest ID seen, then perform similar query to above except we remove the - // matched IDs, but only if they're smaller than or equal to the largest seen ID so we - // don't remove newly added packages while let Some(pkg) = pkgs.next().await.transpose()? { // Failing to remove the package file isn't the biggest problem let _ = tokio::fs::remove_file( @@ -148,18 +145,7 @@ impl RepoMgr { // Query all packages in the repo that have the given architecture or the "any" // architecture - let mut pkgs = db::Package::find() - .filter(db::package::Column::RepoId.eq(repo)) - .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) - .filter( - db::package::Column::Id.in_subquery( - Query::select() - .expr(db::package::Column::Id.max()) - .from(db::package::Entity) - .group_by_columns([db::package::Column::Arch, db::package::Column::Name]) - .to_owned(), - ), - ) + let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) .stream(&self.conn) .await?; @@ -358,6 +344,37 @@ impl RepoMgr { Ok(()) } + pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { + self.repos.write().await.remove(&repo); + db::Repo::delete_by_id(repo).exec(&self.conn).await?; + let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; + + Ok(()) + } + + /// Remove all packages in the repository that have a given arch. This method marks all + /// packages with the given architecture as "pending deletion", before performing a manual sync + /// & removal of stale packages. + pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::PendingDeletion), + ) + .filter( + Condition::all() + .add(db::package::Column::RepoId.eq(repo)) + .add(db::package::Column::Arch.eq(arch)), + ) + .exec(&self.conn) + .await?; + + self.sync_repo(repo).await?; + self.remove_stale_pkgs().await?; + + Ok(()) + } + pub fn random_file_paths(&self) -> [PathBuf; C] { std::array::from_fn(|_| { let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index bb592c9..290f9a7 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -95,20 +95,15 @@ async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(mgr) = global.mgr.get_mgr(&distro).await { - // let repo_removed = mgr.remove_repo(&repo).await?; - // - // if repo_removed { - // tracing::info!("Removed repository '{}'", repo); - // - // Ok(StatusCode::OK) - // } else { - // Ok(StatusCode::NOT_FOUND) - // } - //} else { - // Ok(StatusCode::NOT_FOUND) - //} + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo(repo).await?; + + tracing::info!("Removed repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } } async fn delete_arch_repo(