Compare commits
	
		
			3 Commits 
		
	
	
		
			a408c14ab1
			...
			27afb3496d
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								
									
								
								 | 
						27afb3496d | |
| 
							
							
								
									
								
								 | 
						5d7832c43a | |
| 
							
							
								
									
								
								 | 
						67b4640e56 | 
| 
						 | 
				
			
			@ -44,7 +44,7 @@ pub struct Cli {
 | 
			
		|||
    #[arg(
 | 
			
		||||
        long,
 | 
			
		||||
        value_name = "LOG_LEVEL",
 | 
			
		||||
        default_value = "tower_http=debug,rieterd=debug",
 | 
			
		||||
        default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
 | 
			
		||||
        env = "RIETER_LOG"
 | 
			
		||||
    )]
 | 
			
		||||
    pub log: String,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,7 @@ use crate::db::{self, *};
 | 
			
		|||
 | 
			
		||||
use futures::Stream;
 | 
			
		||||
use sea_orm::{sea_query::IntoCondition, *};
 | 
			
		||||
use sea_query::{Alias, Expr, Query};
 | 
			
		||||
use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
| 
						 | 
				
			
			@ -218,11 +218,85 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
 | 
			
		|||
 | 
			
		||||
#[derive(FromQueryResult)]
 | 
			
		||||
pub struct PkgToRemove {
 | 
			
		||||
    repo_id: i32,
 | 
			
		||||
    id: i32,
 | 
			
		||||
    pub repo_id: i32,
 | 
			
		||||
    pub id: i32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
 | 
			
		||||
fn max_pkg_ids_query() -> SelectStatement {
 | 
			
		||||
    Query::select()
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
 | 
			
		||||
        .group_by_columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn pkgs_to_sync(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    arch: &str,
 | 
			
		||||
) -> SelectorRaw<SelectModel<package::Model>> {
 | 
			
		||||
    let max_id_query = Query::select()
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .group_by_columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .to_owned();
 | 
			
		||||
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let query = Query::select()
 | 
			
		||||
        .column((p1.clone(), Asterisk))
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_id_query,
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
        )
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::all()
 | 
			
		||||
                .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .ne(db::PackageState::PendingDeletion),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Arch))
 | 
			
		||||
                        .is_in([arch, crate::ANY_ARCH]),
 | 
			
		||||
                ),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned();
 | 
			
		||||
    let builder = conn.get_database_backend();
 | 
			
		||||
    let sql = builder.build(&query);
 | 
			
		||||
 | 
			
		||||
    db::Package::find().from_raw_sql(sql)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
 | 
			
		||||
    // In each repository, only one version of a package can exist for any given arch. Because ids
 | 
			
		||||
    // are monotonically increasing, we know that the row that represents the actual package
 | 
			
		||||
    // currently in the repository is the row with the largest id whose state is "committed". This
 | 
			
		||||
    // query finds this id for each (repo, arch, name) tuple.
 | 
			
		||||
    let mut max_id_query = Query::select();
 | 
			
		||||
    max_id_query
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
| 
						 | 
				
			
			@ -243,12 +317,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
 | 
			
		|||
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let mut query = Query::select();
 | 
			
		||||
    query
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .columns([
 | 
			
		||||
 | 
			
		||||
    // We then perform an inner join between the max id query above and the package table, where we
 | 
			
		||||
    // filter on rows whose id is less than their respective package's max id or whose state is set
 | 
			
		||||
    // to "pending deletion". This gives us all rows in the database that correspond to packages
 | 
			
		||||
    // that are no longer needed, and can thus be removed.
 | 
			
		||||
    query.from_as(db::package::Entity, p1.clone());
 | 
			
		||||
 | 
			
		||||
    if include_repo {
 | 
			
		||||
        query.columns([
 | 
			
		||||
            (p1.clone(), db::package::Column::RepoId),
 | 
			
		||||
            (p1.clone(), db::package::Column::Id),
 | 
			
		||||
        ])
 | 
			
		||||
        ]);
 | 
			
		||||
    } else {
 | 
			
		||||
        query.column((p1.clone(), db::package::Column::Id));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    query
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_id_query,
 | 
			
		||||
| 
						 | 
				
			
			@ -277,9 +362,23 @@ pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove
 | 
			
		|||
                    Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                        .eq(db::PackageState::PendingDeletion),
 | 
			
		||||
                ),
 | 
			
		||||
        );
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
 | 
			
		||||
    let query = stale_pkgs_query(true);
 | 
			
		||||
    let builder = conn.get_database_backend();
 | 
			
		||||
    let sql = builder.build(&query);
 | 
			
		||||
 | 
			
		||||
    PkgToRemove::find_by_statement(sql)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
 | 
			
		||||
    Ok(db::Package::delete_many()
 | 
			
		||||
        .filter(db::package::Column::Id.lte(max_id))
 | 
			
		||||
        .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
 | 
			
		||||
        .exec(conn)
 | 
			
		||||
        .await
 | 
			
		||||
        .map(|_| ())?)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,6 +12,8 @@ use repo::DistroMgr;
 | 
			
		|||
use clap::Parser;
 | 
			
		||||
use std::{path::PathBuf, sync::Arc};
 | 
			
		||||
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    data_dir: PathBuf,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,5 @@
 | 
			
		|||
use super::{archive, package};
 | 
			
		||||
use crate::db;
 | 
			
		||||
use crate::db::{self, query::package::delete_stale_pkgs};
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
| 
						 | 
				
			
			@ -22,8 +22,6 @@ use tokio::sync::{
 | 
			
		|||
};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
struct PkgQueueMsg {
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    path: PathBuf,
 | 
			
		||||
| 
						 | 
				
			
			@ -103,16 +101,37 @@ impl RepoMgr {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clean any remaining old package files from the database and file system
 | 
			
		||||
    pub async fn clean(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::to_be_removed_query(&self.conn)
 | 
			
		||||
    pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::stale_pkgs(&self.conn)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            // TODO remove package from file system and database
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = tokio::fs::remove_file(
 | 
			
		||||
                self.repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            )
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // TODO log indicating how many packages were cleaned
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed {removed_pkgs} stale package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -126,18 +145,7 @@ impl RepoMgr {
 | 
			
		|||
 | 
			
		||||
        // Query all packages in the repo that have the given architecture or the "any"
 | 
			
		||||
        // architecture
 | 
			
		||||
        let mut pkgs = db::Package::find()
 | 
			
		||||
            .filter(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
            .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
 | 
			
		||||
            .filter(
 | 
			
		||||
                db::package::Column::Id.in_subquery(
 | 
			
		||||
                    Query::select()
 | 
			
		||||
                        .expr(db::package::Column::Id.max())
 | 
			
		||||
                        .from(db::package::Entity)
 | 
			
		||||
                        .group_by_columns([db::package::Column::Arch, db::package::Column::Name])
 | 
			
		||||
                        .to_owned(),
 | 
			
		||||
                ),
 | 
			
		||||
            )
 | 
			
		||||
        let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -233,7 +241,7 @@ impl RepoMgr {
 | 
			
		|||
 | 
			
		||||
                    // TODO move this so that we only clean if entire queue is empty, not just
 | 
			
		||||
                    // queue for specific repo
 | 
			
		||||
                    let _ = self.clean().await;
 | 
			
		||||
                    let _ = self.remove_stale_pkgs().await;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
| 
						 | 
				
			
			@ -246,6 +254,19 @@ impl RepoMgr {
 | 
			
		|||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
 | 
			
		||||
        Ok(db::Repo::find()
 | 
			
		||||
            .find_also_related(db::Distro)
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::repo::Column::Name.eq(repo))
 | 
			
		||||
                    .add(db::distro::Column::Name.eq(distro)),
 | 
			
		||||
            )
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await
 | 
			
		||||
            .map(|res| res.map(|(repo, _)| repo.id))?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
 | 
			
		||||
        let mut repos = self.repos.write().await;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -323,6 +344,37 @@ impl RepoMgr {
 | 
			
		|||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        self.repos.write().await.remove(&repo);
 | 
			
		||||
        db::Repo::delete_by_id(repo).exec(&self.conn).await?;
 | 
			
		||||
        let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove all packages in the repository that have a given arch. This method marks all
 | 
			
		||||
    /// packages with the given architecture as "pending deletion", before performing a manual sync
 | 
			
		||||
    /// & removal of stale packages.
 | 
			
		||||
    pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::PendingDeletion),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
                    .add(db::package::Column::Arch.eq(arch)),
 | 
			
		||||
            )
 | 
			
		||||
            .exec(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        self.sync_repo(repo).await?;
 | 
			
		||||
        self.remove_stale_pkgs().await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
 | 
			
		||||
        std::array::from_fn(|_| {
 | 
			
		||||
            let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -49,25 +49,29 @@ async fn get_file(
 | 
			
		|||
    Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
 | 
			
		||||
    req: Request<Body>,
 | 
			
		||||
) -> crate::Result<impl IntoResponse> {
 | 
			
		||||
    let repo_dir = global
 | 
			
		||||
        .config
 | 
			
		||||
        .data_dir
 | 
			
		||||
        .join("distros")
 | 
			
		||||
        .join(&distro)
 | 
			
		||||
        .join(&repo);
 | 
			
		||||
    if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        let repo_dir = global
 | 
			
		||||
            .config
 | 
			
		||||
            .data_dir
 | 
			
		||||
            .join("repos")
 | 
			
		||||
            .join(repo_id.to_string());
 | 
			
		||||
 | 
			
		||||
    let file_name =
 | 
			
		||||
        if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
 | 
			
		||||
            format!("{}.db.tar.gz", arch)
 | 
			
		||||
        } else if file_name == format!("{}.files", repo)
 | 
			
		||||
            || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
        {
 | 
			
		||||
            format!("{}.files.tar.gz", arch)
 | 
			
		||||
        } else {
 | 
			
		||||
            file_name
 | 
			
		||||
        };
 | 
			
		||||
        let file_name =
 | 
			
		||||
            if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
 | 
			
		||||
                format!("{}.db.tar.gz", arch)
 | 
			
		||||
            } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
            {
 | 
			
		||||
                format!("{}.files.tar.gz", arch)
 | 
			
		||||
            } else {
 | 
			
		||||
                file_name
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
    Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await)
 | 
			
		||||
        let path = repo_dir.join(file_name);
 | 
			
		||||
        Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
    } else {
 | 
			
		||||
        Err(StatusCode::NOT_FOUND.into())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn post_package_archive(
 | 
			
		||||
| 
						 | 
				
			
			@ -84,18 +88,6 @@ async fn post_package_archive(
 | 
			
		|||
 | 
			
		||||
    global.mgr.queue_pkg(repo, tmp_path).await;
 | 
			
		||||
 | 
			
		||||
    //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //tracing::info!(
 | 
			
		||||
    //    "Added '{}-{}' to repository '{}' ({})",
 | 
			
		||||
    //    name,
 | 
			
		||||
    //    version,
 | 
			
		||||
    //    repo,
 | 
			
		||||
    //    arch
 | 
			
		||||
    //);
 | 
			
		||||
 | 
			
		||||
    //tokio::spawn(async move { mgr.sync_repo(&repo).await });
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -103,20 +95,15 @@ async fn delete_repo(
 | 
			
		|||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let repo_removed = mgr.remove_repo(&repo).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if repo_removed {
 | 
			
		||||
    //        tracing::info!("Removed repository '{}'", repo);
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo(repo).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_arch_repo(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue