refactor: ensure repo dirs exist; rename some things

concurrent-repos
Jef Roosens 2024-05-30 10:49:44 +02:00
parent 58def483aa
commit f9518d6b7d
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
6 changed files with 61 additions and 66 deletions

View File

@ -1,10 +1,9 @@
use crate::{distro::MetaDistroMgr, repo::MetaRepoMgr, Config, Global}; use crate::{distro::MetaDistroMgr, Config, Global};
use axum::{extract::FromRef, Router}; use axum::Router;
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait; use sea_orm_migration::MigratorTrait;
use std::{io, path::PathBuf, sync::Arc}; use std::{io, path::PathBuf};
use tokio::sync::RwLock;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing::debug; use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -76,8 +75,7 @@ impl Cli {
data_dir: self.data_dir.clone(), data_dir: self.data_dir.clone(),
}; };
let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()); let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?;
mgr.bootstrap().await?;
let global = Global { config, mgr, db }; let global = Global { config, mgr, db };

View File

@ -81,7 +81,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSig).string_len(255))
.col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::PgpSigSize).big_integer())
.col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
.col(ColumnDef::new(Package::Compression).char_len(16).not_null()) .col(ColumnDef::new(Package::Compression).string_len(16).not_null())
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
.name("fk-package-repo_id") .name("fk-package-repo_id")

View File

@ -1,4 +1,4 @@
use crate::{db, MetaRepoMgr}; use crate::{db, DistroMgr};
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -13,42 +13,40 @@ use tokio::sync::Mutex;
pub struct MetaDistroMgr { pub struct MetaDistroMgr {
distro_dir: PathBuf, distro_dir: PathBuf,
conn: DbConn, conn: DbConn,
distros: Arc<Mutex<HashMap<String, Arc<MetaRepoMgr>>>>, distros: Arc<Mutex<HashMap<String, Arc<DistroMgr>>>>,
} }
impl MetaDistroMgr { impl MetaDistroMgr {
pub fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> Self { pub async fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> crate::Result<Self> {
Self { if !tokio::fs::try_exists(&distro_dir).await? {
distro_dir: distro_dir.as_ref().to_path_buf(), tokio::fs::create_dir(&distro_dir).await?;
conn,
distros: Arc::new(Mutex::new(HashMap::new())),
} }
}
/// Populate the manager with the currently known distros from the database. let distro_dir = distro_dir.as_ref().to_path_buf();
pub async fn bootstrap(&self) -> crate::Result<()> { let mut map: HashMap<String, Arc<DistroMgr>> = HashMap::new();
let mut map = self.distros.lock().await;
let distros = db::Distro::find().all(&self.conn).await?; let distros = db::Distro::find().all(&conn).await?;
for distro in distros { for distro in distros {
let mgr = MetaRepoMgr::new( let mgr =
self.distro_dir.join(&distro.name), DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?;
distro.id,
self.conn.clone(),
);
map.insert(distro.name, Arc::new(mgr)); map.insert(distro.name, Arc::new(mgr));
} }
Ok(()) Ok(Self {
distro_dir,
conn,
distros: Arc::new(Mutex::new(map)),
})
} }
pub async fn get_mgr(&self, distro: &str) -> Option<Arc<MetaRepoMgr>> { pub async fn get_mgr(&self, distro: &str) -> Option<Arc<DistroMgr>> {
let map = self.distros.lock().await; let map = self.distros.lock().await;
map.get(distro).map(|mgr| Arc::clone(mgr)) map.get(distro).map(|mgr| Arc::clone(mgr))
} }
pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<MetaRepoMgr>> { pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<DistroMgr>> {
let mut map = self.distros.lock().await; let mut map = self.distros.lock().await;
if let Some(mgr) = map.get(distro) { if let Some(mgr) = map.get(distro) {
@ -56,11 +54,14 @@ impl MetaDistroMgr {
} else { } else {
let distro = db::query::distro::insert(&self.conn, distro, None).await?; let distro = db::query::distro::insert(&self.conn, distro, None).await?;
let mgr = Arc::new(MetaRepoMgr::new( let mgr = Arc::new(
self.distro_dir.join(&distro.name), DistroMgr::new(
distro.id, self.distro_dir.join(&distro.name),
self.conn.clone(), distro.id,
)); self.conn.clone(),
)
.await?,
);
map.insert(distro.name, Arc::clone(&mgr)); map.insert(distro.name, Arc::clone(&mgr));
Ok(mgr) Ok(mgr)

View File

@ -6,11 +6,10 @@ mod error;
mod repo; mod repo;
pub use error::{Result, ServerError}; pub use error::{Result, ServerError};
use repo::MetaRepoMgr; use repo::DistroMgr;
use clap::Parser; use clap::Parser;
use std::{path::PathBuf, sync::Arc}; use std::path::PathBuf;
use tokio::sync::RwLock;
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {

View File

@ -10,19 +10,23 @@ use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any"; pub const ANY_ARCH: &'static str = "any";
pub struct MetaRepoMgr { pub struct DistroMgr {
repo_dir: PathBuf, distro_dir: PathBuf,
distro_id: i32, distro_id: i32,
conn: DbConn, conn: DbConn,
} }
impl MetaRepoMgr { impl DistroMgr {
pub fn new<P: AsRef<Path>>(repo_dir: P, distro_id: i32, conn: DbConn) -> Self { pub async fn new<P: AsRef<Path>>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result<Self> {
MetaRepoMgr { if !tokio::fs::try_exists(&distro_dir).await? {
repo_dir: repo_dir.as_ref().to_path_buf(), tokio::fs::create_dir(&distro_dir).await?;
}
Ok(Self {
distro_dir: distro_dir.as_ref().to_path_buf(),
distro_id, distro_id,
conn, conn,
} })
} }
/// Generate archive databases for all known architectures in the repository, including the /// Generate archive databases for all known architectures in the repository, including the
@ -62,9 +66,6 @@ impl MetaRepoMgr {
let repo = repo.unwrap(); let repo = repo.unwrap();
let parent_dir = self.repo_dir.join(&repo.name);
tokio::fs::create_dir_all(&parent_dir).await?;
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths(); self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
@ -102,15 +103,13 @@ impl MetaRepoMgr {
ar_db.close().await?; ar_db.close().await?;
ar_files.close().await?; ar_files.close().await?;
let repo_dir = self.distro_dir.join(&repo.name);
// Move the db archives to their respective places // Move the db archives to their respective places
tokio::fs::rename( tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
tmp_ar_db_path,
parent_dir.join(format!("{}.db.tar.gz", arch)),
)
.await?;
tokio::fs::rename( tokio::fs::rename(
tmp_ar_files_path, tmp_ar_files_path,
parent_dir.join(format!("{}.files.tar.gz", arch)), repo_dir.join(format!("{}.files.tar.gz", arch)),
) )
.await?; .await?;
@ -131,7 +130,7 @@ impl MetaRepoMgr {
repo_entry.delete(&self.conn).await?; repo_entry.delete(&self.conn).await?;
// Remove files from file system // Remove files from file system
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?; tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?;
Ok(true) Ok(true)
} else { } else {
@ -152,7 +151,7 @@ impl MetaRepoMgr {
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
let path = self let path = self
.repo_dir .distro_dir
.join(&repo.name) .join(&repo.name)
.join(super::package::filename(&pkg)); .join(super::package::filename(&pkg));
tokio::fs::remove_file(path).await?; tokio::fs::remove_file(path).await?;
@ -161,13 +160,13 @@ impl MetaRepoMgr {
} }
tokio::fs::remove_file( tokio::fs::remove_file(
self.repo_dir self.distro_dir
.join(&repo.name) .join(&repo.name)
.join(format!("{}.db.tar.gz", arch)), .join(format!("{}.db.tar.gz", arch)),
) )
.await?; .await?;
tokio::fs::remove_file( tokio::fs::remove_file(
self.repo_dir self.distro_dir
.join(&repo.name) .join(&repo.name)
.join(format!("{}.files.tar.gz", arch)), .join(format!("{}.files.tar.gz", arch)),
) )
@ -194,7 +193,7 @@ impl MetaRepoMgr {
if let Some(pkg) = pkg { if let Some(pkg) = pkg {
// Remove package from database & file system // Remove package from database & file system
tokio::fs::remove_file( tokio::fs::remove_file(
self.repo_dir self.distro_dir
.join(&repo.name) .join(&repo.name)
.join(super::package::filename(&pkg)), .join(super::package::filename(&pkg)),
) )
@ -221,24 +220,23 @@ impl MetaRepoMgr {
reader: &mut R, reader: &mut R,
repo: &str, repo: &str,
) -> crate::Result<(String, String, String)> { ) -> crate::Result<(String, String, String)> {
// Copy file contents to temporary path so libarchive can work with it
let [path] = self.random_file_paths(); let [path] = self.random_file_paths();
let mut temp_file = tokio::fs::File::create(&path).await?; let mut temp_file = tokio::fs::File::create(&path).await?;
tokio::io::copy(reader, &mut temp_file).await?; tokio::io::copy(reader, &mut temp_file).await?;
// Parse the package
let path_clone = path.clone(); let path_clone = path.clone();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await .await
.unwrap()?; .unwrap()?;
// Query the repo for its ID, or create it if it does not already exist let repo_dir = self.distro_dir.join(repo);
let res = db::query::repo::by_name(&self.conn, &repo).await?;
let repo_id = if let Some(repo_entity) = res { let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? {
repo_entity.id repo.id
} else { } else {
tokio::fs::create_dir(&repo_dir).await?;
db::query::repo::insert(&self.conn, self.distro_id, repo, None) db::query::repo::insert(&self.conn, self.distro_id, repo, None)
.await? .await?
.id .id
@ -259,7 +257,7 @@ impl MetaRepoMgr {
entry.delete(&self.conn).await?; entry.delete(&self.conn).await?;
} }
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name()); let dest_pkg_path = repo_dir.join(pkg.file_name());
// Insert new package into database // Insert new package into database
let name = pkg.info.name.clone(); let name = pkg.info.name.clone();
@ -268,7 +266,6 @@ impl MetaRepoMgr {
db::query::package::insert(&self.conn, repo_id, pkg).await?; db::query::package::insert(&self.conn, repo_id, pkg).await?;
// Move the package to its final resting place // Move the package to its final resting place
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
tokio::fs::rename(path, dest_pkg_path).await?; tokio::fs::rename(path, dest_pkg_path).await?;
// Synchronize archive databases // Synchronize archive databases
@ -285,7 +282,7 @@ impl MetaRepoMgr {
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] { pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| { std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.repo_dir.join(uuid.to_string()) self.distro_dir.join(uuid.to_string())
}) })
} }
} }

View File

@ -2,7 +2,7 @@ mod archive;
mod manager; mod manager;
pub mod package; pub mod package;
pub use manager::MetaRepoMgr; pub use manager::DistroMgr;
use axum::{ use axum::{
body::Body, body::Body,