feat: add package cleaning

concurrent-repos
Jef Roosens 2024-06-15 18:12:14 +02:00
parent a408c14ab1
commit 67b4640e56
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
2 changed files with 66 additions and 15 deletions

View File

@ -2,7 +2,7 @@ use crate::db::{self, *};
use futures::Stream; use futures::Stream;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Expr, Query}; use sea_query::{Alias, Expr, Query, SelectStatement};
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -218,11 +218,15 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
#[derive(FromQueryResult)] #[derive(FromQueryResult)]
pub struct PkgToRemove { pub struct PkgToRemove {
repo_id: i32, pub repo_id: i32,
id: i32, pub id: i32,
} }
pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> { fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
// In each repository, only one version of a package can exist for any given arch. Because ids
// are monotonically increasing, we know that the row that represents the actual package
// currently in the repository is the row with the largest id whose state is "committed". This
// query finds this id for each (repo, arch, name) tuple.
let mut max_id_query = Query::select(); let mut max_id_query = Query::select();
max_id_query max_id_query
.from(db::package::Entity) .from(db::package::Entity)
@ -243,12 +247,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select(); let mut query = Query::select();
query
.from_as(db::package::Entity, p1.clone()) // We then perform an inner join between the max id query above and the package table, where we
.columns([ // filter on rows whose id is less than their respective package's max id or whose state is set
// to "pending deletion". This gives us all rows in the database that correspond to packages
// that are no longer needed, and can thus be removed.
query.from_as(db::package::Entity, p1.clone());
if include_repo {
query.columns([
(p1.clone(), db::package::Column::RepoId), (p1.clone(), db::package::Column::RepoId),
(p1.clone(), db::package::Column::Id), (p1.clone(), db::package::Column::Id),
]) ]);
} else {
query.column((p1.clone(), db::package::Column::Id));
}
query
.join_subquery( .join_subquery(
JoinType::InnerJoin, JoinType::InnerJoin,
max_id_query, max_id_query,
@ -277,9 +292,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
Expr::col((p1.clone(), db::package::Column::Id)) Expr::col((p1.clone(), db::package::Column::Id))
.eq(db::PackageState::PendingDeletion), .eq(db::PackageState::PendingDeletion),
), ),
); )
.to_owned()
}
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
let query = stale_pkgs_query(true);
let builder = conn.get_database_backend(); let builder = conn.get_database_backend();
let sql = builder.build(&query); let sql = builder.build(&query);
PkgToRemove::find_by_statement(sql) PkgToRemove::find_by_statement(sql)
} }
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
Ok(db::Package::delete_many()
.filter(db::package::Column::Id.lte(max_id))
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
.exec(conn)
.await
.map(|_| ())?)
}

View File

@ -1,5 +1,5 @@
use super::{archive, package}; use super::{archive, package};
use crate::db; use crate::db::{self, query::package::delete_stale_pkgs};
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -103,16 +103,38 @@ impl RepoMgr {
} }
/// Clean any remaining old package files from the database and file system /// Clean any remaining old package files from the database and file system
pub async fn clean(&self) -> crate::Result<()> { pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::to_be_removed_query(&self.conn) let mut pkgs = db::query::package::stale_pkgs(&self.conn)
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
let mut max_id = -1;
let mut removed_pkgs = 0;
// TODO track largest ID seen, then perform similar query to above except we remove the
// matched IDs, but only if they're smaller than or equal to the largest seen ID so we
// don't remove newly added packages
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
// TODO remove package from file system and database // Failing to remove the package file isn't the biggest problem
let _ = tokio::fs::remove_file(
self.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
)
.await;
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
} }
// TODO log indicating how many packages were cleaned if removed_pkgs > 0 {
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
}
tracing::info!("Removed {removed_pkgs} stale package(s)");
Ok(()) Ok(())
} }
@ -233,7 +255,7 @@ impl RepoMgr {
// TODO move this so that we only clean if entire queue is empty, not just // TODO move this so that we only clean if entire queue is empty, not just
// queue for specific repo // queue for specific repo
let _ = self.clean().await; let _ = self.remove_stale_pkgs().await;
} }
} }
} }