diff --git a/server/src/cli.rs b/server/src/cli.rs index 4fc94f1..2df7f7c 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -35,7 +35,7 @@ pub struct Cli { #[arg( long, value_name = "LOG_LEVEL", - default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", + default_value = "tower_http=debug,rieterd=debug", env = "RIETER_LOG" )] pub log: String, diff --git a/server/src/db/entities/package.rs b/server/src/db/entities/package.rs index 112cde4..08ac2ab 100644 --- a/server/src/db/entities/package.rs +++ b/server/src/db/entities/package.rs @@ -4,6 +4,8 @@ use chrono::NaiveDateTime; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; +use crate::db::PackageState; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "package")] pub struct Model { @@ -24,6 +26,7 @@ pub struct Model { pub pgp_sig_size: Option, pub sha256_sum: String, pub compression: String, + pub state: PackageState, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs index 2deb05f..f76e639 100644 --- a/server/src/db/migrator/m20230730_000001_create_repo_tables.rs +++ b/server/src/db/migrator/m20230730_000001_create_repo_tables.rs @@ -81,7 +81,12 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) - .col(ColumnDef::new(Package::Compression).string_len(16).not_null()) + .col( + ColumnDef::new(Package::Compression) + .string_len(16) + .not_null(), + ) + .col(ColumnDef::new(Package::State).integer().not_null()) .foreign_key( ForeignKey::create() .name("fk-package-repo_id") @@ -264,6 +269,7 @@ pub enum Package { PgpSigSize, Sha256Sum, Compression, + State, } #[derive(Iden)] diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 597cf20..98f42a4 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -30,6 +30,17 @@ pub enum PackageRelatedEnum { Optdepend, } +#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)] +#[sea_orm(rs_type = "i32", db_type = "Integer")] +pub enum PackageState { + #[sea_orm(num_value = 0)] + PendingCommit, + #[sea_orm(num_value = 1)] + Committed, + #[sea_orm(num_value = 2)] + PendingDeletion, +} + #[derive(Serialize)] pub struct FullPackage { #[serde(flatten)] diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index c76e532..abbfb9c 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -68,9 +68,17 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result .await } -pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { +pub async fn insert( + conn: &DbConn, + repo_id: i32, + pkg: crate::repo::package::Package, +) -> Result { let info = pkg.info; + // Doing this manually is not the recommended way, but the generic error type of the + // transaction function didn't play well with my current error handling + let txn = conn.begin().await?; + let model = package::ActiveModel { id: NotSet, repo_id: Set(repo_id), @@ -88,9 +96,10 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack pgp_sig_size: Set(info.pgpsigsize), sha256_sum: Set(info.sha256sum), compression: Set(pkg.compression.extension().unwrap().to_string()), + state: Set(PackageState::PendingCommit), }; - let pkg_entry = model.insert(conn).await?; + let pkg_entry = model.insert(&txn).await?; // Insert all the related tables PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { @@ -98,7 +107,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { @@ -106,7 +115,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; let related = info @@ -146,7 +155,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack name: Set(s.to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { @@ -154,10 +163,12 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack path: Set(s.display().to_string()), })) .on_empty_do_nothing() - .exec(conn) + .exec(&txn) .await?; - Ok(()) + txn.commit().await?; + + Ok(pkg_entry) } pub async fn full(conn: &DbConn, id: i32) -> Result> { diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index 23d693d..691f6b0 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -1,19 +1,25 @@ use super::{archive, package}; use crate::{db, error::Result}; -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use futures::StreamExt; use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::io::AsyncRead; +use tokio::{io::AsyncRead, sync::Mutex}; use uuid::Uuid; pub const ANY_ARCH: &'static str = "any"; +pub const REPOS_DIR: &'static str = "repos"; +pub const QUEUE_DIR: &'static str = "queue"; pub struct DistroMgr { distro_dir: PathBuf, distro_id: i32, conn: DbConn, + lock: Arc>, } impl DistroMgr { @@ -22,10 +28,23 @@ impl DistroMgr { tokio::fs::create_dir(&distro_dir).await?; } + let repos_dir = distro_dir.as_ref().join(REPOS_DIR); + + if !tokio::fs::try_exists(&repos_dir).await? { + tokio::fs::create_dir(repos_dir).await?; + } + + let queue_dir = distro_dir.as_ref().join(QUEUE_DIR); + + if !tokio::fs::try_exists(&queue_dir).await? { + tokio::fs::create_dir(queue_dir).await?; + } + Ok(Self { distro_dir: distro_dir.as_ref().to_path_buf(), distro_id, conn, + lock: Arc::new(Mutex::new(())), }) } @@ -121,6 +140,18 @@ impl DistroMgr { Ok(()) } + async fn get_or_create_repo(&self, repo: &str) -> Result { + let _guard = self.lock.lock().await; + + if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? { + Ok(repo) + } else { + tokio::fs::create_dir(self.distro_dir.join(repo)).await?; + + Ok(db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?) + } + } + /// Remove the repo with the given name, if it existed pub async fn remove_repo(&self, repo: &str) -> Result { let res = db::query::repo::by_name(&self.conn, repo).await?; @@ -220,62 +251,56 @@ impl DistroMgr { reader: &mut R, repo: &str, ) -> crate::Result<(String, String, String)> { - let [path] = self.random_file_paths(); - let mut temp_file = tokio::fs::File::create(&path).await?; + let [tmp_file_path] = self.random_file_paths(); + let mut temp_file = tokio::fs::File::create(&tmp_file_path).await?; tokio::io::copy(reader, &mut temp_file).await?; - let path_clone = path.clone(); + let path_clone = tmp_file_path.clone(); let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .await .unwrap()?; - let repo_dir = self.distro_dir.join(repo); + let repo = self.get_or_create_repo(repo).await?; + let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?; - let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? { - repo.id - } else { - tokio::fs::create_dir(&repo_dir).await?; - - db::query::repo::insert(&self.conn, self.distro_id, repo, None) - .await? - .id - }; + let queue_path = self.distro_dir.join(QUEUE_DIR).join(pkg.id.to_string()); + tokio::fs::rename(tmp_file_path, queue_path).await?; // If the package already exists in the database, we remove it first - let res = db::query::package::by_fields( - &self.conn, - repo_id, - &pkg.info.arch, - &pkg.info.name, - None, - None, - ) - .await?; - - if let Some(entry) = res { - entry.delete(&self.conn).await?; - } - - let dest_pkg_path = repo_dir.join(pkg.file_name()); - - // Insert new package into database - let name = pkg.info.name.clone(); - let version = pkg.info.version.clone(); - let arch = pkg.info.arch.clone(); - db::query::package::insert(&self.conn, repo_id, pkg).await?; - - // Move the package to its final resting place - tokio::fs::rename(path, dest_pkg_path).await?; + //let res = db::query::package::by_fields( + // &self.conn, + // repo.id, + // &pkg.info.arch, + // &pkg.info.name, + // None, + // None, + //) + //.await?; + // + //if let Some(entry) = res { + // entry.delete(&self.conn).await?; + //} + //let dest_pkg_path = repo_dir.join(pkg.file_name()); + // + //// Insert new package into database + //let name = pkg.info.name.clone(); + //let version = pkg.info.version.clone(); + //let arch = pkg.info.arch.clone(); + //db::query::package::insert(&self.conn, repo.id, pkg).await?; + // + //// Move the package to its final resting place + //tokio::fs::rename(tmp_file_path, dest_pkg_path).await?; + // // Synchronize archive databases - if arch == ANY_ARCH { - self.generate_archives_all(repo).await?; - } else { - self.generate_archives(repo, &arch).await?; - } + //if arch == ANY_ARCH { + // self.generate_archives_all(&repo.name).await?; + //} else { + // self.generate_archives(&repo.name, &arch).await?; + //} - Ok((name, version, arch)) + Ok((pkg.name, pkg.version, pkg.arch)) } /// Generate a path to a unique file that can be used as a temporary file