From e17269ac3b0f31f70ee032be4eb862dc938bcb20 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sun, 16 Jun 2024 13:04:04 +0200 Subject: [PATCH 1/2] feat: clean up some queries; implement repo arch remove --- server/src/db/query/package.rs | 102 +++++++++++---------------------- server/src/repo/manager2.rs | 72 +++++++++++------------ server/src/repo/mod.rs | 14 ++++- 3 files changed, 82 insertions(+), 106 deletions(-) diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 8e9c17b..2ba1996 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; +use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -222,8 +222,8 @@ pub struct PkgToRemove { pub id: i32, } -fn max_pkg_ids_query() -> SelectStatement { - Query::select() +fn max_pkg_ids_query(committed: bool) -> SelectStatement { + let mut query = Query::select() .from(db::package::Entity) .columns([ db::package::Column::RepoId, @@ -236,39 +236,29 @@ fn max_pkg_ids_query() -> SelectStatement { db::package::Column::Arch, db::package::Column::Name, ]) - .cond_where( - Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), - ) - .to_owned() -} - -pub fn pkgs_to_sync( - conn: &DbConn, - repo: i32, - arch: &str, -) -> SelectorRaw> { - let max_id_query = Query::select() - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .from(db::package::Entity) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) .to_owned(); + if committed { + query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); + } + + query +} + +/// Query that returns all packages that should be included in a sync for the given repository and +/// arch. +pub fn pkgs_to_sync( + conn: &DbConn, + repo: i32, + arch: &str, +) -> SelectorRaw> { let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let query = Query::select() - .column((p1.clone(), Asterisk)) + .columns(db::package::Column::iter().map(|c| (p1.clone(), c))) .from_as(db::package::Entity, p1.clone()) .join_subquery( JoinType::InnerJoin, - max_id_query, + max_pkg_ids_query(false), p2.clone(), Expr::col((p1.clone(), db::package::Column::Id)) .eq(Expr::col((p2.clone(), Alias::new("max_id")))), @@ -276,13 +266,13 @@ pub fn pkgs_to_sync( .cond_where( Condition::all() .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) - .add( - Expr::col((p1.clone(), db::package::Column::State)) - .ne(db::PackageState::PendingDeletion), - ) .add( Expr::col((p1.clone(), db::package::Column::Arch)) .is_in([arch, crate::ANY_ARCH]), + ) + .add( + Expr::col((p1.clone(), db::package::Column::State)) + .ne(db::PackageState::PendingDeletion), ), ) .to_owned(); @@ -293,36 +283,10 @@ pub fn pkgs_to_sync( } fn stale_pkgs_query(include_repo: bool) -> SelectStatement { - // In each repository, only one version of a package can exist for any given arch. Because ids - // are monotonically increasing, we know that the row that represents the actual package - // currently in the repository is the row with the largest id whose state is "committed". This - // query finds this id for each (repo, arch, name) tuple. - let mut max_id_query = Query::select(); - max_id_query - .from(db::package::Entity) - .columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) - .group_by_columns([ - db::package::Column::RepoId, - db::package::Column::Arch, - db::package::Column::Name, - ]) - .cond_where( - Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), - ); - let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let mut query = Query::select(); - - // We then perform an inner join between the max id query above and the package table, where we - // filter on rows whose id is less than their respective package's max id or whose state is set - // to "pending deletion". This gives us all rows in the database that correspond to packages - // that are no longer needed, and can thus be removed. - query.from_as(db::package::Entity, p1.clone()); + let mut query = Query::select() + .from_as(db::package::Entity, p1.clone()) + .to_owned(); if include_repo { query.columns([ @@ -333,10 +297,13 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { query.column((p1.clone(), db::package::Column::Id)); } + // We left join on the max pkgs query because a repository that has all its packages set to + // "pending deletion" doesn't show up in the query. These are also included with a where clause + // on the joined rows. query .join_subquery( - JoinType::InnerJoin, - max_id_query, + JoinType::LeftJoin, + max_pkg_ids_query(true), p2.clone(), Condition::all() .add( @@ -359,11 +326,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { .lt(Expr::col((p2.clone(), Alias::new("max_id")))), ) .add( - Expr::col((p1.clone(), db::package::Column::Id)) + Expr::col((p1.clone(), db::package::Column::State)) .eq(db::PackageState::PendingDeletion), ), - ) - .to_owned() + ); + + query } pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index f91ab69..266eeee 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -100,42 +100,6 @@ impl RepoMgr { Ok(()) } - /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) - .stream(&self.conn) - .await?; - - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; - } - - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); - - Ok(()) - } - /// Generate the archive databases for the given repository and architecture. async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = @@ -209,6 +173,42 @@ impl RepoMgr { Ok(()) } + /// Clean any remaining old package files from the database and file system + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) + .stream(&self.conn) + .await?; + + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; + } + + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); + + Ok(()) + } + pub async fn pkg_parse_task(&self) { loop { // Receive the next message and immediately drop the mutex afterwards. As long as the diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 290f9a7..d088095 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -78,7 +78,7 @@ async fn post_package_archive( State(global): State, Path((distro, repo)): Path<(String, String)>, body: Body, -) -> crate::Result<()> { +) -> crate::Result { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; let [tmp_path] = global.mgr.random_file_paths(); @@ -88,7 +88,7 @@ async fn post_package_archive( global.mgr.queue_pkg(repo, tmp_path).await; - Ok(()) + Ok(StatusCode::ACCEPTED) } async fn delete_repo( @@ -110,7 +110,15 @@ async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo_arch(repo, &arch).await?; + + tracing::info!("Removed architecture '{arch}' from repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } //if let Some(mgr) = global.mgr.get_mgr(&distro).await { // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; // From 97e42588ed6f912175b8dadfcd32034ef6df6eb9 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sun, 16 Jun 2024 18:14:56 +0200 Subject: [PATCH 2/2] feat: switch to proper config file --- server/rieterd.toml | 12 +++-- server/src/cli.rs | 90 ++++++++++--------------------------- server/src/config.rs | 59 ++++++++++++++++++++---- server/src/db/mod.rs | 51 ++++++++++++++++++++- server/src/main.rs | 8 +--- server/src/repo/manager2.rs | 3 +- server/src/repo/mod.rs | 37 ++++++++------- 7 files changed, 158 insertions(+), 102 deletions(-) diff --git a/server/rieterd.toml b/server/rieterd.toml index 781a055..9cc56bf 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,11 +1,17 @@ api_key = "test" -port = 8000 -log_level = "tower_http=debug,rieterd=debug" +pkg_workers = 2 +log_level = "rieterd=debug" [fs] -type = "locl" +type = "local" data_dir = "./data" [db] type = "sqlite" db_dir = "./data" +# [db] +# type = "postgres" +# host = "localhost" +# db = "rieter" +# user = "rieter" +# password = "rieter" diff --git a/server/src/cli.rs b/server/src/cli.rs index 1ceaf27..73dc9f2 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,4 +1,4 @@ -use crate::{distro::MetaDistroMgr, Config, Global}; +use crate::{distro::MetaDistroMgr, Config, FsConfig, Global}; use std::{io, path::PathBuf, sync::Arc}; @@ -12,13 +12,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] pub struct Cli { - /// Directory where repository metadata & SQLite database is stored - #[arg(env = "RIETER_DATA_DIR")] - pub data_dir: PathBuf, - /// API key to authenticate private routes with - #[arg(env = "RIETER_API_KEY")] - pub api_key: String, - #[arg( short, long, @@ -26,89 +19,54 @@ pub struct Cli { default_value = "./rieterd.toml" )] pub config_file: PathBuf, - - /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the - /// data directory - #[arg(short, long, env = "RIETER_DATABASE_URL")] - pub database_url: Option, - /// Port the server will listen on - #[arg( - short, - long, - value_name = "PORT", - default_value_t = 8000, - env = "RIETER_PORT" - )] - pub port: u16, - /// Log levels for the tracing - #[arg( - long, - value_name = "LOG_LEVEL", - default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", - env = "RIETER_LOG" - )] - pub log: String, } impl Cli { - pub fn init_tracing(&self) { + pub async fn run(&self) -> crate::Result<()> { + let config: Config = Config::figment(&self.config_file) + .extract() + .inspect_err(|e| tracing::error!("{}", e))?; + tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(self.log.clone())) + .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) .with(tracing_subscriber::fmt::layer()) .init(); - } - pub async fn run(&self) -> crate::Result<()> { - self.init_tracing(); + tracing::info!("Connecting to database"); + let db = crate::db::connect(&config.db).await?; - //tracing::debug!("{:?}", &self.config_file); - //let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err( - // |e| tracing::error!("{}", e) - //)?; - //tracing::debug!("{:?}", new_config); - - let db_url = if let Some(url) = &self.database_url { - url.clone() - } else { - format!( - "sqlite://{}?mode=rwc", - self.data_dir.join("rieter.sqlite").to_string_lossy() - ) - }; - - debug!("Connecting to database with URL {}", db_url); - - let mut options = sea_orm::ConnectOptions::new(db_url); - options.max_connections(16); - - let db = sea_orm::Database::connect(options).await?; crate::db::Migrator::up(&db, None).await?; - debug!("Successfully applied migrations"); - - let config = Config { - data_dir: self.data_dir.clone(), + let mgr = match &config.fs { + FsConfig::Local { data_dir } => { + crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? + } }; - let mgr = - Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); + let mgr = Arc::new(mgr); - for _ in 0..1 { + for _ in 0..config.pkg_workers { let clone = Arc::clone(&mgr); tokio::spawn(async move { clone.pkg_parse_task().await }); } - let global = Global { config, mgr, db }; + let global = Global { + config: config.clone(), + mgr, + db, + }; // build our application with a single route let app = Router::new() .nest("/api", crate::api::router()) - .merge(crate::repo::router(&self.api_key)) + .merge(crate::repo::router(&config.api_key)) .with_state(global) .layer(TraceLayer::new_for_http()); - let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap(); + let domain: String = format!("{}:{}", config.domain, config.port) + .parse() + .unwrap(); let listener = tokio::net::TcpListener::bind(domain).await?; // run it with hyper on localhost:3000 Ok(axum::serve(listener, app.into_make_service()) diff --git a/server/src/config.rs b/server/src/config.rs index a639362..e165fdc 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -6,34 +6,49 @@ use figment::{ }; use serde::Deserialize; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum FsConfig { Local { data_dir: PathBuf }, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum DbConfig { Sqlite { db_dir: PathBuf, + #[serde(default = "default_db_sqlite_max_connections")] + max_connections: u32, }, Postgres { host: String, + #[serde(default = "default_db_postgres_port")] + port: u16, user: String, password: String, + db: String, + #[serde(default)] + schema: String, + #[serde(default = "default_db_postgres_max_connections")] + max_connections: u32, }, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct Config { - api_key: String, - port: u16, - log_level: String, - fs: FsConfig, - db: DbConfig, + pub api_key: String, + #[serde(default = "default_domain")] + pub domain: String, + #[serde(default = "default_port")] + pub port: u16, + #[serde(default = "default_log_level")] + pub log_level: String, + pub fs: FsConfig, + pub db: DbConfig, + #[serde(default = "default_pkg_workers")] + pub pkg_workers: u32, } impl Config { @@ -43,3 +58,31 @@ impl Config { .merge(Env::prefixed("RIETER_")) } } + +fn default_domain() -> String { + String::from("0.0.0.0") +} + +fn default_port() -> u16 { + 8000 +} + +fn default_log_level() -> String { + String::from("tower_http=debug,rieterd=debug,sea_orm=debug") +} + +fn default_db_sqlite_max_connections() -> u32 { + 16 +} + +fn default_db_postgres_port() -> u16 { + 5432 +} + +fn default_db_postgres_max_connections() -> u32 { + 16 +} + +fn default_pkg_workers() -> u32 { + 1 +} diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 98f42a4..a1b7476 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -2,10 +2,12 @@ pub mod entities; mod migrator; pub mod query; +use crate::config::DbConfig; + pub use entities::{prelude::*, *}; pub use migrator::Migrator; -use sea_orm::{DeriveActiveEnum, EnumIter}; +use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter}; use serde::{Deserialize, Serialize}; type Result = std::result::Result; @@ -50,3 +52,50 @@ pub struct FullPackage { related: Vec<(PackageRelatedEnum, String)>, files: Vec, } + +pub async fn connect(conn: &DbConfig) -> crate::Result { + match conn { + DbConfig::Sqlite { + db_dir, + max_connections, + } => { + let url = format!( + "sqlite://{}?mode=rwc", + db_dir.join("rieter.sqlite").to_string_lossy() + ); + let options = sea_orm::ConnectOptions::new(url) + .max_connections(*max_connections) + .to_owned(); + + let conn = Database::connect(options).await?; + + // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs + // https://www.sqlite.org/pragma.html#pragma_synchronous + conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?; + conn.execute_unprepared("PRAGMA synchronous=NORMAL;") + .await?; + + Ok(conn) + } + DbConfig::Postgres { + host, + port, + db, + user, + password, + schema, + max_connections, + } => { + let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db); + + if schema != "" { + url = format!("{url}?currentSchema={schema}"); + } + + let options = sea_orm::ConnectOptions::new(url) + .max_connections(*max_connections) + .to_owned(); + Ok(Database::connect(options).await?) + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index c3237cf..f7e1a95 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,6 +6,7 @@ mod distro; mod error; mod repo; +pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; use repo::DistroMgr; @@ -14,14 +15,9 @@ use std::{path::PathBuf, sync::Arc}; pub const ANY_ARCH: &'static str = "any"; -#[derive(Clone)] -pub struct Config { - data_dir: PathBuf, -} - #[derive(Clone)] pub struct Global { - config: Config, + config: crate::config::Config, mgr: Arc, db: sea_orm::DbConn, } diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 266eeee..2f66cfe 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -248,7 +248,7 @@ impl RepoMgr { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); + self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); self.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); @@ -291,6 +291,7 @@ impl RepoMgr { }; let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::DistroId.eq(distro_id)) .filter(db::repo::Column::Name.eq(repo)) .select_only() .column(db::repo::Column::Id) diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index d088095..16c62a5 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -6,6 +6,8 @@ pub mod package; pub use manager::DistroMgr; pub use manager2::RepoMgr; +use crate::FsConfig; + use axum::{ body::Body, extract::{Path, State}, @@ -50,25 +52,26 @@ async fn get_file( req: Request, ) -> crate::Result { if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - let repo_dir = global - .config - .data_dir - .join("repos") - .join(repo_id.to_string()); + match global.config.fs { + FsConfig::Local { data_dir } => { + let repo_dir = data_dir.join("repos").join(repo_id.to_string()); - let file_name = - if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = if file_name == format!("{}.db", repo) + || file_name == format!("{}.db.tar.gz", repo) + { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) + } + } } else { Err(StatusCode::NOT_FOUND.into()) }