From c5ef7c3c2817f74fc47ff8ef65061913ba9feec5 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 25 May 2024 18:55:02 +0200 Subject: [PATCH] feat: message-passing interface for archive structs; implement pkg add --- server/src/repo/archive.rs | 106 +++++++++++++++++++++++++++++++ server/src/repo/manager_new.rs | 113 ++++++++++++++++++++------------- server/src/repo/mod.rs | 2 +- server/src/repo/package_new.rs | 56 ---------------- 4 files changed, 176 insertions(+), 101 deletions(-) create mode 100644 server/src/repo/archive.rs delete mode 100644 server/src/repo/package_new.rs diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs new file mode 100644 index 0000000..a8471eb --- /dev/null +++ b/server/src/repo/archive.rs @@ -0,0 +1,106 @@ +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::sync::{mpsc, oneshot}; + +use libarchive::write::{Builder, FileWriter, WriteEntry}; +use libarchive::{Entry, WriteFilter, WriteFormat}; + +enum Message { + AppendFilesEntry(oneshot::Sender>, String), + AppendLine(oneshot::Sender>, String), + Close(oneshot::Sender>), +} + +/// Struct to abstract away the intrinsics of writing entries to an archive file +pub struct RepoArchiveWriter { + tx: mpsc::Sender, +} + +fn archive_manager_task(mut rx: mpsc::Receiver, mut ar: FileWriter) { + // Once the accompanying struct and the final sender get dropped, this will return None and the + // task will exit + while let Some(msg) = rx.blocking_recv() { + match msg { + Message::AppendFilesEntry(tx, full_name) => { + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(PathBuf::from(full_name).join("files")); + ar_entry.set_mode(0o100644); + // TODO set entry size? + + let mut res = ar.append_entry(&mut ar_entry).map_err(io::Error::from); + + if res.is_ok() { + res = writeln!(ar, "%FILES%"); + } + + tx.send(res); + + // All "files" entries start with this line + } + Message::AppendLine(tx, line) => { + let res = writeln!(ar, "{}", line); + + tx.send(res); + } + Message::Close(tx) => { + let res = ar.close(); + + tx.send(res.map_err(io::Error::from)); + } + } + } +} + +impl RepoArchiveWriter { + pub async fn open>(path: P) -> io::Result { + let path = PathBuf::from(path.as_ref()); + let (tx, rx) = mpsc::channel(1); + + // Open the archive file + let ar = tokio::task::spawn_blocking(move || { + let mut builder = Builder::new(); + builder.add_filter(WriteFilter::Gzip)?; + builder.set_format(WriteFormat::PaxRestricted)?; + + builder.open_file(path) + }) + .await + .unwrap()?; + + // Spawn blocking task to perform blocking actions + tokio::task::spawn_blocking(move || archive_manager_task(rx, ar)); + + Ok(Self { tx }) + } + + /// Set the current entry to be a new "files" list + pub async fn add_files_entry(&self, full_name: &str) -> io::Result<()> { + let full_name = String::from(full_name); + let (tx, rx) = oneshot::channel(); + + self.tx.send(Message::AppendFilesEntry(tx, full_name)).await; + + rx.await.unwrap() + } + // + ///// Append the given line to the currently active entry + pub async fn write_line(&self, line: &str) -> io::Result<()> { + let line = String::from(line); + let (tx, rx) = oneshot::channel(); + + self.tx.send(Message::AppendLine(tx, line)).await; + + rx.await.unwrap() + } + + pub async fn close(&self) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + + self.tx.send(Message::Close(tx)).await; + + rx.await.unwrap() + } +} diff --git a/server/src/repo/manager_new.rs b/server/src/repo/manager_new.rs index 5e3787e..a7f2f1b 100644 --- a/server/src/repo/manager_new.rs +++ b/server/src/repo/manager_new.rs @@ -5,10 +5,13 @@ use libarchive::write::{Builder, WriteEntry}; use libarchive::{Entry, WriteFilter, WriteFormat}; use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter}; +use uuid::Uuid; use futures::StreamExt; +use tokio::io::AsyncRead; -use super::package_new; +use super::archive; +use super::package; use crate::db; use crate::error::Result; @@ -38,30 +41,9 @@ impl MetaRepoMgr { let repo = repo.unwrap(); let parent_dir = self.repo_dir.join(&repo.name).join(arch); - let repo_name = repo.name.clone(); - let (ar_db, ar_files) = tokio::task::spawn_blocking(move || { - let mut ar_db = Builder::new(); - ar_db.add_filter(WriteFilter::Gzip)?; - ar_db.set_format(WriteFormat::PaxRestricted)?; - - let mut ar_files = Builder::new(); - ar_files.add_filter(WriteFilter::Gzip)?; - ar_files.set_format(WriteFormat::PaxRestricted)?; - - let ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", repo_name))); - let ar_files = - ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", repo_name))); - - match (ar_db, ar_files) { - (Ok(ar_db), Ok(ar_files)) => Ok((ar_db, ar_files)), - (Err(err), _) | (_, Err(err)) => Err(err), - } - }) - .await - .unwrap()?; - //let mut ar_db = ar_db.open_file(parent_dir.join(format!("{}.db.tar.gz", &repo.name)))?; - //let mut ar_files = - // ar_files.open_file(parent_dir.join(format!("{}.files.tar.gz", &repo.name)))?; + let ar_files = + archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", repo.name))) + .await?; // Query all packages in the repo that have the given architecture or the "any" // architecture @@ -71,29 +53,19 @@ impl MetaRepoMgr { .stream(conn) .await?; - let ar_files = Arc::new(Mutex::new(ar_files)); - let ar_db = Arc::new(Mutex::new(ar_db)); + while let Some(pkg) = pkgs.next().await.transpose()? { + ar_files + .add_files_entry(&format!("{}-{}", pkg.name, pkg.version)) + .await?; - while let Some(pkg) = pkgs.next().await { - let pkg = pkg?; + let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; - package_new::append_files_entry(conn, &pkg, Arc::clone(&ar_files)).await?; - - // TODO db archive + while let Some(file) = files.next().await.transpose()? { + ar_files.write_line(&file.path).await?; + } } - // Close archives explicitely for better error handling - tokio::task::spawn_blocking(move || { - let r1 = ar_files.lock().unwrap().close(); - let r2 = ar_db.lock().unwrap().close(); - - match (r1, r2) { - (Ok(_), Ok(_)) => Ok(()), - (Err(err), _) | (_, Err(err)) => Err(err), - } - }) - .await - .unwrap()?; + ar_files.close().await?; Ok(()) } @@ -139,4 +111,57 @@ impl MetaRepoMgr { Ok(false) } } + + pub async fn add_pkg_from_reader(&self, conn: &DbConn, reader: &mut R, repo: &str) -> crate::Result<()> { + // Copy file contents to temporary path so libarchive can work with it + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let path = self.pkg_dir.join(uuid.to_string()); + let mut temp_file = tokio::fs::File::create(&path).await?; + + tokio::io::copy(reader, &mut temp_file).await?; + + // Parse the package + let path_clone = path.clone(); + let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)).await.unwrap()?; + + // Query the repo for its ID, or create it if it does not already exist + let res = db::query::repo::by_name(conn, &repo).await?; + + let repo_id = if let Some(repo_entity) = res { + repo_entity.id + } else { + db::query::repo::insert(conn, repo, None) + .await? + .last_insert_id + }; + + // If the package already exists in the database, we remove it first + let res = + db::query::package::by_fields(conn, repo_id, &pkg.info.arch, &pkg.info.name) + .await?; + + if let Some(entry) = res { + entry.delete(conn).await?; + } + + let dest_pkg_path = self + .pkg_dir + .join(repo) + .join(&pkg.info.arch) + .join(pkg.file_name()); + + // Insert new package into database + let arch = pkg.info.arch.clone(); + db::query::package::insert(conn, repo_id, pkg).await?; + + // Move the package to its final resting place + tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?; + tokio::fs::rename(path, dest_pkg_path).await?; + + // Synchronize archive databases + // TODO account for "any" architecture here + self.generate_archives(conn, repo, &arch).await?; + + Ok(()) + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index c6ca77c..4b3ff29 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,7 +1,7 @@ +mod archive; mod manager; mod manager_new; pub mod package; -pub mod package_new; pub use manager::RepoGroupManager; diff --git a/server/src/repo/package_new.rs b/server/src/repo/package_new.rs deleted file mode 100644 index 187d980..0000000 --- a/server/src/repo/package_new.rs +++ /dev/null @@ -1,56 +0,0 @@ -use sea_orm::{DbConn, ModelTrait}; - -use libarchive::write::{FileWriter, WriteEntry}; -use libarchive::Entry; - -use futures::StreamExt; - -use std::io::Write; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - -use crate::db; - -/// Return the full name of the package, consisting of its package name, pkgver and pkgrel -fn full_pkg_name(pkg: &db::entities::package::Model) -> String { - format!("{}-{}", pkg.name, pkg.version) -} - -pub async fn append_files_entry( - conn: &DbConn, - pkg: &db::entities::package::Model, - ar: Arc>, -) -> crate::Result<()> { - let full_name = full_pkg_name(pkg); - let ar_clone = Arc::clone(&ar); - - tokio::task::spawn_blocking(move || { - let mut ar_entry = WriteEntry::new(); - ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); - ar_entry.set_pathname(PathBuf::from(full_name).join("files")); - ar_entry.set_mode(0o100644); - // TODO set entry size? - - ar_clone.lock().unwrap().append_entry(&mut ar_entry) - }) - .await - .unwrap()?; - - // Write first header line - let ar_clone = Arc::clone(&ar); - tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "%FILES%")) - .await - .unwrap()?; - - let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; - - while let Some(file) = files.next().await.transpose()? { - let ar_clone = Arc::clone(&ar); - - tokio::task::spawn_blocking(move || writeln!(ar_clone.lock().unwrap(), "{}", file.path)) - .await - .unwrap()?; - } - - Ok(()) -}