feat: message-passing interface for archive structs; implement pkg add
parent
c95feadca1
commit
c5ef7c3c28
|
@ -0,0 +1,106 @@
|
||||||
|
use std::io::{self, Write};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
|
use libarchive::write::{Builder, FileWriter, WriteEntry};
|
||||||
|
use libarchive::{Entry, WriteFilter, WriteFormat};
|
||||||
|
|
||||||
|
enum Message {
|
||||||
|
AppendFilesEntry(oneshot::Sender<io::Result<()>>, String),
|
||||||
|
AppendLine(oneshot::Sender<io::Result<()>>, String),
|
||||||
|
Close(oneshot::Sender<io::Result<()>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Struct to abstract away the intrinsics of writing entries to an archive file
|
||||||
|
pub struct RepoArchiveWriter {
|
||||||
|
tx: mpsc::Sender<Message>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn archive_manager_task(mut rx: mpsc::Receiver<Message>, mut ar: FileWriter) {
|
||||||
|
// Once the accompanying struct and the final sender get dropped, this will return None and the
|
||||||
|
// task will exit
|
||||||
|
while let Some(msg) = rx.blocking_recv() {
|
||||||
|
match msg {
|
||||||
|
Message::AppendFilesEntry(tx, full_name) => {
|
||||||
|
let mut ar_entry = WriteEntry::new();
|
||||||
|
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
|
||||||
|
ar_entry.set_pathname(PathBuf::from(full_name).join("files"));
|
||||||
|
ar_entry.set_mode(0o100644);
|
||||||
|
// TODO set entry size?
|
||||||
|
|
||||||
|
let mut res = ar.append_entry(&mut ar_entry).map_err(io::Error::from);
|
||||||
|
|
||||||
|
if res.is_ok() {
|
||||||
|
res = writeln!(ar, "%FILES%");
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.send(res);
|
||||||
|
|
||||||
|
// All "files" entries start with this line
|
||||||
|
}
|
||||||
|
Message::AppendLine(tx, line) => {
|
||||||
|
let res = writeln!(ar, "{}", line);
|
||||||
|
|
||||||
|
tx.send(res);
|
||||||
|
}
|
||||||
|
Message::Close(tx) => {
|
||||||
|
let res = ar.close();
|
||||||
|
|
||||||
|
tx.send(res.map_err(io::Error::from));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepoArchiveWriter {
|
||||||
|
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
|
||||||
|
let path = PathBuf::from(path.as_ref());
|
||||||
|
let (tx, rx) = mpsc::channel(1);
|
||||||
|
|
||||||
|
// Open the archive file
|
||||||
|
let ar = tokio::task::spawn_blocking(move || {
|
||||||
|
let mut builder = Builder::new();
|
||||||
|
builder.add_filter(WriteFilter::Gzip)?;
|
||||||
|
builder.set_format(WriteFormat::PaxRestricted)?;
|
||||||
|
|
||||||
|
builder.open_file(path)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
|
// Spawn blocking task to perform blocking actions
|
||||||
|
tokio::task::spawn_blocking(move || archive_manager_task(rx, ar));
|
||||||
|
|
||||||
|
Ok(Self { tx })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the current entry to be a new "files" list
|
||||||
|
pub async fn add_files_entry(&self, full_name: &str) -> io::Result<()> {
|
||||||
|
let full_name = String::from(full_name);
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.tx.send(Message::AppendFilesEntry(tx, full_name)).await;
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
|
//
|
||||||
|
///// Append the given line to the currently active entry
|
||||||
|
pub async fn write_line(&self, line: &str) -> io::Result<()> {
|
||||||
|
let line = String::from(line);
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.tx.send(Message::AppendLine(tx, line)).await;
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn close(&self) -> io::Result<()> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.tx.send(Message::Close(tx)).await;
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,10 +5,13 @@ use libarchive::write::{Builder, WriteEntry};
|
||||||
use libarchive::{Entry, WriteFilter, WriteFormat};
|
use libarchive::{Entry, WriteFilter, WriteFormat};
|
||||||
|
|
||||||
use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter};
|
use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
use super::package_new;
|
use super::archive;
|
||||||
|
use super::package;
|
||||||
use crate::db;
|
use crate::db;
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
|
||||||
|
@ -38,30 +41,9 @@ impl MetaRepoMgr {
|
||||||
let repo = repo.unwrap();
|
let repo = repo.unwrap();
|
||||||
let parent_dir = self.repo_dir.join(&repo.name).join(arch);
|
let parent_dir = self.repo_dir.join(&repo.name).join(arch);
|
||||||
|
|
||||||
let repo_name = repo.name.clone();
|
let ar_files =
|
||||||
let (ar_db, ar_files) = tokio::task::spawn_blocking(move || {
|
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", repo.name)))
|
||||||
let mut ar_db = Builder::new();
|
.await?;
|
||||||
ar_db.add_filter(WriteFilter::Gzip)?;
|
|
||||||
ar_db.set_format(WriteFormat::PaxRestricted)?;
|
|
||||||
|
|
||||||
let mut ar_files = Builder::new();
|
|
||||||
ar_files.add_filter(WriteFilter::Gzip)?;
|
|
||||||
ar_files.set_format(WriteFormat::PaxRestricted)?;
|
|
||||||
|
|
||||||
let ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", repo_name)));
|
|
||||||
let ar_files =
|
|
||||||
ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", repo_name)));
|
|
||||||
|
|
||||||
match (ar_db, ar_files) {
|
|
||||||
(Ok(ar_db), Ok(ar_files)) => Ok((ar_db, ar_files)),
|
|
||||||
(Err(err), _) | (_, Err(err)) => Err(err),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
//let mut ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", &repo.name)))?;
|
|
||||||
//let mut ar_files =
|
|
||||||
// ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", &repo.name)))?;
|
|
||||||
|
|
||||||
// Query all packages in the repo that have the given architecture or the "any"
|
// Query all packages in the repo that have the given architecture or the "any"
|
||||||
// architecture
|
// architecture
|
||||||
|
@ -71,29 +53,19 @@ impl MetaRepoMgr {
|
||||||
.stream(conn)
|
.stream(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let ar_files = Arc::new(Mutex::new(ar_files));
|
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||||
let ar_db = Arc::new(Mutex::new(ar_db));
|
ar_files
|
||||||
|
.add_files_entry(&format!("{}-{}", pkg.name, pkg.version))
|
||||||
|
.await?;
|
||||||
|
|
||||||
while let Some(pkg) = pkgs.next().await {
|
let mut files = pkg.find_related(db::PackageFile).stream(conn).await?;
|
||||||
let pkg = pkg?;
|
|
||||||
|
|
||||||
package_new::append_files_entry(conn, &pkg, Arc::clone(&ar_files)).await?;
|
while let Some(file) = files.next().await.transpose()? {
|
||||||
|
ar_files.write_line(&file.path).await?;
|
||||||
// TODO db archive
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close archives explicitely for better error handling
|
ar_files.close().await?;
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
let r1 = ar_files.lock().unwrap().close();
|
|
||||||
let r2 = ar_db.lock().unwrap().close();
|
|
||||||
|
|
||||||
match (r1, r2) {
|
|
||||||
(Ok(_), Ok(_)) => Ok(()),
|
|
||||||
(Err(err), _) | (_, Err(err)) => Err(err),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -139,4 +111,57 @@ impl MetaRepoMgr {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(&self, conn: &DbConn, reader: &mut R, repo: &str) -> crate::Result<()> {
|
||||||
|
// Copy file contents to temporary path so libarchive can work with it
|
||||||
|
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||||
|
let path = self.pkg_dir.join(uuid.to_string());
|
||||||
|
let mut temp_file = tokio::fs::File::create(&path).await?;
|
||||||
|
|
||||||
|
tokio::io::copy(reader, &mut temp_file).await?;
|
||||||
|
|
||||||
|
// Parse the package
|
||||||
|
let path_clone = path.clone();
|
||||||
|
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)).await.unwrap()?;
|
||||||
|
|
||||||
|
// Query the repo for its ID, or create it if it does not already exist
|
||||||
|
let res = db::query::repo::by_name(conn, &repo).await?;
|
||||||
|
|
||||||
|
let repo_id = if let Some(repo_entity) = res {
|
||||||
|
repo_entity.id
|
||||||
|
} else {
|
||||||
|
db::query::repo::insert(conn, repo, None)
|
||||||
|
.await?
|
||||||
|
.last_insert_id
|
||||||
|
};
|
||||||
|
|
||||||
|
// If the package already exists in the database, we remove it first
|
||||||
|
let res =
|
||||||
|
db::query::package::by_fields(conn, repo_id, &pkg.info.arch, &pkg.info.name)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(entry) = res {
|
||||||
|
entry.delete(conn).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let dest_pkg_path = self
|
||||||
|
.pkg_dir
|
||||||
|
.join(repo)
|
||||||
|
.join(&pkg.info.arch)
|
||||||
|
.join(pkg.file_name());
|
||||||
|
|
||||||
|
// Insert new package into database
|
||||||
|
let arch = pkg.info.arch.clone();
|
||||||
|
db::query::package::insert(conn, repo_id, pkg).await?;
|
||||||
|
|
||||||
|
// Move the package to its final resting place
|
||||||
|
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
|
||||||
|
tokio::fs::rename(path, dest_pkg_path).await?;
|
||||||
|
|
||||||
|
// Synchronize archive databases
|
||||||
|
// TODO account for "any" architecture here
|
||||||
|
self.generate_archives(conn, repo, &arch).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
|
mod archive;
|
||||||
mod manager;
|
mod manager;
|
||||||
mod manager_new;
|
mod manager_new;
|
||||||
pub mod package;
|
pub mod package;
|
||||||
pub mod package_new;
|
|
||||||
|
|
||||||
pub use manager::RepoGroupManager;
|
pub use manager::RepoGroupManager;
|
||||||
|
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
use sea_orm::{DbConn, ModelTrait};
|
|
||||||
|
|
||||||
use libarchive::write::{FileWriter, WriteEntry};
|
|
||||||
use libarchive::Entry;
|
|
||||||
|
|
||||||
use futures::StreamExt;
|
|
||||||
|
|
||||||
use std::io::Write;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use crate::db;
|
|
||||||
|
|
||||||
/// Return the full name of the package, consisting of its package name, pkgver and pkgrel
|
|
||||||
fn full_pkg_name(pkg: &db::entities::package::Model) -> String {
|
|
||||||
format!("{}-{}", pkg.name, pkg.version)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn append_files_entry(
|
|
||||||
conn: &DbConn,
|
|
||||||
pkg: &db::entities::package::Model,
|
|
||||||
ar: Arc<Mutex<FileWriter>>,
|
|
||||||
) -> crate::Result<()> {
|
|
||||||
let full_name = full_pkg_name(pkg);
|
|
||||||
let ar_clone = Arc::clone(&ar);
|
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
let mut ar_entry = WriteEntry::new();
|
|
||||||
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
|
|
||||||
ar_entry.set_pathname(PathBuf::from(full_name).join("files"));
|
|
||||||
ar_entry.set_mode(0o100644);
|
|
||||||
// TODO set entry size?
|
|
||||||
|
|
||||||
ar_clone.lock().unwrap().append_entry(&mut ar_entry)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
// Write first header line
|
|
||||||
let ar_clone = Arc::clone(&ar);
|
|
||||||
tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "%FILES%"))
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
let mut files = pkg.find_related(db::PackageFile).stream(conn).await?;
|
|
||||||
|
|
||||||
while let Some(file) = files.next().await.transpose()? {
|
|
||||||
let ar_clone = Arc::clone(&ar);
|
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "{}", file.path))
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
Loading…
Reference in New Issue