diff --git a/Cargo.lock b/Cargo.lock index 8520e63..333bc72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,15 +174,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" -dependencies = [ - "bytemuck", -] - [[package]] name = "atomic-waker" version = "1.1.2" @@ -389,12 +380,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytemuck" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" - [[package]] name = "byteorder" version = "1.5.0" @@ -645,20 +630,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" -[[package]] -name = "figment" -version = "0.10.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3" -dependencies = [ - "atomic", - "pear", - "serde", - "toml", - "uncased", - "version_check", -] - [[package]] name = "flume" version = "0.11.0" @@ -1066,12 +1037,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "inlinable_string" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" - [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1421,29 +1386,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "pear" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467" -dependencies = [ - "inlinable_string", - "pear_codegen", - "yansi", -] - -[[package]] -name = "pear_codegen" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147" -dependencies = [ - "proc-macro2", - "proc-macro2-diagnostics", - "quote", - "syn 2.0.66", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -1536,7 +1478,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit 0.21.1", + "toml_edit", ] [[package]] @@ -1572,19 +1514,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "proc-macro2-diagnostics" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", - "version_check", - "yansi", -] - [[package]] name = "ptr_meta" version = "0.1.4" @@ -1728,13 +1657,11 @@ dependencies = [ "axum", "chrono", "clap", - "figment", "futures", "http-body-util", "libarchive", "sea-orm", "sea-orm-migration", - "sea-query", "serde", "sha256", "tokio", @@ -2109,15 +2036,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_spanned" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2705,26 +2623,11 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.8.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.22.14", -] - [[package]] name = "toml_datetime" version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -2734,20 +2637,7 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap", "toml_datetime", - "winnow 0.5.40", -] - -[[package]] -name = "toml_edit" -version = "0.22.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" -dependencies = [ - "indexmap", - "serde", - "serde_spanned", - "toml_datetime", - "winnow 0.6.13", + "winnow", ] [[package]] @@ -2872,15 +2762,6 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "uncased" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" -dependencies = [ - "version_check", -] - [[package]] name = "unicase" version = "2.7.0" @@ -3247,15 +3128,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winnow" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" -dependencies = [ - "memchr", -] - [[package]] name = "wyz" version = "0.5.1" @@ -3265,12 +3137,6 @@ dependencies = [ "tap", ] -[[package]] -name = "yansi" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" - [[package]] name = "zerocopy" version = "0.7.34" diff --git a/server/Cargo.toml b/server/Cargo.toml index b1fc688..cd86713 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,12 +10,10 @@ authors = ["Jef Roosens"] axum = { version = "0.7.5", features = ["http2", "macros"] } chrono = { version = "0.4.26", features = ["serde"] } clap = { version = "4.3.12", features = ["env", "derive"] } -figment = { version = "0.10.19", features = ["env", "toml"] } futures = "0.3.28" http-body-util = "0.1.1" libarchive = { path = "../libarchive" } sea-orm-migration = "0.12.1" -sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] } serde = { version = "1.0.178", features = ["derive"] } sha256 = "1.1.4" tokio = { version = "1.29.1", features = ["full"] } diff --git a/server/rieterd.toml b/server/rieterd.toml deleted file mode 100644 index 9cc56bf..0000000 --- a/server/rieterd.toml +++ /dev/null @@ -1,17 +0,0 @@ -api_key = "test" -pkg_workers = 2 -log_level = "rieterd=debug" - -[fs] -type = "local" -data_dir = "./data" - -[db] -type = "sqlite" -db_dir = "./data" -# [db] -# type = "postgres" -# host = "localhost" -# db = "rieter" -# user = "rieter" -# password = "rieter" diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 4678257..0a0a56e 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -22,10 +22,10 @@ async fn get_repos( Query(pagination): Query, Query(filter): Query, ) -> crate::Result>> { - let items = + let (total_pages, items) = db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?; - Ok(Json(pagination.res(items))) + Ok(Json(pagination.res(total_pages, items))) } async fn get_single_repo( @@ -44,11 +44,11 @@ async fn get_packages( Query(pagination): Query, Query(filter): Query, ) -> crate::Result>> { - let items = + let (total_pages, pkgs) = db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter) .await?; - Ok(Json(pagination.res(items))) + Ok(Json(pagination.res(total_pages, pkgs))) } async fn get_single_package( diff --git a/server/src/api/pagination.rs b/server/src/api/pagination.rs index 3ede5bf..02e32dc 100644 --- a/server/src/api/pagination.rs +++ b/server/src/api/pagination.rs @@ -1,19 +1,19 @@ use serde::{Deserialize, Serialize}; #[derive(Deserialize)] +#[serde(default)] pub struct Query { - #[serde(default = "default_page")] pub page: u64, - #[serde(default = "default_per_page")] pub per_page: u64, } -fn default_page() -> u64 { - 1 -} - -fn default_per_page() -> u64 { - 25 +impl Default for Query { + fn default() -> Self { + Query { + page: 1, + per_page: 25, + } + } } #[derive(Serialize)] @@ -23,15 +23,21 @@ where { pub page: u64, pub per_page: u64, + pub total_pages: u64, pub count: usize, pub items: Vec, } impl Query { - pub fn res Serialize>(self, items: Vec) -> PaginatedResponse { + pub fn res Serialize>( + self, + total_pages: u64, + items: Vec, + ) -> PaginatedResponse { PaginatedResponse { page: self.page, per_page: self.per_page, + total_pages, count: items.len(), items, } diff --git a/server/src/cli.rs b/server/src/cli.rs index c6998eb..4fc94f1 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,71 +1,92 @@ -use crate::{Config, FsConfig, Global}; - -use std::{io, path::PathBuf, sync::Arc}; +use crate::{distro::MetaDistroMgr, Config, Global}; use axum::Router; use clap::Parser; use sea_orm_migration::MigratorTrait; +use std::{io, path::PathBuf}; use tower_http::trace::TraceLayer; +use tracing::debug; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] pub struct Cli { + /// Directory where repository metadata & SQLite database is stored + #[arg(env = "RIETER_DATA_DIR")] + pub data_dir: PathBuf, + /// API key to authenticate private routes with + #[arg(env = "RIETER_API_KEY")] + pub api_key: String, + + /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the + /// data directory + #[arg(short, long, env = "RIETER_DATABASE_URL")] + pub database_url: Option, + /// Port the server will listen on #[arg( short, long, - env = "RIETER_CONFIG_FILE", - default_value = "./rieterd.toml" + value_name = "PORT", + default_value_t = 8000, + env = "RIETER_PORT" )] - pub config_file: PathBuf, + pub port: u16, + /// Log levels for the tracing + #[arg( + long, + value_name = "LOG_LEVEL", + default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", + env = "RIETER_LOG" + )] + pub log: String, } impl Cli { - pub async fn run(&self) -> crate::Result<()> { - let config: Config = Config::figment(&self.config_file) - .extract() - .inspect_err(|e| tracing::error!("{}", e))?; - + pub fn init_tracing(&self) { tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) + .with(tracing_subscriber::EnvFilter::new(self.log.clone())) .with(tracing_subscriber::fmt::layer()) .init(); + } - tracing::info!("Connecting to database"); - let db = crate::db::connect(&config.db).await?; + pub async fn run(&self) -> crate::Result<()> { + self.init_tracing(); + let db_url = if let Some(url) = &self.database_url { + url.clone() + } else { + format!( + "sqlite://{}?mode=rwc", + self.data_dir.join("rieter.sqlite").to_string_lossy() + ) + }; + + debug!("Connecting to database with URL {}", db_url); + + let mut options = sea_orm::ConnectOptions::new(db_url); + options.max_connections(16); + + let db = sea_orm::Database::connect(options).await?; crate::db::Migrator::up(&db, None).await?; - let mgr = match &config.fs { - FsConfig::Local { data_dir } => { - crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? - } + debug!("Successfully applied migrations"); + + let config = Config { + data_dir: self.data_dir.clone(), }; - let mgr = Arc::new(mgr); + let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?; - for _ in 0..config.pkg_workers { - let clone = Arc::clone(&mgr); - - tokio::spawn(async move { clone.pkg_parse_task().await }); - } - - let global = Global { - config: config.clone(), - mgr, - db, - }; + let global = Global { config, mgr, db }; // build our application with a single route let app = Router::new() .nest("/api", crate::api::router()) - .merge(crate::repo::router(&config.api_key)) + .merge(crate::repo::router(&self.api_key)) .with_state(global) .layer(TraceLayer::new_for_http()); - let domain: String = format!("{}:{}", config.domain, config.port) - .parse() - .unwrap(); + let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap(); let listener = tokio::net::TcpListener::bind(domain).await?; // run it with hyper on localhost:3000 Ok(axum::serve(listener, app.into_make_service()) diff --git a/server/src/config.rs b/server/src/config.rs deleted file mode 100644 index e165fdc..0000000 --- a/server/src/config.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::path::{Path, PathBuf}; - -use figment::{ - providers::{Env, Format, Toml}, - Figment, -}; -use serde::Deserialize; - -#[derive(Deserialize, Debug, Clone)] -#[serde(rename_all = "lowercase")] -#[serde(tag = "type")] -pub enum FsConfig { - Local { data_dir: PathBuf }, -} - -#[derive(Deserialize, Debug, Clone)] -#[serde(rename_all = "lowercase")] -#[serde(tag = "type")] -pub enum DbConfig { - Sqlite { - db_dir: PathBuf, - #[serde(default = "default_db_sqlite_max_connections")] - max_connections: u32, - }, - Postgres { - host: String, - #[serde(default = "default_db_postgres_port")] - port: u16, - user: String, - password: String, - db: String, - #[serde(default)] - schema: String, - #[serde(default = "default_db_postgres_max_connections")] - max_connections: u32, - }, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct Config { - pub api_key: String, - #[serde(default = "default_domain")] - pub domain: String, - #[serde(default = "default_port")] - pub port: u16, - #[serde(default = "default_log_level")] - pub log_level: String, - pub fs: FsConfig, - pub db: DbConfig, - #[serde(default = "default_pkg_workers")] - pub pkg_workers: u32, -} - -impl Config { - pub fn figment(config_file: impl AsRef) -> Figment { - Figment::new() - .merge(Toml::file(config_file)) - .merge(Env::prefixed("RIETER_")) - } -} - -fn default_domain() -> String { - String::from("0.0.0.0") -} - -fn default_port() -> u16 { - 8000 -} - -fn default_log_level() -> String { - String::from("tower_http=debug,rieterd=debug,sea_orm=debug") -} - -fn default_db_sqlite_max_connections() -> u32 { - 16 -} - -fn default_db_postgres_port() -> u16 { - 5432 -} - -fn default_db_postgres_max_connections() -> u32 { - 16 -} - -fn default_pkg_workers() -> u32 { - 1 -} diff --git a/server/src/db/entities/package.rs b/server/src/db/entities/package.rs index 4ef90a4..112cde4 100644 --- a/server/src/db/entities/package.rs +++ b/server/src/db/entities/package.rs @@ -4,8 +4,6 @@ use chrono::NaiveDateTime; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -use crate::db::PackageState; - #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "package")] pub struct Model { @@ -26,8 +24,6 @@ pub struct Model { pub pgp_sig_size: Option, pub sha256_sum: String, pub compression: String, - #[serde(skip_serializing)] - pub state: PackageState, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs index f76e639..2deb05f 100644 --- a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs +++ b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs @@ -81,12 +81,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) - .col( - ColumnDef::new(Package::Compression) - .string_len(16) - .not_null(), - ) - .col(ColumnDef::new(Package::State).integer().not_null()) + .col(ColumnDef::new(Package::Compression).string_len(16).not_null()) .foreign_key( ForeignKey::create() .name("fk-package-repo_id") @@ -269,7 +264,6 @@ pub enum Package { PgpSigSize, Sha256Sum, Compression, - State, } #[derive(Iden)] diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index a1b7476..597cf20 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -2,12 +2,10 @@ pub mod entities; mod migrator; pub mod query; -use crate::config::DbConfig; - pub use entities::{prelude::*, *}; pub use migrator::Migrator; -use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter}; +use sea_orm::{DeriveActiveEnum, EnumIter}; use serde::{Deserialize, Serialize}; type Result = std::result::Result; @@ -32,17 +30,6 @@ pub enum PackageRelatedEnum { Optdepend, } -#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)] -#[sea_orm(rs_type = "i32", db_type = "Integer")] -pub enum PackageState { - #[sea_orm(num_value = 0)] - PendingCommit, - #[sea_orm(num_value = 1)] - Committed, - #[sea_orm(num_value = 2)] - PendingDeletion, -} - #[derive(Serialize)] pub struct FullPackage { #[serde(flatten)] @@ -52,50 +39,3 @@ pub struct FullPackage { related: Vec<(PackageRelatedEnum, String)>, files: Vec, } - -pub async fn connect(conn: &DbConfig) -> crate::Result { - match conn { - DbConfig::Sqlite { - db_dir, - max_connections, - } => { - let url = format!( - "sqlite://{}?mode=rwc", - db_dir.join("rieter.sqlite").to_string_lossy() - ); - let options = sea_orm::ConnectOptions::new(url) - .max_connections(*max_connections) - .to_owned(); - - let conn = Database::connect(options).await?; - - // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs - // https://www.sqlite.org/pragma.html#pragma_synchronous - conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?; - conn.execute_unprepared("PRAGMA synchronous=NORMAL;") - .await?; - - Ok(conn) - } - DbConfig::Postgres { - host, - port, - db, - user, - password, - schema, - max_connections, - } => { - let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db); - - if schema != "" { - url = format!("{url}?currentSchema={schema}"); - } - - let options = sea_orm::ConnectOptions::new(url) - .max_connections(*max_connections) - .to_owned(); - Ok(Database::connect(options).await?) - } - } -} diff --git a/server/src/db/query/distro.rs b/server/src/db/query/distro.rs index 8647f2a..c4fc70f 100644 --- a/server/src/db/query/distro.rs +++ b/server/src/db/query/distro.rs @@ -21,14 +21,15 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> Result> { +) -> Result<(u64, Vec)> { let paginator = Distro::find() .filter(filter) .order_by_asc(distro::Column::Id) .paginate(conn, per_page); let repos = paginator.fetch_page(page).await?; + let total_pages = paginator.num_pages().await?; - Ok(repos) + Ok((total_pages, repos)) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index bfdad73..c76e532 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,8 +1,6 @@ use crate::db::{self, *}; -use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -17,7 +15,10 @@ impl IntoCondition for Filter { Condition::all() .add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo))) .add_option(self.arch.map(|arch| package::Column::Arch.eq(arch))) - .add_option(self.name.map(|name| package::Column::Name.contains(name))) + .add_option( + self.name + .map(|name| package::Column::Name.like(format!("%{}%", name))), + ) } } @@ -26,29 +27,15 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> crate::Result> { - let p2 = Alias::new("p2"); - let query = Query::select() - .columns(db::package::Column::iter().map(|c| (db::package::Entity, c))) - .from(db::package::Entity) - .join_subquery( - JoinType::InnerJoin, - max_pkg_ids_query(true), - p2.clone(), - Expr::col((db::package::Entity, db::package::Column::Id)) - .eq(Expr::col((p2.clone(), Alias::new("max_id")))), - ) - .cond_where(filter) - .order_by((db::package::Entity, db::package::Column::Id), Order::Asc) - .to_owned(); - let builder = conn.get_database_backend(); - let sql = builder.build(&query); +) -> super::Result<(u64, Vec)> { + let paginator = Package::find() + .filter(filter) + .order_by_asc(package::Column::Id) + .paginate(conn, per_page); + let packages = paginator.fetch_page(page).await?; + let total_pages = paginator.num_pages().await?; - Ok(db::Package::find() - .from_raw_sql(sql) - .paginate(conn, per_page) - .fetch_page(page) - .await?) + Ok((total_pages, packages)) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { @@ -81,17 +68,9 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result .await } -pub async fn insert( - conn: &DbConn, - repo_id: i32, - pkg: crate::repo::package::Package, -) -> Result { +pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { let info = pkg.info; - // Doing this manually is not the recommended way, but the generic error type of the - // transaction function didn't play well with my current error handling - let txn = conn.begin().await?; - let model = package::ActiveModel { id: NotSet, repo_id: Set(repo_id), @@ -109,10 +88,9 @@ pub async fn insert( pgp_sig_size: Set(info.pgpsigsize), sha256_sum: Set(info.sha256sum), compression: Set(pkg.compression.extension().unwrap().to_string()), - state: Set(PackageState::PendingCommit), }; - let pkg_entry = model.insert(&txn).await?; + let pkg_entry = model.insert(conn).await?; // Insert all the related tables PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { @@ -120,7 +98,7 @@ pub async fn insert( name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(&txn) + .exec(conn) .await?; PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { @@ -128,7 +106,7 @@ pub async fn insert( name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(&txn) + .exec(conn) .await?; let related = info @@ -168,7 +146,7 @@ pub async fn insert( name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(&txn) + .exec(conn) .await?; PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { @@ -176,12 +154,10 @@ pub async fn insert( path: Set(s.display().to_string()), })) .on_empty_do_nothing() - .exec(&txn) + .exec(conn) .await?; - txn.commit().await?; - - Ok(pkg_entry) + Ok(()) } pub async fn full(conn: &DbConn, id: i32) -> Result> { @@ -226,138 +202,3 @@ pub async fn full(conn: &DbConn, id: i32) -> Result> { Ok(None) } } - -#[derive(FromQueryResult)] -pub struct PkgToRemove { - pub repo_id: i32, - pub id: i32, -} - -fn max_pkg_ids_query(committed: bool) -> SelectStatement { - let mut query = Query::select() - .from(db::package::Entity) - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .to_owned(); - - if committed { - query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); - } - - query -} - -/// Query that returns all packages that should be included in a sync for the given repository and -/// arch. -pub fn pkgs_to_sync( - conn: &DbConn, - repo: i32, - arch: &str, -) -> SelectorRaw> { - let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let query = Query::select() - .columns(db::package::Column::iter().map(|c| (p1.clone(), c))) - .from_as(db::package::Entity, p1.clone()) - .join_subquery( - JoinType::InnerJoin, - max_pkg_ids_query(false), - p2.clone(), - Expr::col((p1.clone(), db::package::Column::Id)) - .eq(Expr::col((p2.clone(), Alias::new("max_id")))), - ) - .cond_where( - Condition::all() - .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) - .add( - Expr::col((p1.clone(), db::package::Column::Arch)) - .is_in([arch, crate::ANY_ARCH]), - ) - .add( - Expr::col((p1.clone(), db::package::Column::State)) - .ne(db::PackageState::PendingDeletion), - ), - ) - .to_owned(); - let builder = conn.get_database_backend(); - let sql = builder.build(&query); - - db::Package::find().from_raw_sql(sql) -} - -fn stale_pkgs_query(include_repo: bool) -> SelectStatement { - let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let mut query = Query::select() - .from_as(db::package::Entity, p1.clone()) - .to_owned(); - - if include_repo { - query.columns([ - (p1.clone(), db::package::Column::RepoId), - (p1.clone(), db::package::Column::Id), - ]); - } else { - query.column((p1.clone(), db::package::Column::Id)); - } - - // We left join on the max pkgs query because a repository that has all its packages set to - // "pending deletion" doesn't show up in the query. These are also included with a where clause - // on the joined rows. - query - .join_subquery( - JoinType::LeftJoin, - max_pkg_ids_query(true), - p2.clone(), - Condition::all() - .add( - Expr::col((p1.clone(), db::package::Column::RepoId)) - .eq(Expr::col((p2.clone(), db::package::Column::RepoId))), - ) - .add( - Expr::col((p1.clone(), db::package::Column::Arch)) - .eq(Expr::col((p2.clone(), db::package::Column::Arch))), - ) - .add( - Expr::col((p1.clone(), db::package::Column::Name)) - .eq(Expr::col((p2.clone(), db::package::Column::Name))), - ), - ) - .cond_where( - Condition::any() - .add( - Expr::col((p1.clone(), db::package::Column::Id)) - .lt(Expr::col((p2.clone(), Alias::new("max_id")))), - ) - .add( - Expr::col((p1.clone(), db::package::Column::State)) - .eq(db::PackageState::PendingDeletion), - ), - ); - - query -} - -pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { - let query = stale_pkgs_query(true); - let builder = conn.get_database_backend(); - let sql = builder.build(&query); - - PkgToRemove::find_by_statement(sql) -} - -pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> { - Ok(db::Package::delete_many() - .filter(db::package::Column::Id.lte(max_id)) - .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false))) - .exec(conn) - .await - .map(|_| ())?) -} diff --git a/server/src/db/query/repo.rs b/server/src/db/query/repo.rs index a2daa26..2ad54bf 100644 --- a/server/src/db/query/repo.rs +++ b/server/src/db/query/repo.rs @@ -21,14 +21,15 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> Result> { +) -> Result<(u64, Vec)> { let paginator = Repo::find() .filter(filter) .order_by_asc(repo::Column::Id) .paginate(conn, per_page); let repos = paginator.fetch_page(page).await?; + let total_pages = paginator.num_pages().await?; - Ok(repos) + Ok((total_pages, repos)) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { diff --git a/server/src/distro.rs b/server/src/distro.rs new file mode 100644 index 0000000..22563ff --- /dev/null +++ b/server/src/distro.rs @@ -0,0 +1,70 @@ +use crate::{db, DistroMgr}; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; + +use sea_orm::{DbConn, EntityTrait}; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct MetaDistroMgr { + distro_dir: PathBuf, + conn: DbConn, + distros: Arc>>>, +} + +impl MetaDistroMgr { + pub async fn new>(distro_dir: P, conn: DbConn) -> crate::Result { + if !tokio::fs::try_exists(&distro_dir).await? { + tokio::fs::create_dir(&distro_dir).await?; + } + + let distro_dir = distro_dir.as_ref().to_path_buf(); + let mut map: HashMap> = HashMap::new(); + + let distros = db::Distro::find().all(&conn).await?; + + for distro in distros { + let mgr = + DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?; + map.insert(distro.name, Arc::new(mgr)); + } + + Ok(Self { + distro_dir, + conn, + distros: Arc::new(Mutex::new(map)), + }) + } + + pub async fn get_mgr(&self, distro: &str) -> Option> { + let map = self.distros.lock().await; + + map.get(distro).map(|mgr| Arc::clone(mgr)) + } + + pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result> { + let mut map = self.distros.lock().await; + + if let Some(mgr) = map.get(distro) { + Ok(Arc::clone(mgr)) + } else { + let distro = db::query::distro::insert(&self.conn, distro, None).await?; + + let mgr = Arc::new( + DistroMgr::new( + self.distro_dir.join(&distro.name), + distro.id, + self.conn.clone(), + ) + .await?, + ); + map.insert(distro.name, Arc::clone(&mgr)); + + Ok(mgr) + } + } +} diff --git a/server/src/error.rs b/server/src/error.rs index e0626d4..5c3e920 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -14,8 +14,6 @@ pub enum ServerError { Db(sea_orm::DbErr), Status(StatusCode), Archive(libarchive::error::ArchiveError), - Figment(figment::Error), - Unit, } impl fmt::Display for ServerError { @@ -26,8 +24,6 @@ impl fmt::Display for ServerError { ServerError::Status(status) => write!(fmt, "{}", status), ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err), - ServerError::Figment(err) => write!(fmt, "{}", err), - ServerError::Unit => Ok(()), } } } @@ -45,10 +41,9 @@ impl IntoResponse for ServerError { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { StatusCode::NOT_FOUND.into_response() } - ServerError::Db(_) - | ServerError::Archive(_) - | ServerError::Figment(_) - | ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + ServerError::Db(_) | ServerError::Archive(_) => { + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } } } } @@ -88,9 +83,3 @@ impl From for ServerError { ServerError::Archive(err) } } - -impl From for ServerError { - fn from(err: figment::Error) -> Self { - ServerError::Figment(err) - } -} diff --git a/server/src/main.rs b/server/src/main.rs index eb1c3d0..d3cbdf0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,23 +1,25 @@ mod api; mod cli; -mod config; pub mod db; +mod distro; mod error; mod repo; -pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; - -use std::sync::Arc; +use repo::DistroMgr; use clap::Parser; +use std::path::PathBuf; -pub const ANY_ARCH: &'static str = "any"; +#[derive(Clone)] +pub struct Config { + data_dir: PathBuf, +} #[derive(Clone)] pub struct Global { - config: crate::config::Config, - mgr: Arc, + config: Config, + mgr: distro::MetaDistroMgr, db: sea_orm::DbConn, } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index e4f0581..23d693d 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -1,107 +1,71 @@ use super::{archive, package}; -use crate::db::{self, query::package::delete_stale_pkgs}; +use crate::{db, error::Result}; -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, -}; +use std::path::{Path, PathBuf}; use futures::StreamExt; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use sea_query::{Alias, Expr, Query}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, RwLock, -}; +use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; +use tokio::io::AsyncRead; use uuid::Uuid; -struct PkgQueueMsg { - repo: i32, - path: PathBuf, -} +pub const ANY_ARCH: &'static str = "any"; -/// A single instance of this struct orchestrates everything related to managing packages files on -/// disk for all repositories in the server -pub struct RepoMgr { - repos_dir: PathBuf, +pub struct DistroMgr { + distro_dir: PathBuf, + distro_id: i32, conn: DbConn, - pkg_queue: ( - UnboundedSender, - Mutex>, - ), - repos: RwLock>)>>, } -impl RepoMgr { - pub async fn new>(repos_dir: P, conn: DbConn) -> crate::Result { - if !tokio::fs::try_exists(&repos_dir).await? { - tokio::fs::create_dir(&repos_dir).await?; - } - - let (tx, rx) = unbounded_channel(); - - let mut repos = HashMap::new(); - let repo_ids: Vec = db::Repo::find() - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .all(&conn) - .await?; - - for id in repo_ids { - repos.insert(id, Default::default()); +impl DistroMgr { + pub async fn new>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result { + if !tokio::fs::try_exists(&distro_dir).await? { + tokio::fs::create_dir(&distro_dir).await?; } Ok(Self { - repos_dir: repos_dir.as_ref().to_path_buf(), + distro_dir: distro_dir.as_ref().to_path_buf(), + distro_id, conn, - pkg_queue: (tx, Mutex::new(rx)), - repos: RwLock::new(repos), }) } /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> { - let lock = self - .repos - .read() - .await - .get(&repo) - .map(|(_, lock)| Arc::clone(lock)); + pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; - if lock.is_none() { + if repo.is_none() { return Ok(()); } - let lock = lock.unwrap(); - let _guard = lock.lock().await; + let repo = repo.unwrap(); - let archs: Vec = db::Package::find() - .filter(db::package::Column::RepoId.eq(repo)) + let mut archs = repo + .find_related(crate::db::Package) .select_only() - .column(db::package::Column::Arch) + .column(crate::db::package::Column::Arch) .distinct() - .into_tuple() - .all(&self.conn) + .into_tuple::() + .stream(&self.conn) .await?; - for arch in archs { - self.generate_archives(repo, &arch).await?; + while let Some(arch) = archs.next().await.transpose()? { + self.generate_archives(&repo.name, &arch).await?; } Ok(()) } /// Generate the archive databases for the given repository and architecture. - async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { + pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; + + if repo.is_none() { + return Ok(()); + } + + let repo = repo.unwrap(); + let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = self.random_file_paths(); let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; @@ -109,15 +73,13 @@ impl RepoMgr { // Query all packages in the repo that have the given architecture or the "any" // architecture - let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) + let mut pkgs = repo + .find_related(crate::db::Package) + .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) .stream(&self.conn) .await?; - let mut commited_ids: Vec = Vec::new(); - while let Some(pkg) = pkgs.next().await.transpose()? { - commited_ids.push(pkg.id); - let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; @@ -141,7 +103,7 @@ impl RepoMgr { ar_db.close().await?; ar_files.close().await?; - let repo_dir = self.repos_dir.join(repo.to_string()); + let repo_dir = self.distro_dir.join(&repo.name); // Move the db archives to their respective places tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; @@ -151,235 +113,176 @@ impl RepoMgr { ) .await?; - // Only after we have successfully written everything to disk do we update the database. - // This order ensures any failure can be recovered, as the database is our single source of - // truth. - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::Committed), - ) - .filter(db::package::Column::Id.is_in(commited_ids)) - .exec(&self.conn) - .await?; - // If this fails there's no point in failing the function + if there were no packages in // the repo, this fails anyway because the temp file doesn't exist let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await; - tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); - Ok(()) } - /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) - .stream(&self.conn) - .await?; + /// Remove the repo with the given name, if it existed + pub async fn remove_repo(&self, repo: &str) -> Result { + let res = db::query::repo::by_name(&self.conn, repo).await?; - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; + if let Some(repo_entry) = res { + // Remove repository from database + repo_entry.delete(&self.conn).await?; - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; + // Remove files from file system + tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?; - if pkg.id > max_id { - max_id = pkg.id; + Ok(true) + } else { + Ok(false) + } + } + + /// Remove all packages from the repository with the given arch. + pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result { + let repo = db::query::repo::by_name(&self.conn, repo).await?; + + if let Some(repo) = repo { + let mut pkgs = repo + .find_related(db::Package) + .filter(db::package::Column::Arch.eq(arch)) + .stream(&self.conn) + .await?; + + while let Some(pkg) = pkgs.next().await.transpose()? { + let path = self + .distro_dir + .join(&repo.name) + .join(super::package::filename(&pkg)); + tokio::fs::remove_file(path).await?; + + pkg.delete(&self.conn).await?; } - removed_pkgs += 1; + tokio::fs::remove_file( + self.distro_dir + .join(&repo.name) + .join(format!("{}.db.tar.gz", arch)), + ) + .await?; + tokio::fs::remove_file( + self.distro_dir + .join(&repo.name) + .join(format!("{}.files.tar.gz", arch)), + ) + .await?; + + // If we removed all "any" packages, we need to resync all databases + if arch == ANY_ARCH { + self.generate_archives_all(&repo.name).await?; + } + + Ok(true) + } else { + Ok(false) } - - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); - - Ok(()) } - pub async fn pkg_parse_task(&self) { - loop { - // Receive the next message and immediately drop the mutex afterwards. As long as the - // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked - // as soon as a message is received, so another worker can pick up the mutex. - let msg = { - let mut recv = self.pkg_queue.1.lock().await; - recv.recv().await - }; + pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result { + let repo = db::query::repo::by_name(&self.conn, repo).await?; - if let Some(msg) = msg { - // TODO better handle this error (retry if failure wasn't because the package is - // faulty) - let _ = self - .add_pkg_from_path(msg.path, msg.repo) - .await - .inspect_err(|e| tracing::error!("{:?}", e)); + if let Some(repo) = repo { + let pkg = + db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; - let old = self - .repos - .read() - .await - .get(&msg.repo) - .map(|n| n.0.fetch_sub(1, Ordering::SeqCst)); + if let Some(pkg) = pkg { + // Remove package from database & file system + tokio::fs::remove_file( + self.distro_dir + .join(&repo.name) + .join(super::package::filename(&pkg)), + ) + .await?; + pkg.delete(&self.conn).await?; - // Every time the queue for a repo becomes empty, we run a sync job - if old == Some(1) { - // TODO error handling - let _ = self.sync_repo(msg.repo).await; - - // TODO move this so that we only clean if entire queue is empty, not just - // queue for specific repo - let _ = self.remove_stale_pkgs().await; + if arch == ANY_ARCH { + self.generate_archives_all(&repo.name).await?; + } else { + self.generate_archives(&repo.name, arch).await?; } + + Ok(true) + } else { + Ok(false) } + } else { + Ok(false) } } - pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); - self.repos.read().await.get(&repo).inspect(|n| { - n.0.fetch_add(1, Ordering::SeqCst); - }); - } + pub async fn add_pkg_from_reader( + &self, + reader: &mut R, + repo: &str, + ) -> crate::Result<(String, String, String)> { + let [path] = self.random_file_paths(); + let mut temp_file = tokio::fs::File::create(&path).await?; - pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { - Ok(db::Repo::find() - .find_also_related(db::Distro) - .filter( - Condition::all() - .add(db::repo::Column::Name.eq(repo)) - .add(db::distro::Column::Name.eq(distro)), - ) - .one(&self.conn) - .await - .map(|res| res.map(|(repo, _)| repo.id))?) - } + tokio::io::copy(reader, &mut temp_file).await?; - pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { - let mut repos = self.repos.write().await; - - let distro_id: Option = db::Distro::find() - .filter(db::distro::Column::Name.eq(distro)) - .select_only() - .column(db::distro::Column::Id) - .into_tuple() - .one(&self.conn) - .await?; - - let distro_id = if let Some(id) = distro_id { - id - } else { - let new_distro = db::distro::ActiveModel { - id: NotSet, - name: Set(distro.to_string()), - description: NotSet, - }; - - new_distro.insert(&self.conn).await?.id - }; - - let repo_id: Option = db::Repo::find() - .filter(db::repo::Column::DistroId.eq(distro_id)) - .filter(db::repo::Column::Name.eq(repo)) - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .one(&self.conn) - .await?; - - let repo_id = if let Some(id) = repo_id { - id - } else { - let new_repo = db::repo::ActiveModel { - id: NotSet, - distro_id: Set(distro_id), - name: Set(repo.to_string()), - description: NotSet, - }; - let id = new_repo.insert(&self.conn).await?.id; - - tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; - repos.insert(id, Default::default()); - - id - }; - - Ok(repo_id) - } - - async fn add_pkg_from_path>(&self, path: P, repo: i32) -> crate::Result<()> { - let path_clone = path.as_ref().to_path_buf(); + let path_clone = path.clone(); let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .await .unwrap()?; - // TODO prevent database from being updated but file failing to move to repo dir? - let pkg = db::query::package::insert(&self.conn, repo, pkg).await?; + let repo_dir = self.distro_dir.join(repo); - let dest_path = self - .repos_dir - .join(repo.to_string()) - .join(pkg.id.to_string()); - tokio::fs::rename(path.as_ref(), dest_path).await?; + let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? { + repo.id + } else { + tokio::fs::create_dir(&repo_dir).await?; - tracing::info!( - "Added '{}-{}-{}' to repository {}", - pkg.name, - pkg.version, - pkg.arch, - repo, - ); + db::query::repo::insert(&self.conn, self.distro_id, repo, None) + .await? + .id + }; - Ok(()) - } - - pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { - self.repos.write().await.remove(&repo); - db::Repo::delete_by_id(repo).exec(&self.conn).await?; - let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; - - Ok(()) - } - - /// Remove all packages in the repository that have a given arch. This method marks all - /// packages with the given architecture as "pending deletion", before performing a manual sync - /// & removal of stale packages. - pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::PendingDeletion), - ) - .filter( - Condition::all() - .add(db::package::Column::RepoId.eq(repo)) - .add(db::package::Column::Arch.eq(arch)), - ) - .exec(&self.conn) - .await?; - - self.sync_repo(repo).await?; - self.remove_stale_pkgs().await?; - - Ok(()) + // If the package already exists in the database, we remove it first + let res = db::query::package::by_fields( + &self.conn, + repo_id, + &pkg.info.arch, + &pkg.info.name, + None, + None, + ) + .await?; + + if let Some(entry) = res { + entry.delete(&self.conn).await?; + } + + let dest_pkg_path = repo_dir.join(pkg.file_name()); + + // Insert new package into database + let name = pkg.info.name.clone(); + let version = pkg.info.version.clone(); + let arch = pkg.info.arch.clone(); + db::query::package::insert(&self.conn, repo_id, pkg).await?; + + // Move the package to its final resting place + tokio::fs::rename(path, dest_pkg_path).await?; + + // Synchronize archive databases + if arch == ANY_ARCH { + self.generate_archives_all(repo).await?; + } else { + self.generate_archives(repo, &arch).await?; + } + + Ok((name, version, arch)) } + /// Generate a path to a unique file that can be used as a temporary file pub fn random_file_paths(&self) -> [PathBuf; C] { std::array::from_fn(|_| { let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.repos_dir.join(uuid.to_string()) + self.distro_dir.join(uuid.to_string()) }) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 953b631..7d7e321 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -2,9 +2,7 @@ mod archive; mod manager; pub mod package; -pub use manager::RepoMgr; - -use crate::FsConfig; +pub use manager::DistroMgr; use axum::{ body::Body, @@ -49,59 +47,61 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - match global.config.fs { - FsConfig::Local { data_dir } => { - let repo_dir = data_dir.join("repos").join(repo_id.to_string()); + let repo_dir = global + .config + .data_dir + .join("distros") + .join(&distro) + .join(&repo); - let file_name = if file_name == format!("{}.db", repo) - || file_name == format!("{}.db.tar.gz", repo) - { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = + if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) - } - } - } else { - Err(StatusCode::NOT_FOUND.into()) - } + Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await) } async fn post_package_archive( State(global): State, Path((distro, repo)): Path<(String, String)>, body: Body, -) -> crate::Result { +) -> crate::Result<()> { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; - let [tmp_path] = global.mgr.random_file_paths(); + let mgr = global.mgr.get_or_create_mgr(&distro).await?; + let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?; - let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; - tokio::io::copy(&mut body, &mut tmp_file).await?; + tracing::info!( + "Added '{}-{}' to repository '{}' ({})", + name, + version, + repo, + arch + ); - global.mgr.queue_pkg(repo, tmp_path).await; - - Ok(StatusCode::ACCEPTED) + Ok(()) } async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo(repo).await?; + if let Some(mgr) = global.mgr.get_mgr(&distro).await { + let repo_removed = mgr.remove_repo(&repo).await?; - tracing::info!("Removed repository {repo}"); + if repo_removed { + tracing::info!("Removed repository '{}'", repo); - Ok(StatusCode::OK) + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } } else { Ok(StatusCode::NOT_FOUND) } @@ -111,51 +111,41 @@ async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo_arch(repo, &arch).await?; + if let Some(mgr) = global.mgr.get_mgr(&distro).await { + let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; - tracing::info!("Removed architecture '{arch}' from repository {repo}"); + if repo_removed { + tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); - Ok(StatusCode::OK) + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } } else { Ok(StatusCode::NOT_FOUND) } - //if let Some(mgr) = global.mgr.get_mgr(&distro).await { - // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; - // - // if repo_removed { - // tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); - // - // Ok(StatusCode::OK) - // } else { - // Ok(StatusCode::NOT_FOUND) - // } - //} else { - // Ok(StatusCode::NOT_FOUND) - //} } async fn delete_package( State(global): State, Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(mgr) = global.mgr.get_mgr(&distro).await { - // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; - // - // if pkg_removed { - // tracing::info!( - // "Removed package '{}' ({}) from repository '{}'", - // pkg_name, - // arch, - // repo - // ); - // - // Ok(StatusCode::OK) - // } else { - // Ok(StatusCode::NOT_FOUND) - // } - //} else { - // Ok(StatusCode::NOT_FOUND) - //} + if let Some(mgr) = global.mgr.get_mgr(&distro).await { + let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; + + if pkg_removed { + tracing::info!( + "Removed package '{}' ({}) from repository '{}'", + pkg_name, + arch, + repo + ); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } + } else { + Ok(StatusCode::NOT_FOUND) + } } diff --git a/server/src/repo/package.rs b/server/src/repo/package.rs index df98559..24979eb 100644 --- a/server/src/repo/package.rs +++ b/server/src/repo/package.rs @@ -323,7 +323,7 @@ pub async fn write_desc( pkg: &package::Model, ) -> crate::Result<()> { writer - .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes()) + .write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes()) .await?; write_attribute(writer, "NAME", &pkg.name).await?; @@ -397,8 +397,6 @@ pub async fn write_desc( write_attribute(writer, key, &items.join("\n")).await?; } - writer.flush().await?; - Ok(()) } @@ -419,7 +417,5 @@ pub async fn write_files( .await?; } - writer.flush().await?; - Ok(()) }