diff --git a/server/src/cli.rs b/server/src/cli.rs index 1ceaf27..68658ae 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -44,7 +44,7 @@ pub struct Cli { #[arg( long, value_name = "LOG_LEVEL", - default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", + default_value = "tower_http=debug,rieterd=debug", env = "RIETER_LOG" )] pub log: String, diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 8e9c17b..5e400ea 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; +use sea_query::{Alias, Expr, Query}; use serde::Deserialize; #[derive(Deserialize)] @@ -218,85 +218,11 @@ pub async fn full(conn: &DbConn, id: i32) -> Result> { #[derive(FromQueryResult)] pub struct PkgToRemove { - pub repo_id: i32, - pub id: i32, + repo_id: i32, + id: i32, } -fn max_pkg_ids_query() -> SelectStatement { - Query::select() - .from(db::package::Entity) - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .cond_where( - Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), - ) - .to_owned() -} - -pub fn pkgs_to_sync( - conn: &DbConn, - repo: i32, - arch: &str, -) -> SelectorRaw> { - let max_id_query = Query::select() - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .from(db::package::Entity) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .to_owned(); - - let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let query = Query::select() - .column((p1.clone(), Asterisk)) - .from_as(db::package::Entity, p1.clone()) - .join_subquery( - JoinType::InnerJoin, - max_id_query, - p2.clone(), - Expr::col((p1.clone(), db::package::Column::Id)) - .eq(Expr::col((p2.clone(), Alias::new("max_id")))), - ) - .cond_where( - Condition::all() - .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) - .add( - Expr::col((p1.clone(), db::package::Column::State)) - .ne(db::PackageState::PendingDeletion), - ) - .add( - Expr::col((p1.clone(), db::package::Column::Arch)) - .is_in([arch, crate::ANY_ARCH]), - ), - ) - .to_owned(); - let builder = conn.get_database_backend(); - let sql = builder.build(&query); - - db::Package::find().from_raw_sql(sql) -} - -fn stale_pkgs_query(include_repo: bool) -> SelectStatement { - // In each repository, only one version of a package can exist for any given arch. Because ids - // are monotonically increasing, we know that the row that represents the actual package - // currently in the repository is the row with the largest id whose state is "committed". This - // query finds this id for each (repo, arch, name) tuple. +pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw> { let mut max_id_query = Query::select(); max_id_query .from(db::package::Entity) @@ -317,23 +243,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let mut query = Query::select(); - - // We then perform an inner join between the max id query above and the package table, where we - // filter on rows whose id is less than their respective package's max id or whose state is set - // to "pending deletion". This gives us all rows in the database that correspond to packages - // that are no longer needed, and can thus be removed. - query.from_as(db::package::Entity, p1.clone()); - - if include_repo { - query.columns([ + query + .from_as(db::package::Entity, p1.clone()) + .columns([ (p1.clone(), db::package::Column::RepoId), (p1.clone(), db::package::Column::Id), - ]); - } else { - query.column((p1.clone(), db::package::Column::Id)); - } - - query + ]) .join_subquery( JoinType::InnerJoin, max_id_query, @@ -362,23 +277,9 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { Expr::col((p1.clone(), db::package::Column::Id)) .eq(db::PackageState::PendingDeletion), ), - ) - .to_owned() -} - -pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { - let query = stale_pkgs_query(true); + ); let builder = conn.get_database_backend(); let sql = builder.build(&query); PkgToRemove::find_by_statement(sql) } - -pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> { - Ok(db::Package::delete_many() - .filter(db::package::Column::Id.lte(max_id)) - .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false))) - .exec(conn) - .await - .map(|_| ())?) -} diff --git a/server/src/main.rs b/server/src/main.rs index c3237cf..f1e70f9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -12,8 +12,6 @@ use repo::DistroMgr; use clap::Parser; use std::{path::PathBuf, sync::Arc}; -pub const ANY_ARCH: &'static str = "any"; - #[derive(Clone)] pub struct Config { data_dir: PathBuf, diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index f91ab69..b0df209 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -1,5 +1,5 @@ use super::{archive, package}; -use crate::db::{self, query::package::delete_stale_pkgs}; +use crate::db; use std::{ collections::HashMap, @@ -22,6 +22,8 @@ use tokio::sync::{ }; use uuid::Uuid; +pub const ANY_ARCH: &'static str = "any"; + struct PkgQueueMsg { repo: i32, path: PathBuf, @@ -101,37 +103,16 @@ impl RepoMgr { } /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) + pub async fn clean(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::to_be_removed_query(&self.conn) .stream(&self.conn) .await?; - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; + // TODO remove package from file system and database } - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); + // TODO log indicating how many packages were cleaned Ok(()) } @@ -145,7 +126,18 @@ impl RepoMgr { // Query all packages in the repo that have the given architecture or the "any" // architecture - let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) + let mut pkgs = db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) + .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) + .filter( + db::package::Column::Id.in_subquery( + Query::select() + .expr(db::package::Column::Id.max()) + .from(db::package::Entity) + .group_by_columns([db::package::Column::Arch, db::package::Column::Name]) + .to_owned(), + ), + ) .stream(&self.conn) .await?; @@ -241,7 +233,7 @@ impl RepoMgr { // TODO move this so that we only clean if entire queue is empty, not just // queue for specific repo - let _ = self.remove_stale_pkgs().await; + let _ = self.clean().await; } } } @@ -254,19 +246,6 @@ impl RepoMgr { }); } - pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { - Ok(db::Repo::find() - .find_also_related(db::Distro) - .filter( - Condition::all() - .add(db::repo::Column::Name.eq(repo)) - .add(db::distro::Column::Name.eq(distro)), - ) - .one(&self.conn) - .await - .map(|res| res.map(|(repo, _)| repo.id))?) - } - pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { let mut repos = self.repos.write().await; @@ -344,37 +323,6 @@ impl RepoMgr { Ok(()) } - pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { - self.repos.write().await.remove(&repo); - db::Repo::delete_by_id(repo).exec(&self.conn).await?; - let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; - - Ok(()) - } - - /// Remove all packages in the repository that have a given arch. This method marks all - /// packages with the given architecture as "pending deletion", before performing a manual sync - /// & removal of stale packages. - pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::PendingDeletion), - ) - .filter( - Condition::all() - .add(db::package::Column::RepoId.eq(repo)) - .add(db::package::Column::Arch.eq(arch)), - ) - .exec(&self.conn) - .await?; - - self.sync_repo(repo).await?; - self.remove_stale_pkgs().await?; - - Ok(()) - } - pub fn random_file_paths(&self) -> [PathBuf; C] { std::array::from_fn(|_| { let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 290f9a7..c5549ef 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -49,29 +49,25 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - let repo_dir = global - .config - .data_dir - .join("repos") - .join(repo_id.to_string()); + let repo_dir = global + .config + .data_dir + .join("distros") + .join(&distro) + .join(&repo); - let file_name = - if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = + if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) - } else { - Err(StatusCode::NOT_FOUND.into()) - } + Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await) } async fn post_package_archive( @@ -88,6 +84,18 @@ async fn post_package_archive( global.mgr.queue_pkg(repo, tmp_path).await; + //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; + // + //tracing::info!( + // "Added '{}-{}' to repository '{}' ({})", + // name, + // version, + // repo, + // arch + //); + + //tokio::spawn(async move { mgr.sync_repo(&repo).await }); + Ok(()) } @@ -95,15 +103,20 @@ async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo(repo).await?; - - tracing::info!("Removed repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let repo_removed = mgr.remove_repo(&repo).await?; + // + // if repo_removed { + // tracing::info!("Removed repository '{}'", repo); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } async fn delete_arch_repo(