feat: further work on new repo & package implementation

concurrent-repos
Jef Roosens 2024-05-25 13:31:46 +02:00
parent 2e0c6d1fa6
commit c95feadca1
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 113 additions and 2 deletions

View File

@ -2,10 +2,12 @@ use super::WriteEntry;
use crate::error::ArchiveError; use crate::error::ArchiveError;
use crate::Entry; use crate::Entry;
use crate::Handle; use crate::Handle;
use core::ffi::c_void;
use libarchive3_sys::ffi; use libarchive3_sys::ffi;
use std::fs; use std::fs;
use std::io; use std::io;
use std::io::Read; use std::io::Read;
use std::io::Write;
use std::path::Path; use std::path::Path;
pub struct FileWriter { pub struct FileWriter {
@ -33,6 +35,17 @@ impl FileWriter {
} }
} }
/// Append the given entry to the archive. After successfully calling this function, writing to
/// the archive now writes to this entry.
pub fn append_entry(&mut self, entry: &mut WriteEntry) -> crate::Result<()> {
unsafe {
match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) {
ffi::ARCHIVE_OK => Ok(()),
_ => Err(ArchiveError::from(self as &dyn Handle).into()),
}
}
}
pub fn append_data<R: Read>(&mut self, entry: &mut WriteEntry, r: &mut R) -> crate::Result<()> { pub fn append_data<R: Read>(&mut self, entry: &mut WriteEntry, r: &mut R) -> crate::Result<()> {
unsafe { unsafe {
match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) { match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) {
@ -109,3 +122,23 @@ impl Drop for FileWriter {
} }
} }
} }
impl Write for FileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let res = unsafe {
ffi::archive_write_data(self.handle_mut(), buf.as_ptr() as *const c_void, buf.len())
} as isize;
if res < 0 {
Err(ArchiveError::from(self as &dyn Handle).into())
} else {
// Unwrap is safe as we check if the value is negative in the if statement
Ok(res.try_into().unwrap())
}
}
fn flush(&mut self) -> io::Result<()> {
// Libarchive doesn't seem to provide a flush mechanism
Ok(())
}
}

View File

@ -3,6 +3,7 @@ mod file;
use crate::Entry; use crate::Entry;
pub use builder::Builder; pub use builder::Builder;
pub use file::FileWriter;
use libarchive3_sys::ffi; use libarchive3_sys::ffi;
pub struct WriteEntry { pub struct WriteEntry {

View File

@ -1,4 +1,5 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use libarchive::write::{Builder, WriteEntry}; use libarchive::write::{Builder, WriteEntry};
use libarchive::{Entry, WriteFilter, WriteFormat}; use libarchive::{Entry, WriteFilter, WriteFormat};
@ -7,6 +8,7 @@ use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter};
use futures::StreamExt; use futures::StreamExt;
use super::package_new;
use crate::db; use crate::db;
use crate::error::Result; use crate::error::Result;
@ -37,7 +39,7 @@ impl MetaRepoMgr {
let parent_dir = self.repo_dir.join(&repo.name).join(arch); let parent_dir = self.repo_dir.join(&repo.name).join(arch);
let repo_name = repo.name.clone(); let repo_name = repo.name.clone();
let (mut ar_db, mut ar_files) = tokio::task::spawn_blocking(move || { let (ar_db, ar_files) = tokio::task::spawn_blocking(move || {
let mut ar_db = Builder::new(); let mut ar_db = Builder::new();
ar_db.add_filter(WriteFilter::Gzip)?; ar_db.add_filter(WriteFilter::Gzip)?;
ar_db.set_format(WriteFormat::PaxRestricted)?; ar_db.set_format(WriteFormat::PaxRestricted)?;
@ -69,12 +71,30 @@ impl MetaRepoMgr {
.stream(conn) .stream(conn)
.await?; .await?;
let ar_files = Arc::new(Mutex::new(ar_files));
let ar_db = Arc::new(Mutex::new(ar_db));
while let Some(pkg) = pkgs.next().await { while let Some(pkg) = pkgs.next().await {
let pkg = pkg?; let pkg = pkg?;
// TODO for each package, write entry to archive files package_new::append_files_entry(conn, &pkg, Arc::clone(&ar_files)).await?;
// TODO db archive
} }
// Close archives explicitely for better error handling
tokio::task::spawn_blocking(move || {
let r1 = ar_files.lock().unwrap().close();
let r2 = ar_db.lock().unwrap().close();
match (r1, r2) {
(Ok(_), Ok(_)) => Ok(()),
(Err(err), _) | (_, Err(err)) => Err(err),
}
})
.await
.unwrap()?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
mod manager; mod manager;
mod manager_new; mod manager_new;
pub mod package; pub mod package;
pub mod package_new;
pub use manager::RepoGroupManager; pub use manager::RepoGroupManager;

View File

@ -0,0 +1,56 @@
use sea_orm::{DbConn, ModelTrait};
use libarchive::write::{FileWriter, WriteEntry};
use libarchive::Entry;
use futures::StreamExt;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use crate::db;
/// Return the full name of the package, consisting of its package name, pkgver and pkgrel
fn full_pkg_name(pkg: &db::entities::package::Model) -> String {
format!("{}-{}", pkg.name, pkg.version)
}
pub async fn append_files_entry(
conn: &DbConn,
pkg: &db::entities::package::Model,
ar: Arc<Mutex<FileWriter>>,
) -> crate::Result<()> {
let full_name = full_pkg_name(pkg);
let ar_clone = Arc::clone(&ar);
tokio::task::spawn_blocking(move || {
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(PathBuf::from(full_name).join("files"));
ar_entry.set_mode(0o100644);
// TODO set entry size?
ar_clone.lock().unwrap().append_entry(&mut ar_entry)
})
.await
.unwrap()?;
// Write first header line
let ar_clone = Arc::clone(&ar);
tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "%FILES%"))
.await
.unwrap()?;
let mut files = pkg.find_related(db::PackageFile).stream(conn).await?;
while let Some(file) = files.next().await.transpose()? {
let ar_clone = Arc::clone(&ar);
tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "{}", file.path))
.await
.unwrap()?;
}
Ok(())
}