Compare commits
	
		
			10 Commits 
		
	
	
		
			76395afb10
			...
			d375df0ff4
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								
									
								
								 | 
						d375df0ff4 | |
| 
							
							
								
									
								
								 | 
						a6de2c3c14 | |
| 
							
							
								
									
								
								 | 
						412d1e65f1 | |
| 
							
							
								
									
								
								 | 
						bde3b90711 | |
| 
							
							
								
									
								
								 | 
						042f1ecbd3 | |
| 
							
							
								
									
								
								 | 
						9237add869 | |
| 
							
							
								
									
								
								 | 
						a7c0d3e062 | |
| 
							
							
								
									
								
								 | 
						80d5291508 | |
| 
							
							
								
									
								
								 | 
						656df06b4e | |
| 
							
							
								
									
								
								 | 
						8864925e58 | 
| 
						 | 
				
			
			@ -1,12 +1,6 @@
 | 
			
		|||
use crate::{Config, FsConfig, Global};
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
 | 
			
		||||
use std::{io, path::PathBuf, sync::Arc};
 | 
			
		||||
 | 
			
		||||
use axum::Router;
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use sea_orm_migration::MigratorTrait;
 | 
			
		||||
use tower_http::trace::TraceLayer;
 | 
			
		||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
 | 
			
		||||
 | 
			
		||||
#[derive(Parser)]
 | 
			
		||||
#[command(author, version, about, long_about = None)]
 | 
			
		||||
| 
						 | 
				
			
			@ -19,57 +13,3 @@ pub struct Cli {
 | 
			
		|||
    )]
 | 
			
		||||
    pub config_file: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Cli {
 | 
			
		||||
    pub async fn run(&self) -> crate::Result<()> {
 | 
			
		||||
        let config: Config = Config::figment(&self.config_file)
 | 
			
		||||
            .extract()
 | 
			
		||||
            .inspect_err(|e| tracing::error!("{}", e))?;
 | 
			
		||||
 | 
			
		||||
        tracing_subscriber::registry()
 | 
			
		||||
            .with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
 | 
			
		||||
            .with(tracing_subscriber::fmt::layer())
 | 
			
		||||
            .init();
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Connecting to database");
 | 
			
		||||
        let db = crate::db::connect(&config.db).await?;
 | 
			
		||||
 | 
			
		||||
        crate::db::Migrator::up(&db, None).await?;
 | 
			
		||||
 | 
			
		||||
        let mgr = match &config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mgr = Arc::new(mgr);
 | 
			
		||||
 | 
			
		||||
        for _ in 0..config.pkg_workers {
 | 
			
		||||
            let clone = Arc::clone(&mgr);
 | 
			
		||||
 | 
			
		||||
            tokio::spawn(async move { clone.pkg_parse_task().await });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let global = Global {
 | 
			
		||||
            config: config.clone(),
 | 
			
		||||
            mgr,
 | 
			
		||||
            db,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // build our application with a single route
 | 
			
		||||
        let app = Router::new()
 | 
			
		||||
            .nest("/api", crate::api::router())
 | 
			
		||||
            .merge(crate::repo::router(&config.api_key))
 | 
			
		||||
            .with_state(global)
 | 
			
		||||
            .layer(TraceLayer::new_for_http());
 | 
			
		||||
 | 
			
		||||
        let domain: String = format!("{}:{}", config.domain, config.port)
 | 
			
		||||
            .parse()
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(domain).await?;
 | 
			
		||||
        // run it with hyper on localhost:3000
 | 
			
		||||
        Ok(axum::serve(listener, app.into_make_service())
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,3 @@
 | 
			
		|||
pub mod distro;
 | 
			
		||||
pub mod package;
 | 
			
		||||
pub mod repo;
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,8 +1,7 @@
 | 
			
		|||
use crate::db::{self, *};
 | 
			
		||||
 | 
			
		||||
use futures::Stream;
 | 
			
		||||
use sea_orm::{sea_query::IntoCondition, *};
 | 
			
		||||
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
 | 
			
		||||
use sea_query::{Alias, Expr, Query, SelectStatement};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,28 +1,96 @@
 | 
			
		|||
mod api;
 | 
			
		||||
mod cli;
 | 
			
		||||
mod config;
 | 
			
		||||
pub mod db;
 | 
			
		||||
mod error;
 | 
			
		||||
mod repo;
 | 
			
		||||
mod web;
 | 
			
		||||
 | 
			
		||||
pub use config::{Config, DbConfig, FsConfig};
 | 
			
		||||
pub use error::{Result, ServerError};
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::{io, path::PathBuf};
 | 
			
		||||
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use sea_orm_migration::MigratorTrait;
 | 
			
		||||
use tokio::runtime;
 | 
			
		||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
 | 
			
		||||
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Global {
 | 
			
		||||
    config: crate::config::Config,
 | 
			
		||||
    mgr: Arc<repo::RepoMgr>,
 | 
			
		||||
    repo: repo::Handle,
 | 
			
		||||
    db: sea_orm::DbConn,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
async fn main() -> crate::Result<()> {
 | 
			
		||||
fn main() -> crate::Result<()> {
 | 
			
		||||
    let rt = tokio::runtime::Builder::new_multi_thread()
 | 
			
		||||
        .enable_all()
 | 
			
		||||
        .build()
 | 
			
		||||
        .unwrap();
 | 
			
		||||
    let handle = rt.handle();
 | 
			
		||||
 | 
			
		||||
    let cli = cli::Cli::parse();
 | 
			
		||||
    cli.run().await
 | 
			
		||||
    let global = setup(handle, cli.config_file)?;
 | 
			
		||||
 | 
			
		||||
    handle.block_on(run(global))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
 | 
			
		||||
    let config: Config = Config::figment(config_file)
 | 
			
		||||
        .extract()
 | 
			
		||||
        .inspect_err(|e| tracing::error!("{}", e))?;
 | 
			
		||||
 | 
			
		||||
    tracing_subscriber::registry()
 | 
			
		||||
        .with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
 | 
			
		||||
        .with(tracing_subscriber::fmt::layer())
 | 
			
		||||
        .init();
 | 
			
		||||
 | 
			
		||||
    tracing::info!("Connecting to database");
 | 
			
		||||
    let db = rt.block_on(crate::db::connect(&config.db))?;
 | 
			
		||||
    rt.block_on(crate::db::Migrator::up(&db, None))?;
 | 
			
		||||
 | 
			
		||||
    let repo = match &config.fs {
 | 
			
		||||
        FsConfig::Local { data_dir } => {
 | 
			
		||||
            crate::repo::start(
 | 
			
		||||
                data_dir.join("repos"),
 | 
			
		||||
                db.clone(),
 | 
			
		||||
                rt.clone(),
 | 
			
		||||
                config.pkg_workers,
 | 
			
		||||
            )?
 | 
			
		||||
            //rt.block_on(crate::repo::RepoMgr::new(
 | 
			
		||||
            //    data_dir.join("repos"),
 | 
			
		||||
            //    db.clone(),
 | 
			
		||||
            //))?
 | 
			
		||||
            //RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())?
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
    //let mgr = Arc::new(mgr);
 | 
			
		||||
    //
 | 
			
		||||
    //for _ in 0..config.pkg_workers {
 | 
			
		||||
    //    let clone = Arc::clone(&mgr);
 | 
			
		||||
    //
 | 
			
		||||
    //    rt.spawn(async move { clone.pkg_parse_task().await });
 | 
			
		||||
    //}
 | 
			
		||||
 | 
			
		||||
    Ok(Global {
 | 
			
		||||
        config: config.clone(),
 | 
			
		||||
        repo,
 | 
			
		||||
        db,
 | 
			
		||||
    })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn run(global: Global) -> crate::Result<()> {
 | 
			
		||||
    let domain: String = format!("{}:{}", &global.config.domain, global.config.port)
 | 
			
		||||
        .parse()
 | 
			
		||||
        .unwrap();
 | 
			
		||||
    let listener = tokio::net::TcpListener::bind(domain).await?;
 | 
			
		||||
 | 
			
		||||
    let app = web::router(global);
 | 
			
		||||
 | 
			
		||||
    // run it with hyper on localhost:3000
 | 
			
		||||
    Ok(axum::serve(listener, app.into_make_service())
 | 
			
		||||
        .await
 | 
			
		||||
        .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,245 @@
 | 
			
		|||
use super::{archive, package, Command, SharedState};
 | 
			
		||||
use crate::db;
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    path::PathBuf,
 | 
			
		||||
    sync::{atomic::Ordering, Arc},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
 | 
			
		||||
use sea_query::Expr;
 | 
			
		||||
use tokio::{runtime, sync::mpsc};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
/// The actor is responsible for mutating the repositories. They receive their commands their
 | 
			
		||||
/// messages and process these commands in both a synchronous and asynchronous way.
 | 
			
		||||
pub struct Actor {
 | 
			
		||||
    rt: runtime::Handle,
 | 
			
		||||
    state: Arc<SharedState>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Actor {
 | 
			
		||||
    pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            rt,
 | 
			
		||||
            state: Arc::clone(&state),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
 | 
			
		||||
        std::array::from_fn(|_| {
 | 
			
		||||
            let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
 | 
			
		||||
            self.state.repos_dir.join(uuid.to_string())
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Run the main actor loop
 | 
			
		||||
    pub fn run(self) {
 | 
			
		||||
        while let Some(msg) = {
 | 
			
		||||
            let mut rx = self.state.rx.lock().unwrap();
 | 
			
		||||
            rx.blocking_recv()
 | 
			
		||||
        } {
 | 
			
		||||
            match msg {
 | 
			
		||||
                Command::ParsePkg(repo, path) => {
 | 
			
		||||
                    let _ = self.parse_pkg(repo, path);
 | 
			
		||||
 | 
			
		||||
                    if self
 | 
			
		||||
                        .state
 | 
			
		||||
                        .repos
 | 
			
		||||
                        .blocking_read()
 | 
			
		||||
                        .get(&repo)
 | 
			
		||||
                        .map(|n| n.0.load(Ordering::SeqCst))
 | 
			
		||||
                        == Some(0)
 | 
			
		||||
                    {
 | 
			
		||||
                        let _ = self.sync_repo(repo);
 | 
			
		||||
                        let _ = self.clean();
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Command::SyncRepo(repo) => {
 | 
			
		||||
                    let _ = self.sync_repo(repo);
 | 
			
		||||
                }
 | 
			
		||||
                Command::Clean => {
 | 
			
		||||
                    let _ = self.clean();
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Parse a queued package for the given repository.
 | 
			
		||||
    fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> {
 | 
			
		||||
        let pkg = package::Package::open(&path)?;
 | 
			
		||||
        let pkg = self
 | 
			
		||||
            .rt
 | 
			
		||||
            .block_on(db::query::package::insert(&self.state.conn, repo, pkg))?;
 | 
			
		||||
        let dest_path = self
 | 
			
		||||
            .state
 | 
			
		||||
            .repos_dir
 | 
			
		||||
            .join(repo.to_string())
 | 
			
		||||
            .join(pkg.id.to_string());
 | 
			
		||||
        std::fs::rename(path, dest_path)?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!(
 | 
			
		||||
            "Added '{}-{}-{}' to repository {}",
 | 
			
		||||
            pkg.name,
 | 
			
		||||
            pkg.version,
 | 
			
		||||
            pkg.arch,
 | 
			
		||||
            repo,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        self.state.repos.blocking_read().get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_sub(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn sync_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        let repos = self.state.repos.blocking_read();
 | 
			
		||||
 | 
			
		||||
        if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
 | 
			
		||||
            let archs: Vec<String> = self.rt.block_on(
 | 
			
		||||
                db::Package::find()
 | 
			
		||||
                    .filter(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
                    .select_only()
 | 
			
		||||
                    .column(db::package::Column::Arch)
 | 
			
		||||
                    .distinct()
 | 
			
		||||
                    .into_tuple()
 | 
			
		||||
                    .all(&self.state.conn),
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            for arch in archs {
 | 
			
		||||
                self.generate_archives(repo, &arch)?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths();
 | 
			
		||||
 | 
			
		||||
        let mut ars = archive::RepoArchivesWriter::new(
 | 
			
		||||
            &tmp_ar_db_path,
 | 
			
		||||
            &tmp_ar_files_path,
 | 
			
		||||
            self.random_file_paths(),
 | 
			
		||||
            &self.rt,
 | 
			
		||||
            &self.state.conn,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
 | 
			
		||||
        let conn = self.state.conn.clone();
 | 
			
		||||
        let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch);
 | 
			
		||||
 | 
			
		||||
        // sea_orm needs its connections to be dropped inside an async context, so we spawn a task
 | 
			
		||||
        // that streams the responses to the synchronous context via message passing
 | 
			
		||||
        self.rt.spawn(async move {
 | 
			
		||||
            match query.stream(&conn).await {
 | 
			
		||||
                Ok(mut stream) => {
 | 
			
		||||
                    while let Some(res) = stream.next().await {
 | 
			
		||||
                        let is_err = res.is_err();
 | 
			
		||||
                        let _ = tx.send(res).await;
 | 
			
		||||
 | 
			
		||||
                        if is_err {
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    let _ = tx.send(Err(err)).await;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let mut committed_ids: Vec<i32> = Vec::new();
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = rx.blocking_recv().transpose()? {
 | 
			
		||||
            committed_ids.push(pkg.id);
 | 
			
		||||
            ars.append_pkg(&pkg)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ars.close()?;
 | 
			
		||||
 | 
			
		||||
        // Move newly generated package archives to their correct place
 | 
			
		||||
        let repo_dir = self.state.repos_dir.join(repo.to_string());
 | 
			
		||||
        std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?;
 | 
			
		||||
        std::fs::rename(
 | 
			
		||||
            tmp_ar_files_path,
 | 
			
		||||
            repo_dir.join(format!("{}.files.tar.gz", arch)),
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        // Update the state for the newly committed packages
 | 
			
		||||
        self.rt.block_on(
 | 
			
		||||
            db::Package::update_many()
 | 
			
		||||
                .col_expr(
 | 
			
		||||
                    db::package::Column::State,
 | 
			
		||||
                    Expr::value(db::PackageState::Committed),
 | 
			
		||||
                )
 | 
			
		||||
                .filter(db::package::Column::Id.is_in(committed_ids))
 | 
			
		||||
                .exec(&self.state.conn),
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn clean(&self) -> crate::Result<()> {
 | 
			
		||||
        let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
        let conn = self.state.conn.clone();
 | 
			
		||||
        let query = db::query::package::stale_pkgs(&self.state.conn);
 | 
			
		||||
 | 
			
		||||
        // sea_orm needs its connections to be dropped inside an async context, so we spawn a task
 | 
			
		||||
        // that streams the responses to the synchronous context via message passing
 | 
			
		||||
        self.rt.spawn(async move {
 | 
			
		||||
            match query.stream(&conn).await {
 | 
			
		||||
                Ok(mut stream) => {
 | 
			
		||||
                    while let Some(res) = stream.next().await {
 | 
			
		||||
                        let is_err = res.is_err();
 | 
			
		||||
                        let _ = tx.send(res).await;
 | 
			
		||||
 | 
			
		||||
                        if is_err {
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    let _ = tx.send(Err(err)).await;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = rx.blocking_recv().transpose()? {
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = std::fs::remove_file(
 | 
			
		||||
                self.state
 | 
			
		||||
                    .repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            self.rt.block_on(db::query::package::delete_stale_pkgs(
 | 
			
		||||
                &self.state.conn,
 | 
			
		||||
                max_id,
 | 
			
		||||
            ))?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Cleaned up {removed_pkgs} old package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,78 +1,220 @@
 | 
			
		|||
use crate::db;
 | 
			
		||||
use std::{
 | 
			
		||||
    io,
 | 
			
		||||
    io::Write,
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
    sync::{Arc, Mutex},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use libarchive::{
 | 
			
		||||
    write::{Builder, FileWriter, WriteEntry},
 | 
			
		||||
    Entry, WriteFilter, WriteFormat,
 | 
			
		||||
};
 | 
			
		||||
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
 | 
			
		||||
use tokio::{runtime, sync::mpsc};
 | 
			
		||||
 | 
			
		||||
/// Struct to abstract away the intrinsics of writing entries to an archive file
 | 
			
		||||
pub struct RepoArchiveWriter {
 | 
			
		||||
    ar: Arc<Mutex<FileWriter>>,
 | 
			
		||||
pub struct RepoArchivesWriter {
 | 
			
		||||
    ar_db: FileWriter,
 | 
			
		||||
    ar_files: FileWriter,
 | 
			
		||||
    rt: runtime::Handle,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    tmp_paths: [PathBuf; 2],
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RepoArchiveWriter {
 | 
			
		||||
    pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
 | 
			
		||||
        let path = PathBuf::from(path.as_ref());
 | 
			
		||||
impl RepoArchivesWriter {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        ar_db_path: impl AsRef<Path>,
 | 
			
		||||
        ar_files_path: impl AsRef<Path>,
 | 
			
		||||
        tmp_paths: [impl AsRef<Path>; 2],
 | 
			
		||||
        rt: &runtime::Handle,
 | 
			
		||||
        conn: &sea_orm::DbConn,
 | 
			
		||||
    ) -> crate::Result<Self> {
 | 
			
		||||
        let ar_db = Self::open_ar(ar_db_path)?;
 | 
			
		||||
        let ar_files = Self::open_ar(ar_files_path)?;
 | 
			
		||||
 | 
			
		||||
        // Open the archive file
 | 
			
		||||
        let ar = tokio::task::spawn_blocking(move || {
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            ar_db,
 | 
			
		||||
            ar_files,
 | 
			
		||||
            rt: rt.clone(),
 | 
			
		||||
            conn: conn.clone(),
 | 
			
		||||
            tmp_paths: [
 | 
			
		||||
                tmp_paths[0].as_ref().to_path_buf(),
 | 
			
		||||
                tmp_paths[1].as_ref().to_path_buf(),
 | 
			
		||||
            ],
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn open_ar(path: impl AsRef<Path>) -> crate::Result<FileWriter> {
 | 
			
		||||
        let mut builder = Builder::new();
 | 
			
		||||
        builder.add_filter(WriteFilter::Gzip)?;
 | 
			
		||||
        builder.set_format(WriteFormat::PaxRestricted)?;
 | 
			
		||||
 | 
			
		||||
            builder.open_file(path)
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
        .unwrap()?;
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            // In practice, mutex is only ever used by one thread at a time. It's simply here so we
 | 
			
		||||
            // can use spawn_blocking without issues.
 | 
			
		||||
            ar: Arc::new(Mutex::new(ar)),
 | 
			
		||||
        })
 | 
			
		||||
        Ok(builder.open_file(path)?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Add either a "desc" or "files" entry to the archive
 | 
			
		||||
    pub async fn add_entry<P: AsRef<Path>>(
 | 
			
		||||
        &self,
 | 
			
		||||
        full_name: &str,
 | 
			
		||||
        path: P,
 | 
			
		||||
        desc: bool,
 | 
			
		||||
    ) -> io::Result<()> {
 | 
			
		||||
        let metadata = tokio::fs::metadata(&path).await?;
 | 
			
		||||
    fn append_entry(
 | 
			
		||||
        ar: &mut FileWriter,
 | 
			
		||||
        src_path: impl AsRef<Path>,
 | 
			
		||||
        dest_path: impl AsRef<Path>,
 | 
			
		||||
    ) -> crate::Result<()> {
 | 
			
		||||
        let metadata = std::fs::metadata(&src_path)?;
 | 
			
		||||
        let file_size = metadata.len();
 | 
			
		||||
 | 
			
		||||
        let ar = Arc::clone(&self.ar);
 | 
			
		||||
        let full_name = String::from(full_name);
 | 
			
		||||
        let path = PathBuf::from(path.as_ref());
 | 
			
		||||
 | 
			
		||||
        Ok(tokio::task::spawn_blocking(move || {
 | 
			
		||||
        let mut ar_entry = WriteEntry::new();
 | 
			
		||||
        ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
 | 
			
		||||
 | 
			
		||||
            let name = if desc { "desc" } else { "files" };
 | 
			
		||||
 | 
			
		||||
            ar_entry.set_pathname(PathBuf::from(full_name).join(name));
 | 
			
		||||
        ar_entry.set_pathname(dest_path);
 | 
			
		||||
        ar_entry.set_mode(0o100644);
 | 
			
		||||
        ar_entry.set_size(file_size.try_into().unwrap());
 | 
			
		||||
 | 
			
		||||
            ar.lock().unwrap().append_path(&mut ar_entry, path)
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
        .unwrap()?)
 | 
			
		||||
        Ok(ar.append_path(&mut ar_entry, src_path)?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn close(&self) -> io::Result<()> {
 | 
			
		||||
        let ar = Arc::clone(&self.ar);
 | 
			
		||||
    pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> {
 | 
			
		||||
        self.write_desc(&self.tmp_paths[0], pkg)?;
 | 
			
		||||
        self.write_files(&self.tmp_paths[1], pkg)?;
 | 
			
		||||
 | 
			
		||||
        Ok(
 | 
			
		||||
            tokio::task::spawn_blocking(move || ar.lock().unwrap().close())
 | 
			
		||||
                .await
 | 
			
		||||
                .unwrap()?,
 | 
			
		||||
        )
 | 
			
		||||
        let full_name = format!("{}-{}", pkg.name, pkg.version);
 | 
			
		||||
        let dest_desc_path = format!("{}/desc", full_name);
 | 
			
		||||
        let dest_files_path = format!("{}/files", full_name);
 | 
			
		||||
 | 
			
		||||
        Self::append_entry(&mut self.ar_db, &self.tmp_paths[0], &dest_desc_path)?;
 | 
			
		||||
        Self::append_entry(&mut self.ar_files, &self.tmp_paths[0], &dest_desc_path)?;
 | 
			
		||||
        Self::append_entry(&mut self.ar_files, &self.tmp_paths[1], &dest_files_path)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate a "files" archive entry for the package in the given path
 | 
			
		||||
    fn write_files(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
 | 
			
		||||
        let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
 | 
			
		||||
 | 
			
		||||
        writeln!(f, "%FILES%")?;
 | 
			
		||||
 | 
			
		||||
        let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
 | 
			
		||||
        let conn = self.conn.clone();
 | 
			
		||||
        let query = pkg.find_related(db::PackageFile);
 | 
			
		||||
 | 
			
		||||
        self.rt.spawn(async move {
 | 
			
		||||
            match query.stream(&conn).await {
 | 
			
		||||
                Ok(mut stream) => {
 | 
			
		||||
                    while let Some(res) = stream.next().await {
 | 
			
		||||
                        let is_err = res.is_err();
 | 
			
		||||
                        let _ = tx.send(res).await;
 | 
			
		||||
 | 
			
		||||
                        if is_err {
 | 
			
		||||
                            return;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    let _ = tx.send(Err(err)).await;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        while let Some(file) = rx.blocking_recv().transpose()? {
 | 
			
		||||
            writeln!(f, "{}", file.path)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        f.flush()?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn write_desc(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
 | 
			
		||||
        let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
 | 
			
		||||
 | 
			
		||||
        writeln!(f, "%FILENAME%\n{}", pkg.id)?;
 | 
			
		||||
 | 
			
		||||
        let mut write_attr = |k: &str, v: &str| {
 | 
			
		||||
            if !v.is_empty() {
 | 
			
		||||
                writeln!(f, "\n%{}%\n{}", k, v)
 | 
			
		||||
            } else {
 | 
			
		||||
                Ok(())
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        write_attr("NAME", &pkg.name)?;
 | 
			
		||||
        write_attr("BASE", &pkg.base)?;
 | 
			
		||||
        write_attr("VERSION", &pkg.version)?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref desc) = pkg.description {
 | 
			
		||||
            write_attr("DESC", desc)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let groups: Vec<String> = self.rt.block_on(
 | 
			
		||||
            pkg.find_related(db::PackageGroup)
 | 
			
		||||
                .select_only()
 | 
			
		||||
                .column(db::package_group::Column::Name)
 | 
			
		||||
                .into_tuple()
 | 
			
		||||
                .all(&self.conn),
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        write_attr("GROUPS", &groups.join("\n"))?;
 | 
			
		||||
 | 
			
		||||
        write_attr("CSIZE", &pkg.c_size.to_string())?;
 | 
			
		||||
        write_attr("ISIZE", &pkg.size.to_string())?;
 | 
			
		||||
        write_attr("SHA256SUM", &pkg.sha256_sum)?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref url) = pkg.url {
 | 
			
		||||
            write_attr("URL", url)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let licenses: Vec<String> = self.rt.block_on(
 | 
			
		||||
            pkg.find_related(db::PackageLicense)
 | 
			
		||||
                .select_only()
 | 
			
		||||
                .column(db::package_license::Column::Name)
 | 
			
		||||
                .into_tuple()
 | 
			
		||||
                .all(&self.conn),
 | 
			
		||||
        )?;
 | 
			
		||||
        write_attr("LICENSE", &licenses.join("\n"))?;
 | 
			
		||||
 | 
			
		||||
        write_attr("ARCH", &pkg.arch)?;
 | 
			
		||||
 | 
			
		||||
        // TODO build date
 | 
			
		||||
        write_attr(
 | 
			
		||||
            "BUILDDATE",
 | 
			
		||||
            &pkg.build_date.and_utc().timestamp().to_string(),
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref packager) = pkg.packager {
 | 
			
		||||
            write_attr("PACKAGER", packager)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let related = [
 | 
			
		||||
            ("REPLACES", db::PackageRelatedEnum::Replaces),
 | 
			
		||||
            ("CONFLICTS", db::PackageRelatedEnum::Conflicts),
 | 
			
		||||
            ("PROVIDES", db::PackageRelatedEnum::Provides),
 | 
			
		||||
            ("DEPENDS", db::PackageRelatedEnum::Depend),
 | 
			
		||||
            ("OPTDEPENDS", db::PackageRelatedEnum::Optdepend),
 | 
			
		||||
            ("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend),
 | 
			
		||||
            ("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend),
 | 
			
		||||
        ];
 | 
			
		||||
 | 
			
		||||
        for (key, attr) in related.into_iter() {
 | 
			
		||||
            let items: Vec<String> = self.rt.block_on(
 | 
			
		||||
                pkg.find_related(db::PackageRelated)
 | 
			
		||||
                    .filter(db::package_related::Column::Type.eq(attr))
 | 
			
		||||
                    .select_only()
 | 
			
		||||
                    .column(db::package_related::Column::Name)
 | 
			
		||||
                    .into_tuple()
 | 
			
		||||
                    .all(&self.conn),
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            write_attr(key, &items.join("\n"))?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        f.flush()?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn close(&mut self) -> crate::Result<()> {
 | 
			
		||||
        self.ar_db.close()?;
 | 
			
		||||
        self.ar_files.close()?;
 | 
			
		||||
 | 
			
		||||
        let _ = std::fs::remove_file(&self.tmp_paths[0])?;
 | 
			
		||||
        let _ = std::fs::remove_file(&self.tmp_paths[1])?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,144 @@
 | 
			
		|||
use super::{Command, SharedState};
 | 
			
		||||
use crate::db;
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    path::PathBuf,
 | 
			
		||||
    sync::{atomic::Ordering, Arc},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use sea_orm::{
 | 
			
		||||
    ActiveModelTrait, ColumnTrait, Condition, EntityTrait, NotSet, QueryFilter, QuerySelect, Set,
 | 
			
		||||
};
 | 
			
		||||
use sea_query::Expr;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Handle {
 | 
			
		||||
    state: Arc<SharedState>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Handle {
 | 
			
		||||
    pub fn new(state: &Arc<SharedState>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            state: Arc::clone(state),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
 | 
			
		||||
        std::array::from_fn(|_| {
 | 
			
		||||
            let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
 | 
			
		||||
            self.state.repos_dir.join(uuid.to_string())
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
 | 
			
		||||
        let mut repos = self.state.repos.write().await;
 | 
			
		||||
 | 
			
		||||
        let distro_id: Option<i32> = db::Distro::find()
 | 
			
		||||
            .filter(db::distro::Column::Name.eq(distro))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::distro::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.state.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let distro_id = if let Some(id) = distro_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_distro = db::distro::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                name: Set(distro.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            new_distro.insert(&self.state.conn).await?.id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let repo_id: Option<i32> = db::Repo::find()
 | 
			
		||||
            .filter(db::repo::Column::DistroId.eq(distro_id))
 | 
			
		||||
            .filter(db::repo::Column::Name.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.state.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let repo_id = if let Some(id) = repo_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_repo = db::repo::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                distro_id: Set(distro_id),
 | 
			
		||||
                name: Set(repo.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
            let id = new_repo.insert(&self.state.conn).await?.id;
 | 
			
		||||
 | 
			
		||||
            tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?;
 | 
			
		||||
            repos.insert(id, Default::default());
 | 
			
		||||
 | 
			
		||||
            id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(repo_id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
 | 
			
		||||
        Ok(db::Repo::find()
 | 
			
		||||
            .find_also_related(db::Distro)
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::repo::Column::Name.eq(repo))
 | 
			
		||||
                    .add(db::distro::Column::Name.eq(distro)),
 | 
			
		||||
            )
 | 
			
		||||
            .one(&self.state.conn)
 | 
			
		||||
            .await
 | 
			
		||||
            .map(|res| res.map(|(repo, _)| repo.id))?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        self.state.repos.write().await.remove(&repo);
 | 
			
		||||
        db::Repo::delete_by_id(repo).exec(&self.state.conn).await?;
 | 
			
		||||
        let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove all packages in the repository that have a given arch. This method marks all
 | 
			
		||||
    /// packages with the given architecture as "pending deletion", before performing a manual sync
 | 
			
		||||
    /// & removal of stale packages.
 | 
			
		||||
    pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::PendingDeletion),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
                    .add(db::package::Column::Arch.eq(arch)),
 | 
			
		||||
            )
 | 
			
		||||
            .exec(&self.state.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        self.queue_sync(repo).await;
 | 
			
		||||
        self.queue_clean().await;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
 | 
			
		||||
        self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
 | 
			
		||||
        self.state.repos.read().await.get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn queue_sync(&self, repo: i32) {
 | 
			
		||||
        self.state.tx.send(Command::SyncRepo(repo)).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn queue_clean(&self) {
 | 
			
		||||
        self.state.tx.send(Command::Clean).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,385 +0,0 @@
 | 
			
		|||
use super::{archive, package};
 | 
			
		||||
use crate::db::{self, query::package::delete_stale_pkgs};
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
    sync::{
 | 
			
		||||
        atomic::{AtomicU32, Ordering},
 | 
			
		||||
        Arc,
 | 
			
		||||
    },
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use sea_orm::{
 | 
			
		||||
    ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
 | 
			
		||||
    ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
 | 
			
		||||
};
 | 
			
		||||
use sea_query::{Alias, Expr, Query};
 | 
			
		||||
use tokio::sync::{
 | 
			
		||||
    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
 | 
			
		||||
    Mutex, RwLock,
 | 
			
		||||
};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
struct PkgQueueMsg {
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    path: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A single instance of this struct orchestrates everything related to managing packages files on
 | 
			
		||||
/// disk for all repositories in the server
 | 
			
		||||
pub struct RepoMgr {
 | 
			
		||||
    repos_dir: PathBuf,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    pkg_queue: (
 | 
			
		||||
        UnboundedSender<PkgQueueMsg>,
 | 
			
		||||
        Mutex<UnboundedReceiver<PkgQueueMsg>>,
 | 
			
		||||
    ),
 | 
			
		||||
    repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RepoMgr {
 | 
			
		||||
    pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
 | 
			
		||||
        if !tokio::fs::try_exists(&repos_dir).await? {
 | 
			
		||||
            tokio::fs::create_dir(&repos_dir).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let (tx, rx) = unbounded_channel();
 | 
			
		||||
 | 
			
		||||
        let mut repos = HashMap::new();
 | 
			
		||||
        let repo_ids: Vec<i32> = db::Repo::find()
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(&conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        for id in repo_ids {
 | 
			
		||||
            repos.insert(id, Default::default());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            repos_dir: repos_dir.as_ref().to_path_buf(),
 | 
			
		||||
            conn,
 | 
			
		||||
            pkg_queue: (tx, Mutex::new(rx)),
 | 
			
		||||
            repos: RwLock::new(repos),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate archive databases for all known architectures in the repository, including the
 | 
			
		||||
    /// "any" architecture.
 | 
			
		||||
    pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        let lock = self
 | 
			
		||||
            .repos
 | 
			
		||||
            .read()
 | 
			
		||||
            .await
 | 
			
		||||
            .get(&repo)
 | 
			
		||||
            .map(|(_, lock)| Arc::clone(lock));
 | 
			
		||||
 | 
			
		||||
        if lock.is_none() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let lock = lock.unwrap();
 | 
			
		||||
        let _guard = lock.lock().await;
 | 
			
		||||
 | 
			
		||||
        let archs: Vec<String> = db::Package::find()
 | 
			
		||||
            .filter(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::package::Column::Arch)
 | 
			
		||||
            .distinct()
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        for arch in archs {
 | 
			
		||||
            self.generate_archives(repo, &arch).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate the archive databases for the given repository and architecture.
 | 
			
		||||
    async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
 | 
			
		||||
            self.random_file_paths();
 | 
			
		||||
        let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
 | 
			
		||||
        let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?;
 | 
			
		||||
 | 
			
		||||
        // Query all packages in the repo that have the given architecture or the "any"
 | 
			
		||||
        // architecture
 | 
			
		||||
        let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let mut commited_ids: Vec<i32> = Vec::new();
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            commited_ids.push(pkg.id);
 | 
			
		||||
 | 
			
		||||
            let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
 | 
			
		||||
            let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
 | 
			
		||||
 | 
			
		||||
            package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?;
 | 
			
		||||
            package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?;
 | 
			
		||||
 | 
			
		||||
            let full_name = format!("{}-{}", pkg.name, pkg.version);
 | 
			
		||||
 | 
			
		||||
            ar_db
 | 
			
		||||
                .add_entry(&full_name, &desc_tmp_file_path, true)
 | 
			
		||||
                .await?;
 | 
			
		||||
            ar_files
 | 
			
		||||
                .add_entry(&full_name, &desc_tmp_file_path, true)
 | 
			
		||||
                .await?;
 | 
			
		||||
            ar_files
 | 
			
		||||
                .add_entry(&full_name, &files_tmp_file_path, false)
 | 
			
		||||
                .await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Cleanup
 | 
			
		||||
        ar_db.close().await?;
 | 
			
		||||
        ar_files.close().await?;
 | 
			
		||||
 | 
			
		||||
        let repo_dir = self.repos_dir.join(repo.to_string());
 | 
			
		||||
 | 
			
		||||
        // Move the db archives to their respective places
 | 
			
		||||
        tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
 | 
			
		||||
        tokio::fs::rename(
 | 
			
		||||
            tmp_ar_files_path,
 | 
			
		||||
            repo_dir.join(format!("{}.files.tar.gz", arch)),
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
        // Only after we have successfully written everything to disk do we update the database.
 | 
			
		||||
        // This order ensures any failure can be recovered, as the database is our single source of
 | 
			
		||||
        // truth.
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::Committed),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(db::package::Column::Id.is_in(commited_ids))
 | 
			
		||||
            .exec(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // If this fails there's no point in failing the function + if there were no packages in
 | 
			
		||||
        // the repo, this fails anyway because the temp file doesn't exist
 | 
			
		||||
        let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
 | 
			
		||||
        let _ = tokio::fs::remove_file(files_tmp_file_path).await;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clean any remaining old package files from the database and file system
 | 
			
		||||
    pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::stale_pkgs(&self.conn)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = tokio::fs::remove_file(
 | 
			
		||||
                self.repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            )
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed {removed_pkgs} stale package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn pkg_parse_task(&self) {
 | 
			
		||||
        loop {
 | 
			
		||||
            // Receive the next message and immediately drop the mutex afterwards. As long as the
 | 
			
		||||
            // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
 | 
			
		||||
            // as soon as a message is received, so another worker can pick up the mutex.
 | 
			
		||||
            let msg = {
 | 
			
		||||
                let mut recv = self.pkg_queue.1.lock().await;
 | 
			
		||||
                recv.recv().await
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            if let Some(msg) = msg {
 | 
			
		||||
                // TODO better handle this error (retry if failure wasn't because the package is
 | 
			
		||||
                // faulty)
 | 
			
		||||
                let _ = self
 | 
			
		||||
                    .add_pkg_from_path(msg.path, msg.repo)
 | 
			
		||||
                    .await
 | 
			
		||||
                    .inspect_err(|e| tracing::error!("{:?}", e));
 | 
			
		||||
 | 
			
		||||
                let old = self
 | 
			
		||||
                    .repos
 | 
			
		||||
                    .read()
 | 
			
		||||
                    .await
 | 
			
		||||
                    .get(&msg.repo)
 | 
			
		||||
                    .map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
 | 
			
		||||
 | 
			
		||||
                // Every time the queue for a repo becomes empty, we run a sync job
 | 
			
		||||
                if old == Some(1) {
 | 
			
		||||
                    // TODO error handling
 | 
			
		||||
                    let _ = self.sync_repo(msg.repo).await;
 | 
			
		||||
 | 
			
		||||
                    // TODO move this so that we only clean if entire queue is empty, not just
 | 
			
		||||
                    // queue for specific repo
 | 
			
		||||
                    let _ = self.remove_stale_pkgs().await;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
 | 
			
		||||
        self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
 | 
			
		||||
        self.repos.read().await.get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
 | 
			
		||||
        Ok(db::Repo::find()
 | 
			
		||||
            .find_also_related(db::Distro)
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::repo::Column::Name.eq(repo))
 | 
			
		||||
                    .add(db::distro::Column::Name.eq(distro)),
 | 
			
		||||
            )
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await
 | 
			
		||||
            .map(|res| res.map(|(repo, _)| repo.id))?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
 | 
			
		||||
        let mut repos = self.repos.write().await;
 | 
			
		||||
 | 
			
		||||
        let distro_id: Option<i32> = db::Distro::find()
 | 
			
		||||
            .filter(db::distro::Column::Name.eq(distro))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::distro::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let distro_id = if let Some(id) = distro_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_distro = db::distro::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                name: Set(distro.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            new_distro.insert(&self.conn).await?.id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let repo_id: Option<i32> = db::Repo::find()
 | 
			
		||||
            .filter(db::repo::Column::DistroId.eq(distro_id))
 | 
			
		||||
            .filter(db::repo::Column::Name.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let repo_id = if let Some(id) = repo_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_repo = db::repo::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                distro_id: Set(distro_id),
 | 
			
		||||
                name: Set(repo.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
            let id = new_repo.insert(&self.conn).await?.id;
 | 
			
		||||
 | 
			
		||||
            tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
 | 
			
		||||
            repos.insert(id, Default::default());
 | 
			
		||||
 | 
			
		||||
            id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(repo_id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        let path_clone = path.as_ref().to_path_buf();
 | 
			
		||||
        let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
 | 
			
		||||
            .await
 | 
			
		||||
            .unwrap()?;
 | 
			
		||||
 | 
			
		||||
        // TODO prevent database from being updated but file failing to move to repo dir?
 | 
			
		||||
        let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
 | 
			
		||||
 | 
			
		||||
        let dest_path = self
 | 
			
		||||
            .repos_dir
 | 
			
		||||
            .join(repo.to_string())
 | 
			
		||||
            .join(pkg.id.to_string());
 | 
			
		||||
        tokio::fs::rename(path.as_ref(), dest_path).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!(
 | 
			
		||||
            "Added '{}-{}-{}' to repository {}",
 | 
			
		||||
            pkg.name,
 | 
			
		||||
            pkg.version,
 | 
			
		||||
            pkg.arch,
 | 
			
		||||
            repo,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        self.repos.write().await.remove(&repo);
 | 
			
		||||
        db::Repo::delete_by_id(repo).exec(&self.conn).await?;
 | 
			
		||||
        let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove all packages in the repository that have a given arch. This method marks all
 | 
			
		||||
    /// packages with the given architecture as "pending deletion", before performing a manual sync
 | 
			
		||||
    /// & removal of stale packages.
 | 
			
		||||
    pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::PendingDeletion),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
                    .add(db::package::Column::Arch.eq(arch)),
 | 
			
		||||
            )
 | 
			
		||||
            .exec(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        self.sync_repo(repo).await?;
 | 
			
		||||
        self.remove_stale_pkgs().await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
 | 
			
		||||
        std::array::from_fn(|_| {
 | 
			
		||||
            let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
 | 
			
		||||
            self.repos_dir.join(uuid.to_string())
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,161 +1,88 @@
 | 
			
		|||
mod actor;
 | 
			
		||||
mod archive;
 | 
			
		||||
mod manager;
 | 
			
		||||
mod handle;
 | 
			
		||||
pub mod package;
 | 
			
		||||
 | 
			
		||||
pub use manager::RepoMgr;
 | 
			
		||||
pub use actor::Actor;
 | 
			
		||||
pub use handle::Handle;
 | 
			
		||||
 | 
			
		||||
use crate::FsConfig;
 | 
			
		||||
use crate::db;
 | 
			
		||||
 | 
			
		||||
use axum::{
 | 
			
		||||
    body::Body,
 | 
			
		||||
    extract::{Path, State},
 | 
			
		||||
    http::{Request, StatusCode},
 | 
			
		||||
    response::IntoResponse,
 | 
			
		||||
    routing::{delete, post},
 | 
			
		||||
    Router,
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
    sync::{atomic::AtomicU32, Arc, Mutex},
 | 
			
		||||
};
 | 
			
		||||
use futures::TryStreamExt;
 | 
			
		||||
use tokio_util::io::StreamReader;
 | 
			
		||||
use tower::util::ServiceExt;
 | 
			
		||||
use tower_http::{services::ServeFile, validate_request::ValidateRequestHeaderLayer};
 | 
			
		||||
 | 
			
		||||
pub fn router(api_key: &str) -> Router<crate::Global> {
 | 
			
		||||
    Router::new()
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo",
 | 
			
		||||
            post(post_package_archive)
 | 
			
		||||
                .delete(delete_repo)
 | 
			
		||||
                .route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
 | 
			
		||||
        )
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo/:arch",
 | 
			
		||||
            delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
 | 
			
		||||
        )
 | 
			
		||||
        // Routes added after the layer do not get that layer applied, so the GET requests will not
 | 
			
		||||
        // be authorized
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo/:arch/:filename",
 | 
			
		||||
            delete(delete_package)
 | 
			
		||||
                .route_layer(ValidateRequestHeaderLayer::bearer(api_key))
 | 
			
		||||
                .get(get_file),
 | 
			
		||||
        )
 | 
			
		||||
use sea_orm::{DbConn, EntityTrait, QuerySelect};
 | 
			
		||||
use tokio::{
 | 
			
		||||
    runtime,
 | 
			
		||||
    sync::{
 | 
			
		||||
        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
 | 
			
		||||
        RwLock,
 | 
			
		||||
    },
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
pub enum Command {
 | 
			
		||||
    ParsePkg(i32, PathBuf),
 | 
			
		||||
    SyncRepo(i32),
 | 
			
		||||
    Clean,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Serve the package archive files and database archives. If files are requested for an
 | 
			
		||||
/// architecture that does not have any explicit packages, a repository containing only "any" files
 | 
			
		||||
/// is returned.
 | 
			
		||||
async fn get_file(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
 | 
			
		||||
    req: Request<Body>,
 | 
			
		||||
) -> crate::Result<impl IntoResponse> {
 | 
			
		||||
    if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        match global.config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                let repo_dir = data_dir.join("repos").join(repo_id.to_string());
 | 
			
		||||
pub struct SharedState {
 | 
			
		||||
    pub repos_dir: PathBuf,
 | 
			
		||||
    pub conn: DbConn,
 | 
			
		||||
    pub rx: Mutex<UnboundedReceiver<Command>>,
 | 
			
		||||
    pub tx: UnboundedSender<Command>,
 | 
			
		||||
    pub repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
                let file_name = if file_name == format!("{}.db", repo)
 | 
			
		||||
                    || file_name == format!("{}.db.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.db.tar.gz", arch)
 | 
			
		||||
                } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                    || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.files.tar.gz", arch)
 | 
			
		||||
                } else {
 | 
			
		||||
                    file_name
 | 
			
		||||
                };
 | 
			
		||||
impl SharedState {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        repos_dir: impl AsRef<Path>,
 | 
			
		||||
        conn: DbConn,
 | 
			
		||||
        repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        let (tx, rx) = unbounded_channel();
 | 
			
		||||
 | 
			
		||||
                let path = repo_dir.join(file_name);
 | 
			
		||||
                Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
        Self {
 | 
			
		||||
            repos_dir: repos_dir.as_ref().to_path_buf(),
 | 
			
		||||
            conn,
 | 
			
		||||
            rx: Mutex::new(rx),
 | 
			
		||||
            tx,
 | 
			
		||||
            repos: RwLock::new(repos),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    } else {
 | 
			
		||||
        Err(StatusCode::NOT_FOUND.into())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn start(
 | 
			
		||||
    repos_dir: impl AsRef<Path>,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    rt: runtime::Handle,
 | 
			
		||||
    actors: u32,
 | 
			
		||||
) -> crate::Result<Handle> {
 | 
			
		||||
    std::fs::create_dir_all(repos_dir.as_ref())?;
 | 
			
		||||
 | 
			
		||||
    let mut repos = HashMap::new();
 | 
			
		||||
    let repo_ids: Vec<i32> = rt.block_on(
 | 
			
		||||
        db::Repo::find()
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(&conn),
 | 
			
		||||
    )?;
 | 
			
		||||
 | 
			
		||||
    for id in repo_ids {
 | 
			
		||||
        repos.insert(id, Default::default());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn post_package_archive(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
    body: Body,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
 | 
			
		||||
    let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
 | 
			
		||||
    let [tmp_path] = global.mgr.random_file_paths();
 | 
			
		||||
    let state = Arc::new(SharedState::new(repos_dir, conn, repos));
 | 
			
		||||
 | 
			
		||||
    let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
 | 
			
		||||
    tokio::io::copy(&mut body, &mut tmp_file).await?;
 | 
			
		||||
    for _ in 0..actors {
 | 
			
		||||
        let actor = Actor::new(rt.clone(), Arc::clone(&state));
 | 
			
		||||
 | 
			
		||||
    global.mgr.queue_pkg(repo, tmp_path).await;
 | 
			
		||||
 | 
			
		||||
    Ok(StatusCode::ACCEPTED)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_repo(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo(repo).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
        std::thread::spawn(|| actor.run());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_arch_repo(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch)): Path<(String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo_arch(repo, &arch).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed architecture '{arch}' from repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if repo_removed {
 | 
			
		||||
    //        tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_package(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if pkg_removed {
 | 
			
		||||
    //        tracing::info!(
 | 
			
		||||
    //            "Removed package '{}' ({}) from repository '{}'",
 | 
			
		||||
    //            pkg_name,
 | 
			
		||||
    //            arch,
 | 
			
		||||
    //            repo
 | 
			
		||||
    //        );
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
 | 
			
		||||
    Ok(Handle::new(&state))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,19 +1,17 @@
 | 
			
		|||
use crate::db::{self, entities::package, PackageRelatedEnum};
 | 
			
		||||
use crate::db::entities::package;
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    fmt, fs,
 | 
			
		||||
    io::{self, BufRead, BufReader, BufWriter, Read, Write},
 | 
			
		||||
    io::{self, BufRead, BufReader, Read},
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use chrono::NaiveDateTime;
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use libarchive::{
 | 
			
		||||
    read::{Archive, Builder},
 | 
			
		||||
    Entry, ReadFilter,
 | 
			
		||||
};
 | 
			
		||||
use sea_orm::{ActiveValue::Set, ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
 | 
			
		||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
 | 
			
		||||
use sea_orm::ActiveValue::Set;
 | 
			
		||||
 | 
			
		||||
const IGNORED_FILES: [&str; 5] = [".BUILDINFO", ".INSTALL", ".MTREE", ".PKGINFO", ".CHANGELOG"];
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -204,74 +202,6 @@ impl Package {
 | 
			
		|||
            self.compression.extension().unwrap()
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Write the formatted desc file to the provided writer
 | 
			
		||||
    pub fn write_desc<W: Write>(&self, w: &mut W) -> io::Result<()> {
 | 
			
		||||
        // We write a lot of small strings to the writer, so wrapping it in a BufWriter is
 | 
			
		||||
        // beneficial
 | 
			
		||||
        let mut w = BufWriter::new(w);
 | 
			
		||||
 | 
			
		||||
        let info = &self.info;
 | 
			
		||||
 | 
			
		||||
        writeln!(w, "%FILENAME%\n{}", self.file_name())?;
 | 
			
		||||
 | 
			
		||||
        let mut write = |key: &str, value: &str| {
 | 
			
		||||
            if !value.is_empty() {
 | 
			
		||||
                writeln!(w, "\n%{}%\n{}", key, value)
 | 
			
		||||
            } else {
 | 
			
		||||
                Ok(())
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        write("NAME", &info.name)?;
 | 
			
		||||
        write("BASE", &info.base)?;
 | 
			
		||||
        write("VERSION", &info.version)?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref description) = info.description {
 | 
			
		||||
            write("DESC", description)?;
 | 
			
		||||
        }
 | 
			
		||||
        write("GROUPS", &info.groups.join("\n"))?;
 | 
			
		||||
        write("CSIZE", &info.csize.to_string())?;
 | 
			
		||||
        write("ISIZE", &info.size.to_string())?;
 | 
			
		||||
 | 
			
		||||
        write("SHA256SUM", &info.sha256sum)?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref url) = info.url {
 | 
			
		||||
            write("URL", url)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        write("LICENSE", &info.licenses.join("\n"))?;
 | 
			
		||||
        write("ARCH", &info.arch)?;
 | 
			
		||||
        write("BUILDDATE", &info.build_date.timestamp().to_string())?;
 | 
			
		||||
 | 
			
		||||
        if let Some(ref packager) = info.packager {
 | 
			
		||||
            write("PACKAGER", packager)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        write("REPLACES", &info.replaces.join("\n"))?;
 | 
			
		||||
        write("CONFLICTS", &info.conflicts.join("\n"))?;
 | 
			
		||||
        write("PROVIDES", &info.provides.join("\n"))?;
 | 
			
		||||
        write("DEPENDS", &info.depends.join("\n"))?;
 | 
			
		||||
        write("OPTDEPENDS", &info.optdepends.join("\n"))?;
 | 
			
		||||
        write("MAKEDEPENDS", &info.makedepends.join("\n"))?;
 | 
			
		||||
        write("CHECKDEPENDS", &info.checkdepends.join("\n"))?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn write_files<W: Write>(&self, w: &mut W) -> io::Result<()> {
 | 
			
		||||
        // We write a lot of small strings to the writer, so wrapping it in a BufWriter is
 | 
			
		||||
        // beneficial
 | 
			
		||||
        let mut w = BufWriter::new(w);
 | 
			
		||||
 | 
			
		||||
        writeln!(w, "%FILES%")?;
 | 
			
		||||
 | 
			
		||||
        for file in &self.files {
 | 
			
		||||
            writeln!(w, "{}", file.to_string_lossy())?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<Package> for package::ActiveModel {
 | 
			
		||||
| 
						 | 
				
			
			@ -303,123 +233,3 @@ pub fn filename(pkg: &package::Model) -> String {
 | 
			
		|||
        pkg.name, pkg.version, pkg.arch, pkg.compression
 | 
			
		||||
    )
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn write_attribute<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		||||
    writer: &mut W,
 | 
			
		||||
    key: &str,
 | 
			
		||||
    value: &str,
 | 
			
		||||
) -> io::Result<()> {
 | 
			
		||||
    if !value.is_empty() {
 | 
			
		||||
        let s = format!("\n%{}%\n{}\n", key, value);
 | 
			
		||||
        writer.write_all(s.as_bytes()).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    writer: &mut W,
 | 
			
		||||
    pkg: &package::Model,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    writer
 | 
			
		||||
        .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes())
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    write_attribute(writer, "NAME", &pkg.name).await?;
 | 
			
		||||
    write_attribute(writer, "BASE", &pkg.base).await?;
 | 
			
		||||
    write_attribute(writer, "VERSION", &pkg.version).await?;
 | 
			
		||||
 | 
			
		||||
    if let Some(ref description) = pkg.description {
 | 
			
		||||
        write_attribute(writer, "DESC", description).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let groups: Vec<String> = pkg
 | 
			
		||||
        .find_related(db::PackageGroup)
 | 
			
		||||
        .select_only()
 | 
			
		||||
        .column(db::package_group::Column::Name)
 | 
			
		||||
        .into_tuple()
 | 
			
		||||
        .all(conn)
 | 
			
		||||
        .await?;
 | 
			
		||||
    write_attribute(writer, "GROUPS", &groups.join("\n")).await?;
 | 
			
		||||
 | 
			
		||||
    write_attribute(writer, "CSIZE", &pkg.c_size.to_string()).await?;
 | 
			
		||||
    write_attribute(writer, "ISIZE", &pkg.size.to_string()).await?;
 | 
			
		||||
    write_attribute(writer, "SHA256SUM", &pkg.sha256_sum).await?;
 | 
			
		||||
 | 
			
		||||
    if let Some(ref url) = pkg.url {
 | 
			
		||||
        write_attribute(writer, "URL", url).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let licenses: Vec<String> = pkg
 | 
			
		||||
        .find_related(db::PackageLicense)
 | 
			
		||||
        .select_only()
 | 
			
		||||
        .column(db::package_license::Column::Name)
 | 
			
		||||
        .into_tuple()
 | 
			
		||||
        .all(conn)
 | 
			
		||||
        .await?;
 | 
			
		||||
    write_attribute(writer, "LICENSE", &licenses.join("\n")).await?;
 | 
			
		||||
 | 
			
		||||
    write_attribute(writer, "ARCH", &pkg.arch).await?;
 | 
			
		||||
 | 
			
		||||
    // TODO build date
 | 
			
		||||
    write_attribute(
 | 
			
		||||
        writer,
 | 
			
		||||
        "BUILDDATE",
 | 
			
		||||
        &pkg.build_date.and_utc().timestamp().to_string(),
 | 
			
		||||
    )
 | 
			
		||||
    .await?;
 | 
			
		||||
 | 
			
		||||
    if let Some(ref packager) = pkg.packager {
 | 
			
		||||
        write_attribute(writer, "PACKAGER", packager).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let related = [
 | 
			
		||||
        ("REPLACES", PackageRelatedEnum::Replaces),
 | 
			
		||||
        ("CONFLICTS", PackageRelatedEnum::Conflicts),
 | 
			
		||||
        ("PROVIDES", PackageRelatedEnum::Provides),
 | 
			
		||||
        ("DEPENDS", PackageRelatedEnum::Depend),
 | 
			
		||||
        ("OPTDEPENDS", PackageRelatedEnum::Optdepend),
 | 
			
		||||
        ("MAKEDEPENDS", PackageRelatedEnum::Makedepend),
 | 
			
		||||
        ("CHECKDEPENDS", PackageRelatedEnum::Checkdepend),
 | 
			
		||||
    ];
 | 
			
		||||
 | 
			
		||||
    for (key, attr) in related.into_iter() {
 | 
			
		||||
        let items: Vec<String> = pkg
 | 
			
		||||
            .find_related(db::PackageRelated)
 | 
			
		||||
            .filter(db::package_related::Column::Type.eq(attr))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::package_related::Column::Name)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        write_attribute(writer, key, &items.join("\n")).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer.flush().await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn write_files<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    writer: &mut W,
 | 
			
		||||
    pkg: &package::Model,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    let line = "%FILES%\n";
 | 
			
		||||
    writer.write_all(line.as_bytes()).await?;
 | 
			
		||||
 | 
			
		||||
    // Generate the files list for the package
 | 
			
		||||
    let mut files = pkg.find_related(db::PackageFile).stream(conn).await?;
 | 
			
		||||
 | 
			
		||||
    while let Some(file) = files.next().await.transpose()? {
 | 
			
		||||
        writer
 | 
			
		||||
            .write_all(format!("{}\n", file.path).as_bytes())
 | 
			
		||||
            .await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer.flush().await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,13 @@
 | 
			
		|||
mod api;
 | 
			
		||||
mod repo;
 | 
			
		||||
 | 
			
		||||
use axum::Router;
 | 
			
		||||
use tower_http::trace::TraceLayer;
 | 
			
		||||
 | 
			
		||||
pub fn router(global: crate::Global) -> Router {
 | 
			
		||||
    Router::new()
 | 
			
		||||
        .nest("/api", api::router())
 | 
			
		||||
        .merge(repo::router(&global.config.api_key))
 | 
			
		||||
        .with_state(global)
 | 
			
		||||
        .layer(TraceLayer::new_for_http())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,142 @@
 | 
			
		|||
use crate::FsConfig;
 | 
			
		||||
 | 
			
		||||
use axum::{
 | 
			
		||||
    body::Body,
 | 
			
		||||
    extract::{Path, State},
 | 
			
		||||
    http::{Request, StatusCode},
 | 
			
		||||
    response::IntoResponse,
 | 
			
		||||
    routing::{delete, post},
 | 
			
		||||
    Router,
 | 
			
		||||
};
 | 
			
		||||
use futures::TryStreamExt;
 | 
			
		||||
use tokio_util::io::StreamReader;
 | 
			
		||||
use tower::util::ServiceExt;
 | 
			
		||||
use tower_http::{services::ServeFile, validate_request::ValidateRequestHeaderLayer};
 | 
			
		||||
 | 
			
		||||
pub fn router(api_key: &str) -> Router<crate::Global> {
 | 
			
		||||
    Router::new()
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo",
 | 
			
		||||
            post(post_package_archive)
 | 
			
		||||
                .delete(delete_repo)
 | 
			
		||||
                .route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
 | 
			
		||||
        )
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo/:arch",
 | 
			
		||||
            delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
 | 
			
		||||
        )
 | 
			
		||||
        // Routes added after the layer do not get that layer applied, so the GET requests will not
 | 
			
		||||
        // be authorized
 | 
			
		||||
        .route(
 | 
			
		||||
            "/:distro/:repo/:arch/:filename",
 | 
			
		||||
            delete(delete_package)
 | 
			
		||||
                .route_layer(ValidateRequestHeaderLayer::bearer(api_key))
 | 
			
		||||
                .get(get_file),
 | 
			
		||||
        )
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Serve the package archive files and database archives. If files are requested for an
 | 
			
		||||
/// architecture that does not have any explicit packages, a repository containing only "any" files
 | 
			
		||||
/// is returned.
 | 
			
		||||
async fn get_file(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
 | 
			
		||||
    req: Request<Body>,
 | 
			
		||||
) -> crate::Result<impl IntoResponse> {
 | 
			
		||||
    if let Some(repo_id) = global.repo.get_repo(&distro, &repo).await? {
 | 
			
		||||
        match global.config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                let repo_dir = data_dir.join("repos").join(repo_id.to_string());
 | 
			
		||||
 | 
			
		||||
                let file_name = if file_name == format!("{}.db", repo)
 | 
			
		||||
                    || file_name == format!("{}.db.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.db.tar.gz", arch)
 | 
			
		||||
                } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                    || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.files.tar.gz", arch)
 | 
			
		||||
                } else {
 | 
			
		||||
                    file_name
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let path = repo_dir.join(file_name);
 | 
			
		||||
                Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        Err(StatusCode::NOT_FOUND.into())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn post_package_archive(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
    body: Body,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?;
 | 
			
		||||
 | 
			
		||||
    let [tmp_path] = global.repo.random_file_paths();
 | 
			
		||||
    let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
 | 
			
		||||
    let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
 | 
			
		||||
    tokio::io::copy(&mut body, &mut tmp_file).await?;
 | 
			
		||||
 | 
			
		||||
    global.repo.queue_pkg(repo_id, tmp_path).await;
 | 
			
		||||
 | 
			
		||||
    Ok(StatusCode::ACCEPTED)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_repo(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(repo) = global.repo.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.repo.remove_repo(repo).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_arch_repo(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch)): Path<(String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(repo) = global.repo.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.repo.remove_repo_arch(repo, &arch).await?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed architecture '{arch}' from repository {repo}");
 | 
			
		||||
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_package(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if pkg_removed {
 | 
			
		||||
    //        tracing::info!(
 | 
			
		||||
    //            "Removed package '{}' ({}) from repository '{}'",
 | 
			
		||||
    //            pkg_name,
 | 
			
		||||
    //            arch,
 | 
			
		||||
    //            repo
 | 
			
		||||
    //        );
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue