feat: upload new packages to queue

concurrent-repos
Jef Roosens 2024-06-03 09:46:02 +02:00
parent f9518d6b7d
commit fa6de9b035
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
6 changed files with 110 additions and 54 deletions

View File

@ -35,7 +35,7 @@ pub struct Cli {
#[arg( #[arg(
long, long,
value_name = "LOG_LEVEL", value_name = "LOG_LEVEL",
default_value = "tower_http=debug,rieterd=debug,sea_orm=debug", default_value = "tower_http=debug,rieterd=debug",
env = "RIETER_LOG" env = "RIETER_LOG"
)] )]
pub log: String, pub log: String,

View File

@ -4,6 +4,8 @@ use chrono::NaiveDateTime;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::PackageState;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package")] #[sea_orm(table_name = "package")]
pub struct Model { pub struct Model {
@ -24,6 +26,7 @@ pub struct Model {
pub pgp_sig_size: Option<i64>, pub pgp_sig_size: Option<i64>,
pub sha256_sum: String, pub sha256_sum: String,
pub compression: String, pub compression: String,
pub state: PackageState,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -81,7 +81,12 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSig).string_len(255))
.col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::PgpSigSize).big_integer())
.col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
.col(ColumnDef::new(Package::Compression).string_len(16).not_null()) .col(
ColumnDef::new(Package::Compression)
.string_len(16)
.not_null(),
)
.col(ColumnDef::new(Package::State).integer().not_null())
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
.name("fk-package-repo_id") .name("fk-package-repo_id")
@ -264,6 +269,7 @@ pub enum Package {
PgpSigSize, PgpSigSize,
Sha256Sum, Sha256Sum,
Compression, Compression,
State,
} }
#[derive(Iden)] #[derive(Iden)]

View File

@ -30,6 +30,17 @@ pub enum PackageRelatedEnum {
Optdepend, Optdepend,
} }
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum PackageState {
#[sea_orm(num_value = 0)]
PendingCommit,
#[sea_orm(num_value = 1)]
Committed,
#[sea_orm(num_value = 2)]
PendingDeletion,
}
#[derive(Serialize)] #[derive(Serialize)]
pub struct FullPackage { pub struct FullPackage {
#[serde(flatten)] #[serde(flatten)]

View File

@ -68,9 +68,17 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result
.await .await
} }
pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { pub async fn insert(
conn: &DbConn,
repo_id: i32,
pkg: crate::repo::package::Package,
) -> Result<package::Model> {
let info = pkg.info; let info = pkg.info;
// Doing this manually is not the recommended way, but the generic error type of the
// transaction function didn't play well with my current error handling
let txn = conn.begin().await?;
let model = package::ActiveModel { let model = package::ActiveModel {
id: NotSet, id: NotSet,
repo_id: Set(repo_id), repo_id: Set(repo_id),
@ -88,9 +96,10 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
pgp_sig_size: Set(info.pgpsigsize), pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum), sha256_sum: Set(info.sha256sum),
compression: Set(pkg.compression.extension().unwrap().to_string()), compression: Set(pkg.compression.extension().unwrap().to_string()),
state: Set(PackageState::PendingCommit),
}; };
let pkg_entry = model.insert(conn).await?; let pkg_entry = model.insert(&txn).await?;
// Insert all the related tables // Insert all the related tables
PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
@ -98,7 +107,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
@ -106,7 +115,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
let related = info let related = info
@ -146,7 +155,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
@ -154,10 +163,12 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
path: Set(s.display().to_string()), path: Set(s.display().to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
Ok(()) txn.commit().await?;
Ok(pkg_entry)
} }
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> { pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {

View File

@ -1,19 +1,25 @@
use super::{archive, package}; use super::{archive, package};
use crate::{db, error::Result}; use crate::{db, error::Result};
use std::path::{Path, PathBuf}; use std::{
path::{Path, PathBuf},
sync::Arc,
};
use futures::StreamExt; use futures::StreamExt;
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
use tokio::io::AsyncRead; use tokio::{io::AsyncRead, sync::Mutex};
use uuid::Uuid; use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any"; pub const ANY_ARCH: &'static str = "any";
pub const REPOS_DIR: &'static str = "repos";
pub const QUEUE_DIR: &'static str = "queue";
pub struct DistroMgr { pub struct DistroMgr {
distro_dir: PathBuf, distro_dir: PathBuf,
distro_id: i32, distro_id: i32,
conn: DbConn, conn: DbConn,
lock: Arc<Mutex<()>>,
} }
impl DistroMgr { impl DistroMgr {
@ -22,10 +28,23 @@ impl DistroMgr {
tokio::fs::create_dir(&distro_dir).await?; tokio::fs::create_dir(&distro_dir).await?;
} }
let repos_dir = distro_dir.as_ref().join(REPOS_DIR);
if !tokio::fs::try_exists(&repos_dir).await? {
tokio::fs::create_dir(repos_dir).await?;
}
let queue_dir = distro_dir.as_ref().join(QUEUE_DIR);
if !tokio::fs::try_exists(&queue_dir).await? {
tokio::fs::create_dir(queue_dir).await?;
}
Ok(Self { Ok(Self {
distro_dir: distro_dir.as_ref().to_path_buf(), distro_dir: distro_dir.as_ref().to_path_buf(),
distro_id, distro_id,
conn, conn,
lock: Arc::new(Mutex::new(())),
}) })
} }
@ -121,6 +140,18 @@ impl DistroMgr {
Ok(()) Ok(())
} }
async fn get_or_create_repo(&self, repo: &str) -> Result<db::repo::Model> {
let _guard = self.lock.lock().await;
if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? {
Ok(repo)
} else {
tokio::fs::create_dir(self.distro_dir.join(repo)).await?;
Ok(db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?)
}
}
/// Remove the repo with the given name, if it existed /// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, repo: &str) -> Result<bool> { pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(&self.conn, repo).await?; let res = db::query::repo::by_name(&self.conn, repo).await?;
@ -220,62 +251,56 @@ impl DistroMgr {
reader: &mut R, reader: &mut R,
repo: &str, repo: &str,
) -> crate::Result<(String, String, String)> { ) -> crate::Result<(String, String, String)> {
let [path] = self.random_file_paths(); let [tmp_file_path] = self.random_file_paths();
let mut temp_file = tokio::fs::File::create(&path).await?; let mut temp_file = tokio::fs::File::create(&tmp_file_path).await?;
tokio::io::copy(reader, &mut temp_file).await?; tokio::io::copy(reader, &mut temp_file).await?;
let path_clone = path.clone(); let path_clone = tmp_file_path.clone();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await .await
.unwrap()?; .unwrap()?;
let repo_dir = self.distro_dir.join(repo); let repo = self.get_or_create_repo(repo).await?;
let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?;
let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? { let queue_path = self.distro_dir.join(QUEUE_DIR).join(pkg.id.to_string());
repo.id tokio::fs::rename(tmp_file_path, queue_path).await?;
} else {
tokio::fs::create_dir(&repo_dir).await?;
db::query::repo::insert(&self.conn, self.distro_id, repo, None)
.await?
.id
};
// If the package already exists in the database, we remove it first // If the package already exists in the database, we remove it first
let res = db::query::package::by_fields( //let res = db::query::package::by_fields(
&self.conn, // &self.conn,
repo_id, // repo.id,
&pkg.info.arch, // &pkg.info.arch,
&pkg.info.name, // &pkg.info.name,
None, // None,
None, // None,
) //)
.await?; //.await?;
//
if let Some(entry) = res { //if let Some(entry) = res {
entry.delete(&self.conn).await?; // entry.delete(&self.conn).await?;
} //}
let dest_pkg_path = repo_dir.join(pkg.file_name());
// Insert new package into database
let name = pkg.info.name.clone();
let version = pkg.info.version.clone();
let arch = pkg.info.arch.clone();
db::query::package::insert(&self.conn, repo_id, pkg).await?;
// Move the package to its final resting place
tokio::fs::rename(path, dest_pkg_path).await?;
//let dest_pkg_path = repo_dir.join(pkg.file_name());
//
//// Insert new package into database
//let name = pkg.info.name.clone();
//let version = pkg.info.version.clone();
//let arch = pkg.info.arch.clone();
//db::query::package::insert(&self.conn, repo.id, pkg).await?;
//
//// Move the package to its final resting place
//tokio::fs::rename(tmp_file_path, dest_pkg_path).await?;
//
// Synchronize archive databases // Synchronize archive databases
if arch == ANY_ARCH { //if arch == ANY_ARCH {
self.generate_archives_all(repo).await?; // self.generate_archives_all(&repo.name).await?;
} else { //} else {
self.generate_archives(repo, &arch).await?; // self.generate_archives(&repo.name, &arch).await?;
} //}
Ok((name, version, arch)) Ok((pkg.name, pkg.version, pkg.arch))
} }
/// Generate a path to a unique file that can be used as a temporary file /// Generate a path to a unique file that can be used as a temporary file