diff --git a/server/rieterd.toml b/server/rieterd.toml index 9cc56bf..781a055 100644 --- a/server/rieterd.toml +++ b/server/rieterd.toml @@ -1,17 +1,11 @@ api_key = "test" -pkg_workers = 2 -log_level = "rieterd=debug" +port = 8000 +log_level = "tower_http=debug,rieterd=debug" [fs] -type = "local" +type = "locl" data_dir = "./data" [db] type = "sqlite" db_dir = "./data" -# [db] -# type = "postgres" -# host = "localhost" -# db = "rieter" -# user = "rieter" -# password = "rieter" diff --git a/server/src/cli.rs b/server/src/cli.rs index 73dc9f2..1ceaf27 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,4 +1,4 @@ -use crate::{distro::MetaDistroMgr, Config, FsConfig, Global}; +use crate::{distro::MetaDistroMgr, Config, Global}; use std::{io, path::PathBuf, sync::Arc}; @@ -12,6 +12,13 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] pub struct Cli { + /// Directory where repository metadata & SQLite database is stored + #[arg(env = "RIETER_DATA_DIR")] + pub data_dir: PathBuf, + /// API key to authenticate private routes with + #[arg(env = "RIETER_API_KEY")] + pub api_key: String, + #[arg( short, long, @@ -19,54 +26,89 @@ pub struct Cli { default_value = "./rieterd.toml" )] pub config_file: PathBuf, + + /// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the + /// data directory + #[arg(short, long, env = "RIETER_DATABASE_URL")] + pub database_url: Option, + /// Port the server will listen on + #[arg( + short, + long, + value_name = "PORT", + default_value_t = 8000, + env = "RIETER_PORT" + )] + pub port: u16, + /// Log levels for the tracing + #[arg( + long, + value_name = "LOG_LEVEL", + default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", + env = "RIETER_LOG" + )] + pub log: String, } impl Cli { - pub async fn run(&self) -> crate::Result<()> { - let config: Config = Config::figment(&self.config_file) - .extract() - .inspect_err(|e| tracing::error!("{}", e))?; - + pub fn init_tracing(&self) { tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) + .with(tracing_subscriber::EnvFilter::new(self.log.clone())) .with(tracing_subscriber::fmt::layer()) .init(); + } - tracing::info!("Connecting to database"); - let db = crate::db::connect(&config.db).await?; + pub async fn run(&self) -> crate::Result<()> { + self.init_tracing(); - crate::db::Migrator::up(&db, None).await?; + //tracing::debug!("{:?}", &self.config_file); + //let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err( + // |e| tracing::error!("{}", e) + //)?; + //tracing::debug!("{:?}", new_config); - let mgr = match &config.fs { - FsConfig::Local { data_dir } => { - crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? - } + let db_url = if let Some(url) = &self.database_url { + url.clone() + } else { + format!( + "sqlite://{}?mode=rwc", + self.data_dir.join("rieter.sqlite").to_string_lossy() + ) }; - let mgr = Arc::new(mgr); + debug!("Connecting to database with URL {}", db_url); - for _ in 0..config.pkg_workers { + let mut options = sea_orm::ConnectOptions::new(db_url); + options.max_connections(16); + + let db = sea_orm::Database::connect(options).await?; + crate::db::Migrator::up(&db, None).await?; + + debug!("Successfully applied migrations"); + + let config = Config { + data_dir: self.data_dir.clone(), + }; + + let mgr = + Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); + + for _ in 0..1 { let clone = Arc::clone(&mgr); tokio::spawn(async move { clone.pkg_parse_task().await }); } - let global = Global { - config: config.clone(), - mgr, - db, - }; + let global = Global { config, mgr, db }; // build our application with a single route let app = Router::new() .nest("/api", crate::api::router()) - .merge(crate::repo::router(&config.api_key)) + .merge(crate::repo::router(&self.api_key)) .with_state(global) .layer(TraceLayer::new_for_http()); - let domain: String = format!("{}:{}", config.domain, config.port) - .parse() - .unwrap(); + let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap(); let listener = tokio::net::TcpListener::bind(domain).await?; // run it with hyper on localhost:3000 Ok(axum::serve(listener, app.into_make_service()) diff --git a/server/src/config.rs b/server/src/config.rs index e165fdc..a639362 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -6,49 +6,34 @@ use figment::{ }; use serde::Deserialize; -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum FsConfig { Local { data_dir: PathBuf }, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug)] #[serde(rename_all = "lowercase")] #[serde(tag = "type")] pub enum DbConfig { Sqlite { db_dir: PathBuf, - #[serde(default = "default_db_sqlite_max_connections")] - max_connections: u32, }, Postgres { host: String, - #[serde(default = "default_db_postgres_port")] - port: u16, user: String, password: String, - db: String, - #[serde(default)] - schema: String, - #[serde(default = "default_db_postgres_max_connections")] - max_connections: u32, }, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug)] pub struct Config { - pub api_key: String, - #[serde(default = "default_domain")] - pub domain: String, - #[serde(default = "default_port")] - pub port: u16, - #[serde(default = "default_log_level")] - pub log_level: String, - pub fs: FsConfig, - pub db: DbConfig, - #[serde(default = "default_pkg_workers")] - pub pkg_workers: u32, + api_key: String, + port: u16, + log_level: String, + fs: FsConfig, + db: DbConfig, } impl Config { @@ -58,31 +43,3 @@ impl Config { .merge(Env::prefixed("RIETER_")) } } - -fn default_domain() -> String { - String::from("0.0.0.0") -} - -fn default_port() -> u16 { - 8000 -} - -fn default_log_level() -> String { - String::from("tower_http=debug,rieterd=debug,sea_orm=debug") -} - -fn default_db_sqlite_max_connections() -> u32 { - 16 -} - -fn default_db_postgres_port() -> u16 { - 5432 -} - -fn default_db_postgres_max_connections() -> u32 { - 16 -} - -fn default_pkg_workers() -> u32 { - 1 -} diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index a1b7476..98f42a4 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -2,12 +2,10 @@ pub mod entities; mod migrator; pub mod query; -use crate::config::DbConfig; - pub use entities::{prelude::*, *}; pub use migrator::Migrator; -use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter}; +use sea_orm::{DeriveActiveEnum, EnumIter}; use serde::{Deserialize, Serialize}; type Result = std::result::Result; @@ -52,50 +50,3 @@ pub struct FullPackage { related: Vec<(PackageRelatedEnum, String)>, files: Vec, } - -pub async fn connect(conn: &DbConfig) -> crate::Result { - match conn { - DbConfig::Sqlite { - db_dir, - max_connections, - } => { - let url = format!( - "sqlite://{}?mode=rwc", - db_dir.join("rieter.sqlite").to_string_lossy() - ); - let options = sea_orm::ConnectOptions::new(url) - .max_connections(*max_connections) - .to_owned(); - - let conn = Database::connect(options).await?; - - // synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs - // https://www.sqlite.org/pragma.html#pragma_synchronous - conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?; - conn.execute_unprepared("PRAGMA synchronous=NORMAL;") - .await?; - - Ok(conn) - } - DbConfig::Postgres { - host, - port, - db, - user, - password, - schema, - max_connections, - } => { - let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db); - - if schema != "" { - url = format!("{url}?currentSchema={schema}"); - } - - let options = sea_orm::ConnectOptions::new(url) - .max_connections(*max_connections) - .to_owned(); - Ok(Database::connect(options).await?) - } - } -} diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 2ba1996..8e9c17b 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -2,7 +2,7 @@ use crate::db::{self, *}; use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; +use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] @@ -222,8 +222,8 @@ pub struct PkgToRemove { pub id: i32, } -fn max_pkg_ids_query(committed: bool) -> SelectStatement { - let mut query = Query::select() +fn max_pkg_ids_query() -> SelectStatement { + Query::select() .from(db::package::Entity) .columns([ db::package::Column::RepoId, @@ -236,29 +236,39 @@ fn max_pkg_ids_query(committed: bool) -> SelectStatement { db::package::Column::Arch, db::package::Column::Name, ]) - .to_owned(); - - if committed { - query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); - } - - query + .cond_where( + Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), + ) + .to_owned() } -/// Query that returns all packages that should be included in a sync for the given repository and -/// arch. pub fn pkgs_to_sync( conn: &DbConn, repo: i32, arch: &str, ) -> SelectorRaw> { + let max_id_query = Query::select() + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .from(db::package::Entity) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .to_owned(); + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let query = Query::select() - .columns(db::package::Column::iter().map(|c| (p1.clone(), c))) + .column((p1.clone(), Asterisk)) .from_as(db::package::Entity, p1.clone()) .join_subquery( JoinType::InnerJoin, - max_pkg_ids_query(false), + max_id_query, p2.clone(), Expr::col((p1.clone(), db::package::Column::Id)) .eq(Expr::col((p2.clone(), Alias::new("max_id")))), @@ -266,13 +276,13 @@ pub fn pkgs_to_sync( .cond_where( Condition::all() .add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) - .add( - Expr::col((p1.clone(), db::package::Column::Arch)) - .is_in([arch, crate::ANY_ARCH]), - ) .add( Expr::col((p1.clone(), db::package::Column::State)) .ne(db::PackageState::PendingDeletion), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Arch)) + .is_in([arch, crate::ANY_ARCH]), ), ) .to_owned(); @@ -283,10 +293,36 @@ pub fn pkgs_to_sync( } fn stale_pkgs_query(include_repo: bool) -> SelectStatement { + // In each repository, only one version of a package can exist for any given arch. Because ids + // are monotonically increasing, we know that the row that represents the actual package + // currently in the repository is the row with the largest id whose state is "committed". This + // query finds this id for each (repo, arch, name) tuple. + let mut max_id_query = Query::select(); + max_id_query + .from(db::package::Entity) + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .cond_where( + Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), + ); + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); - let mut query = Query::select() - .from_as(db::package::Entity, p1.clone()) - .to_owned(); + let mut query = Query::select(); + + // We then perform an inner join between the max id query above and the package table, where we + // filter on rows whose id is less than their respective package's max id or whose state is set + // to "pending deletion". This gives us all rows in the database that correspond to packages + // that are no longer needed, and can thus be removed. + query.from_as(db::package::Entity, p1.clone()); if include_repo { query.columns([ @@ -297,13 +333,10 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { query.column((p1.clone(), db::package::Column::Id)); } - // We left join on the max pkgs query because a repository that has all its packages set to - // "pending deletion" doesn't show up in the query. These are also included with a where clause - // on the joined rows. query .join_subquery( - JoinType::LeftJoin, - max_pkg_ids_query(true), + JoinType::InnerJoin, + max_id_query, p2.clone(), Condition::all() .add( @@ -326,12 +359,11 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement { .lt(Expr::col((p2.clone(), Alias::new("max_id")))), ) .add( - Expr::col((p1.clone(), db::package::Column::State)) + Expr::col((p1.clone(), db::package::Column::Id)) .eq(db::PackageState::PendingDeletion), ), - ); - - query + ) + .to_owned() } pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw> { diff --git a/server/src/main.rs b/server/src/main.rs index f7e1a95..c3237cf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,7 +6,6 @@ mod distro; mod error; mod repo; -pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; use repo::DistroMgr; @@ -15,9 +14,14 @@ use std::{path::PathBuf, sync::Arc}; pub const ANY_ARCH: &'static str = "any"; +#[derive(Clone)] +pub struct Config { + data_dir: PathBuf, +} + #[derive(Clone)] pub struct Global { - config: crate::config::Config, + config: Config, mgr: Arc, db: sea_orm::DbConn, } diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 2f66cfe..f91ab69 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -100,6 +100,42 @@ impl RepoMgr { Ok(()) } + /// Clean any remaining old package files from the database and file system + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) + .stream(&self.conn) + .await?; + + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; + } + + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); + + Ok(()) + } + /// Generate the archive databases for the given repository and architecture. async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = @@ -173,42 +209,6 @@ impl RepoMgr { Ok(()) } - /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) - .stream(&self.conn) - .await?; - - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; - } - - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); - - Ok(()) - } - pub async fn pkg_parse_task(&self) { loop { // Receive the next message and immediately drop the mutex afterwards. As long as the @@ -248,7 +248,7 @@ impl RepoMgr { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); + let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); self.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); @@ -291,7 +291,6 @@ impl RepoMgr { }; let repo_id: Option = db::Repo::find() - .filter(db::repo::Column::DistroId.eq(distro_id)) .filter(db::repo::Column::Name.eq(repo)) .select_only() .column(db::repo::Column::Id) diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 16c62a5..290f9a7 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -6,8 +6,6 @@ pub mod package; pub use manager::DistroMgr; pub use manager2::RepoMgr; -use crate::FsConfig; - use axum::{ body::Body, extract::{Path, State}, @@ -52,26 +50,25 @@ async fn get_file( req: Request, ) -> crate::Result { if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - match global.config.fs { - FsConfig::Local { data_dir } => { - let repo_dir = data_dir.join("repos").join(repo_id.to_string()); + let repo_dir = global + .config + .data_dir + .join("repos") + .join(repo_id.to_string()); - let file_name = if file_name == format!("{}.db", repo) - || file_name == format!("{}.db.tar.gz", repo) - { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; + let file_name = + if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) - } - } + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) } else { Err(StatusCode::NOT_FOUND.into()) } @@ -81,7 +78,7 @@ async fn post_package_archive( State(global): State, Path((distro, repo)): Path<(String, String)>, body: Body, -) -> crate::Result { +) -> crate::Result<()> { let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; let [tmp_path] = global.mgr.random_file_paths(); @@ -91,7 +88,7 @@ async fn post_package_archive( global.mgr.queue_pkg(repo, tmp_path).await; - Ok(StatusCode::ACCEPTED) + Ok(()) } async fn delete_repo( @@ -113,15 +110,7 @@ async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - global.mgr.remove_repo_arch(repo, &arch).await?; - - tracing::info!("Removed architecture '{arch}' from repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } + Ok(StatusCode::NOT_FOUND) //if let Some(mgr) = global.mgr.get_mgr(&distro).await { // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; //