feat: start reimplementing package removals; some fixes

concurrent-repos
Jef Roosens 2024-06-15 21:59:58 +02:00
parent 5d7832c43a
commit 27afb3496d
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 116 additions and 32 deletions

View File

@ -2,7 +2,7 @@ use crate::db::{self, *};
use futures::Stream; use futures::Stream;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Expr, Query, SelectStatement}; use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement};
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -222,6 +222,76 @@ pub struct PkgToRemove {
pub id: i32, pub id: i32,
} }
fn max_pkg_ids_query() -> SelectStatement {
Query::select()
.from(db::package::Entity)
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.cond_where(
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
)
.to_owned()
}
pub fn pkgs_to_sync(
conn: &DbConn,
repo: i32,
arch: &str,
) -> SelectorRaw<SelectModel<package::Model>> {
let max_id_query = Query::select()
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.from(db::package::Entity)
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.to_owned();
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let query = Query::select()
.column((p1.clone(), Asterisk))
.from_as(db::package::Entity, p1.clone())
.join_subquery(
JoinType::InnerJoin,
max_id_query,
p2.clone(),
Expr::col((p1.clone(), db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.cond_where(
Condition::all()
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
)
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.is_in([arch, crate::ANY_ARCH]),
),
)
.to_owned();
let builder = conn.get_database_backend();
let sql = builder.build(&query);
db::Package::find().from_raw_sql(sql)
}
fn stale_pkgs_query(include_repo: bool) -> SelectStatement { fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
// In each repository, only one version of a package can exist for any given arch. Because ids // In each repository, only one version of a package can exist for any given arch. Because ids
// are monotonically increasing, we know that the row that represents the actual package // are monotonically increasing, we know that the row that represents the actual package

View File

@ -12,6 +12,8 @@ use repo::DistroMgr;
use clap::Parser; use clap::Parser;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
pub const ANY_ARCH: &'static str = "any";
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {
data_dir: PathBuf, data_dir: PathBuf,

View File

@ -22,8 +22,6 @@ use tokio::sync::{
}; };
use uuid::Uuid; use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any";
struct PkgQueueMsg { struct PkgQueueMsg {
repo: i32, repo: i32,
path: PathBuf, path: PathBuf,
@ -108,12 +106,11 @@ impl RepoMgr {
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1; let mut max_id = -1;
let mut removed_pkgs = 0; let mut removed_pkgs = 0;
// TODO track largest ID seen, then perform similar query to above except we remove the
// matched IDs, but only if they're smaller than or equal to the largest seen ID so we
// don't remove newly added packages
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
// Failing to remove the package file isn't the biggest problem // Failing to remove the package file isn't the biggest problem
let _ = tokio::fs::remove_file( let _ = tokio::fs::remove_file(
@ -148,18 +145,7 @@ impl RepoMgr {
// Query all packages in the repo that have the given architecture or the "any" // Query all packages in the repo that have the given architecture or the "any"
// architecture // architecture
let mut pkgs = db::Package::find() let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
.filter(db::package::Column::RepoId.eq(repo))
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.filter(
db::package::Column::Id.in_subquery(
Query::select()
.expr(db::package::Column::Id.max())
.from(db::package::Entity)
.group_by_columns([db::package::Column::Arch, db::package::Column::Name])
.to_owned(),
),
)
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
@ -358,6 +344,37 @@ impl RepoMgr {
Ok(()) Ok(())
} }
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
self.repos.write().await.remove(&repo);
db::Repo::delete_by_id(repo).exec(&self.conn).await?;
let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
Ok(())
}
/// Remove all packages in the repository that have a given arch. This method marks all
/// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::PendingDeletion),
)
.filter(
Condition::all()
.add(db::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)),
)
.exec(&self.conn)
.await?;
self.sync_repo(repo).await?;
self.remove_stale_pkgs().await?;
Ok(())
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] { pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| { std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); let uuid: uuid::fmt::Simple = Uuid::new_v4().into();

View File

@ -95,20 +95,15 @@ async fn delete_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>, Path((distro, repo)): Path<(String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
Ok(StatusCode::NOT_FOUND) if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
//if let Some(mgr) = global.mgr.get_mgr(&distro).await { global.mgr.remove_repo(repo).await?;
// let repo_removed = mgr.remove_repo(&repo).await?;
// tracing::info!("Removed repository {repo}");
// if repo_removed {
// tracing::info!("Removed repository '{}'", repo); Ok(StatusCode::OK)
// } else {
// Ok(StatusCode::OK) Ok(StatusCode::NOT_FOUND)
// } else { }
// Ok(StatusCode::NOT_FOUND)
// }
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
} }
async fn delete_arch_repo( async fn delete_arch_repo(