Compare commits
	
		
			15 Commits 
		
	
	
		
			f9518d6b7d
			...
			76395afb10
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								
									
								
								 | 
						76395afb10 | |
| 
							
							
								
									
								
								 | 
						730ae009b0 | |
| 
							
							
								
									
								
								 | 
						cc8848d3ae | |
| 
							
							
								
									
								
								 | 
						97e42588ed | |
| 
							
							
								
									
								
								 | 
						e17269ac3b | |
| 
							
							
								
									
								
								 | 
						27afb3496d | |
| 
							
							
								
									
								
								 | 
						5d7832c43a | |
| 
							
							
								
									
								
								 | 
						67b4640e56 | |
| 
							
							
								
									
								
								 | 
						a408c14ab1 | |
| 
							
							
								
									
								
								 | 
						be2ce7bf45 | |
| 
							
							
								
									
								
								 | 
						6dff65f30d | |
| 
							
							
								
									
								
								 | 
						5073855696 | |
| 
							
							
								
									
								
								 | 
						5839d66213 | |
| 
							
							
								
									
								
								 | 
						97612e1af6 | |
| 
							
							
								
									
								
								 | 
						fa6de9b035 | 
| 
						 | 
				
			
			@ -174,6 +174,15 @@ dependencies = [
 | 
			
		|||
 "num-traits",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "atomic"
 | 
			
		||||
version = "0.6.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "bytemuck",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "atomic-waker"
 | 
			
		||||
version = "1.1.2"
 | 
			
		||||
| 
						 | 
				
			
			@ -380,6 +389,12 @@ dependencies = [
 | 
			
		|||
 "syn 1.0.109",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "bytemuck"
 | 
			
		||||
version = "1.16.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "byteorder"
 | 
			
		||||
version = "1.5.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -630,6 +645,20 @@ version = "2.1.0"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "figment"
 | 
			
		||||
version = "0.10.19"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "atomic",
 | 
			
		||||
 "pear",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "toml",
 | 
			
		||||
 "uncased",
 | 
			
		||||
 "version_check",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "flume"
 | 
			
		||||
version = "0.11.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -1037,6 +1066,12 @@ dependencies = [
 | 
			
		|||
 "syn 2.0.66",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "inlinable_string"
 | 
			
		||||
version = "0.1.15"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "is_terminal_polyfill"
 | 
			
		||||
version = "1.70.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -1386,6 +1421,29 @@ version = "1.0.15"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "pear"
 | 
			
		||||
version = "0.2.9"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "inlinable_string",
 | 
			
		||||
 "pear_codegen",
 | 
			
		||||
 "yansi",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "pear_codegen"
 | 
			
		||||
version = "0.2.9"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "proc-macro2",
 | 
			
		||||
 "proc-macro2-diagnostics",
 | 
			
		||||
 "quote",
 | 
			
		||||
 "syn 2.0.66",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "pem-rfc7468"
 | 
			
		||||
version = "0.7.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -1478,7 +1536,7 @@ version = "3.1.0"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "toml_edit",
 | 
			
		||||
 "toml_edit 0.21.1",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
| 
						 | 
				
			
			@ -1514,6 +1572,19 @@ dependencies = [
 | 
			
		|||
 "unicode-ident",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "proc-macro2-diagnostics"
 | 
			
		||||
version = "0.10.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "proc-macro2",
 | 
			
		||||
 "quote",
 | 
			
		||||
 "syn 2.0.66",
 | 
			
		||||
 "version_check",
 | 
			
		||||
 "yansi",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "ptr_meta"
 | 
			
		||||
version = "0.1.4"
 | 
			
		||||
| 
						 | 
				
			
			@ -1657,11 +1728,13 @@ dependencies = [
 | 
			
		|||
 "axum",
 | 
			
		||||
 "chrono",
 | 
			
		||||
 "clap",
 | 
			
		||||
 "figment",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "http-body-util",
 | 
			
		||||
 "libarchive",
 | 
			
		||||
 "sea-orm",
 | 
			
		||||
 "sea-orm-migration",
 | 
			
		||||
 "sea-query",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "sha256",
 | 
			
		||||
 "tokio",
 | 
			
		||||
| 
						 | 
				
			
			@ -2036,6 +2109,15 @@ dependencies = [
 | 
			
		|||
 "serde",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "serde_spanned"
 | 
			
		||||
version = "0.6.6"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "serde",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "serde_urlencoded"
 | 
			
		||||
version = "0.7.1"
 | 
			
		||||
| 
						 | 
				
			
			@ -2623,11 +2705,26 @@ dependencies = [
 | 
			
		|||
 "tokio",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "toml"
 | 
			
		||||
version = "0.8.14"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "serde",
 | 
			
		||||
 "serde_spanned",
 | 
			
		||||
 "toml_datetime",
 | 
			
		||||
 "toml_edit 0.22.14",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "toml_datetime"
 | 
			
		||||
version = "0.6.6"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "serde",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "toml_edit"
 | 
			
		||||
| 
						 | 
				
			
			@ -2637,7 +2734,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
 | 
			
		|||
dependencies = [
 | 
			
		||||
 "indexmap",
 | 
			
		||||
 "toml_datetime",
 | 
			
		||||
 "winnow",
 | 
			
		||||
 "winnow 0.5.40",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "toml_edit"
 | 
			
		||||
version = "0.22.14"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "indexmap",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "serde_spanned",
 | 
			
		||||
 "toml_datetime",
 | 
			
		||||
 "winnow 0.6.13",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
| 
						 | 
				
			
			@ -2762,6 +2872,15 @@ version = "1.17.0"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "uncased"
 | 
			
		||||
version = "0.9.10"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "version_check",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "unicase"
 | 
			
		||||
version = "2.7.0"
 | 
			
		||||
| 
						 | 
				
			
			@ -3128,6 +3247,15 @@ dependencies = [
 | 
			
		|||
 "memchr",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "winnow"
 | 
			
		||||
version = "0.6.13"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "memchr",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "wyz"
 | 
			
		||||
version = "0.5.1"
 | 
			
		||||
| 
						 | 
				
			
			@ -3137,6 +3265,12 @@ dependencies = [
 | 
			
		|||
 "tap",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "yansi"
 | 
			
		||||
version = "1.0.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "zerocopy"
 | 
			
		||||
version = "0.7.34"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,10 +10,12 @@ authors = ["Jef Roosens"]
 | 
			
		|||
axum = { version = "0.7.5", features = ["http2", "macros"] }
 | 
			
		||||
chrono = { version = "0.4.26", features = ["serde"] }
 | 
			
		||||
clap = { version = "4.3.12", features = ["env", "derive"] }
 | 
			
		||||
figment = { version = "0.10.19", features = ["env", "toml"] }
 | 
			
		||||
futures = "0.3.28"
 | 
			
		||||
http-body-util = "0.1.1"
 | 
			
		||||
libarchive = { path = "../libarchive" }
 | 
			
		||||
sea-orm-migration = "0.12.1"
 | 
			
		||||
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
 | 
			
		||||
serde = { version = "1.0.178", features = ["derive"] }
 | 
			
		||||
sha256 = "1.1.4"
 | 
			
		||||
tokio = { version = "1.29.1", features = ["full"] }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
api_key = "test"
 | 
			
		||||
pkg_workers = 2
 | 
			
		||||
log_level = "rieterd=debug"
 | 
			
		||||
 | 
			
		||||
[fs]
 | 
			
		||||
type = "local"
 | 
			
		||||
data_dir = "./data"
 | 
			
		||||
 | 
			
		||||
[db]
 | 
			
		||||
type = "sqlite"
 | 
			
		||||
db_dir = "./data"
 | 
			
		||||
# [db]
 | 
			
		||||
# type = "postgres"
 | 
			
		||||
# host = "localhost"
 | 
			
		||||
# db = "rieter"
 | 
			
		||||
# user = "rieter"
 | 
			
		||||
# password = "rieter"
 | 
			
		||||
| 
						 | 
				
			
			@ -22,10 +22,10 @@ async fn get_repos(
 | 
			
		|||
    Query(pagination): Query<pagination::Query>,
 | 
			
		||||
    Query(filter): Query<db::query::repo::Filter>,
 | 
			
		||||
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> {
 | 
			
		||||
    let (total_pages, items) =
 | 
			
		||||
    let items =
 | 
			
		||||
        db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
 | 
			
		||||
 | 
			
		||||
    Ok(Json(pagination.res(total_pages, items)))
 | 
			
		||||
    Ok(Json(pagination.res(items)))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn get_single_repo(
 | 
			
		||||
| 
						 | 
				
			
			@ -44,11 +44,11 @@ async fn get_packages(
 | 
			
		|||
    Query(pagination): Query<pagination::Query>,
 | 
			
		||||
    Query(filter): Query<db::query::package::Filter>,
 | 
			
		||||
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> {
 | 
			
		||||
    let (total_pages, pkgs) =
 | 
			
		||||
    let items =
 | 
			
		||||
        db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
    Ok(Json(pagination.res(total_pages, pkgs)))
 | 
			
		||||
    Ok(Json(pagination.res(items)))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn get_single_package(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,19 +1,19 @@
 | 
			
		|||
use serde::{Deserialize, Serialize};
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
#[serde(default)]
 | 
			
		||||
pub struct Query {
 | 
			
		||||
    #[serde(default = "default_page")]
 | 
			
		||||
    pub page: u64,
 | 
			
		||||
    #[serde(default = "default_per_page")]
 | 
			
		||||
    pub per_page: u64,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for Query {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        Query {
 | 
			
		||||
            page: 1,
 | 
			
		||||
            per_page: 25,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
fn default_page() -> u64 {
 | 
			
		||||
    1
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_per_page() -> u64 {
 | 
			
		||||
    25
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Serialize)]
 | 
			
		||||
| 
						 | 
				
			
			@ -23,21 +23,15 @@ where
 | 
			
		|||
{
 | 
			
		||||
    pub page: u64,
 | 
			
		||||
    pub per_page: u64,
 | 
			
		||||
    pub total_pages: u64,
 | 
			
		||||
    pub count: usize,
 | 
			
		||||
    pub items: Vec<T>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Query {
 | 
			
		||||
    pub fn res<T: for<'de> Serialize>(
 | 
			
		||||
        self,
 | 
			
		||||
        total_pages: u64,
 | 
			
		||||
        items: Vec<T>,
 | 
			
		||||
    ) -> PaginatedResponse<T> {
 | 
			
		||||
    pub fn res<T: for<'de> Serialize>(self, items: Vec<T>) -> PaginatedResponse<T> {
 | 
			
		||||
        PaginatedResponse {
 | 
			
		||||
            page: self.page,
 | 
			
		||||
            per_page: self.per_page,
 | 
			
		||||
            total_pages,
 | 
			
		||||
            count: items.len(),
 | 
			
		||||
            items,
 | 
			
		||||
        }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,92 +1,71 @@
 | 
			
		|||
use crate::{distro::MetaDistroMgr, Config, Global};
 | 
			
		||||
use crate::{Config, FsConfig, Global};
 | 
			
		||||
 | 
			
		||||
use std::{io, path::PathBuf, sync::Arc};
 | 
			
		||||
 | 
			
		||||
use axum::Router;
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use sea_orm_migration::MigratorTrait;
 | 
			
		||||
use std::{io, path::PathBuf};
 | 
			
		||||
use tower_http::trace::TraceLayer;
 | 
			
		||||
use tracing::debug;
 | 
			
		||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
 | 
			
		||||
 | 
			
		||||
#[derive(Parser)]
 | 
			
		||||
#[command(author, version, about, long_about = None)]
 | 
			
		||||
pub struct Cli {
 | 
			
		||||
    /// Directory where repository metadata & SQLite database is stored
 | 
			
		||||
    #[arg(env = "RIETER_DATA_DIR")]
 | 
			
		||||
    pub data_dir: PathBuf,
 | 
			
		||||
    /// API key to authenticate private routes with
 | 
			
		||||
    #[arg(env = "RIETER_API_KEY")]
 | 
			
		||||
    pub api_key: String,
 | 
			
		||||
 | 
			
		||||
    /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
 | 
			
		||||
    /// data directory
 | 
			
		||||
    #[arg(short, long, env = "RIETER_DATABASE_URL")]
 | 
			
		||||
    pub database_url: Option<String>,
 | 
			
		||||
    /// Port the server will listen on
 | 
			
		||||
    #[arg(
 | 
			
		||||
        short,
 | 
			
		||||
        long,
 | 
			
		||||
        value_name = "PORT",
 | 
			
		||||
        default_value_t = 8000,
 | 
			
		||||
        env = "RIETER_PORT"
 | 
			
		||||
        env = "RIETER_CONFIG_FILE",
 | 
			
		||||
        default_value = "./rieterd.toml"
 | 
			
		||||
    )]
 | 
			
		||||
    pub port: u16,
 | 
			
		||||
    /// Log levels for the tracing
 | 
			
		||||
    #[arg(
 | 
			
		||||
        long,
 | 
			
		||||
        value_name = "LOG_LEVEL",
 | 
			
		||||
        default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
 | 
			
		||||
        env = "RIETER_LOG"
 | 
			
		||||
    )]
 | 
			
		||||
    pub log: String,
 | 
			
		||||
    pub config_file: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Cli {
 | 
			
		||||
    pub fn init_tracing(&self) {
 | 
			
		||||
    pub async fn run(&self) -> crate::Result<()> {
 | 
			
		||||
        let config: Config = Config::figment(&self.config_file)
 | 
			
		||||
            .extract()
 | 
			
		||||
            .inspect_err(|e| tracing::error!("{}", e))?;
 | 
			
		||||
 | 
			
		||||
        tracing_subscriber::registry()
 | 
			
		||||
            .with(tracing_subscriber::EnvFilter::new(self.log.clone()))
 | 
			
		||||
            .with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
 | 
			
		||||
            .with(tracing_subscriber::fmt::layer())
 | 
			
		||||
            .init();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn run(&self) -> crate::Result<()> {
 | 
			
		||||
        self.init_tracing();
 | 
			
		||||
        tracing::info!("Connecting to database");
 | 
			
		||||
        let db = crate::db::connect(&config.db).await?;
 | 
			
		||||
 | 
			
		||||
        let db_url = if let Some(url) = &self.database_url {
 | 
			
		||||
            url.clone()
 | 
			
		||||
        } else {
 | 
			
		||||
            format!(
 | 
			
		||||
                "sqlite://{}?mode=rwc",
 | 
			
		||||
                self.data_dir.join("rieter.sqlite").to_string_lossy()
 | 
			
		||||
            )
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        debug!("Connecting to database with URL {}", db_url);
 | 
			
		||||
 | 
			
		||||
        let mut options = sea_orm::ConnectOptions::new(db_url);
 | 
			
		||||
        options.max_connections(16);
 | 
			
		||||
 | 
			
		||||
        let db = sea_orm::Database::connect(options).await?;
 | 
			
		||||
        crate::db::Migrator::up(&db, None).await?;
 | 
			
		||||
 | 
			
		||||
        debug!("Successfully applied migrations");
 | 
			
		||||
 | 
			
		||||
        let config = Config {
 | 
			
		||||
            data_dir: self.data_dir.clone(),
 | 
			
		||||
        let mgr = match &config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?;
 | 
			
		||||
        let mgr = Arc::new(mgr);
 | 
			
		||||
 | 
			
		||||
        let global = Global { config, mgr, db };
 | 
			
		||||
        for _ in 0..config.pkg_workers {
 | 
			
		||||
            let clone = Arc::clone(&mgr);
 | 
			
		||||
 | 
			
		||||
            tokio::spawn(async move { clone.pkg_parse_task().await });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let global = Global {
 | 
			
		||||
            config: config.clone(),
 | 
			
		||||
            mgr,
 | 
			
		||||
            db,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // build our application with a single route
 | 
			
		||||
        let app = Router::new()
 | 
			
		||||
            .nest("/api", crate::api::router())
 | 
			
		||||
            .merge(crate::repo::router(&self.api_key))
 | 
			
		||||
            .merge(crate::repo::router(&config.api_key))
 | 
			
		||||
            .with_state(global)
 | 
			
		||||
            .layer(TraceLayer::new_for_http());
 | 
			
		||||
 | 
			
		||||
        let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap();
 | 
			
		||||
        let domain: String = format!("{}:{}", config.domain, config.port)
 | 
			
		||||
            .parse()
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(domain).await?;
 | 
			
		||||
        // run it with hyper on localhost:3000
 | 
			
		||||
        Ok(axum::serve(listener, app.into_make_service())
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,88 @@
 | 
			
		|||
use std::path::{Path, PathBuf};
 | 
			
		||||
 | 
			
		||||
use figment::{
 | 
			
		||||
    providers::{Env, Format, Toml},
 | 
			
		||||
    Figment,
 | 
			
		||||
};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
#[serde(rename_all = "lowercase")]
 | 
			
		||||
#[serde(tag = "type")]
 | 
			
		||||
pub enum FsConfig {
 | 
			
		||||
    Local { data_dir: PathBuf },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
#[serde(rename_all = "lowercase")]
 | 
			
		||||
#[serde(tag = "type")]
 | 
			
		||||
pub enum DbConfig {
 | 
			
		||||
    Sqlite {
 | 
			
		||||
        db_dir: PathBuf,
 | 
			
		||||
        #[serde(default = "default_db_sqlite_max_connections")]
 | 
			
		||||
        max_connections: u32,
 | 
			
		||||
    },
 | 
			
		||||
    Postgres {
 | 
			
		||||
        host: String,
 | 
			
		||||
        #[serde(default = "default_db_postgres_port")]
 | 
			
		||||
        port: u16,
 | 
			
		||||
        user: String,
 | 
			
		||||
        password: String,
 | 
			
		||||
        db: String,
 | 
			
		||||
        #[serde(default)]
 | 
			
		||||
        schema: String,
 | 
			
		||||
        #[serde(default = "default_db_postgres_max_connections")]
 | 
			
		||||
        max_connections: u32,
 | 
			
		||||
    },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    pub api_key: String,
 | 
			
		||||
    #[serde(default = "default_domain")]
 | 
			
		||||
    pub domain: String,
 | 
			
		||||
    #[serde(default = "default_port")]
 | 
			
		||||
    pub port: u16,
 | 
			
		||||
    #[serde(default = "default_log_level")]
 | 
			
		||||
    pub log_level: String,
 | 
			
		||||
    pub fs: FsConfig,
 | 
			
		||||
    pub db: DbConfig,
 | 
			
		||||
    #[serde(default = "default_pkg_workers")]
 | 
			
		||||
    pub pkg_workers: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Config {
 | 
			
		||||
    pub fn figment(config_file: impl AsRef<Path>) -> Figment {
 | 
			
		||||
        Figment::new()
 | 
			
		||||
            .merge(Toml::file(config_file))
 | 
			
		||||
            .merge(Env::prefixed("RIETER_"))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_domain() -> String {
 | 
			
		||||
    String::from("0.0.0.0")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_port() -> u16 {
 | 
			
		||||
    8000
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_log_level() -> String {
 | 
			
		||||
    String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_sqlite_max_connections() -> u32 {
 | 
			
		||||
    16
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_postgres_port() -> u16 {
 | 
			
		||||
    5432
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_db_postgres_max_connections() -> u32 {
 | 
			
		||||
    16
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_pkg_workers() -> u32 {
 | 
			
		||||
    1
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -4,6 +4,8 @@ use chrono::NaiveDateTime;
 | 
			
		|||
use sea_orm::entity::prelude::*;
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
 | 
			
		||||
use crate::db::PackageState;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
 | 
			
		||||
#[sea_orm(table_name = "package")]
 | 
			
		||||
pub struct Model {
 | 
			
		||||
| 
						 | 
				
			
			@ -24,6 +26,8 @@ pub struct Model {
 | 
			
		|||
    pub pgp_sig_size: Option<i64>,
 | 
			
		||||
    pub sha256_sum: String,
 | 
			
		||||
    pub compression: String,
 | 
			
		||||
    #[serde(skip_serializing)]
 | 
			
		||||
    pub state: PackageState,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -81,7 +81,12 @@ impl MigrationTrait for Migration {
 | 
			
		|||
                    .col(ColumnDef::new(Package::PgpSig).string_len(255))
 | 
			
		||||
                    .col(ColumnDef::new(Package::PgpSigSize).big_integer())
 | 
			
		||||
                    .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
 | 
			
		||||
                    .col(ColumnDef::new(Package::Compression).string_len(16).not_null())
 | 
			
		||||
                    .col(
 | 
			
		||||
                        ColumnDef::new(Package::Compression)
 | 
			
		||||
                            .string_len(16)
 | 
			
		||||
                            .not_null(),
 | 
			
		||||
                    )
 | 
			
		||||
                    .col(ColumnDef::new(Package::State).integer().not_null())
 | 
			
		||||
                    .foreign_key(
 | 
			
		||||
                        ForeignKey::create()
 | 
			
		||||
                            .name("fk-package-repo_id")
 | 
			
		||||
| 
						 | 
				
			
			@ -264,6 +269,7 @@ pub enum Package {
 | 
			
		|||
    PgpSigSize,
 | 
			
		||||
    Sha256Sum,
 | 
			
		||||
    Compression,
 | 
			
		||||
    State,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Iden)]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,10 +2,12 @@ pub mod entities;
 | 
			
		|||
mod migrator;
 | 
			
		||||
pub mod query;
 | 
			
		||||
 | 
			
		||||
use crate::config::DbConfig;
 | 
			
		||||
 | 
			
		||||
pub use entities::{prelude::*, *};
 | 
			
		||||
pub use migrator::Migrator;
 | 
			
		||||
 | 
			
		||||
use sea_orm::{DeriveActiveEnum, EnumIter};
 | 
			
		||||
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
 | 
			
		||||
| 
						 | 
				
			
			@ -30,6 +32,17 @@ pub enum PackageRelatedEnum {
 | 
			
		|||
    Optdepend,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
 | 
			
		||||
#[sea_orm(rs_type = "i32", db_type = "Integer")]
 | 
			
		||||
pub enum PackageState {
 | 
			
		||||
    #[sea_orm(num_value = 0)]
 | 
			
		||||
    PendingCommit,
 | 
			
		||||
    #[sea_orm(num_value = 1)]
 | 
			
		||||
    Committed,
 | 
			
		||||
    #[sea_orm(num_value = 2)]
 | 
			
		||||
    PendingDeletion,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Serialize)]
 | 
			
		||||
pub struct FullPackage {
 | 
			
		||||
    #[serde(flatten)]
 | 
			
		||||
| 
						 | 
				
			
			@ -39,3 +52,50 @@ pub struct FullPackage {
 | 
			
		|||
    related: Vec<(PackageRelatedEnum, String)>,
 | 
			
		||||
    files: Vec<String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
 | 
			
		||||
    match conn {
 | 
			
		||||
        DbConfig::Sqlite {
 | 
			
		||||
            db_dir,
 | 
			
		||||
            max_connections,
 | 
			
		||||
        } => {
 | 
			
		||||
            let url = format!(
 | 
			
		||||
                "sqlite://{}?mode=rwc",
 | 
			
		||||
                db_dir.join("rieter.sqlite").to_string_lossy()
 | 
			
		||||
            );
 | 
			
		||||
            let options = sea_orm::ConnectOptions::new(url)
 | 
			
		||||
                .max_connections(*max_connections)
 | 
			
		||||
                .to_owned();
 | 
			
		||||
 | 
			
		||||
            let conn = Database::connect(options).await?;
 | 
			
		||||
 | 
			
		||||
            // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
 | 
			
		||||
            // https://www.sqlite.org/pragma.html#pragma_synchronous
 | 
			
		||||
            conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
 | 
			
		||||
            conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
 | 
			
		||||
                .await?;
 | 
			
		||||
 | 
			
		||||
            Ok(conn)
 | 
			
		||||
        }
 | 
			
		||||
        DbConfig::Postgres {
 | 
			
		||||
            host,
 | 
			
		||||
            port,
 | 
			
		||||
            db,
 | 
			
		||||
            user,
 | 
			
		||||
            password,
 | 
			
		||||
            schema,
 | 
			
		||||
            max_connections,
 | 
			
		||||
        } => {
 | 
			
		||||
            let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
 | 
			
		||||
 | 
			
		||||
            if schema != "" {
 | 
			
		||||
                url = format!("{url}?currentSchema={schema}");
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let options = sea_orm::ConnectOptions::new(url)
 | 
			
		||||
                .max_connections(*max_connections)
 | 
			
		||||
                .to_owned();
 | 
			
		||||
            Ok(Database::connect(options).await?)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,15 +21,14 @@ pub async fn page(
 | 
			
		|||
    per_page: u64,
 | 
			
		||||
    page: u64,
 | 
			
		||||
    filter: Filter,
 | 
			
		||||
) -> Result<(u64, Vec<distro::Model>)> {
 | 
			
		||||
) -> Result<Vec<distro::Model>> {
 | 
			
		||||
    let paginator = Distro::find()
 | 
			
		||||
        .filter(filter)
 | 
			
		||||
        .order_by_asc(distro::Column::Id)
 | 
			
		||||
        .paginate(conn, per_page);
 | 
			
		||||
    let repos = paginator.fetch_page(page).await?;
 | 
			
		||||
    let total_pages = paginator.num_pages().await?;
 | 
			
		||||
 | 
			
		||||
    Ok((total_pages, repos))
 | 
			
		||||
    Ok(repos)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<distro::Model>> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,8 @@
 | 
			
		|||
use crate::db::{self, *};
 | 
			
		||||
 | 
			
		||||
use futures::Stream;
 | 
			
		||||
use sea_orm::{sea_query::IntoCondition, *};
 | 
			
		||||
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize)]
 | 
			
		||||
| 
						 | 
				
			
			@ -15,10 +17,7 @@ impl IntoCondition for Filter {
 | 
			
		|||
        Condition::all()
 | 
			
		||||
            .add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo)))
 | 
			
		||||
            .add_option(self.arch.map(|arch| package::Column::Arch.eq(arch)))
 | 
			
		||||
            .add_option(
 | 
			
		||||
                self.name
 | 
			
		||||
                    .map(|name| package::Column::Name.like(format!("%{}%", name))),
 | 
			
		||||
            )
 | 
			
		||||
            .add_option(self.name.map(|name| package::Column::Name.contains(name)))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -27,15 +26,29 @@ pub async fn page(
 | 
			
		|||
    per_page: u64,
 | 
			
		||||
    page: u64,
 | 
			
		||||
    filter: Filter,
 | 
			
		||||
) -> super::Result<(u64, Vec<package::Model>)> {
 | 
			
		||||
    let paginator = Package::find()
 | 
			
		||||
        .filter(filter)
 | 
			
		||||
        .order_by_asc(package::Column::Id)
 | 
			
		||||
        .paginate(conn, per_page);
 | 
			
		||||
    let packages = paginator.fetch_page(page).await?;
 | 
			
		||||
    let total_pages = paginator.num_pages().await?;
 | 
			
		||||
) -> crate::Result<Vec<package::Model>> {
 | 
			
		||||
    let p2 = Alias::new("p2");
 | 
			
		||||
    let query = Query::select()
 | 
			
		||||
        .columns(db::package::Column::iter().map(|c| (db::package::Entity, c)))
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_pkg_ids_query(true),
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Expr::col((db::package::Entity, db::package::Column::Id))
 | 
			
		||||
                .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
        )
 | 
			
		||||
        .cond_where(filter)
 | 
			
		||||
        .order_by((db::package::Entity, db::package::Column::Id), Order::Asc)
 | 
			
		||||
        .to_owned();
 | 
			
		||||
    let builder = conn.get_database_backend();
 | 
			
		||||
    let sql = builder.build(&query);
 | 
			
		||||
 | 
			
		||||
    Ok((total_pages, packages))
 | 
			
		||||
    Ok(db::Package::find()
 | 
			
		||||
        .from_raw_sql(sql)
 | 
			
		||||
        .paginate(conn, per_page)
 | 
			
		||||
        .fetch_page(page)
 | 
			
		||||
        .await?)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<package::Model>> {
 | 
			
		||||
| 
						 | 
				
			
			@ -68,9 +81,17 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result
 | 
			
		|||
        .await
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> {
 | 
			
		||||
pub async fn insert(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    repo_id: i32,
 | 
			
		||||
    pkg: crate::repo::package::Package,
 | 
			
		||||
) -> Result<package::Model> {
 | 
			
		||||
    let info = pkg.info;
 | 
			
		||||
 | 
			
		||||
    // Doing this manually is not the recommended way, but the generic error type of the
 | 
			
		||||
    // transaction function didn't play well with my current error handling
 | 
			
		||||
    let txn = conn.begin().await?;
 | 
			
		||||
 | 
			
		||||
    let model = package::ActiveModel {
 | 
			
		||||
        id: NotSet,
 | 
			
		||||
        repo_id: Set(repo_id),
 | 
			
		||||
| 
						 | 
				
			
			@ -88,9 +109,10 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
 | 
			
		|||
        pgp_sig_size: Set(info.pgpsigsize),
 | 
			
		||||
        sha256_sum: Set(info.sha256sum),
 | 
			
		||||
        compression: Set(pkg.compression.extension().unwrap().to_string()),
 | 
			
		||||
        state: Set(PackageState::PendingCommit),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let pkg_entry = model.insert(conn).await?;
 | 
			
		||||
    let pkg_entry = model.insert(&txn).await?;
 | 
			
		||||
 | 
			
		||||
    // Insert all the related tables
 | 
			
		||||
    PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
 | 
			
		||||
| 
						 | 
				
			
			@ -98,7 +120,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
 | 
			
		|||
        name: Set(s.to_string()),
 | 
			
		||||
    }))
 | 
			
		||||
    .on_empty_do_nothing()
 | 
			
		||||
    .exec(conn)
 | 
			
		||||
    .exec(&txn)
 | 
			
		||||
    .await?;
 | 
			
		||||
 | 
			
		||||
    PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
 | 
			
		||||
| 
						 | 
				
			
			@ -106,7 +128,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
 | 
			
		|||
        name: Set(s.to_string()),
 | 
			
		||||
    }))
 | 
			
		||||
    .on_empty_do_nothing()
 | 
			
		||||
    .exec(conn)
 | 
			
		||||
    .exec(&txn)
 | 
			
		||||
    .await?;
 | 
			
		||||
 | 
			
		||||
    let related = info
 | 
			
		||||
| 
						 | 
				
			
			@ -146,7 +168,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
 | 
			
		|||
        name: Set(s.to_string()),
 | 
			
		||||
    }))
 | 
			
		||||
    .on_empty_do_nothing()
 | 
			
		||||
    .exec(conn)
 | 
			
		||||
    .exec(&txn)
 | 
			
		||||
    .await?;
 | 
			
		||||
 | 
			
		||||
    PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
 | 
			
		||||
| 
						 | 
				
			
			@ -154,10 +176,12 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
 | 
			
		|||
        path: Set(s.display().to_string()),
 | 
			
		||||
    }))
 | 
			
		||||
    .on_empty_do_nothing()
 | 
			
		||||
    .exec(conn)
 | 
			
		||||
    .exec(&txn)
 | 
			
		||||
    .await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
    txn.commit().await?;
 | 
			
		||||
 | 
			
		||||
    Ok(pkg_entry)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
 | 
			
		||||
| 
						 | 
				
			
			@ -202,3 +226,138 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
 | 
			
		|||
        Ok(None)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(FromQueryResult)]
 | 
			
		||||
pub struct PkgToRemove {
 | 
			
		||||
    pub repo_id: i32,
 | 
			
		||||
    pub id: i32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
 | 
			
		||||
    let mut query = Query::select()
 | 
			
		||||
        .from(db::package::Entity)
 | 
			
		||||
        .columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
 | 
			
		||||
        .group_by_columns([
 | 
			
		||||
            db::package::Column::RepoId,
 | 
			
		||||
            db::package::Column::Arch,
 | 
			
		||||
            db::package::Column::Name,
 | 
			
		||||
        ])
 | 
			
		||||
        .to_owned();
 | 
			
		||||
 | 
			
		||||
    if committed {
 | 
			
		||||
        query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    query
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Query that returns all packages that should be included in a sync for the given repository and
 | 
			
		||||
/// arch.
 | 
			
		||||
pub fn pkgs_to_sync(
 | 
			
		||||
    conn: &DbConn,
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    arch: &str,
 | 
			
		||||
) -> SelectorRaw<SelectModel<package::Model>> {
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let query = Query::select()
 | 
			
		||||
        .columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::InnerJoin,
 | 
			
		||||
            max_pkg_ids_query(false),
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
        )
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::all()
 | 
			
		||||
                .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Arch))
 | 
			
		||||
                        .is_in([arch, crate::ANY_ARCH]),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .ne(db::PackageState::PendingDeletion),
 | 
			
		||||
                ),
 | 
			
		||||
        )
 | 
			
		||||
        .to_owned();
 | 
			
		||||
    let builder = conn.get_database_backend();
 | 
			
		||||
    let sql = builder.build(&query);
 | 
			
		||||
 | 
			
		||||
    db::Package::find().from_raw_sql(sql)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
 | 
			
		||||
    let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
 | 
			
		||||
    let mut query = Query::select()
 | 
			
		||||
        .from_as(db::package::Entity, p1.clone())
 | 
			
		||||
        .to_owned();
 | 
			
		||||
 | 
			
		||||
    if include_repo {
 | 
			
		||||
        query.columns([
 | 
			
		||||
            (p1.clone(), db::package::Column::RepoId),
 | 
			
		||||
            (p1.clone(), db::package::Column::Id),
 | 
			
		||||
        ]);
 | 
			
		||||
    } else {
 | 
			
		||||
        query.column((p1.clone(), db::package::Column::Id));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // We left join on the max pkgs query because a repository that has all its packages set to
 | 
			
		||||
    // "pending deletion" doesn't show up in the query. These are also included with a where clause
 | 
			
		||||
    // on the joined rows.
 | 
			
		||||
    query
 | 
			
		||||
        .join_subquery(
 | 
			
		||||
            JoinType::LeftJoin,
 | 
			
		||||
            max_pkg_ids_query(true),
 | 
			
		||||
            p2.clone(),
 | 
			
		||||
            Condition::all()
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::RepoId))
 | 
			
		||||
                        .eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Arch))
 | 
			
		||||
                        .eq(Expr::col((p2.clone(), db::package::Column::Arch))),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Name))
 | 
			
		||||
                        .eq(Expr::col((p2.clone(), db::package::Column::Name))),
 | 
			
		||||
                ),
 | 
			
		||||
        )
 | 
			
		||||
        .cond_where(
 | 
			
		||||
            Condition::any()
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::Id))
 | 
			
		||||
                        .lt(Expr::col((p2.clone(), Alias::new("max_id")))),
 | 
			
		||||
                )
 | 
			
		||||
                .add(
 | 
			
		||||
                    Expr::col((p1.clone(), db::package::Column::State))
 | 
			
		||||
                        .eq(db::PackageState::PendingDeletion),
 | 
			
		||||
                ),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
    query
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
 | 
			
		||||
    let query = stale_pkgs_query(true);
 | 
			
		||||
    let builder = conn.get_database_backend();
 | 
			
		||||
    let sql = builder.build(&query);
 | 
			
		||||
 | 
			
		||||
    PkgToRemove::find_by_statement(sql)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
 | 
			
		||||
    Ok(db::Package::delete_many()
 | 
			
		||||
        .filter(db::package::Column::Id.lte(max_id))
 | 
			
		||||
        .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
 | 
			
		||||
        .exec(conn)
 | 
			
		||||
        .await
 | 
			
		||||
        .map(|_| ())?)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,15 +21,14 @@ pub async fn page(
 | 
			
		|||
    per_page: u64,
 | 
			
		||||
    page: u64,
 | 
			
		||||
    filter: Filter,
 | 
			
		||||
) -> Result<(u64, Vec<repo::Model>)> {
 | 
			
		||||
) -> Result<Vec<repo::Model>> {
 | 
			
		||||
    let paginator = Repo::find()
 | 
			
		||||
        .filter(filter)
 | 
			
		||||
        .order_by_asc(repo::Column::Id)
 | 
			
		||||
        .paginate(conn, per_page);
 | 
			
		||||
    let repos = paginator.fetch_page(page).await?;
 | 
			
		||||
    let total_pages = paginator.num_pages().await?;
 | 
			
		||||
 | 
			
		||||
    Ok((total_pages, repos))
 | 
			
		||||
    Ok(repos)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<repo::Model>> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,70 +0,0 @@
 | 
			
		|||
use crate::{db, DistroMgr};
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
    sync::Arc,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use sea_orm::{DbConn, EntityTrait};
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct MetaDistroMgr {
 | 
			
		||||
    distro_dir: PathBuf,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    distros: Arc<Mutex<HashMap<String, Arc<DistroMgr>>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MetaDistroMgr {
 | 
			
		||||
    pub async fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> crate::Result<Self> {
 | 
			
		||||
        if !tokio::fs::try_exists(&distro_dir).await? {
 | 
			
		||||
            tokio::fs::create_dir(&distro_dir).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let distro_dir = distro_dir.as_ref().to_path_buf();
 | 
			
		||||
        let mut map: HashMap<String, Arc<DistroMgr>> = HashMap::new();
 | 
			
		||||
 | 
			
		||||
        let distros = db::Distro::find().all(&conn).await?;
 | 
			
		||||
 | 
			
		||||
        for distro in distros {
 | 
			
		||||
            let mgr =
 | 
			
		||||
                DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?;
 | 
			
		||||
            map.insert(distro.name, Arc::new(mgr));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            distro_dir,
 | 
			
		||||
            conn,
 | 
			
		||||
            distros: Arc::new(Mutex::new(map)),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_mgr(&self, distro: &str) -> Option<Arc<DistroMgr>> {
 | 
			
		||||
        let map = self.distros.lock().await;
 | 
			
		||||
 | 
			
		||||
        map.get(distro).map(|mgr| Arc::clone(mgr))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<DistroMgr>> {
 | 
			
		||||
        let mut map = self.distros.lock().await;
 | 
			
		||||
 | 
			
		||||
        if let Some(mgr) = map.get(distro) {
 | 
			
		||||
            Ok(Arc::clone(mgr))
 | 
			
		||||
        } else {
 | 
			
		||||
            let distro = db::query::distro::insert(&self.conn, distro, None).await?;
 | 
			
		||||
 | 
			
		||||
            let mgr = Arc::new(
 | 
			
		||||
                DistroMgr::new(
 | 
			
		||||
                    self.distro_dir.join(&distro.name),
 | 
			
		||||
                    distro.id,
 | 
			
		||||
                    self.conn.clone(),
 | 
			
		||||
                )
 | 
			
		||||
                .await?,
 | 
			
		||||
            );
 | 
			
		||||
            map.insert(distro.name, Arc::clone(&mgr));
 | 
			
		||||
 | 
			
		||||
            Ok(mgr)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -14,6 +14,8 @@ pub enum ServerError {
 | 
			
		|||
    Db(sea_orm::DbErr),
 | 
			
		||||
    Status(StatusCode),
 | 
			
		||||
    Archive(libarchive::error::ArchiveError),
 | 
			
		||||
    Figment(figment::Error),
 | 
			
		||||
    Unit,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl fmt::Display for ServerError {
 | 
			
		||||
| 
						 | 
				
			
			@ -24,6 +26,8 @@ impl fmt::Display for ServerError {
 | 
			
		|||
            ServerError::Status(status) => write!(fmt, "{}", status),
 | 
			
		||||
            ServerError::Db(err) => write!(fmt, "{}", err),
 | 
			
		||||
            ServerError::Archive(err) => write!(fmt, "{}", err),
 | 
			
		||||
            ServerError::Figment(err) => write!(fmt, "{}", err),
 | 
			
		||||
            ServerError::Unit => Ok(()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -41,9 +45,10 @@ impl IntoResponse for ServerError {
 | 
			
		|||
            ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
 | 
			
		||||
                StatusCode::NOT_FOUND.into_response()
 | 
			
		||||
            }
 | 
			
		||||
            ServerError::Db(_) | ServerError::Archive(_) => {
 | 
			
		||||
                StatusCode::INTERNAL_SERVER_ERROR.into_response()
 | 
			
		||||
            }
 | 
			
		||||
            ServerError::Db(_)
 | 
			
		||||
            | ServerError::Archive(_)
 | 
			
		||||
            | ServerError::Figment(_)
 | 
			
		||||
            | ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -83,3 +88,9 @@ impl From<libarchive::error::ArchiveError> for ServerError {
 | 
			
		|||
        ServerError::Archive(err)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<figment::Error> for ServerError {
 | 
			
		||||
    fn from(err: figment::Error) -> Self {
 | 
			
		||||
        ServerError::Figment(err)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,25 +1,23 @@
 | 
			
		|||
mod api;
 | 
			
		||||
mod cli;
 | 
			
		||||
mod config;
 | 
			
		||||
pub mod db;
 | 
			
		||||
mod distro;
 | 
			
		||||
mod error;
 | 
			
		||||
mod repo;
 | 
			
		||||
 | 
			
		||||
pub use config::{Config, DbConfig, FsConfig};
 | 
			
		||||
pub use error::{Result, ServerError};
 | 
			
		||||
use repo::DistroMgr;
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    data_dir: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Global {
 | 
			
		||||
    config: Config,
 | 
			
		||||
    mgr: distro::MetaDistroMgr,
 | 
			
		||||
    config: crate::config::Config,
 | 
			
		||||
    mgr: Arc<repo::RepoMgr>,
 | 
			
		||||
    db: sea_orm::DbConn,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,71 +1,107 @@
 | 
			
		|||
use super::{archive, package};
 | 
			
		||||
use crate::{db, error::Result};
 | 
			
		||||
use crate::db::{self, query::package::delete_stale_pkgs};
 | 
			
		||||
 | 
			
		||||
use std::path::{Path, PathBuf};
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::HashMap,
 | 
			
		||||
    path::{Path, PathBuf},
 | 
			
		||||
    sync::{
 | 
			
		||||
        atomic::{AtomicU32, Ordering},
 | 
			
		||||
        Arc,
 | 
			
		||||
    },
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
 | 
			
		||||
use tokio::io::AsyncRead;
 | 
			
		||||
use sea_orm::{
 | 
			
		||||
    ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
 | 
			
		||||
    ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
 | 
			
		||||
};
 | 
			
		||||
use sea_query::{Alias, Expr, Query};
 | 
			
		||||
use tokio::sync::{
 | 
			
		||||
    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
 | 
			
		||||
    Mutex, RwLock,
 | 
			
		||||
};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
pub const ANY_ARCH: &'static str = "any";
 | 
			
		||||
 | 
			
		||||
pub struct DistroMgr {
 | 
			
		||||
    distro_dir: PathBuf,
 | 
			
		||||
    distro_id: i32,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
struct PkgQueueMsg {
 | 
			
		||||
    repo: i32,
 | 
			
		||||
    path: PathBuf,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl DistroMgr {
 | 
			
		||||
    pub async fn new<P: AsRef<Path>>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result<Self> {
 | 
			
		||||
        if !tokio::fs::try_exists(&distro_dir).await? {
 | 
			
		||||
            tokio::fs::create_dir(&distro_dir).await?;
 | 
			
		||||
/// A single instance of this struct orchestrates everything related to managing packages files on
 | 
			
		||||
/// disk for all repositories in the server
 | 
			
		||||
pub struct RepoMgr {
 | 
			
		||||
    repos_dir: PathBuf,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    pkg_queue: (
 | 
			
		||||
        UnboundedSender<PkgQueueMsg>,
 | 
			
		||||
        Mutex<UnboundedReceiver<PkgQueueMsg>>,
 | 
			
		||||
    ),
 | 
			
		||||
    repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RepoMgr {
 | 
			
		||||
    pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
 | 
			
		||||
        if !tokio::fs::try_exists(&repos_dir).await? {
 | 
			
		||||
            tokio::fs::create_dir(&repos_dir).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let (tx, rx) = unbounded_channel();
 | 
			
		||||
 | 
			
		||||
        let mut repos = HashMap::new();
 | 
			
		||||
        let repo_ids: Vec<i32> = db::Repo::find()
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(&conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        for id in repo_ids {
 | 
			
		||||
            repos.insert(id, Default::default());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            distro_dir: distro_dir.as_ref().to_path_buf(),
 | 
			
		||||
            distro_id,
 | 
			
		||||
            repos_dir: repos_dir.as_ref().to_path_buf(),
 | 
			
		||||
            conn,
 | 
			
		||||
            pkg_queue: (tx, Mutex::new(rx)),
 | 
			
		||||
            repos: RwLock::new(repos),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate archive databases for all known architectures in the repository, including the
 | 
			
		||||
    /// "any" architecture.
 | 
			
		||||
    pub async fn generate_archives_all(&self, repo: &str) -> Result<()> {
 | 
			
		||||
        let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
 | 
			
		||||
    pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        let lock = self
 | 
			
		||||
            .repos
 | 
			
		||||
            .read()
 | 
			
		||||
            .await
 | 
			
		||||
            .get(&repo)
 | 
			
		||||
            .map(|(_, lock)| Arc::clone(lock));
 | 
			
		||||
 | 
			
		||||
        if repo.is_none() {
 | 
			
		||||
        if lock.is_none() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let repo = repo.unwrap();
 | 
			
		||||
        let lock = lock.unwrap();
 | 
			
		||||
        let _guard = lock.lock().await;
 | 
			
		||||
 | 
			
		||||
        let mut archs = repo
 | 
			
		||||
            .find_related(crate::db::Package)
 | 
			
		||||
        let archs: Vec<String> = db::Package::find()
 | 
			
		||||
            .filter(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(crate::db::package::Column::Arch)
 | 
			
		||||
            .column(db::package::Column::Arch)
 | 
			
		||||
            .distinct()
 | 
			
		||||
            .into_tuple::<String>()
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .all(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        while let Some(arch) = archs.next().await.transpose()? {
 | 
			
		||||
            self.generate_archives(&repo.name, &arch).await?;
 | 
			
		||||
        for arch in archs {
 | 
			
		||||
            self.generate_archives(repo, &arch).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Generate the archive databases for the given repository and architecture.
 | 
			
		||||
    pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> {
 | 
			
		||||
        let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
 | 
			
		||||
 | 
			
		||||
        if repo.is_none() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let repo = repo.unwrap();
 | 
			
		||||
 | 
			
		||||
    async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
 | 
			
		||||
            self.random_file_paths();
 | 
			
		||||
        let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
 | 
			
		||||
| 
						 | 
				
			
			@ -73,13 +109,15 @@ impl DistroMgr {
 | 
			
		|||
 | 
			
		||||
        // Query all packages in the repo that have the given architecture or the "any"
 | 
			
		||||
        // architecture
 | 
			
		||||
        let mut pkgs = repo
 | 
			
		||||
            .find_related(crate::db::Package)
 | 
			
		||||
            .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
 | 
			
		||||
        let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let mut commited_ids: Vec<i32> = Vec::new();
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            commited_ids.push(pkg.id);
 | 
			
		||||
 | 
			
		||||
            let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
 | 
			
		||||
            let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -103,7 +141,7 @@ impl DistroMgr {
 | 
			
		|||
        ar_db.close().await?;
 | 
			
		||||
        ar_files.close().await?;
 | 
			
		||||
 | 
			
		||||
        let repo_dir = self.distro_dir.join(&repo.name);
 | 
			
		||||
        let repo_dir = self.repos_dir.join(repo.to_string());
 | 
			
		||||
 | 
			
		||||
        // Move the db archives to their respective places
 | 
			
		||||
        tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
 | 
			
		||||
| 
						 | 
				
			
			@ -113,176 +151,235 @@ impl DistroMgr {
 | 
			
		|||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
        // Only after we have successfully written everything to disk do we update the database.
 | 
			
		||||
        // This order ensures any failure can be recovered, as the database is our single source of
 | 
			
		||||
        // truth.
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::Committed),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(db::package::Column::Id.is_in(commited_ids))
 | 
			
		||||
            .exec(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        // If this fails there's no point in failing the function + if there were no packages in
 | 
			
		||||
        // the repo, this fails anyway because the temp file doesn't exist
 | 
			
		||||
        let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
 | 
			
		||||
        let _ = tokio::fs::remove_file(files_tmp_file_path).await;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove the repo with the given name, if it existed
 | 
			
		||||
    pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
 | 
			
		||||
        let res = db::query::repo::by_name(&self.conn, repo).await?;
 | 
			
		||||
 | 
			
		||||
        if let Some(repo_entry) = res {
 | 
			
		||||
            // Remove repository from database
 | 
			
		||||
            repo_entry.delete(&self.conn).await?;
 | 
			
		||||
 | 
			
		||||
            // Remove files from file system
 | 
			
		||||
            tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?;
 | 
			
		||||
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(false)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove all packages from the repository with the given arch.
 | 
			
		||||
    pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
 | 
			
		||||
        let repo = db::query::repo::by_name(&self.conn, repo).await?;
 | 
			
		||||
 | 
			
		||||
        if let Some(repo) = repo {
 | 
			
		||||
            let mut pkgs = repo
 | 
			
		||||
                .find_related(db::Package)
 | 
			
		||||
                .filter(db::package::Column::Arch.eq(arch))
 | 
			
		||||
                .stream(&self.conn)
 | 
			
		||||
                .await?;
 | 
			
		||||
 | 
			
		||||
            while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
                let path = self
 | 
			
		||||
                    .distro_dir
 | 
			
		||||
                    .join(&repo.name)
 | 
			
		||||
                    .join(super::package::filename(&pkg));
 | 
			
		||||
                tokio::fs::remove_file(path).await?;
 | 
			
		||||
 | 
			
		||||
                pkg.delete(&self.conn).await?;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            tokio::fs::remove_file(
 | 
			
		||||
                self.distro_dir
 | 
			
		||||
                    .join(&repo.name)
 | 
			
		||||
                    .join(format!("{}.db.tar.gz", arch)),
 | 
			
		||||
            )
 | 
			
		||||
            .await?;
 | 
			
		||||
            tokio::fs::remove_file(
 | 
			
		||||
                self.distro_dir
 | 
			
		||||
                    .join(&repo.name)
 | 
			
		||||
                    .join(format!("{}.files.tar.gz", arch)),
 | 
			
		||||
            )
 | 
			
		||||
    /// Clean any remaining old package files from the database and file system
 | 
			
		||||
    pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
 | 
			
		||||
        let mut pkgs = db::query::package::stale_pkgs(&self.conn)
 | 
			
		||||
            .stream(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
            // If we removed all "any" packages, we need to resync all databases
 | 
			
		||||
            if arch == ANY_ARCH {
 | 
			
		||||
                self.generate_archives_all(&repo.name).await?;
 | 
			
		||||
        // Ids are monotonically increasing, so the max id suffices to know which packages to
 | 
			
		||||
        // remove later
 | 
			
		||||
        let mut max_id = -1;
 | 
			
		||||
        let mut removed_pkgs = 0;
 | 
			
		||||
 | 
			
		||||
        while let Some(pkg) = pkgs.next().await.transpose()? {
 | 
			
		||||
            // Failing to remove the package file isn't the biggest problem
 | 
			
		||||
            let _ = tokio::fs::remove_file(
 | 
			
		||||
                self.repos_dir
 | 
			
		||||
                    .join(pkg.repo_id.to_string())
 | 
			
		||||
                    .join(pkg.id.to_string()),
 | 
			
		||||
            )
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
            if pkg.id > max_id {
 | 
			
		||||
                max_id = pkg.id;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            Ok(true)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(false)
 | 
			
		||||
            removed_pkgs += 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if removed_pkgs > 0 {
 | 
			
		||||
            db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!("Removed {removed_pkgs} stale package(s)");
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> {
 | 
			
		||||
        let repo = db::query::repo::by_name(&self.conn, repo).await?;
 | 
			
		||||
    pub async fn pkg_parse_task(&self) {
 | 
			
		||||
        loop {
 | 
			
		||||
            // Receive the next message and immediately drop the mutex afterwards. As long as the
 | 
			
		||||
            // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
 | 
			
		||||
            // as soon as a message is received, so another worker can pick up the mutex.
 | 
			
		||||
            let msg = {
 | 
			
		||||
                let mut recv = self.pkg_queue.1.lock().await;
 | 
			
		||||
                recv.recv().await
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
        if let Some(repo) = repo {
 | 
			
		||||
            let pkg =
 | 
			
		||||
                db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?;
 | 
			
		||||
            if let Some(msg) = msg {
 | 
			
		||||
                // TODO better handle this error (retry if failure wasn't because the package is
 | 
			
		||||
                // faulty)
 | 
			
		||||
                let _ = self
 | 
			
		||||
                    .add_pkg_from_path(msg.path, msg.repo)
 | 
			
		||||
                    .await
 | 
			
		||||
                    .inspect_err(|e| tracing::error!("{:?}", e));
 | 
			
		||||
 | 
			
		||||
            if let Some(pkg) = pkg {
 | 
			
		||||
                // Remove package from database & file system
 | 
			
		||||
                tokio::fs::remove_file(
 | 
			
		||||
                    self.distro_dir
 | 
			
		||||
                        .join(&repo.name)
 | 
			
		||||
                        .join(super::package::filename(&pkg)),
 | 
			
		||||
                )
 | 
			
		||||
                .await?;
 | 
			
		||||
                pkg.delete(&self.conn).await?;
 | 
			
		||||
                let old = self
 | 
			
		||||
                    .repos
 | 
			
		||||
                    .read()
 | 
			
		||||
                    .await
 | 
			
		||||
                    .get(&msg.repo)
 | 
			
		||||
                    .map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
 | 
			
		||||
 | 
			
		||||
                if arch == ANY_ARCH {
 | 
			
		||||
                    self.generate_archives_all(&repo.name).await?;
 | 
			
		||||
                } else {
 | 
			
		||||
                    self.generate_archives(&repo.name, arch).await?;
 | 
			
		||||
                // Every time the queue for a repo becomes empty, we run a sync job
 | 
			
		||||
                if old == Some(1) {
 | 
			
		||||
                    // TODO error handling
 | 
			
		||||
                    let _ = self.sync_repo(msg.repo).await;
 | 
			
		||||
 | 
			
		||||
                    // TODO move this so that we only clean if entire queue is empty, not just
 | 
			
		||||
                    // queue for specific repo
 | 
			
		||||
                    let _ = self.remove_stale_pkgs().await;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                Ok(true)
 | 
			
		||||
            } else {
 | 
			
		||||
                Ok(false)
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(false)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
 | 
			
		||||
        &self,
 | 
			
		||||
        reader: &mut R,
 | 
			
		||||
        repo: &str,
 | 
			
		||||
    ) -> crate::Result<(String, String, String)> {
 | 
			
		||||
        let [path] = self.random_file_paths();
 | 
			
		||||
        let mut temp_file = tokio::fs::File::create(&path).await?;
 | 
			
		||||
    pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
 | 
			
		||||
        self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
 | 
			
		||||
        self.repos.read().await.get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        tokio::io::copy(reader, &mut temp_file).await?;
 | 
			
		||||
    pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
 | 
			
		||||
        Ok(db::Repo::find()
 | 
			
		||||
            .find_also_related(db::Distro)
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::repo::Column::Name.eq(repo))
 | 
			
		||||
                    .add(db::distro::Column::Name.eq(distro)),
 | 
			
		||||
            )
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await
 | 
			
		||||
            .map(|res| res.map(|(repo, _)| repo.id))?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        let path_clone = path.clone();
 | 
			
		||||
    pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
 | 
			
		||||
        let mut repos = self.repos.write().await;
 | 
			
		||||
 | 
			
		||||
        let distro_id: Option<i32> = db::Distro::find()
 | 
			
		||||
            .filter(db::distro::Column::Name.eq(distro))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::distro::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let distro_id = if let Some(id) = distro_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_distro = db::distro::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                name: Set(distro.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            new_distro.insert(&self.conn).await?.id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let repo_id: Option<i32> = db::Repo::find()
 | 
			
		||||
            .filter(db::repo::Column::DistroId.eq(distro_id))
 | 
			
		||||
            .filter(db::repo::Column::Name.eq(repo))
 | 
			
		||||
            .select_only()
 | 
			
		||||
            .column(db::repo::Column::Id)
 | 
			
		||||
            .into_tuple()
 | 
			
		||||
            .one(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        let repo_id = if let Some(id) = repo_id {
 | 
			
		||||
            id
 | 
			
		||||
        } else {
 | 
			
		||||
            let new_repo = db::repo::ActiveModel {
 | 
			
		||||
                id: NotSet,
 | 
			
		||||
                distro_id: Set(distro_id),
 | 
			
		||||
                name: Set(repo.to_string()),
 | 
			
		||||
                description: NotSet,
 | 
			
		||||
            };
 | 
			
		||||
            let id = new_repo.insert(&self.conn).await?.id;
 | 
			
		||||
 | 
			
		||||
            tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
 | 
			
		||||
            repos.insert(id, Default::default());
 | 
			
		||||
 | 
			
		||||
            id
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(repo_id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        let path_clone = path.as_ref().to_path_buf();
 | 
			
		||||
        let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
 | 
			
		||||
            .await
 | 
			
		||||
            .unwrap()?;
 | 
			
		||||
 | 
			
		||||
        let repo_dir = self.distro_dir.join(repo);
 | 
			
		||||
        // TODO prevent database from being updated but file failing to move to repo dir?
 | 
			
		||||
        let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
 | 
			
		||||
 | 
			
		||||
        let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? {
 | 
			
		||||
            repo.id
 | 
			
		||||
        } else {
 | 
			
		||||
            tokio::fs::create_dir(&repo_dir).await?;
 | 
			
		||||
        let dest_path = self
 | 
			
		||||
            .repos_dir
 | 
			
		||||
            .join(repo.to_string())
 | 
			
		||||
            .join(pkg.id.to_string());
 | 
			
		||||
        tokio::fs::rename(path.as_ref(), dest_path).await?;
 | 
			
		||||
 | 
			
		||||
            db::query::repo::insert(&self.conn, self.distro_id, repo, None)
 | 
			
		||||
                .await?
 | 
			
		||||
                .id
 | 
			
		||||
        };
 | 
			
		||||
        tracing::info!(
 | 
			
		||||
            "Added '{}-{}-{}' to repository {}",
 | 
			
		||||
            pkg.name,
 | 
			
		||||
            pkg.version,
 | 
			
		||||
            pkg.arch,
 | 
			
		||||
            repo,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        // If the package already exists in the database, we remove it first
 | 
			
		||||
        let res = db::query::package::by_fields(
 | 
			
		||||
            &self.conn,
 | 
			
		||||
            repo_id,
 | 
			
		||||
            &pkg.info.arch,
 | 
			
		||||
            &pkg.info.name,
 | 
			
		||||
            None,
 | 
			
		||||
            None,
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
        if let Some(entry) = res {
 | 
			
		||||
            entry.delete(&self.conn).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let dest_pkg_path = repo_dir.join(pkg.file_name());
 | 
			
		||||
 | 
			
		||||
        // Insert new package into database
 | 
			
		||||
        let name = pkg.info.name.clone();
 | 
			
		||||
        let version = pkg.info.version.clone();
 | 
			
		||||
        let arch = pkg.info.arch.clone();
 | 
			
		||||
        db::query::package::insert(&self.conn, repo_id, pkg).await?;
 | 
			
		||||
 | 
			
		||||
        // Move the package to its final resting place
 | 
			
		||||
        tokio::fs::rename(path, dest_pkg_path).await?;
 | 
			
		||||
 | 
			
		||||
        // Synchronize archive databases
 | 
			
		||||
        if arch == ANY_ARCH {
 | 
			
		||||
            self.generate_archives_all(repo).await?;
 | 
			
		||||
        } else {
 | 
			
		||||
            self.generate_archives(repo, &arch).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok((name, version, arch))
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
 | 
			
		||||
        self.repos.write().await.remove(&repo);
 | 
			
		||||
        db::Repo::delete_by_id(repo).exec(&self.conn).await?;
 | 
			
		||||
        let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove all packages in the repository that have a given arch. This method marks all
 | 
			
		||||
    /// packages with the given architecture as "pending deletion", before performing a manual sync
 | 
			
		||||
    /// & removal of stale packages.
 | 
			
		||||
    pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
 | 
			
		||||
        db::Package::update_many()
 | 
			
		||||
            .col_expr(
 | 
			
		||||
                db::package::Column::State,
 | 
			
		||||
                Expr::value(db::PackageState::PendingDeletion),
 | 
			
		||||
            )
 | 
			
		||||
            .filter(
 | 
			
		||||
                Condition::all()
 | 
			
		||||
                    .add(db::package::Column::RepoId.eq(repo))
 | 
			
		||||
                    .add(db::package::Column::Arch.eq(arch)),
 | 
			
		||||
            )
 | 
			
		||||
            .exec(&self.conn)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        self.sync_repo(repo).await?;
 | 
			
		||||
        self.remove_stale_pkgs().await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ///  Generate a path to a unique file that can be used as a temporary file
 | 
			
		||||
    pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
 | 
			
		||||
        std::array::from_fn(|_| {
 | 
			
		||||
            let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
 | 
			
		||||
            self.distro_dir.join(uuid.to_string())
 | 
			
		||||
            self.repos_dir.join(uuid.to_string())
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,9 @@ mod archive;
 | 
			
		|||
mod manager;
 | 
			
		||||
pub mod package;
 | 
			
		||||
 | 
			
		||||
pub use manager::DistroMgr;
 | 
			
		||||
pub use manager::RepoMgr;
 | 
			
		||||
 | 
			
		||||
use crate::FsConfig;
 | 
			
		||||
 | 
			
		||||
use axum::{
 | 
			
		||||
    body::Body,
 | 
			
		||||
| 
						 | 
				
			
			@ -47,61 +49,59 @@ async fn get_file(
 | 
			
		|||
    Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
 | 
			
		||||
    req: Request<Body>,
 | 
			
		||||
) -> crate::Result<impl IntoResponse> {
 | 
			
		||||
    let repo_dir = global
 | 
			
		||||
        .config
 | 
			
		||||
        .data_dir
 | 
			
		||||
        .join("distros")
 | 
			
		||||
        .join(&distro)
 | 
			
		||||
        .join(&repo);
 | 
			
		||||
    if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        match global.config.fs {
 | 
			
		||||
            FsConfig::Local { data_dir } => {
 | 
			
		||||
                let repo_dir = data_dir.join("repos").join(repo_id.to_string());
 | 
			
		||||
 | 
			
		||||
    let file_name =
 | 
			
		||||
        if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
 | 
			
		||||
            format!("{}.db.tar.gz", arch)
 | 
			
		||||
        } else if file_name == format!("{}.files", repo)
 | 
			
		||||
            || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
        {
 | 
			
		||||
            format!("{}.files.tar.gz", arch)
 | 
			
		||||
        } else {
 | 
			
		||||
            file_name
 | 
			
		||||
        };
 | 
			
		||||
                let file_name = if file_name == format!("{}.db", repo)
 | 
			
		||||
                    || file_name == format!("{}.db.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.db.tar.gz", arch)
 | 
			
		||||
                } else if file_name == format!("{}.files", repo)
 | 
			
		||||
                    || file_name == format!("{}.files.tar.gz", repo)
 | 
			
		||||
                {
 | 
			
		||||
                    format!("{}.files.tar.gz", arch)
 | 
			
		||||
                } else {
 | 
			
		||||
                    file_name
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
    Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await)
 | 
			
		||||
                let path = repo_dir.join(file_name);
 | 
			
		||||
                Ok(ServeFile::new(path).oneshot(req).await)
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        Err(StatusCode::NOT_FOUND.into())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn post_package_archive(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
    body: Body,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
 | 
			
		||||
    let mgr = global.mgr.get_or_create_mgr(&distro).await?;
 | 
			
		||||
    let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?;
 | 
			
		||||
    let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
 | 
			
		||||
    let [tmp_path] = global.mgr.random_file_paths();
 | 
			
		||||
 | 
			
		||||
    tracing::info!(
 | 
			
		||||
        "Added '{}-{}' to repository '{}' ({})",
 | 
			
		||||
        name,
 | 
			
		||||
        version,
 | 
			
		||||
        repo,
 | 
			
		||||
        arch
 | 
			
		||||
    );
 | 
			
		||||
    let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
 | 
			
		||||
    tokio::io::copy(&mut body, &mut tmp_file).await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
    global.mgr.queue_pkg(repo, tmp_path).await;
 | 
			
		||||
 | 
			
		||||
    Ok(StatusCode::ACCEPTED)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_repo(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo)): Path<(String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
        let repo_removed = mgr.remove_repo(&repo).await?;
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo(repo).await?;
 | 
			
		||||
 | 
			
		||||
        if repo_removed {
 | 
			
		||||
            tracing::info!("Removed repository '{}'", repo);
 | 
			
		||||
        tracing::info!("Removed repository {repo}");
 | 
			
		||||
 | 
			
		||||
            Ok(StatusCode::OK)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
        }
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -111,41 +111,51 @@ async fn delete_arch_repo(
 | 
			
		|||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch)): Path<(String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
        let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
 | 
			
		||||
    if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
 | 
			
		||||
        global.mgr.remove_repo_arch(repo, &arch).await?;
 | 
			
		||||
 | 
			
		||||
        if repo_removed {
 | 
			
		||||
            tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
 | 
			
		||||
        tracing::info!("Removed architecture '{arch}' from repository {repo}");
 | 
			
		||||
 | 
			
		||||
            Ok(StatusCode::OK)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
        }
 | 
			
		||||
        Ok(StatusCode::OK)
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if repo_removed {
 | 
			
		||||
    //        tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn delete_package(
 | 
			
		||||
    State(global): State<crate::Global>,
 | 
			
		||||
    Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
 | 
			
		||||
) -> crate::Result<StatusCode> {
 | 
			
		||||
    if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
        let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
 | 
			
		||||
 | 
			
		||||
        if pkg_removed {
 | 
			
		||||
            tracing::info!(
 | 
			
		||||
                "Removed package '{}' ({}) from repository '{}'",
 | 
			
		||||
                pkg_name,
 | 
			
		||||
                arch,
 | 
			
		||||
                repo
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            Ok(StatusCode::OK)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    }
 | 
			
		||||
    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
 | 
			
		||||
    //    let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
 | 
			
		||||
    //
 | 
			
		||||
    //    if pkg_removed {
 | 
			
		||||
    //        tracing::info!(
 | 
			
		||||
    //            "Removed package '{}' ({}) from repository '{}'",
 | 
			
		||||
    //            pkg_name,
 | 
			
		||||
    //            arch,
 | 
			
		||||
    //            repo
 | 
			
		||||
    //        );
 | 
			
		||||
    //
 | 
			
		||||
    //        Ok(StatusCode::OK)
 | 
			
		||||
    //    } else {
 | 
			
		||||
    //        Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //    }
 | 
			
		||||
    //} else {
 | 
			
		||||
    //    Ok(StatusCode::NOT_FOUND)
 | 
			
		||||
    //}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -323,7 +323,7 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		|||
    pkg: &package::Model,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    writer
 | 
			
		||||
        .write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes())
 | 
			
		||||
        .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes())
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    write_attribute(writer, "NAME", &pkg.name).await?;
 | 
			
		||||
| 
						 | 
				
			
			@ -397,6 +397,8 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		|||
        write_attribute(writer, key, &items.join("\n")).await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer.flush().await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -417,5 +419,7 @@ pub async fn write_files<W: AsyncWrite + std::marker::Unpin>(
 | 
			
		|||
            .await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    writer.flush().await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue