Compare commits
3 Commits
a408c14ab1
...
27afb3496d
Author | SHA1 | Date |
---|---|---|
Jef Roosens | 27afb3496d | |
Jef Roosens | 5d7832c43a | |
Jef Roosens | 67b4640e56 |
|
@ -44,7 +44,7 @@ pub struct Cli {
|
||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
value_name = "LOG_LEVEL",
|
value_name = "LOG_LEVEL",
|
||||||
default_value = "tower_http=debug,rieterd=debug",
|
default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
|
||||||
env = "RIETER_LOG"
|
env = "RIETER_LOG"
|
||||||
)]
|
)]
|
||||||
pub log: String,
|
pub log: String,
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::db::{self, *};
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use sea_orm::{sea_query::IntoCondition, *};
|
use sea_orm::{sea_query::IntoCondition, *};
|
||||||
use sea_query::{Alias, Expr, Query};
|
use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
@ -218,11 +218,85 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
|
||||||
|
|
||||||
#[derive(FromQueryResult)]
|
#[derive(FromQueryResult)]
|
||||||
pub struct PkgToRemove {
|
pub struct PkgToRemove {
|
||||||
repo_id: i32,
|
pub repo_id: i32,
|
||||||
id: i32,
|
pub id: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
fn max_pkg_ids_query() -> SelectStatement {
|
||||||
|
Query::select()
|
||||||
|
.from(db::package::Entity)
|
||||||
|
.columns([
|
||||||
|
db::package::Column::RepoId,
|
||||||
|
db::package::Column::Arch,
|
||||||
|
db::package::Column::Name,
|
||||||
|
])
|
||||||
|
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
||||||
|
.group_by_columns([
|
||||||
|
db::package::Column::RepoId,
|
||||||
|
db::package::Column::Arch,
|
||||||
|
db::package::Column::Name,
|
||||||
|
])
|
||||||
|
.cond_where(
|
||||||
|
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
|
||||||
|
)
|
||||||
|
.to_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pkgs_to_sync(
|
||||||
|
conn: &DbConn,
|
||||||
|
repo: i32,
|
||||||
|
arch: &str,
|
||||||
|
) -> SelectorRaw<SelectModel<package::Model>> {
|
||||||
|
let max_id_query = Query::select()
|
||||||
|
.columns([
|
||||||
|
db::package::Column::RepoId,
|
||||||
|
db::package::Column::Arch,
|
||||||
|
db::package::Column::Name,
|
||||||
|
])
|
||||||
|
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
||||||
|
.from(db::package::Entity)
|
||||||
|
.group_by_columns([
|
||||||
|
db::package::Column::RepoId,
|
||||||
|
db::package::Column::Arch,
|
||||||
|
db::package::Column::Name,
|
||||||
|
])
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
|
let query = Query::select()
|
||||||
|
.column((p1.clone(), Asterisk))
|
||||||
|
.from_as(db::package::Entity, p1.clone())
|
||||||
|
.join_subquery(
|
||||||
|
JoinType::InnerJoin,
|
||||||
|
max_id_query,
|
||||||
|
p2.clone(),
|
||||||
|
Expr::col((p1.clone(), db::package::Column::Id))
|
||||||
|
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
|
)
|
||||||
|
.cond_where(
|
||||||
|
Condition::all()
|
||||||
|
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
|
||||||
|
.add(
|
||||||
|
Expr::col((p1.clone(), db::package::Column::State))
|
||||||
|
.ne(db::PackageState::PendingDeletion),
|
||||||
|
)
|
||||||
|
.add(
|
||||||
|
Expr::col((p1.clone(), db::package::Column::Arch))
|
||||||
|
.is_in([arch, crate::ANY_ARCH]),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.to_owned();
|
||||||
|
let builder = conn.get_database_backend();
|
||||||
|
let sql = builder.build(&query);
|
||||||
|
|
||||||
|
db::Package::find().from_raw_sql(sql)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
|
// In each repository, only one version of a package can exist for any given arch. Because ids
|
||||||
|
// are monotonically increasing, we know that the row that represents the actual package
|
||||||
|
// currently in the repository is the row with the largest id whose state is "committed". This
|
||||||
|
// query finds this id for each (repo, arch, name) tuple.
|
||||||
let mut max_id_query = Query::select();
|
let mut max_id_query = Query::select();
|
||||||
max_id_query
|
max_id_query
|
||||||
.from(db::package::Entity)
|
.from(db::package::Entity)
|
||||||
|
@ -243,12 +317,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
|
||||||
|
|
||||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
let mut query = Query::select();
|
let mut query = Query::select();
|
||||||
query
|
|
||||||
.from_as(db::package::Entity, p1.clone())
|
// We then perform an inner join between the max id query above and the package table, where we
|
||||||
.columns([
|
// filter on rows whose id is less than their respective package's max id or whose state is set
|
||||||
|
// to "pending deletion". This gives us all rows in the database that correspond to packages
|
||||||
|
// that are no longer needed, and can thus be removed.
|
||||||
|
query.from_as(db::package::Entity, p1.clone());
|
||||||
|
|
||||||
|
if include_repo {
|
||||||
|
query.columns([
|
||||||
(p1.clone(), db::package::Column::RepoId),
|
(p1.clone(), db::package::Column::RepoId),
|
||||||
(p1.clone(), db::package::Column::Id),
|
(p1.clone(), db::package::Column::Id),
|
||||||
])
|
]);
|
||||||
|
} else {
|
||||||
|
query.column((p1.clone(), db::package::Column::Id));
|
||||||
|
}
|
||||||
|
|
||||||
|
query
|
||||||
.join_subquery(
|
.join_subquery(
|
||||||
JoinType::InnerJoin,
|
JoinType::InnerJoin,
|
||||||
max_id_query,
|
max_id_query,
|
||||||
|
@ -277,9 +362,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
|
||||||
Expr::col((p1.clone(), db::package::Column::Id))
|
Expr::col((p1.clone(), db::package::Column::Id))
|
||||||
.eq(db::PackageState::PendingDeletion),
|
.eq(db::PackageState::PendingDeletion),
|
||||||
),
|
),
|
||||||
);
|
)
|
||||||
|
.to_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
||||||
|
let query = stale_pkgs_query(true);
|
||||||
let builder = conn.get_database_backend();
|
let builder = conn.get_database_backend();
|
||||||
let sql = builder.build(&query);
|
let sql = builder.build(&query);
|
||||||
|
|
||||||
PkgToRemove::find_by_statement(sql)
|
PkgToRemove::find_by_statement(sql)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
|
||||||
|
Ok(db::Package::delete_many()
|
||||||
|
.filter(db::package::Column::Id.lte(max_id))
|
||||||
|
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
|
||||||
|
.exec(conn)
|
||||||
|
.await
|
||||||
|
.map(|_| ())?)
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,8 @@ use repo::DistroMgr;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
|
pub const ANY_ARCH: &'static str = "any";
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
data_dir: PathBuf,
|
data_dir: PathBuf,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use super::{archive, package};
|
use super::{archive, package};
|
||||||
use crate::db;
|
use crate::db::{self, query::package::delete_stale_pkgs};
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
|
@ -22,8 +22,6 @@ use tokio::sync::{
|
||||||
};
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub const ANY_ARCH: &'static str = "any";
|
|
||||||
|
|
||||||
struct PkgQueueMsg {
|
struct PkgQueueMsg {
|
||||||
repo: i32,
|
repo: i32,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
@ -103,16 +101,37 @@ impl RepoMgr {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clean any remaining old package files from the database and file system
|
/// Clean any remaining old package files from the database and file system
|
||||||
pub async fn clean(&self) -> crate::Result<()> {
|
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
|
||||||
let mut pkgs = db::query::package::to_be_removed_query(&self.conn)
|
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
|
||||||
.stream(&self.conn)
|
.stream(&self.conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Ids are monotonically increasing, so the max id suffices to know which packages to
|
||||||
|
// remove later
|
||||||
|
let mut max_id = -1;
|
||||||
|
let mut removed_pkgs = 0;
|
||||||
|
|
||||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||||
// TODO remove package from file system and database
|
// Failing to remove the package file isn't the biggest problem
|
||||||
|
let _ = tokio::fs::remove_file(
|
||||||
|
self.repos_dir
|
||||||
|
.join(pkg.repo_id.to_string())
|
||||||
|
.join(pkg.id.to_string()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if pkg.id > max_id {
|
||||||
|
max_id = pkg.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
removed_pkgs += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO log indicating how many packages were cleaned
|
if removed_pkgs > 0 {
|
||||||
|
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("Removed {removed_pkgs} stale package(s)");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -126,18 +145,7 @@ impl RepoMgr {
|
||||||
|
|
||||||
// Query all packages in the repo that have the given architecture or the "any"
|
// Query all packages in the repo that have the given architecture or the "any"
|
||||||
// architecture
|
// architecture
|
||||||
let mut pkgs = db::Package::find()
|
let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
|
||||||
.filter(db::package::Column::RepoId.eq(repo))
|
|
||||||
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
|
|
||||||
.filter(
|
|
||||||
db::package::Column::Id.in_subquery(
|
|
||||||
Query::select()
|
|
||||||
.expr(db::package::Column::Id.max())
|
|
||||||
.from(db::package::Entity)
|
|
||||||
.group_by_columns([db::package::Column::Arch, db::package::Column::Name])
|
|
||||||
.to_owned(),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.stream(&self.conn)
|
.stream(&self.conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -233,7 +241,7 @@ impl RepoMgr {
|
||||||
|
|
||||||
// TODO move this so that we only clean if entire queue is empty, not just
|
// TODO move this so that we only clean if entire queue is empty, not just
|
||||||
// queue for specific repo
|
// queue for specific repo
|
||||||
let _ = self.clean().await;
|
let _ = self.remove_stale_pkgs().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -246,6 +254,19 @@ impl RepoMgr {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
|
||||||
|
Ok(db::Repo::find()
|
||||||
|
.find_also_related(db::Distro)
|
||||||
|
.filter(
|
||||||
|
Condition::all()
|
||||||
|
.add(db::repo::Column::Name.eq(repo))
|
||||||
|
.add(db::distro::Column::Name.eq(distro)),
|
||||||
|
)
|
||||||
|
.one(&self.conn)
|
||||||
|
.await
|
||||||
|
.map(|res| res.map(|(repo, _)| repo.id))?)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
|
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
|
||||||
let mut repos = self.repos.write().await;
|
let mut repos = self.repos.write().await;
|
||||||
|
|
||||||
|
@ -323,6 +344,37 @@ impl RepoMgr {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
|
||||||
|
self.repos.write().await.remove(&repo);
|
||||||
|
db::Repo::delete_by_id(repo).exec(&self.conn).await?;
|
||||||
|
let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove all packages in the repository that have a given arch. This method marks all
|
||||||
|
/// packages with the given architecture as "pending deletion", before performing a manual sync
|
||||||
|
/// & removal of stale packages.
|
||||||
|
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
||||||
|
db::Package::update_many()
|
||||||
|
.col_expr(
|
||||||
|
db::package::Column::State,
|
||||||
|
Expr::value(db::PackageState::PendingDeletion),
|
||||||
|
)
|
||||||
|
.filter(
|
||||||
|
Condition::all()
|
||||||
|
.add(db::package::Column::RepoId.eq(repo))
|
||||||
|
.add(db::package::Column::Arch.eq(arch)),
|
||||||
|
)
|
||||||
|
.exec(&self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.sync_repo(repo).await?;
|
||||||
|
self.remove_stale_pkgs().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
|
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
|
||||||
std::array::from_fn(|_| {
|
std::array::from_fn(|_| {
|
||||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||||
|
|
|
@ -49,25 +49,29 @@ async fn get_file(
|
||||||
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
|
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> crate::Result<impl IntoResponse> {
|
) -> crate::Result<impl IntoResponse> {
|
||||||
let repo_dir = global
|
if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
|
||||||
.config
|
let repo_dir = global
|
||||||
.data_dir
|
.config
|
||||||
.join("distros")
|
.data_dir
|
||||||
.join(&distro)
|
.join("repos")
|
||||||
.join(&repo);
|
.join(repo_id.to_string());
|
||||||
|
|
||||||
let file_name =
|
let file_name =
|
||||||
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
|
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
|
||||||
format!("{}.db.tar.gz", arch)
|
format!("{}.db.tar.gz", arch)
|
||||||
} else if file_name == format!("{}.files", repo)
|
} else if file_name == format!("{}.files", repo)
|
||||||
|| file_name == format!("{}.files.tar.gz", repo)
|
|| file_name == format!("{}.files.tar.gz", repo)
|
||||||
{
|
{
|
||||||
format!("{}.files.tar.gz", arch)
|
format!("{}.files.tar.gz", arch)
|
||||||
} else {
|
} else {
|
||||||
file_name
|
file_name
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await)
|
let path = repo_dir.join(file_name);
|
||||||
|
Ok(ServeFile::new(path).oneshot(req).await)
|
||||||
|
} else {
|
||||||
|
Err(StatusCode::NOT_FOUND.into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn post_package_archive(
|
async fn post_package_archive(
|
||||||
|
@ -84,18 +88,6 @@ async fn post_package_archive(
|
||||||
|
|
||||||
global.mgr.queue_pkg(repo, tmp_path).await;
|
global.mgr.queue_pkg(repo, tmp_path).await;
|
||||||
|
|
||||||
//let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?;
|
|
||||||
//
|
|
||||||
//tracing::info!(
|
|
||||||
// "Added '{}-{}' to repository '{}' ({})",
|
|
||||||
// name,
|
|
||||||
// version,
|
|
||||||
// repo,
|
|
||||||
// arch
|
|
||||||
//);
|
|
||||||
|
|
||||||
//tokio::spawn(async move { mgr.sync_repo(&repo).await });
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,20 +95,15 @@ async fn delete_repo(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Path((distro, repo)): Path<(String, String)>,
|
Path((distro, repo)): Path<(String, String)>,
|
||||||
) -> crate::Result<StatusCode> {
|
) -> crate::Result<StatusCode> {
|
||||||
Ok(StatusCode::NOT_FOUND)
|
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
|
||||||
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
global.mgr.remove_repo(repo).await?;
|
||||||
// let repo_removed = mgr.remove_repo(&repo).await?;
|
|
||||||
//
|
tracing::info!("Removed repository {repo}");
|
||||||
// if repo_removed {
|
|
||||||
// tracing::info!("Removed repository '{}'", repo);
|
Ok(StatusCode::OK)
|
||||||
//
|
} else {
|
||||||
// Ok(StatusCode::OK)
|
Ok(StatusCode::NOT_FOUND)
|
||||||
// } else {
|
}
|
||||||
// Ok(StatusCode::NOT_FOUND)
|
|
||||||
// }
|
|
||||||
//} else {
|
|
||||||
// Ok(StatusCode::NOT_FOUND)
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_arch_repo(
|
async fn delete_arch_repo(
|
||||||
|
|
Loading…
Reference in New Issue