feat: clean up some queries; implement repo arch remove

concurrent-repos
Jef Roosens 2024-06-16 13:04:04 +02:00
parent 27afb3496d
commit e17269ac3b
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 82 additions and 106 deletions

View File

@ -2,7 +2,7 @@ use crate::db::{self, *};
use futures::Stream; use futures::Stream;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -222,8 +222,8 @@ pub struct PkgToRemove {
pub id: i32, pub id: i32,
} }
fn max_pkg_ids_query() -> SelectStatement { fn max_pkg_ids_query(committed: bool) -> SelectStatement {
Query::select() let mut query = Query::select()
.from(db::package::Entity) .from(db::package::Entity)
.columns([ .columns([
db::package::Column::RepoId, db::package::Column::RepoId,
@ -236,39 +236,29 @@ fn max_pkg_ids_query() -> SelectStatement {
db::package::Column::Arch, db::package::Column::Arch,
db::package::Column::Name, db::package::Column::Name,
]) ])
.cond_where(
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
)
.to_owned()
}
pub fn pkgs_to_sync(
conn: &DbConn,
repo: i32,
arch: &str,
) -> SelectorRaw<SelectModel<package::Model>> {
let max_id_query = Query::select()
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.from(db::package::Entity)
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.to_owned(); .to_owned();
if committed {
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
}
query
}
/// Query that returns all packages that should be included in a sync for the given repository and
/// arch.
pub fn pkgs_to_sync(
conn: &DbConn,
repo: i32,
arch: &str,
) -> SelectorRaw<SelectModel<package::Model>> {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let query = Query::select() let query = Query::select()
.column((p1.clone(), Asterisk)) .columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
.from_as(db::package::Entity, p1.clone()) .from_as(db::package::Entity, p1.clone())
.join_subquery( .join_subquery(
JoinType::InnerJoin, JoinType::InnerJoin,
max_id_query, max_pkg_ids_query(false),
p2.clone(), p2.clone(),
Expr::col((p1.clone(), db::package::Column::Id)) Expr::col((p1.clone(), db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))), .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
@ -276,13 +266,13 @@ pub fn pkgs_to_sync(
.cond_where( .cond_where(
Condition::all() Condition::all()
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
)
.add( .add(
Expr::col((p1.clone(), db::package::Column::Arch)) Expr::col((p1.clone(), db::package::Column::Arch))
.is_in([arch, crate::ANY_ARCH]), .is_in([arch, crate::ANY_ARCH]),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
), ),
) )
.to_owned(); .to_owned();
@ -293,36 +283,10 @@ pub fn pkgs_to_sync(
} }
fn stale_pkgs_query(include_repo: bool) -> SelectStatement { fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
// In each repository, only one version of a package can exist for any given arch. Because ids
// are monotonically increasing, we know that the row that represents the actual package
// currently in the repository is the row with the largest id whose state is "committed". This
// query finds this id for each (repo, arch, name) tuple.
let mut max_id_query = Query::select();
max_id_query
.from(db::package::Entity)
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.cond_where(
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
);
let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select(); let mut query = Query::select()
.from_as(db::package::Entity, p1.clone())
// We then perform an inner join between the max id query above and the package table, where we .to_owned();
// filter on rows whose id is less than their respective package's max id or whose state is set
// to "pending deletion". This gives us all rows in the database that correspond to packages
// that are no longer needed, and can thus be removed.
query.from_as(db::package::Entity, p1.clone());
if include_repo { if include_repo {
query.columns([ query.columns([
@ -333,10 +297,13 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
query.column((p1.clone(), db::package::Column::Id)); query.column((p1.clone(), db::package::Column::Id));
} }
// We left join on the max pkgs query because a repository that has all its packages set to
// "pending deletion" doesn't show up in the query. These are also included with a where clause
// on the joined rows.
query query
.join_subquery( .join_subquery(
JoinType::InnerJoin, JoinType::LeftJoin,
max_id_query, max_pkg_ids_query(true),
p2.clone(), p2.clone(),
Condition::all() Condition::all()
.add( .add(
@ -359,11 +326,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
.lt(Expr::col((p2.clone(), Alias::new("max_id")))), .lt(Expr::col((p2.clone(), Alias::new("max_id")))),
) )
.add( .add(
Expr::col((p1.clone(), db::package::Column::Id)) Expr::col((p1.clone(), db::package::Column::State))
.eq(db::PackageState::PendingDeletion), .eq(db::PackageState::PendingDeletion),
), ),
) );
.to_owned()
query
} }
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> { pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {

View File

@ -100,42 +100,6 @@ impl RepoMgr {
Ok(()) Ok(())
} }
/// Clean any remaining old package files from the database and file system
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
.stream(&self.conn)
.await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = pkgs.next().await.transpose()? {
// Failing to remove the package file isn't the biggest problem
let _ = tokio::fs::remove_file(
self.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
)
.await;
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
}
if removed_pkgs > 0 {
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
}
tracing::info!("Removed {removed_pkgs} stale package(s)");
Ok(())
}
/// Generate the archive databases for the given repository and architecture. /// Generate the archive databases for the given repository and architecture.
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
@ -209,6 +173,42 @@ impl RepoMgr {
Ok(()) Ok(())
} }
/// Clean any remaining old package files from the database and file system
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
.stream(&self.conn)
.await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = pkgs.next().await.transpose()? {
// Failing to remove the package file isn't the biggest problem
let _ = tokio::fs::remove_file(
self.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
)
.await;
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
}
if removed_pkgs > 0 {
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
}
tracing::info!("Removed {removed_pkgs} stale package(s)");
Ok(())
}
pub async fn pkg_parse_task(&self) { pub async fn pkg_parse_task(&self) {
loop { loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the // Receive the next message and immediately drop the mutex afterwards. As long as the

View File

@ -78,7 +78,7 @@ async fn post_package_archive(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>, Path((distro, repo)): Path<(String, String)>,
body: Body, body: Body,
) -> crate::Result<()> { ) -> crate::Result<StatusCode> {
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
let [tmp_path] = global.mgr.random_file_paths(); let [tmp_path] = global.mgr.random_file_paths();
@ -88,7 +88,7 @@ async fn post_package_archive(
global.mgr.queue_pkg(repo, tmp_path).await; global.mgr.queue_pkg(repo, tmp_path).await;
Ok(()) Ok(StatusCode::ACCEPTED)
} }
async fn delete_repo( async fn delete_repo(
@ -110,7 +110,15 @@ async fn delete_arch_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo, arch)): Path<(String, String, String)>, Path((distro, repo, arch)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
Ok(StatusCode::NOT_FOUND) if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
global.mgr.remove_repo_arch(repo, &arch).await?;
tracing::info!("Removed architecture '{arch}' from repository {repo}");
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
//if let Some(mgr) = global.mgr.get_mgr(&distro).await { //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
// //