Compare commits
	
		
			2 Commits 
		
	
	
		
			27afb3496d
			...
			97e42588ed
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								
									
								
								 | 
						97e42588ed | |
| 
							
							
								
									
								
								 | 
						e17269ac3b | 
| 
						 | 
				
			
			@ -1,11 +1,17 @@
 | 
			
		|||
api_key = "test"
 | 
			
		||||
port = 8000
 | 
			
		||||
log_level = "tower_http=debug,rieterd=debug"
 | 
			
		||||
pkg_workers = 2
 | 
			
		||||
log_level = "rieterd=debug"
 | 
			
		||||
 | 
			
		||||
[fs]
 | 
			
		||||
type = "locl"
 | 
			
		||||
type = "local"
 | 
			
		||||
data_dir = "./data"
 | 
			
		||||
 | 
			
		||||
[db]
 | 
			
		||||
type = "sqlite"
 | 
			
		||||
db_dir = "./data"
 | 
			
		||||
# [db]
 | 
			
		||||
# type = "postgres"
 | 
			
		||||
# host = "localhost"
 | 
			
		||||
# db = "rieter"
 | 
			
		||||
# user = "rieter"
 | 
			
		||||
# password = "rieter"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,4 @@
 | 
			
		|||
use crate::{distro::MetaDistroMgr, Config, Global};
 | 
			
		||||
use crate::{distro::MetaDistroMgr, Config, FsConfig, Global};
 | 
			
		||||
 | 
			
		||||
use std::{io, path::PathBuf, sync::Arc};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -12,13 +12,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
 | 
			
		|||
#[derive(Parser)]
 | 
			
		||||
#[command(author, version, about, long_about = None)]
 | 
			
		||||
pub struct Cli {
 | 
			
		||||
    /// Directory where repository metadata & SQLite database is stored
 | 
			
		||||
    #[arg(env = "RIETER_DATA_DIR")]
 | 
			
		||||
    pub data_dir: PathBuf,
 | 
			
		||||
    /// API key to authenticate private routes with
 | 
			
		||||
    #[arg(env = "RIETER_API_KEY")]
 | 
			
		||||
    pub api_key: String,
 | 
			
		||||
 | 
			
		||||
    #[arg(
 | 
			
		||||
        short,
 | 
			
		||||
        long,
 | 
			
		||||
| 
						 | 
				
			
			@ -26,89 +19,54 @@ pub struct Cli {
 | 
			
		|||
        default_value = "./rieterd.toml"
 | 
			
		||||
    )]
 | 
			
		||||
    pub config_file: PathBuf,
 | 
			
		||||
 | 
			
		||||
    /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
 | 
			
		||||
    /// data directory
 | 
			
		||||
    #[arg(short, long, env = "RIETER_DATABASE_URL")]
 | 
			
		||||
    pub database_url: Option<String>,
 | 
			
		||||
    /// Port the server will listen on
 | 
			
		||||
    #[arg(
 | 
			
		||||
        short,
 | 
			
		||||
        long,
 | 
			
		||||
        value_name = "PORT",
 | 
			
		||||
        default_value_t = 8000,
 | 
			
		||||
        env = "RIETER_PORT"
 | 
			
		||||
    )]
 | 
			
		||||
    pub port: u16,
 | 
			
		||||
    /// Log levels for the tracing
 | 
			
		||||
    #[arg(
 | 
			
		||||
        long,
 | 
			
		||||
        value_name = "LOG_LEVEL",
 | 
			
		||||
        default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
 | 
			
		||||
        env = "RIETER_LOG"
 | 
			
		||||
    )]
 | 
			
		||||
    pub log: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Cli {
 | 
			
		||||
    pub fn init_tracing(&self) {
 | 
			
		||||
    pub async fn run(&self) -> crate::Result<()> {
 | 
			
		||||
        let config: Config = Config::figment(&self.config_file)
 | 
			
		||||
            .extract()
 | 
			
		||||
            .inspect_err(|e| tracing::error!("{}", e))?;
 | 
			
		||||
 | 
			
		||||
        tracing_subscriber::registry()
 | 
			
		||||
            .with(tracing_subscriber::EnvFilter::new(self.log.clone()))
 | 
			
		||||
            .with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
 | 
			
		||||
            .with(tracing_subscriber::fmt::layer())
 | 
			
		||||
            .init();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn run(&self) -> crate::Result<()> {
 | 
			
		||||
        self.init_tracing();
 | 
			
		||||
        tracing::info!("Connecting to database");
 | 
			
		||||
        let db = crate::db::connect(&config.db).await?;
 | 
			
		||||
 | 
			
		||||
        //tracing::debug!("{:?}", &self.config_file);
 | 
			
		||||
        //let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err(
 | 
			
		||||
        //    |e| tracing::error!("{}", e)
 | 
			
		||||
        //)?;
 | 
			
		||||
        //tracing::debug!("{:?}", new_config);
 | 
			
		||||
 | 
			
		||||
        let db_url = if let Some(url) = &self.database_url {
 | 
			
		||||
            url.clone()
 | 
			
		||||
        } else {
 | 
			
		||||
            format!(
 | 
			
		||||
                "sqlite://{}?mode=rwc",
 | 
			
		||||
                self.data_dir.join("rieter.sqlite").to_string_lossy()
 | 
			
		||||
            )
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        debug!("Connecting to database with URL {}", db_url);
 | 
			
		||||
 | 
			
		||||
        let mut options = sea_orm::ConnectOptions::new(db_url);
 | 
			
		||||
        options.max_connections(16);
 | 
			
		||||
 | 
			
		||||
        let db = sea_orm::Database::connect(options).await?;
 | 
			
		||||
        crate::db::Migrator::up(&db, None).await?;
 | 
			
		||||
 | 
			
		||||
        debug!("Successfully applied migrations");
 | 
			
		||||
 | 
			
		||||
        let config = Config {
 | 
			
		||||
            data_dir: self.data_dir.clone(),
 | 
			
		||||
        let mgr = match &config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mgr =
 | 
			
		||||
            Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?);
 | 
			
		||||
        let mgr = Arc::new(mgr);
 | 
			
		||||
 | 
			
		||||
        for _ in 0..1 {
 | 
			
		||||
        for _ in 0..config.pkg_workers {
 | 
			
		||||
            let clone = Arc::clone(&mgr);
 | 
			
		||||
 | 
			
		||||
            tokio::spawn(async move { clone.pkg_parse_task().await });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let global = Global { config, mgr, db };
 | 
			
		||||
        let global = Global {
 | 
			
		||||
            config: config.clone(),
 | 
			
		||||
            mgr,
 | 
			
		||||
            db,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // build our application with a single route
 | 
			
		||||
        let app = Router::new()
 | 
			
		||||
            .nest("/api", crate::api::router())
 | 
			
		||||
            .merge(crate::repo::router(&self.api_key))
 | 
			
		||||
            .merge(crate::repo::router(&config.api_key))
 | 
			
		||||
            .with_state(global)
 | 
			
		||||
            .layer(TraceLayer::new_for_http());
 | 
			
		||||
 | 
			
		||||
        let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap();
 | 
			
		||||
        let domain: String = format!("{}:{}", config.domain, config.port)
 | 
			
		||||
            .parse()
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(domain).await?;
 | 
			
		||||
        // run it with hyper on localhost:3000
 | 
			
		||||
        Ok(axum::serve(listener, app.into_make_service())
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,34 +6,49 @@ use figment::{
 | 
			
		|||
};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug)]
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
#[serde(rename_all = "lowercase")]
 | 
			
		||||
#[serde(tag = "type")]
 | 
			
		||||
pub enum FsConfig {
 | 
			
		||||
    Local { data_dir: PathBuf },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug)]
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
#[serde(rename_all = "lowercase")]
 | 
			
		||||
#[serde(tag = "type")]
 | 
			
		||||
pub enum DbConfig {
 | 
			
		||||
    Sqlite {
 | 
			
		||||
        db_dir: PathBuf,
 | 
			
		||||
        #[serde(default = "default_db_sqlite_max_connections")]
 | 
			
		||||
        max_connections: u32,
 | 
			
		||||
    },
 | 
			
		||||
    Postgres {
 | 
			
		||||
        host: String,
 | 
			
		||||
        #[serde(default = "default_db_postgres_port")]
 | 
			
		||||
        port: u16,
 | 
			
		||||
        user: String,
 | 
			
		||||
        password: String,
 | 
			
		||||
        db: String,
 | 
			
		||||
        #[serde(default)]
 | 
			
		||||
        schema: String,
 | 
			
		||||
        #[serde(default = "default_db_postgres_max_connections")]
 | 
			
		||||
        max_connections: u32,
 | 
			
		||||
    },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug)]
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    api_key: String,
 | 
			
		||||
    port: u16,
 | 
			
		||||
    log_level: String,
 | 
			
		||||
    fs: FsConfig,
 | 
			
		||||
    db: DbConfig,
 | 
			
		||||
    pub api_key: String,
 | 
			
		||||
    #[serde(default = "default_domain")]
 | 
			
		||||
    pub domain: String,
 | 
			
		||||
    #[serde(default = "default_port")]
 | 
			
		||||
    pub port: u16,
 | 
			
		||||
    #[serde(default = "default_log_level")]
 | 
			
		||||
    pub log_level: String,
 | 
			
		||||
    pub fs: FsConfig,
 | 
			
		||||
    pub db: DbConfig,
 | 
			
		||||
    #[serde(default = "default_pkg_workers")]
 | 
			
		||||
    pub pkg_workers: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Config {
 | 
			
		||||
| 
						 | 
				
			
			@ -43,3 +58,31 @@ impl Config {
 | 
			
		|||
            .merge(Env::prefixed("RIETER_"))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_domain() -> String {
 | 
			
		||||
    String::from("0.0.0.0")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_port() -> u16 {
 | 
			
		||||
    8000
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_log_level() -> String {
 | 
			
		||||
    String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_sqlite_max_connections() -> u32 {
 | 
			
		||||
    16
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_postgres_port() -> u16 {
 | 
			
		||||
    5432
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_postgres_max_connections() -> u32 {
 | 
			
		||||
    16
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_pkg_workers() -> u32 {
 | 
			
		||||
    1
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,10 +2,12 @@ pub mod entities;
 | 
			
		|||
mod migrator;
 | 
			
		||||
pub mod query;
 | 
			
		||||
 | 
			
		||||
use crate::config::DbConfig;
 | 
			
		||||
 | 
			
		||||
pub use entities::{prelude::*, *};
 | 
			
		||||
pub use migrator::Migrator;
 | 
			
		||||
 | 
			
		||||
use sea_orm::{DeriveActiveEnum, EnumIter};
 | 
			
		||||
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
 | 
			
		||||
| 
						 | 
				
			
			@ -50,3 +52,50 @@ pub struct FullPackage {
 | 
			
		|||
    related: Vec<(PackageRelatedEnum, String)>,
 | 
			
		||||
    files: Vec<String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
 | 
			
		||||
    match conn {
 | 
			
		||||
        DbConfig::Sqlite {
 | 
			
		||||
            db_dir,
 | 
			
		||||
            max_connections,
 | 
			
		||||
        } => {
 | 
			
		||||
            let url = format!(
 | 
			
		||||
                "sqlite://{}?mode=rwc",
 | 
			
		||||
                db_dir.join("rieter.sqlite").to_string_lossy()
 | 
			
		||||
            );
 | 
			
		||||
            let options = sea_orm::ConnectOptions::new(url)
 | 
			
		||||
                .max_connections(*max_connections)
 | 
			
		||||
                .to_owned();
 | 
			
		||||
 | 
			
		||||
            let conn = Database::connect(options).await?;
 | 
			
		||||
 | 
			
		||||
            // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
 | 
			
		||||
            // https://www.sqlite.org/pragma.html#pragma_synchronous
 | 
			
		||||
            conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
 | 
			
		||||
            conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
 | 
			
		||||
                .await?;
 | 
			
		||||
 | 
			
		||||
            Ok(conn)
 | 
			
		||||
        }
 | 
			
		||||
        DbConfig::Postgres {
 | 
			
		||||
            host,
 | 
			
		||||
            port,
 | 
			
		||||
            db,
 | 
			
		||||
            user,
 | 
			
		||||
            password,
 | 
			
		||||
            schema,
 | 
			
		||||
            max_connections,
 | 
			
		||||
        } => {
 | 
			
		||||
            let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
 | 
			
		||||
 | 
			
		||||
            if schema != "" {
 | 
			
		||||
                url = format!("{url}?currentSchema={schema}");
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let options = sea_orm::ConnectOptions::new(url)
 | 
			
		||||
                .max_connections(*max_connections)
 | 
			
		||||
                .to_owned();
 | 
			
		||||
            Ok(Database::connect(options).await?)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,7 @@ use crate::db::{self, *};
 | 
			
		|||
 | 
			
		||||
use futures::Stream;
 | 
			
		||||
use sea_orm::{sea_query::IntoCondition, *};
 | 
			
		||||
use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement};
 | 
			
		||||
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
| 
						 | 
				
			
			@ -222,8 +222,8 @@ pub struct PkgToRemove {
 | 
			
		|||
    pub id: i32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn max_pkg_ids_query() -> SelectStatement {
 | 
			
		||||
    Query::select()
 | 
			
		||||
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
 | 
			
		||||
    let mut query = Query::select()
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
| 
						 | 
				
			
			@ -236,39 +236,29 @@ fn max_pkg_ids_query() -> SelectStatement {
 | 
			
		|||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn pkgs_to_sync(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    arch: &str,
 | 
			
		||||
) -> SelectorRaw<SelectModel<package::Model>> {
 | 
			
		||||
    let max_id_query = Query::select()
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .group_by_columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .to_owned();
 | 
			
		||||
 | 
			
		||||
    if committed {
 | 
			
		||||
        query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    query
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Query that returns all packages that should be included in a sync for the given repository and
 | 
			
		||||
/// arch.
 | 
			
		||||
pub fn pkgs_to_sync(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    arch: &str,
 | 
			
		||||
) -> SelectorRaw<SelectModel<package::Model>> {
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let query = Query::select()
 | 
			
		||||
        .column((p1.clone(), Asterisk))
 | 
			
		||||
        .columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_id_query,
 | 
			
		||||
            max_pkg_ids_query(false),
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
| 
						 | 
				
			
			@ -276,13 +266,13 @@ pub fn pkgs_to_sync(
 | 
			
		|||
        .cond_where(
 | 
			
		||||
            Condition::all()
 | 
			
		||||
                .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .ne(db::PackageState::PendingDeletion),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Arch))
 | 
			
		||||
                        .is_in([arch, crate::ANY_ARCH]),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .ne(db::PackageState::PendingDeletion),
 | 
			
		||||
                ),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned();
 | 
			
		||||
| 
						 | 
				
			
			@ -293,36 +283,10 @@ pub fn pkgs_to_sync(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
 | 
			
		||||
    // In each repository, only one version of a package can exist for any given arch. Because ids
 | 
			
		||||
    // are monotonically increasing, we know that the row that represents the actual package
 | 
			
		||||
    // currently in the repository is the row with the largest id whose state is "committed". This
 | 
			
		||||
    // query finds this id for each (repo, arch, name) tuple.
 | 
			
		||||
    let mut max_id_query = Query::select();
 | 
			
		||||
    max_id_query
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
 | 
			
		||||
        .group_by_columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let mut query = Query::select();
 | 
			
		||||
 | 
			
		||||
    // We then perform an inner join between the max id query above and the package table, where we
 | 
			
		||||
    // filter on rows whose id is less than their respective package's max id or whose state is set
 | 
			
		||||
    // to "pending deletion". This gives us all rows in the database that correspond to packages
 | 
			
		||||
    // that are no longer needed, and can thus be removed.
 | 
			
		||||
    query.from_as(db::package::Entity, p1.clone());
 | 
			
		||||
    let mut query = Query::select()
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .to_owned();
 | 
			
		||||
 | 
			
		||||
    if include_repo {
 | 
			
		||||
        query.columns([
 | 
			
		||||
| 
						 | 
				
			
			@ -333,10 +297,13 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
 | 
			
		|||
        query.column((p1.clone(), db::package::Column::Id));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // We left join on the max pkgs query because a repository that has all its packages set to
 | 
			
		||||
    // "pending deletion" doesn't show up in the query. These are also included with a where clause
 | 
			
		||||
    // on the joined rows.
 | 
			
		||||
    query
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_id_query,
 | 
			
		||||
            JoinType::LeftJoin,
 | 
			
		||||
            max_pkg_ids_query(true),
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Condition::all()
 | 
			
		||||
                .add(
 | 
			
		||||
| 
						 | 
				
			
			@ -359,11 +326,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
 | 
			
		|||
                        .lt(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .eq(db::PackageState::PendingDeletion),
 | 
			
		||||
                ),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
    query
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,6 +6,7 @@ mod distro;
 | 
			
		|||
mod error;
 | 
			
		||||
mod repo;
 | 
			
		||||
 | 
			
		||||
pub use config::{Config, DbConfig, FsConfig};
 | 
			
		||||
pub use error::{Result, ServerError};
 | 
			
		||||
use repo::DistroMgr;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -14,14 +15,9 @@ use std::{path::PathBuf, sync::Arc};
 | 
			
		|||
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    data_dir: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Global {
 | 
			
		||||
    config: Config,
 | 
			
		||||
    config: crate::config::Config,
 | 
			
		||||
    mgr: Arc<repo::RepoMgr>,
 | 
			
		||||
    db: sea_orm::DbConn,
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,42 +100,6 @@ impl RepoMgr {
 | 
			
		|||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clean any remaining old package files from the database and file system
 | 
			
		||||
    pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::stale_pkgs(&self.conn)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = tokio::fs::remove_file(
 | 
			
		||||
                self.repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            )
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed {removed_pkgs} stale package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate the archive databases for the given repository and architecture.
 | 
			
		||||
    async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
 | 
			
		||||
| 
						 | 
				
			
			@ -209,6 +173,42 @@ impl RepoMgr {
 | 
			
		|||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clean any remaining old package files from the database and file system
 | 
			
		||||
    pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::stale_pkgs(&self.conn)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = tokio::fs::remove_file(
 | 
			
		||||
                self.repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            )
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed {removed_pkgs} stale package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn pkg_parse_task(&self) {
 | 
			
		||||
        loop {
 | 
			
		||||
            // Receive the next message and immediately drop the mutex afterwards. As long as the
 | 
			
		||||
| 
						 | 
				
			
			@ -248,7 +248,7 @@ impl RepoMgr {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
 | 
			
		||||
        let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo });
 | 
			
		||||
        self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
 | 
			
		||||
        self.repos.read().await.get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
| 
						 | 
				
			
			@ -291,6 +291,7 @@ impl RepoMgr {
 | 
			
		|||
        };
 | 
			
		||||
 | 
			
		||||
        let repo_id: Option<i32> = db::Repo::find()
 | 
			
		||||
            .filter(db::repo::Column::DistroId.eq(distro_id))
 | 
			
		||||
            .filter(db::repo::Column::Name.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,6 +6,8 @@ pub mod package;
 | 
			
		|||
pub use manager::DistroMgr;
 | 
			
		||||
pub use manager2::RepoMgr;
 | 
			
		||||
 | 
			
		||||
use crate::FsConfig;
 | 
			
		||||
 | 
			
		||||
use axum::{
 | 
			
		||||
    body::Body,
 | 
			
		||||
    extract::{Path, State},
 | 
			
		||||
| 
						 | 
				
			
			@ -50,25 +52,26 @@ async fn get_file(
 | 
			
		|||
    req: Request<Body>,
 | 
			
		||||
) -> crate::Result<impl IntoResponse> {
 | 
			
		||||
    if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        let repo_dir = global
 | 
			
		||||
            .config
 | 
			
		||||
            .data_dir
 | 
			
		||||
            .join("repos")
 | 
			
		||||
            .join(repo_id.to_string());
 | 
			
		||||
        match global.config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                let repo_dir = data_dir.join("repos").join(repo_id.to_string());
 | 
			
		||||
 | 
			
		||||
        let file_name =
 | 
			
		||||
            if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
 | 
			
		||||
                format!("{}.db.tar.gz", arch)
 | 
			
		||||
            } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
            {
 | 
			
		||||
                format!("{}.files.tar.gz", arch)
 | 
			
		||||
            } else {
 | 
			
		||||
                file_name
 | 
			
		||||
            };
 | 
			
		||||
                let file_name = if file_name == format!("{}.db", repo)
 | 
			
		||||
                    || file_name == format!("{}.db.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.db.tar.gz", arch)
 | 
			
		||||
                } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                    || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.files.tar.gz", arch)
 | 
			
		||||
                } else {
 | 
			
		||||
                    file_name
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
        let path = repo_dir.join(file_name);
 | 
			
		||||
        Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
                let path = repo_dir.join(file_name);
 | 
			
		||||
                Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        Err(StatusCode::NOT_FOUND.into())
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -78,7 +81,7 @@ async fn post_package_archive(
 | 
			
		|||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
    body: Body,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
 | 
			
		||||
    let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
 | 
			
		||||
    let [tmp_path] = global.mgr.random_file_paths();
 | 
			
		||||
| 
						 | 
				
			
			@ -88,7 +91,7 @@ async fn post_package_archive(
 | 
			
		|||
 | 
			
		||||
    global.mgr.queue_pkg(repo, tmp_path).await;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
    Ok(StatusCode::ACCEPTED)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_repo(
 | 
			
		||||
| 
						 | 
				
			
			@ -110,7 +113,15 @@ async fn delete_arch_repo(
 | 
			
		|||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch)): Path<(String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo_arch(repo, &arch).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed architecture '{arch}' from repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
 | 
			
		||||
    //
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue