diff --git a/Cargo.lock b/Cargo.lock index 333bc72..8520e63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,6 +174,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -380,6 +389,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" + [[package]] name = "byteorder" version = "1.5.0" @@ -630,6 +645,20 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "figment" +version = "0.10.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3" +dependencies = [ + "atomic", + "pear", + "serde", + "toml", + "uncased", + "version_check", +] + [[package]] name = "flume" version = "0.11.0" @@ -1037,6 +1066,12 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1386,6 +1421,29 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pear" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi", +] + +[[package]] +name = "pear_codegen" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.66", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -1478,7 +1536,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit", + "toml_edit 0.21.1", ] [[package]] @@ -1514,6 +1572,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", + "version_check", + "yansi", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -1657,11 +1728,13 @@ dependencies = [ "axum", "chrono", "clap", + "figment", "futures", "http-body-util", "libarchive", "sea-orm", "sea-orm-migration", + "sea-query", "serde", "sha256", "tokio", @@ -2036,6 +2109,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2623,11 +2705,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.22.14", +] + [[package]] name = "toml_datetime" version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -2637,7 +2734,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap", "toml_datetime", - "winnow", + "winnow 0.5.40", +] + +[[package]] +name = "toml_edit" +version = "0.22.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow 0.6.13", ] [[package]] @@ -2762,6 +2872,15 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.7.0" @@ -3128,6 +3247,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" +dependencies = [ + "memchr", +] + [[package]] name = "wyz" version = "0.5.1" @@ -3137,6 +3265,12 @@ dependencies = [ "tap", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "zerocopy" version = "0.7.34" diff --git a/server/Cargo.toml b/server/Cargo.toml index cd86713..b1fc688 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,10 +10,12 @@ authors = ["Jef Roosens"] axum = { version = "0.7.5", features = ["http2", "macros"] } chrono = { version = "0.4.26", features = ["serde"] } clap = { version = "4.3.12", features = ["env", "derive"] } +figment = { version = "0.10.19", features = ["env", "toml"] } futures = "0.3.28" http-body-util = "0.1.1" libarchive = { path = "../libarchive" } sea-orm-migration = "0.12.1" +sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] } serde = { version = "1.0.178", features = ["derive"] } sha256 = "1.1.4" tokio = { version = "1.29.1", features = ["full"] } diff --git a/server/rieterd.toml b/server/rieterd.toml new file mode 100644 index 0000000..9cc56bf --- /dev/null +++ b/server/rieterd.toml @@ -0,0 +1,17 @@ +api_key = "test" +pkg_workers = 2 +log_level = "rieterd=debug" + +[fs] +type = "local" +data_dir = "./data" + +[db] +type = "sqlite" +db_dir = "./data" +# [db] +# type = "postgres" +# host = "localhost" +# db = "rieter" +# user = "rieter" +# password = "rieter" diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 0a0a56e..4678257 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -22,10 +22,10 @@ async fn get_repos( Query(pagination): Query, Query(filter): Query, ) -> crate::Result>> { - let (total_pages, items) = + let items = db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?; - Ok(Json(pagination.res(total_pages, items))) + Ok(Json(pagination.res(items))) } async fn get_single_repo( @@ -44,11 +44,11 @@ async fn get_packages( Query(pagination): Query, Query(filter): Query, ) -> crate::Result>> { - let (total_pages, pkgs) = + let items = db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter) .await?; - Ok(Json(pagination.res(total_pages, pkgs))) + Ok(Json(pagination.res(items))) } async fn get_single_package( diff --git a/server/src/api/pagination.rs b/server/src/api/pagination.rs index 02e32dc..3ede5bf 100644 --- a/server/src/api/pagination.rs +++ b/server/src/api/pagination.rs @@ -1,19 +1,19 @@ use serde::{Deserialize, Serialize}; #[derive(Deserialize)] -#[serde(default)] pub struct Query { + #[serde(default = "default_page")] pub page: u64, + #[serde(default = "default_per_page")] pub per_page: u64, } -impl Default for Query { - fn default() -> Self { - Query { - page: 1, - per_page: 25, - } - } +fn default_page() -> u64 { + 1 +} + +fn default_per_page() -> u64 { + 25 } #[derive(Serialize)] @@ -23,21 +23,15 @@ where { pub page: u64, pub per_page: u64, - pub total_pages: u64, pub count: usize, pub items: Vec, } impl Query { - pub fn res Serialize>( - self, - total_pages: u64, - items: Vec, - ) -> PaginatedResponse { + pub fn res Serialize>(self, items: Vec) -> PaginatedResponse { PaginatedResponse { page: self.page, per_page: self.per_page, - total_pages, count: items.len(), items, } diff --git a/server/src/cli.rs b/server/src/cli.rs index 4fc94f1..c6998eb 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,92 +1,71 @@ -use crate::{distro::MetaDistroMgr, Config, Global}; +use crate::{Config, FsConfig, Global}; + +use std::{io, path::PathBuf, sync::Arc}; use axum::Router; use clap::Parser; use sea_orm_migration::MigratorTrait; -use std::{io, path::PathBuf}; use tower_http::trace::TraceLayer; -use tracing::debug; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] pub struct Cli { - /// Directory where repository metadata & SQLite database is stored - #[arg(env = "RIETER_DATA_DIR")] - pub data_dir: PathBuf, - /// API key to authenticate private routes with - #[arg(env = "RIETER_API_KEY")] - pub api_key: String, - - /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the - /// data directory - #[arg(short, long, env = "RIETER_DATABASE_URL")] - pub database_url: Option, - /// Port the server will listen on #[arg( short, long, - value_name = "PORT", - default_value_t = 8000, - env = "RIETER_PORT" + env = "RIETER_CONFIG_FILE", + default_value = "./rieterd.toml" )] - pub port: u16, - /// Log levels for the tracing - #[arg( - long, - value_name = "LOG_LEVEL", - default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", - env = "RIETER_LOG" - )] - pub log: String, + pub config_file: PathBuf, } impl Cli { - pub fn init_tracing(&self) { + pub async fn run(&self) -> crate::Result<()> { + let config: Config = Config::figment(&self.config_file) + .extract() + .inspect_err(|e| tracing::error!("{}", e))?; + tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(self.log.clone())) + .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) .with(tracing_subscriber::fmt::layer()) .init(); - } - pub async fn run(&self) -> crate::Result<()> { - self.init_tracing(); + tracing::info!("Connecting to database"); + let db = crate::db::connect(&config.db).await?; - let db_url = if let Some(url) = &self.database_url { - url.clone() - } else { - format!( - "sqlite://{}?mode=rwc", - self.data_dir.join("rieter.sqlite").to_string_lossy() - ) - }; - - debug!("Connecting to database with URL {}", db_url); - - let mut options = sea_orm::ConnectOptions::new(db_url); - options.max_connections(16); - - let db = sea_orm::Database::connect(options).await?; crate::db::Migrator::up(&db, None).await?; - debug!("Successfully applied migrations"); - - let config = Config { - data_dir: self.data_dir.clone(), + let mgr = match &config.fs { + FsConfig::Local { data_dir } => { + crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? + } }; - let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?; + let mgr = Arc::new(mgr); - let global = Global { config, mgr, db }; + for _ in 0..config.pkg_workers { + let clone = Arc::clone(&mgr); + + tokio::spawn(async move { clone.pkg_parse_task().await }); + } + + let global = Global { + config: config.clone(), + mgr, + db, + }; // build our application with a single route let app = Router::new() .nest("/api", crate::api::router()) - .merge(crate::repo::router(&self.api_key)) + .merge(crate::repo::router(&config.api_key)) .with_state(global) .layer(TraceLayer::new_for_http()); - let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap(); + let domain: String = format!("{}:{}", config.domain, config.port) + .parse() + .unwrap(); let listener = tokio::net::TcpListener::bind(domain).await?; // run it with hyper on localhost:3000 Ok(axum::serve(listener, app.into_make_service()) diff --git a/server/src/config.rs b/server/src/config.rs new file mode 100644 index 0000000..e165fdc --- /dev/null +++ b/server/src/config.rs @@ -0,0 +1,88 @@ +use std::path::{Path, PathBuf}; + +use figment::{ + providers::{Env, Format, Toml}, + Figment, +}; +use serde::Deserialize; + +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "lowercase")] +#[serde(tag = "type")] +pub enum FsConfig { + Local { data_dir: PathBuf }, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "lowercase")] +#[serde(tag = "type")] +pub enum DbConfig { + Sqlite { + db_dir: PathBuf, + #[serde(default = "default_db_sqlite_max_connections")] + max_connections: u32, + }, + Postgres { + host: String, + #[serde(default = "default_db_postgres_port")] + port: u16, + user: String, + password: String, + db: String, + #[serde(default)] + schema: String, + #[serde(default = "default_db_postgres_max_connections")] + max_connections: u32, + }, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct Config { + pub api_key: String, + #[serde(default = "default_domain")] + pub domain: String, + #[serde(default = "default_port")] + pub port: u16, + #[serde(default = "default_log_level")] + pub log_level: String, + pub fs: FsConfig, + pub db: DbConfig, + #[serde(default = "default_pkg_workers")] + pub pkg_workers: u32, +} + +impl Config { + pub fn figment(config_file: impl AsRef) -> Figment { + Figment::new() + .merge(Toml::file(config_file)) + .merge(Env::prefixed("RIETER_")) + } +} + +fn default_domain() -> String { + String::from("0.0.0.0") +} + +fn default_port() -> u16 { + 8000 +} + +fn default_log_level() -> String { + String::from("tower_http=debug,rieterd=debug,sea_orm=debug") +} + +fn default_db_sqlite_max_connections() -> u32 { + 16 +} + +fn default_db_postgres_port() -> u16 { + 5432 +} + +fn default_db_postgres_max_connections() -> u32 { + 16 +} + +fn default_pkg_workers() -> u32 { + 1 +} diff --git a/server/src/db/entities/package.rs b/server/src/db/entities/package.rs index 112cde4..4ef90a4 100644 --- a/server/src/db/entities/package.rs +++ b/server/src/db/entities/package.rs @@ -4,6 +4,8 @@ use chrono::NaiveDateTime; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; +use crate::db::PackageState; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "package")] pub struct Model { @@ -24,6 +26,8 @@ pub struct Model { pub pgp_sig_size: Option, pub sha256_sum: String, pub compression: String, + #[serde(skip_serializing)] + pub state: PackageState, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs index 2deb05f..f76e639 100644 --- a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs +++ b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs @@ -81,7 +81,12 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) - .col(ColumnDef::new(Package::Compression).string_len(16).not_null()) + .col( + ColumnDef::new(Package::Compression) + .string_len(16) + .not_null(), + ) + .col(ColumnDef::new(Package::State).integer().not_null()) .foreign_key( ForeignKey::create() .name("fk-package-repo_id") @@ -264,6 +269,7 @@ pub enum Package { PgpSigSize, Sha256Sum, Compression, + State, } #[derive(Iden)] diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 597cf20..a1b7476 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -2,10 +2,12 @@ pub mod entities; mod migrator; pub mod query; +use crate::config::DbConfig; + pub use entities::{prelude::*, *}; pub use migrator::Migrator; -use sea_orm::{DeriveActiveEnum, EnumIter}; +use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter}; use serde::{Deserialize, Serialize}; type Result = std::result::Result; @@ -30,6 +32,17 @@ pub enum PackageRelatedEnum { Optdepend, } +#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)] +#[sea_orm(rs_type = "i32", db_type = "Integer")] +pub enum PackageState { + #[sea_orm(num_value = 0)] + PendingCommit, + #[sea_orm(num_value = 1)] + Committed, + #[sea_orm(num_value = 2)] + PendingDeletion, +} + #[derive(Serialize)] pub struct FullPackage { #[serde(flatten)] @@ -39,3 +52,50 @@ pub struct FullPackage { related: Vec<(PackageRelatedEnum, String)>, files: Vec, } + +pub async fn connect(conn: &DbConfig) -> crate::Result { + match conn { + DbConfig::Sqlite { + db_dir, + max_connections, + } => { + let url = format!( + "sqlite://{}?mode=rwc", + db_dir.join("rieter.sqlite").to_string_lossy() + ); + let options = sea_orm::ConnectOptions::new(url) + .max_connections(*max_connections) + .to_owned(); + + let conn = Database::connect(options).await?; + + // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs + // https://www.sqlite.org/pragma.html#pragma_synchronous + conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?; + conn.execute_unprepared("PRAGMA synchronous=NORMAL;") + .await?; + + Ok(conn) + } + DbConfig::Postgres { + host, + port, + db, + user, + password, + schema, + max_connections, + } => { + let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db); + + if schema != "" { + url = format!("{url}?currentSchema={schema}"); + } + + let options = sea_orm::ConnectOptions::new(url) + .max_connections(*max_connections) + .to_owned(); + Ok(Database::connect(options).await?) + } + } +} diff --git a/server/src/db/query/distro.rs b/server/src/db/query/distro.rs index c4fc70f..8647f2a 100644 --- a/server/src/db/query/distro.rs +++ b/server/src/db/query/distro.rs @@ -21,15 +21,14 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> Result<(u64, Vec)> { +) -> Result> { let paginator = Distro::find() .filter(filter) .order_by_asc(distro::Column::Id) .paginate(conn, per_page); let repos = paginator.fetch_page(page).await?; - let total_pages = paginator.num_pages().await?; - Ok((total_pages, repos)) + Ok(repos) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index c76e532..bfdad73 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,6 +1,8 @@ use crate::db::{self, *}; +use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; +use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -15,10 +17,7 @@ impl IntoCondition for Filter { Condition::all() .add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo))) .add_option(self.arch.map(|arch| package::Column::Arch.eq(arch))) - .add_option( - self.name - .map(|name| package::Column::Name.like(format!("%{}%", name))), - ) + .add_option(self.name.map(|name| package::Column::Name.contains(name))) } } @@ -27,15 +26,29 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> super::Result<(u64, Vec)> { - let paginator = Package::find() - .filter(filter) - .order_by_asc(package::Column::Id) - .paginate(conn, per_page); - let packages = paginator.fetch_page(page).await?; - let total_pages = paginator.num_pages().await?; +) -> crate::Result> { + let p2 = Alias::new("p2"); + let query = Query::select() + .columns(db::package::Column::iter().map(|c| (db::package::Entity, c))) + .from(db::package::Entity) + .join_subquery( + JoinType::InnerJoin, + max_pkg_ids_query(true), + p2.clone(), + Expr::col((db::package::Entity, db::package::Column::Id)) + .eq(Expr::col((p2.clone(), Alias::new("max_id")))), + ) + .cond_where(filter) + .order_by((db::package::Entity, db::package::Column::Id), Order::Asc) + .to_owned(); + let builder = conn.get_database_backend(); + let sql = builder.build(&query); - Ok((total_pages, packages)) + Ok(db::Package::find() + .from_raw_sql(sql) + .paginate(conn, per_page) + .fetch_page(page) + .await?) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { @@ -68,9 +81,17 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result .await } -pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { +pub async fn insert( + conn: &DbConn, + repo_id: i32, + pkg: crate::repo::package::Package, +) -> Result { let info = pkg.info; + // Doing this manually is not the recommended way, but the generic error type of the + // transaction function didn't play well with my current error handling + let txn = conn.begin().await?; + let model = package::ActiveModel { id: NotSet, repo_id: Set(repo_id), @@ -88,9 +109,10 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack pgp_sig_size: Set(info.pgpsigsize), sha256_sum: Set(info.sha256sum), compression: Set(pkg.compression.extension().unwrap().to_string()), + state: Set(PackageState::PendingCommit), }; - let pkg_entry = model.insert(conn).await?; + let pkg_entry = model.insert(&txn).await?; // Insert all the related tables PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { @@ -98,7 +120,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { @@ -106,7 +128,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; let related = info @@ -146,7 +168,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { @@ -154,10 +176,12 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack path: Set(s.display().to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; - Ok(()) + txn.commit().await?; + + Ok(pkg_entry) } pub async fn full(conn: &DbConn, id: i32) -> Result> { @@ -202,3 +226,138 @@ pub async fn full(conn: &DbConn, id: i32) -> Result> { Ok(None) } } + +#[derive(FromQueryResult)] +pub struct PkgToRemove { + pub repo_id: i32, + pub id: i32, +} + +fn max_pkg_ids_query(committed: bool) -> SelectStatement { + let mut query = Query::select() + .from(db::package::Entity) + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .to_owned(); + + if committed { + query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); + } + + query +} + +/// Query that returns all packages that should be included in a sync for the given repository and +/// arch. +pub fn pkgs_to_sync( + conn: &DbConn, + repo: i32, + arch: &str, +) -> SelectorRaw> { + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); + let query = Query::select() + .columns(db::package::Column::iter().map(|c| (p1.clone(), c))) + .from_as(db::package::Entity, p1.clone()) + .join_subquery( + JoinType::InnerJoin, + max_pkg_ids_query(false), + p2.clone(), + Expr::col((p1.clone(), db::package::Column::Id)) + .eq(Expr::col((p2.clone(), Alias::new("max_id")))), + ) + .cond_where( + Condition::all() + .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) + .add( + Expr::col((p1.clone(), db::package::Column::Arch)) + .is_in([arch, crate::ANY_ARCH]), + ) + .add( + Expr::col((p1.clone(), db::package::Column::State)) + .ne(db::PackageState::PendingDeletion), + ), + ) + .to_owned(); + let builder = conn.get_database_backend(); + let sql = builder.build(&query); + + db::Package::find().from_raw_sql(sql) +} + +fn stale_pkgs_query(include_repo: bool) -> SelectStatement { + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); + let mut query = Query::select() + .from_as(db::package::Entity, p1.clone()) + .to_owned(); + + if include_repo { + query.columns([ + (p1.clone(), db::package::Column::RepoId), + (p1.clone(), db::package::Column::Id), + ]); + } else { + query.column((p1.clone(), db::package::Column::Id)); + } + + // We left join on the max pkgs query because a repository that has all its packages set to + // "pending deletion" doesn't show up in the query. These are also included with a where clause + // on the joined rows. + query + .join_subquery( + JoinType::LeftJoin, + max_pkg_ids_query(true), + p2.clone(), + Condition::all() + .add( + Expr::col((p1.clone(), db::package::Column::RepoId)) + .eq(Expr::col((p2.clone(), db::package::Column::RepoId))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Arch)) + .eq(Expr::col((p2.clone(), db::package::Column::Arch))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Name)) + .eq(Expr::col((p2.clone(), db::package::Column::Name))), + ), + ) + .cond_where( + Condition::any() + .add( + Expr::col((p1.clone(), db::package::Column::Id)) + .lt(Expr::col((p2.clone(), Alias::new("max_id")))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::State)) + .eq(db::PackageState::PendingDeletion), + ), + ); + + query +} + +pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { + let query = stale_pkgs_query(true); + let builder = conn.get_database_backend(); + let sql = builder.build(&query); + + PkgToRemove::find_by_statement(sql) +} + +pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> { + Ok(db::Package::delete_many() + .filter(db::package::Column::Id.lte(max_id)) + .filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false))) + .exec(conn) + .await + .map(|_| ())?) +} diff --git a/server/src/db/query/repo.rs b/server/src/db/query/repo.rs index 2ad54bf..a2daa26 100644 --- a/server/src/db/query/repo.rs +++ b/server/src/db/query/repo.rs @@ -21,15 +21,14 @@ pub async fn page( per_page: u64, page: u64, filter: Filter, -) -> Result<(u64, Vec)> { +) -> Result> { let paginator = Repo::find() .filter(filter) .order_by_asc(repo::Column::Id) .paginate(conn, per_page); let repos = paginator.fetch_page(page).await?; - let total_pages = paginator.num_pages().await?; - Ok((total_pages, repos)) + Ok(repos) } pub async fn by_id(conn: &DbConn, id: i32) -> Result> { diff --git a/server/src/distro.rs b/server/src/distro.rs deleted file mode 100644 index 22563ff..0000000 --- a/server/src/distro.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::{db, DistroMgr}; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::Arc, -}; - -use sea_orm::{DbConn, EntityTrait}; -use tokio::sync::Mutex; - -#[derive(Clone)] -pub struct MetaDistroMgr { - distro_dir: PathBuf, - conn: DbConn, - distros: Arc>>>, -} - -impl MetaDistroMgr { - pub async fn new>(distro_dir: P, conn: DbConn) -> crate::Result { - if !tokio::fs::try_exists(&distro_dir).await? { - tokio::fs::create_dir(&distro_dir).await?; - } - - let distro_dir = distro_dir.as_ref().to_path_buf(); - let mut map: HashMap> = HashMap::new(); - - let distros = db::Distro::find().all(&conn).await?; - - for distro in distros { - let mgr = - DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?; - map.insert(distro.name, Arc::new(mgr)); - } - - Ok(Self { - distro_dir, - conn, - distros: Arc::new(Mutex::new(map)), - }) - } - - pub async fn get_mgr(&self, distro: &str) -> Option> { - let map = self.distros.lock().await; - - map.get(distro).map(|mgr| Arc::clone(mgr)) - } - - pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result> { - let mut map = self.distros.lock().await; - - if let Some(mgr) = map.get(distro) { - Ok(Arc::clone(mgr)) - } else { - let distro = db::query::distro::insert(&self.conn, distro, None).await?; - - let mgr = Arc::new( - DistroMgr::new( - self.distro_dir.join(&distro.name), - distro.id, - self.conn.clone(), - ) - .await?, - ); - map.insert(distro.name, Arc::clone(&mgr)); - - Ok(mgr) - } - } -} diff --git a/server/src/error.rs b/server/src/error.rs index 5c3e920..e0626d4 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -14,6 +14,8 @@ pub enum ServerError { Db(sea_orm::DbErr), Status(StatusCode), Archive(libarchive::error::ArchiveError), + Figment(figment::Error), + Unit, } impl fmt::Display for ServerError { @@ -24,6 +26,8 @@ impl fmt::Display for ServerError { ServerError::Status(status) => write!(fmt, "{}", status), ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err), + ServerError::Figment(err) => write!(fmt, "{}", err), + ServerError::Unit => Ok(()), } } } @@ -41,9 +45,10 @@ impl IntoResponse for ServerError { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { StatusCode::NOT_FOUND.into_response() } - ServerError::Db(_) | ServerError::Archive(_) => { - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } + ServerError::Db(_) + | ServerError::Archive(_) + | ServerError::Figment(_) + | ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } } @@ -83,3 +88,9 @@ impl From for ServerError { ServerError::Archive(err) } } + +impl From for ServerError { + fn from(err: figment::Error) -> Self { + ServerError::Figment(err) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index d3cbdf0..eb1c3d0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,25 +1,23 @@ mod api; mod cli; +mod config; pub mod db; -mod distro; mod error; mod repo; +pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use repo::DistroMgr; + +use std::sync::Arc; use clap::Parser; -use std::path::PathBuf; -#[derive(Clone)] -pub struct Config { - data_dir: PathBuf, -} +pub const ANY_ARCH: &'static str = "any"; #[derive(Clone)] pub struct Global { - config: Config, - mgr: distro::MetaDistroMgr, + config: crate::config::Config, + mgr: Arc, db: sea_orm::DbConn, } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index 23d693d..e4f0581 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -1,71 +1,107 @@ use super::{archive, package}; -use crate::{db, error::Result}; +use crate::db::{self, query::package::delete_stale_pkgs}; -use std::path::{Path, PathBuf}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use futures::StreamExt; -use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::io::AsyncRead; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use sea_query::{Alias, Expr, Query}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Mutex, RwLock, +}; use uuid::Uuid; -pub const ANY_ARCH: &'static str = "any"; - -pub struct DistroMgr { - distro_dir: PathBuf, - distro_id: i32, - conn: DbConn, +struct PkgQueueMsg { + repo: i32, + path: PathBuf, } -impl DistroMgr { - pub async fn new>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result { - if !tokio::fs::try_exists(&distro_dir).await? { - tokio::fs::create_dir(&distro_dir).await?; +/// A single instance of this struct orchestrates everything related to managing packages files on +/// disk for all repositories in the server +pub struct RepoMgr { + repos_dir: PathBuf, + conn: DbConn, + pkg_queue: ( + UnboundedSender, + Mutex>, + ), + repos: RwLock>)>>, +} + +impl RepoMgr { + pub async fn new>(repos_dir: P, conn: DbConn) -> crate::Result { + if !tokio::fs::try_exists(&repos_dir).await? { + tokio::fs::create_dir(&repos_dir).await?; + } + + let (tx, rx) = unbounded_channel(); + + let mut repos = HashMap::new(); + let repo_ids: Vec = db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn) + .await?; + + for id in repo_ids { + repos.insert(id, Default::default()); } Ok(Self { - distro_dir: distro_dir.as_ref().to_path_buf(), - distro_id, + repos_dir: repos_dir.as_ref().to_path_buf(), conn, + pkg_queue: (tx, Mutex::new(rx)), + repos: RwLock::new(repos), }) } /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; + pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> { + let lock = self + .repos + .read() + .await + .get(&repo) + .map(|(_, lock)| Arc::clone(lock)); - if repo.is_none() { + if lock.is_none() { return Ok(()); } - let repo = repo.unwrap(); + let lock = lock.unwrap(); + let _guard = lock.lock().await; - let mut archs = repo - .find_related(crate::db::Package) + let archs: Vec = db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) .select_only() - .column(crate::db::package::Column::Arch) + .column(db::package::Column::Arch) .distinct() - .into_tuple::() - .stream(&self.conn) + .into_tuple() + .all(&self.conn) .await?; - while let Some(arch) = archs.next().await.transpose()? { - self.generate_archives(&repo.name, &arch).await?; + for arch in archs { + self.generate_archives(repo, &arch).await?; } Ok(()) } /// Generate the archive databases for the given repository and architecture. - pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; - - if repo.is_none() { - return Ok(()); - } - - let repo = repo.unwrap(); - + async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = self.random_file_paths(); let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; @@ -73,13 +109,15 @@ impl DistroMgr { // Query all packages in the repo that have the given architecture or the "any" // architecture - let mut pkgs = repo - .find_related(crate::db::Package) - .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) + let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) .stream(&self.conn) .await?; + let mut commited_ids: Vec = Vec::new(); + while let Some(pkg) = pkgs.next().await.transpose()? { + commited_ids.push(pkg.id); + let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; @@ -103,7 +141,7 @@ impl DistroMgr { ar_db.close().await?; ar_files.close().await?; - let repo_dir = self.distro_dir.join(&repo.name); + let repo_dir = self.repos_dir.join(repo.to_string()); // Move the db archives to their respective places tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; @@ -113,176 +151,235 @@ impl DistroMgr { ) .await?; + // Only after we have successfully written everything to disk do we update the database. + // This order ensures any failure can be recovered, as the database is our single source of + // truth. + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(commited_ids)) + .exec(&self.conn) + .await?; + // If this fails there's no point in failing the function + if there were no packages in // the repo, this fails anyway because the temp file doesn't exist let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await; + tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); + Ok(()) } - /// Remove the repo with the given name, if it existed - pub async fn remove_repo(&self, repo: &str) -> Result { - let res = db::query::repo::by_name(&self.conn, repo).await?; - - if let Some(repo_entry) = res { - // Remove repository from database - repo_entry.delete(&self.conn).await?; - - // Remove files from file system - tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?; - - Ok(true) - } else { - Ok(false) - } - } - - /// Remove all packages from the repository with the given arch. - pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result { - let repo = db::query::repo::by_name(&self.conn, repo).await?; - - if let Some(repo) = repo { - let mut pkgs = repo - .find_related(db::Package) - .filter(db::package::Column::Arch.eq(arch)) - .stream(&self.conn) - .await?; - - while let Some(pkg) = pkgs.next().await.transpose()? { - let path = self - .distro_dir - .join(&repo.name) - .join(super::package::filename(&pkg)); - tokio::fs::remove_file(path).await?; - - pkg.delete(&self.conn).await?; - } - - tokio::fs::remove_file( - self.distro_dir - .join(&repo.name) - .join(format!("{}.db.tar.gz", arch)), - ) - .await?; - tokio::fs::remove_file( - self.distro_dir - .join(&repo.name) - .join(format!("{}.files.tar.gz", arch)), - ) + /// Clean any remaining old package files from the database and file system + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) + .stream(&self.conn) .await?; - // If we removed all "any" packages, we need to resync all databases - if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; } - Ok(true) - } else { - Ok(false) + removed_pkgs += 1; } + + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); + + Ok(()) } - pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result { - let repo = db::query::repo::by_name(&self.conn, repo).await?; + pub async fn pkg_parse_task(&self) { + loop { + // Receive the next message and immediately drop the mutex afterwards. As long as the + // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked + // as soon as a message is received, so another worker can pick up the mutex. + let msg = { + let mut recv = self.pkg_queue.1.lock().await; + recv.recv().await + }; - if let Some(repo) = repo { - let pkg = - db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; + if let Some(msg) = msg { + // TODO better handle this error (retry if failure wasn't because the package is + // faulty) + let _ = self + .add_pkg_from_path(msg.path, msg.repo) + .await + .inspect_err(|e| tracing::error!("{:?}", e)); - if let Some(pkg) = pkg { - // Remove package from database & file system - tokio::fs::remove_file( - self.distro_dir - .join(&repo.name) - .join(super::package::filename(&pkg)), - ) - .await?; - pkg.delete(&self.conn).await?; + let old = self + .repos + .read() + .await + .get(&msg.repo) + .map(|n| n.0.fetch_sub(1, Ordering::SeqCst)); - if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; - } else { - self.generate_archives(&repo.name, arch).await?; + // Every time the queue for a repo becomes empty, we run a sync job + if old == Some(1) { + // TODO error handling + let _ = self.sync_repo(msg.repo).await; + + // TODO move this so that we only clean if entire queue is empty, not just + // queue for specific repo + let _ = self.remove_stale_pkgs().await; } - - Ok(true) - } else { - Ok(false) } - } else { - Ok(false) } } - pub async fn add_pkg_from_reader( - &self, - reader: &mut R, - repo: &str, - ) -> crate::Result<(String, String, String)> { - let [path] = self.random_file_paths(); - let mut temp_file = tokio::fs::File::create(&path).await?; + pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { + self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); + self.repos.read().await.get(&repo).inspect(|n| { + n.0.fetch_add(1, Ordering::SeqCst); + }); + } - tokio::io::copy(reader, &mut temp_file).await?; + pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { + Ok(db::Repo::find() + .find_also_related(db::Distro) + .filter( + Condition::all() + .add(db::repo::Column::Name.eq(repo)) + .add(db::distro::Column::Name.eq(distro)), + ) + .one(&self.conn) + .await + .map(|res| res.map(|(repo, _)| repo.id))?) + } - let path_clone = path.clone(); + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { + let mut repos = self.repos.write().await; + + let distro_id: Option = db::Distro::find() + .filter(db::distro::Column::Name.eq(distro)) + .select_only() + .column(db::distro::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let distro_id = if let Some(id) = distro_id { + id + } else { + let new_distro = db::distro::ActiveModel { + id: NotSet, + name: Set(distro.to_string()), + description: NotSet, + }; + + new_distro.insert(&self.conn).await?.id + }; + + let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::DistroId.eq(distro_id)) + .filter(db::repo::Column::Name.eq(repo)) + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let repo_id = if let Some(id) = repo_id { + id + } else { + let new_repo = db::repo::ActiveModel { + id: NotSet, + distro_id: Set(distro_id), + name: Set(repo.to_string()), + description: NotSet, + }; + let id = new_repo.insert(&self.conn).await?.id; + + tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; + repos.insert(id, Default::default()); + + id + }; + + Ok(repo_id) + } + + async fn add_pkg_from_path>(&self, path: P, repo: i32) -> crate::Result<()> { + let path_clone = path.as_ref().to_path_buf(); let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .await .unwrap()?; - let repo_dir = self.distro_dir.join(repo); + // TODO prevent database from being updated but file failing to move to repo dir? + let pkg = db::query::package::insert(&self.conn, repo, pkg).await?; - let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? { - repo.id - } else { - tokio::fs::create_dir(&repo_dir).await?; + let dest_path = self + .repos_dir + .join(repo.to_string()) + .join(pkg.id.to_string()); + tokio::fs::rename(path.as_ref(), dest_path).await?; - db::query::repo::insert(&self.conn, self.distro_id, repo, None) - .await? - .id - }; + tracing::info!( + "Added '{}-{}-{}' to repository {}", + pkg.name, + pkg.version, + pkg.arch, + repo, + ); - // If the package already exists in the database, we remove it first - let res = db::query::package::by_fields( - &self.conn, - repo_id, - &pkg.info.arch, - &pkg.info.name, - None, - None, - ) - .await?; - - if let Some(entry) = res { - entry.delete(&self.conn).await?; - } - - let dest_pkg_path = repo_dir.join(pkg.file_name()); - - // Insert new package into database - let name = pkg.info.name.clone(); - let version = pkg.info.version.clone(); - let arch = pkg.info.arch.clone(); - db::query::package::insert(&self.conn, repo_id, pkg).await?; - - // Move the package to its final resting place - tokio::fs::rename(path, dest_pkg_path).await?; - - // Synchronize archive databases - if arch == ANY_ARCH { - self.generate_archives_all(repo).await?; - } else { - self.generate_archives(repo, &arch).await?; - } - - Ok((name, version, arch)) + Ok(()) + } + + pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { + self.repos.write().await.remove(&repo); + db::Repo::delete_by_id(repo).exec(&self.conn).await?; + let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; + + Ok(()) + } + + /// Remove all packages in the repository that have a given arch. This method marks all + /// packages with the given architecture as "pending deletion", before performing a manual sync + /// & removal of stale packages. + pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::PendingDeletion), + ) + .filter( + Condition::all() + .add(db::package::Column::RepoId.eq(repo)) + .add(db::package::Column::Arch.eq(arch)), + ) + .exec(&self.conn) + .await?; + + self.sync_repo(repo).await?; + self.remove_stale_pkgs().await?; + + Ok(()) } - /// Generate a path to a unique file that can be used as a temporary file pub fn random_file_paths(&self) -> [PathBuf; C] { std::array::from_fn(|_| { let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.distro_dir.join(uuid.to_string()) + self.repos_dir.join(uuid.to_string()) }) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 7d7e321..953b631 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -2,7 +2,9 @@ mod archive; mod manager; pub mod package; -pub use manager::DistroMgr; +pub use manager::RepoMgr; + +use crate::FsConfig; use axum::{ body::Body, @@ -47,61 +49,59 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - let repo_dir = global - .config - .data_dir - .join("distros") - .join(&distro) - .join(&repo); + if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { + match global.config.fs { + FsConfig::Local { data_dir } => { + let repo_dir = data_dir.join("repos").join(repo_id.to_string()); - let file_name = - if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = if file_name == format!("{}.db", repo) + || file_name == format!("{}.db.tar.gz", repo) + { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await) + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) + } + } + } else { + Err(StatusCode::NOT_FOUND.into()) + } } async fn post_package_archive( State(global): State, Path((distro, repo)): Path<(String, String)>, body: Body, -) -> crate::Result<()> { +) -> crate::Result { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - let mgr = global.mgr.get_or_create_mgr(&distro).await?; - let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?; + let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; + let [tmp_path] = global.mgr.random_file_paths(); - tracing::info!( - "Added '{}-{}' to repository '{}' ({})", - name, - version, - repo, - arch - ); + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + tokio::io::copy(&mut body, &mut tmp_file).await?; - Ok(()) + global.mgr.queue_pkg(repo, tmp_path).await; + + Ok(StatusCode::ACCEPTED) } async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let repo_removed = mgr.remove_repo(&repo).await?; + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo(repo).await?; - if repo_removed { - tracing::info!("Removed repository '{}'", repo); + tracing::info!("Removed repository {repo}"); - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::OK) } else { Ok(StatusCode::NOT_FOUND) } @@ -111,41 +111,51 @@ async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo_arch(repo, &arch).await?; - if repo_removed { - tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); + tracing::info!("Removed architecture '{arch}' from repository {repo}"); - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::OK) } else { Ok(StatusCode::NOT_FOUND) } + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; + // + // if repo_removed { + // tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } async fn delete_package( State(global): State, Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, ) -> crate::Result { - if let Some(mgr) = global.mgr.get_mgr(&distro).await { - let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; - - if pkg_removed { - tracing::info!( - "Removed package '{}' ({}) from repository '{}'", - pkg_name, - arch, - repo - ); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; + // + // if pkg_removed { + // tracing::info!( + // "Removed package '{}' ({}) from repository '{}'", + // pkg_name, + // arch, + // repo + // ); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } diff --git a/server/src/repo/package.rs b/server/src/repo/package.rs index 24979eb..df98559 100644 --- a/server/src/repo/package.rs +++ b/server/src/repo/package.rs @@ -323,7 +323,7 @@ pub async fn write_desc( pkg: &package::Model, ) -> crate::Result<()> { writer - .write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes()) + .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes()) .await?; write_attribute(writer, "NAME", &pkg.name).await?; @@ -397,6 +397,8 @@ pub async fn write_desc( write_attribute(writer, key, &items.join("\n")).await?; } + writer.flush().await?; + Ok(()) } @@ -417,5 +419,7 @@ pub async fn write_files( .await?; } + writer.flush().await?; + Ok(()) }