feat: start of archive generation for new manager

concurrent-repos
Jef Roosens 2024-05-23 21:09:35 +02:00
parent cc2dc9b28f
commit 2e0c6d1fa6
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
7 changed files with 90 additions and 22 deletions

View File

@ -11,6 +11,8 @@ pub struct Builder {
consumed: bool, consumed: bool,
} }
unsafe impl Send for Builder {}
impl Builder { impl Builder {
pub fn new() -> Self { pub fn new() -> Self {
Builder::default() Builder::default()

View File

@ -13,6 +13,8 @@ pub struct FileWriter {
closed: bool, closed: bool,
} }
unsafe impl Send for FileWriter {}
impl Handle for FileWriter { impl Handle for FileWriter {
unsafe fn handle(&self) -> *const ffi::Struct_archive { unsafe fn handle(&self) -> *const ffi::Struct_archive {
self.handle as *const _ self.handle as *const _

View File

@ -9,6 +9,8 @@ pub struct WriteEntry {
entry: *mut ffi::Struct_archive_entry, entry: *mut ffi::Struct_archive_entry,
} }
unsafe impl Send for WriteEntry {}
impl WriteEntry { impl WriteEntry {
pub fn new() -> Self { pub fn new() -> Self {
let entry = unsafe { ffi::archive_entry_new() }; let entry = unsafe { ffi::archive_entry_new() };

View File

@ -83,7 +83,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
pgp_sig: Set(info.pgpsig), pgp_sig: Set(info.pgpsig),
pgp_sig_size: Set(info.pgpsigsize), pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum), sha256_sum: Set(info.sha256sum),
compression: Set(pkg.compression.extension().unwrap().to_string()) compression: Set(pkg.compression.extension().unwrap().to_string()),
}; };
let pkg_entry = model.insert(conn).await?; let pkg_entry = model.insert(conn).await?;

View File

@ -1,9 +1,10 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::io; use std::io;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
pub type Result<T> = std::result::Result<T, ServerError>; pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug)] #[derive(Debug)]
@ -12,6 +13,7 @@ pub enum ServerError {
Axum(axum::Error), Axum(axum::Error),
Db(sea_orm::DbErr), Db(sea_orm::DbErr),
Status(StatusCode), Status(StatusCode),
Archive(libarchive::error::ArchiveError),
} }
impl fmt::Display for ServerError { impl fmt::Display for ServerError {
@ -21,6 +23,7 @@ impl fmt::Display for ServerError {
ServerError::Axum(err) => write!(fmt, "{}", err), ServerError::Axum(err) => write!(fmt, "{}", err),
ServerError::Status(status) => write!(fmt, "{}", status), ServerError::Status(status) => write!(fmt, "{}", status),
ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Db(err) => write!(fmt, "{}", err),
ServerError::Archive(err) => write!(fmt, "{}", err),
} }
} }
} }
@ -38,7 +41,9 @@ impl IntoResponse for ServerError {
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
StatusCode::NOT_FOUND.into_response() StatusCode::NOT_FOUND.into_response()
} }
ServerError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), ServerError::Db(_) | ServerError::Archive(_) => {
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
} }
} }
} }
@ -72,3 +77,9 @@ impl From<sea_orm::DbErr> for ServerError {
ServerError::Db(err) ServerError::Db(err)
} }
} }
impl From<libarchive::error::ArchiveError> for ServerError {
fn from(err: libarchive::error::ArchiveError) -> Self {
ServerError::Archive(err)
}
}

View File

@ -1,23 +1,83 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use sea_orm::{DbConn, ModelTrait}; use libarchive::write::{Builder, WriteEntry};
use libarchive::{Entry, WriteFilter, WriteFormat};
use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter};
use futures::StreamExt;
use crate::db; use crate::db;
use crate::error::Result; use crate::error::Result;
pub struct MetaRepoMngr { pub const ANY_ARCH: &str = "any";
pub struct MetaRepoMgr {
repo_dir: PathBuf, repo_dir: PathBuf,
pkg_dir: PathBuf, pkg_dir: PathBuf,
} }
impl MetaRepoMngr { impl MetaRepoMgr {
pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(repo_dir: P1, pkg_dir: P2) -> Self { pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(repo_dir: P1, pkg_dir: P2) -> Self {
MetaRepoMngr { MetaRepoMgr {
repo_dir: repo_dir.as_ref().to_path_buf(), repo_dir: repo_dir.as_ref().to_path_buf(),
pkg_dir: pkg_dir.as_ref().to_path_buf(), pkg_dir: pkg_dir.as_ref().to_path_buf(),
} }
} }
/// Generate the `db` and `files` archive files for the given repo and architecture.
pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
let parent_dir = self.repo_dir.join(&repo.name).join(arch);
let repo_name = repo.name.clone();
let (mut ar_db, mut ar_files) = tokio::task::spawn_blocking(move || {
let mut ar_db = Builder::new();
ar_db.add_filter(WriteFilter::Gzip)?;
ar_db.set_format(WriteFormat::PaxRestricted)?;
let mut ar_files = Builder::new();
ar_files.add_filter(WriteFilter::Gzip)?;
ar_files.set_format(WriteFormat::PaxRestricted)?;
let ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", repo_name)));
let ar_files =
ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", repo_name)));
match (ar_db, ar_files) {
(Ok(ar_db), Ok(ar_files)) => Ok((ar_db, ar_files)),
(Err(err), _) | (_, Err(err)) => Err(err),
}
})
.await
.unwrap()?;
//let mut ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", &repo.name)))?;
//let mut ar_files =
// ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", &repo.name)))?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = repo
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.eq(arch).or(ANY_ARCH.into()))
.stream(conn)
.await?;
while let Some(pkg) = pkgs.next().await {
let pkg = pkg?;
// TODO for each package, write entry to archive files
}
Ok(())
}
/// Remove the repo with the given name, if it existed /// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> { pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(conn, repo).await?; let res = db::query::repo::by_name(conn, repo).await?;

View File

@ -143,13 +143,9 @@ async fn post_package_archive(
}; };
// If the package already exists in the database, we remove it first // If the package already exists in the database, we remove it first
let res = db::query::package::by_fields( let res =
&global.db, db::query::package::by_fields(&global.db, repo_id, &pkg.info.arch, &pkg.info.name)
repo_id, .await?;
&pkg.info.arch,
&pkg.info.name,
)
.await?;
if let Some(entry) = res { if let Some(entry) = res {
entry.delete(&global.db).await?; entry.delete(&global.db).await?;
@ -240,13 +236,8 @@ async fn delete_package(
let res = db::query::repo::by_name(&global.db, &repo).await?; let res = db::query::repo::by_name(&global.db, &repo).await?;
if let Some(repo_entry) = res { if let Some(repo_entry) = res {
let res = db::query::package::by_fields( let res =
&global.db, db::query::package::by_fields(&global.db, repo_entry.id, &arch, &name).await?;
repo_entry.id,
&arch,
&name,
)
.await?;
if let Some(entry) = res { if let Some(entry) = res {
entry.delete(&global.db).await?; entry.delete(&global.db).await?;