From c95feadca104f39bbf25045844b764f82a821828 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 25 May 2024 13:31:46 +0200 Subject: [PATCH] feat: further work on new repo & package implementation --- libarchive/src/write/file.rs | 33 ++++++++++++++++++++ libarchive/src/write/mod.rs | 1 + server/src/repo/manager_new.rs | 24 +++++++++++++-- server/src/repo/mod.rs | 1 + server/src/repo/package_new.rs | 56 ++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 server/src/repo/package_new.rs diff --git a/libarchive/src/write/file.rs b/libarchive/src/write/file.rs index 7f81915..fa39a13 100644 --- a/libarchive/src/write/file.rs +++ b/libarchive/src/write/file.rs @@ -2,10 +2,12 @@ use super::WriteEntry; use crate::error::ArchiveError; use crate::Entry; use crate::Handle; +use core::ffi::c_void; use libarchive3_sys::ffi; use std::fs; use std::io; use std::io::Read; +use std::io::Write; use std::path::Path; pub struct FileWriter { @@ -33,6 +35,17 @@ impl FileWriter { } } + /// Append the given entry to the archive. After successfully calling this function, writing to + /// the archive now writes to this entry. + pub fn append_entry(&mut self, entry: &mut WriteEntry) -> crate::Result<()> { + unsafe { + match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) { + ffi::ARCHIVE_OK => Ok(()), + _ => Err(ArchiveError::from(self as &dyn Handle).into()), + } + } + } + pub fn append_data(&mut self, entry: &mut WriteEntry, r: &mut R) -> crate::Result<()> { unsafe { match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) { @@ -109,3 +122,23 @@ impl Drop for FileWriter { } } } + +impl Write for FileWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let res = unsafe { + ffi::archive_write_data(self.handle_mut(), buf.as_ptr() as *const c_void, buf.len()) + } as isize; + + if res < 0 { + Err(ArchiveError::from(self as &dyn Handle).into()) + } else { + // Unwrap is safe as we check if the value is negative in the if statement + Ok(res.try_into().unwrap()) + } + } + + fn flush(&mut self) -> io::Result<()> { + // Libarchive doesn't seem to provide a flush mechanism + Ok(()) + } +} diff --git a/libarchive/src/write/mod.rs b/libarchive/src/write/mod.rs index 642fc18..5f583e0 100644 --- a/libarchive/src/write/mod.rs +++ b/libarchive/src/write/mod.rs @@ -3,6 +3,7 @@ mod file; use crate::Entry; pub use builder::Builder; +pub use file::FileWriter; use libarchive3_sys::ffi; pub struct WriteEntry { diff --git a/server/src/repo/manager_new.rs b/server/src/repo/manager_new.rs index 9e7fb91..5e3787e 100644 --- a/server/src/repo/manager_new.rs +++ b/server/src/repo/manager_new.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use libarchive::write::{Builder, WriteEntry}; use libarchive::{Entry, WriteFilter, WriteFormat}; @@ -7,6 +8,7 @@ use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter}; use futures::StreamExt; +use super::package_new; use crate::db; use crate::error::Result; @@ -37,7 +39,7 @@ impl MetaRepoMgr { let parent_dir = self.repo_dir.join(&repo.name).join(arch); let repo_name = repo.name.clone(); - let (mut ar_db, mut ar_files) = tokio::task::spawn_blocking(move || { + let (ar_db, ar_files) = tokio::task::spawn_blocking(move || { let mut ar_db = Builder::new(); ar_db.add_filter(WriteFilter::Gzip)?; ar_db.set_format(WriteFormat::PaxRestricted)?; @@ -69,12 +71,30 @@ impl MetaRepoMgr { .stream(conn) .await?; + let ar_files = Arc::new(Mutex::new(ar_files)); + let ar_db = Arc::new(Mutex::new(ar_db)); + while let Some(pkg) = pkgs.next().await { let pkg = pkg?; - // TODO for each package, write entry to archive files + package_new::append_files_entry(conn, &pkg, Arc::clone(&ar_files)).await?; + + // TODO db archive } + // Close archives explicitely for better error handling + tokio::task::spawn_blocking(move || { + let r1 = ar_files.lock().unwrap().close(); + let r2 = ar_db.lock().unwrap().close(); + + match (r1, r2) { + (Ok(_), Ok(_)) => Ok(()), + (Err(err), _) | (_, Err(err)) => Err(err), + } + }) + .await + .unwrap()?; + Ok(()) } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 419aa61..c6ca77c 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,6 +1,7 @@ mod manager; mod manager_new; pub mod package; +pub mod package_new; pub use manager::RepoGroupManager; diff --git a/server/src/repo/package_new.rs b/server/src/repo/package_new.rs new file mode 100644 index 0000000..187d980 --- /dev/null +++ b/server/src/repo/package_new.rs @@ -0,0 +1,56 @@ +use sea_orm::{DbConn, ModelTrait}; + +use libarchive::write::{FileWriter, WriteEntry}; +use libarchive::Entry; + +use futures::StreamExt; + +use std::io::Write; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use crate::db; + +/// Return the full name of the package, consisting of its package name, pkgver and pkgrel +fn full_pkg_name(pkg: &db::entities::package::Model) -> String { + format!("{}-{}", pkg.name, pkg.version) +} + +pub async fn append_files_entry( + conn: &DbConn, + pkg: &db::entities::package::Model, + ar: Arc>, +) -> crate::Result<()> { + let full_name = full_pkg_name(pkg); + let ar_clone = Arc::clone(&ar); + + tokio::task::spawn_blocking(move || { + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(PathBuf::from(full_name).join("files")); + ar_entry.set_mode(0o100644); + // TODO set entry size? + + ar_clone.lock().unwrap().append_entry(&mut ar_entry) + }) + .await + .unwrap()?; + + // Write first header line + let ar_clone = Arc::clone(&ar); + tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "%FILES%")) + .await + .unwrap()?; + + let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; + + while let Some(file) = files.next().await.transpose()? { + let ar_clone = Arc::clone(&ar); + + tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "{}", file.path)) + .await + .unwrap()?; + } + + Ok(()) +}