From 45f1abade39390332056e41c24ff1c2f2e4cf958 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 21 May 2024 09:16:45 +0200 Subject: [PATCH] refactor: restructure database query code --- server/src/api/mod.rs | 28 +-- server/src/cli.rs | 7 +- server/src/db/conn.rs | 61 ------ server/src/db/mod.rs | 29 +-- server/src/db/query/mod.rs | 3 - server/src/db/query/package.rs | 341 ++++++++++++++++----------------- server/src/db/query/repo.rs | 75 ++++---- server/src/main.rs | 2 +- server/src/repo/mod.rs | 51 ++--- 9 files changed, 243 insertions(+), 354 deletions(-) delete mode 100644 server/src/db/conn.rs diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 2577718..09b6c95 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -1,6 +1,6 @@ mod pagination; -use sea_orm::{sea_query::IntoCondition, *}; +use sea_orm::{*}; use axum::extract::{Path, Query, State}; use axum::routing::get; @@ -49,10 +49,7 @@ async fn get_single_repo( State(global): State, Path(id): Path, ) -> crate::Result> { - let repo = global - .db - .repo - .by_id(id) + let repo = db::query::repo::by_id(&global.db, id) .await? .ok_or(axum::http::StatusCode::NOT_FOUND)?; @@ -64,15 +61,13 @@ async fn get_packages( Query(pagination): Query, Query(filter): Query, ) -> crate::Result>> { - let (total_pages, pkgs) = global - .db - .pkg - .page( - pagination.per_page.unwrap_or(25), - pagination.page.unwrap_or(1) - 1, - filter, - ) - .await?; + let (total_pages, pkgs) = db::query::package::page( + &global.db, + pagination.per_page.unwrap_or(25), + pagination.page.unwrap_or(1) - 1, + filter, + ) + .await?; Ok(Json(pagination.res(total_pages, pkgs))) } @@ -81,10 +76,7 @@ async fn get_single_package( State(global): State, Path(id): Path, ) -> crate::Result> { - let entry = global - .db - .pkg - .full(id) + let entry = db::query::package::full(&global.db, id) .await? .ok_or(axum::http::StatusCode::NOT_FOUND)?; diff --git a/server/src/cli.rs b/server/src/cli.rs index 14a5808..1ae6de4 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -10,6 +10,7 @@ use std::sync::{Arc, RwLock}; use tower_http::trace::TraceLayer; use tracing::debug; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use sea_orm_migration::MigratorTrait; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -75,10 +76,8 @@ impl Cli { debug!("Connecting to database with URL {}", db_url); - let db = crate::db::RieterDb::connect(db_url).await?; - // let db = crate::db::init("postgres://rieter:rieter@localhost:5432/rieter") - // .await - // .unwrap(); + let db = sea_orm::Database::connect(db_url).await?; + crate::db::Migrator::up(&db, None).await?; let config = Config { data_dir: self.data_dir.clone(), diff --git a/server/src/db/conn.rs b/server/src/db/conn.rs deleted file mode 100644 index 2756236..0000000 --- a/server/src/db/conn.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::RieterDb; -use sea_orm::{DbBackend, DbErr, ExecResult, QueryResult, Statement}; -use std::{future::Future, pin::Pin}; - -// Allows RieterDb objects to be passed to ORM functions -impl sea_orm::ConnectionTrait for RieterDb { - fn get_database_backend(&self) -> DbBackend { - self.conn.get_database_backend() - } - fn execute<'life0, 'async_trait>( - &'life0 self, - stmt: Statement, - ) -> Pin> + Send + 'async_trait>> - where - Self: 'async_trait, - 'life0: 'async_trait, - { - self.conn.execute(stmt) - } - fn execute_unprepared<'life0, 'life1, 'async_trait>( - &'life0 self, - sql: &'life1 str, - ) -> Pin> + Send + 'async_trait>> - where - Self: 'async_trait, - 'life0: 'async_trait, - 'life1: 'async_trait, - { - self.conn.execute_unprepared(sql) - } - fn query_one<'life0, 'async_trait>( - &'life0 self, - stmt: Statement, - ) -> Pin< - Box< - dyn Future, DbErr>> - + Send - + 'async_trait, - >, - > - where - Self: 'async_trait, - 'life0: 'async_trait, - { - self.conn.query_one(stmt) - } - fn query_all<'life0, 'async_trait>( - &'life0 self, - stmt: Statement, - ) -> Pin< - Box< - dyn Future, DbErr>> + Send + 'async_trait, - >, - > - where - Self: 'async_trait, - 'life0: 'async_trait, - { - self.conn.query_all(stmt) - } -} diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index a7a7f66..b29f3d3 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -1,14 +1,14 @@ -mod conn; pub mod entities; mod migrator; pub mod query; -use sea_orm::{ConnectOptions, Database, DatabaseConnection, DeriveActiveEnum, EnumIter}; -use sea_orm_migration::MigratorTrait; +use sea_orm::{DeriveActiveEnum, EnumIter}; + use serde::{Deserialize, Serialize}; pub use entities::{prelude::*, *}; -use migrator::Migrator; +pub use migrator::Migrator; + type Result = std::result::Result; @@ -41,24 +41,3 @@ pub struct FullPackage { related: Vec<(PackageRelatedEnum, String)>, files: Vec, } - -#[derive(Clone, Debug)] -pub struct RieterDb { - conn: DatabaseConnection, - pub pkg: query::PackageQuery, - pub repo: query::RepoQuery, -} - -impl RieterDb { - pub async fn connect>(opt: C) -> Result { - let db = Database::connect(opt).await?; - - Migrator::up(&db, None).await?; - - Ok(Self { - conn: db.clone(), - pkg: query::PackageQuery::new(db.clone()), - repo: query::RepoQuery::new(db.clone()), - }) - } -} diff --git a/server/src/db/query/mod.rs b/server/src/db/query/mod.rs index 32cc3c5..87d61e3 100644 --- a/server/src/db/query/mod.rs +++ b/server/src/db/query/mod.rs @@ -1,7 +1,4 @@ pub mod package; pub mod repo; -pub use package::PackageQuery; -pub use repo::RepoQuery; - type Result = std::result::Result; diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index 8649dea..9d7a9f2 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -3,11 +3,6 @@ use serde::Deserialize; use crate::db::*; -#[derive(Clone, Debug)] -pub struct PackageQuery { - conn: DatabaseConnection, -} - #[derive(Deserialize)] pub struct Filter { repo: Option, @@ -27,189 +22,183 @@ impl IntoCondition for Filter { } } -impl PackageQuery { - pub fn new(conn: DatabaseConnection) -> Self { - Self { conn } +pub async fn page( + conn: &DbConn, + per_page: u64, + page: u64, + filter: Filter, +) -> super::Result<(u64, Vec)> { + let paginator = Package::find() + .filter(filter) + .order_by_asc(package::Column::Id) + .paginate(conn, per_page); + let packages = paginator.fetch_page(page).await?; + let total_pages = paginator.num_pages().await?; + + Ok((total_pages, packages)) +} + +pub async fn by_id(conn: &DbConn, id: i32) -> Result> { + package::Entity::find_by_id(id).one(conn).await +} + +pub async fn by_fields( + conn: &DbConn, + repo_id: i32, + name: &str, + version: Option<&str>, + arch: &str, +) -> Result> { + let mut query = Package::find() + .filter(package::Column::RepoId.eq(repo_id)) + .filter(package::Column::Name.eq(name)) + .filter(package::Column::Arch.eq(arch)); + + if let Some(version) = version { + query = query.filter(package::Column::Version.eq(version)); } - pub async fn page( - &self, - per_page: u64, - page: u64, - filter: Filter, - ) -> super::Result<(u64, Vec)> { - let paginator = Package::find() - .filter(filter) - .order_by_asc(package::Column::Id) - .paginate(&self.conn, per_page); - let packages = paginator.fetch_page(page).await?; - let total_pages = paginator.num_pages().await?; + query.one(conn).await +} - Ok((total_pages, packages)) - } +pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result { + Package::delete_many() + .filter(package::Column::RepoId.eq(repo_id)) + .filter(package::Column::Arch.eq(arch)) + .exec(conn) + .await +} - pub async fn by_id(&self, id: i32) -> Result> { - package::Entity::find_by_id(id).one(&self.conn).await - } +pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { + let info = pkg.info; - pub async fn by_fields( - &self, - repo_id: i32, - name: &str, - version: Option<&str>, - arch: &str, - ) -> Result> { - let mut query = Package::find() - .filter(package::Column::RepoId.eq(repo_id)) - .filter(package::Column::Name.eq(name)) - .filter(package::Column::Arch.eq(arch)); + let model = package::ActiveModel { + id: NotSet, + repo_id: Set(repo_id), + base: Set(info.base), + name: Set(info.name), + version: Set(info.version), + arch: Set(info.arch), + size: Set(info.size), + c_size: Set(info.csize), + description: Set(info.description), + url: Set(info.url), + build_date: Set(info.build_date.to_string()), + packager: Set(info.packager), + pgp_sig: Set(info.pgpsig), + pgp_sig_size: Set(info.pgpsigsize), + sha256_sum: Set(info.sha256sum), + }; - if let Some(version) = version { - query = query.filter(package::Column::Version.eq(version)); - } + let pkg_entry = model.insert(conn).await?; - query.one(&self.conn).await - } + // Insert all the related tables + PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { + package_id: Set(pkg_entry.id), + name: Set(s.to_string()), + })) + .on_empty_do_nothing() + .exec(conn) + .await?; - pub async fn delete_with_arch(&self, repo_id: i32, arch: &str) -> Result { - Package::delete_many() - .filter(package::Column::RepoId.eq(repo_id)) - .filter(package::Column::Arch.eq(arch)) - .exec(&self.conn) - .await - } + PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { + package_id: Set(pkg_entry.id), + name: Set(s.to_string()), + })) + .on_empty_do_nothing() + .exec(conn) + .await?; - pub async fn insert(&self, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { - let info = pkg.info; + let related = info + .conflicts + .iter() + .map(|s| (PackageRelatedEnum::Conflicts, s)) + .chain( + info.replaces + .iter() + .map(|s| (PackageRelatedEnum::Replaces, s)), + ) + .chain( + info.provides + .iter() + .map(|s| (PackageRelatedEnum::Provides, s)), + ) + .chain(info.depends.iter().map(|s| (PackageRelatedEnum::Depend, s))) + .chain( + info.makedepends + .iter() + .map(|s| (PackageRelatedEnum::Depend, s)), + ) + .chain( + info.checkdepends + .iter() + .map(|s| (PackageRelatedEnum::Checkdepend, s)), + ) + .chain( + info.optdepends + .iter() + .map(|s| (PackageRelatedEnum::Optdepend, s)), + ); - let model = package::ActiveModel { - id: NotSet, - repo_id: Set(repo_id), - base: Set(info.base), - name: Set(info.name), - version: Set(info.version), - arch: Set(info.arch), - size: Set(info.size), - c_size: Set(info.csize), - description: Set(info.description), - url: Set(info.url), - build_date: Set(info.build_date.to_string()), - packager: Set(info.packager), - pgp_sig: Set(info.pgpsig), - pgp_sig_size: Set(info.pgpsigsize), - sha256_sum: Set(info.sha256sum), - }; + PackageRelated::insert_many(related.map(|(t, s)| package_related::ActiveModel { + package_id: Set(pkg_entry.id), + r#type: Set(t), + name: Set(s.to_string()), + })) + .on_empty_do_nothing() + .exec(conn) + .await?; - let pkg_entry = model.insert(&self.conn).await?; + PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { + package_id: Set(pkg_entry.id), + path: Set(s.display().to_string()), + })) + .on_empty_do_nothing() + .exec(conn) + .await?; - // Insert all the related tables - PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { - package_id: Set(pkg_entry.id), - name: Set(s.to_string()), + Ok(()) +} + +pub async fn full(conn: &DbConn, id: i32) -> Result> { + if let Some(entry) = by_id(conn, id).await? { + let licenses = entry + .find_related(PackageLicense) + .all(conn) + .await? + .into_iter() + .map(|e| e.name) + .collect(); + let groups = entry + .find_related(PackageGroup) + .all(conn) + .await? + .into_iter() + .map(|e| e.name) + .collect(); + let related = entry + .find_related(PackageRelated) + .all(conn) + .await? + .into_iter() + .map(|e| (e.r#type, e.name)) + .collect(); + let files = entry + .find_related(PackageFile) + .all(conn) + .await? + .into_iter() + .map(|e| e.path) + .collect(); + + Ok(Some(FullPackage { + entry, + licenses, + groups, + related, + files, })) - .on_empty_do_nothing() - .exec(&self.conn) - .await?; - - PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { - package_id: Set(pkg_entry.id), - name: Set(s.to_string()), - })) - .on_empty_do_nothing() - .exec(&self.conn) - .await?; - - let related = info - .conflicts - .iter() - .map(|s| (PackageRelatedEnum::Conflicts, s)) - .chain( - info.replaces - .iter() - .map(|s| (PackageRelatedEnum::Replaces, s)), - ) - .chain( - info.provides - .iter() - .map(|s| (PackageRelatedEnum::Provides, s)), - ) - .chain(info.depends.iter().map(|s| (PackageRelatedEnum::Depend, s))) - .chain( - info.makedepends - .iter() - .map(|s| (PackageRelatedEnum::Depend, s)), - ) - .chain( - info.checkdepends - .iter() - .map(|s| (PackageRelatedEnum::Checkdepend, s)), - ) - .chain( - info.optdepends - .iter() - .map(|s| (PackageRelatedEnum::Optdepend, s)), - ); - - PackageRelated::insert_many(related.map(|(t, s)| package_related::ActiveModel { - package_id: Set(pkg_entry.id), - r#type: Set(t), - name: Set(s.to_string()), - })) - .on_empty_do_nothing() - .exec(&self.conn) - .await?; - - PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { - package_id: Set(pkg_entry.id), - path: Set(s.display().to_string()), - })) - .on_empty_do_nothing() - .exec(&self.conn) - .await?; - - Ok(()) - } - - pub async fn full(&self, id: i32) -> Result> { - if let Some(entry) = self.by_id(id).await? { - let licenses = entry - .find_related(PackageLicense) - .all(&self.conn) - .await? - .into_iter() - .map(|e| e.name) - .collect(); - let groups = entry - .find_related(PackageGroup) - .all(&self.conn) - .await? - .into_iter() - .map(|e| e.name) - .collect(); - let related = entry - .find_related(PackageRelated) - .all(&self.conn) - .await? - .into_iter() - .map(|e| (e.r#type, e.name)) - .collect(); - let files = entry - .find_related(PackageFile) - .all(&self.conn) - .await? - .into_iter() - .map(|e| e.path) - .collect(); - - Ok(Some(FullPackage { - entry, - licenses, - groups, - related, - files, - })) - } else { - Ok(None) - } + } else { + Ok(None) } } diff --git a/server/src/db/query/repo.rs b/server/src/db/query/repo.rs index 65177df..34fbb81 100644 --- a/server/src/db/query/repo.rs +++ b/server/src/db/query/repo.rs @@ -2,11 +2,6 @@ use sea_orm::{sea_query::IntoCondition, *}; use crate::db::*; -#[derive(Clone, Debug)] -pub struct RepoQuery { - conn: DatabaseConnection, -} - #[derive(Deserialize)] pub struct Filter { name: Option, @@ -18,43 +13,37 @@ impl IntoCondition for Filter { } } -impl RepoQuery { - pub fn new(conn: DatabaseConnection) -> Self { - Self { conn } - } +pub async fn page(conn: &DbConn, per_page: u64, page: u64) -> Result<(u64, Vec)> { + let paginator = Repo::find() + .order_by_asc(repo::Column::Id) + .paginate(conn, per_page); + let repos = paginator.fetch_page(page).await?; + let total_pages = paginator.num_pages().await?; - pub async fn page(&self, per_page: u64, page: u64) -> Result<(u64, Vec)> { - let paginator = Repo::find() - .order_by_asc(repo::Column::Id) - .paginate(&self.conn, per_page); - let repos = paginator.fetch_page(page).await?; - let total_pages = paginator.num_pages().await?; - - Ok((total_pages, repos)) - } - - pub async fn by_id(&self, id: i32) -> Result> { - repo::Entity::find_by_id(id).one(&self.conn).await - } - - pub async fn by_name(&self, name: &str) -> Result> { - Repo::find() - .filter(repo::Column::Name.eq(name)) - .one(&self.conn) - .await - } - - pub async fn insert( - &self, - name: &str, - description: Option<&str>, - ) -> Result> { - let model = repo::ActiveModel { - id: NotSet, - name: Set(String::from(name)), - description: Set(description.map(String::from)), - }; - - Repo::insert(model).exec(&self.conn).await - } + Ok((total_pages, repos)) +} + +pub async fn by_id(conn: &DbConn, id: i32) -> Result> { + repo::Entity::find_by_id(id).one(conn).await +} + +pub async fn by_name(conn: &DbConn, name: &str) -> Result> { + Repo::find() + .filter(repo::Column::Name.eq(name)) + .one(conn) + .await +} + +pub async fn insert( + conn: &DbConn, + name: &str, + description: Option<&str>, +) -> Result> { + let model = repo::ActiveModel { + id: NotSet, + name: Set(String::from(name)), + description: Set(description.map(String::from)), + }; + + Repo::insert(model).exec(conn).await } diff --git a/server/src/main.rs b/server/src/main.rs index fc5c110..9068bd7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,7 +22,7 @@ pub struct Config { pub struct Global { config: Config, repo_manager: Arc>, - db: db::RieterDb, + db: sea_orm::DbConn, } #[tokio::main] diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 4681fb6..5bf00a6 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -5,7 +5,7 @@ pub use manager::RepoGroupManager; use std::path::PathBuf; -use axum::body::{Body, BodyDataStream}; +use axum::body::{Body}; use axum::extract::{Path, State}; use axum::http::Request; use axum::http::StatusCode; @@ -21,6 +21,8 @@ use tower_http::services::{ServeDir, ServeFile}; use tower_http::validate_request::ValidateRequestHeaderLayer; use uuid::Uuid; +use crate::db; + const DB_FILE_EXTS: [&str; 4] = [".db", ".files", ".db.tar.gz", ".files.tar.gz"]; pub fn router(api_key: &str) -> Router { @@ -128,26 +130,31 @@ async fn post_package_archive( tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo); // Query the repo for its ID, or create it if it does not already exist - let res = global.db.repo.by_name(&repo).await?; + let res = db::query::repo::by_name(&global.db, &repo).await?; let repo_id = if let Some(repo_entity) = res { repo_entity.id } else { - global.db.repo.insert(&repo, None).await?.last_insert_id + db::query::repo::insert(&global.db, &repo, None) + .await? + .last_insert_id }; // If the package already exists in the database, we remove it first - let res = global - .db - .pkg - .by_fields(repo_id, &pkg.info.name, None, &pkg.info.arch) - .await?; + let res = db::query::package::by_fields( + &global.db, + repo_id, + &pkg.info.name, + None, + &pkg.info.arch, + ) + .await?; if let Some(entry) = res { entry.delete(&global.db).await?; } - global.db.pkg.insert(repo_id, pkg).await?; + db::query::package::insert(&global.db, repo_id, pkg).await?; Ok(()) } @@ -172,7 +179,7 @@ async fn delete_repo( .await??; if repo_removed { - let res = global.db.repo.by_name(&repo).await?; + let res = db::query::repo::by_name(&global.db, &repo).await?; if let Some(repo_entry) = res { repo_entry.delete(&global.db).await?; @@ -203,10 +210,10 @@ async fn delete_arch_repo( .await??; if repo_removed { - let res = global.db.repo.by_name(&repo).await?; + let res = db::query::repo::by_name(&global.db, &repo).await?; if let Some(repo_entry) = res { - global.db.pkg.delete_with_arch(repo_entry.id, &arch).await?; + db::query::package::delete_with_arch(&global.db, repo_entry.id, &arch).await?; } tracing::info!("Removed architecture '{}' from repository '{}'", arch, repo); @@ -229,19 +236,17 @@ async fn delete_package( .await??; if let Some((name, version, release, arch)) = res { - let res = global.db.repo.by_name(&repo).await?; + let res = db::query::repo::by_name(&global.db, &repo).await?; if let Some(repo_entry) = res { - let res = global - .db - .pkg - .by_fields( - repo_entry.id, - &name, - Some(&format!("{}-{}", version, release)), - &arch, - ) - .await?; + let res = db::query::package::by_fields( + &global.db, + repo_entry.id, + &name, + Some(&format!("{}-{}", version, release)), + &arch, + ) + .await?; if let Some(entry) = res { entry.delete(&global.db).await?;