From 2d4cfee27a215dec18bd34a873477bb0b02d9a80 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 26 May 2024 17:51:40 +0200 Subject: [PATCH] feat: implement archive writer just mutex and spawn_blocking --- server/src/cli.rs | 5 +- server/src/repo/archive.rs | 104 +++++++++++++-------------------- server/src/repo/manager_new.rs | 22 +++++-- 3 files changed, 63 insertions(+), 68 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 91569de..0c802f2 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -77,7 +77,10 @@ impl Cli { debug!("Connecting to database with URL {}", db_url); - let db = sea_orm::Database::connect(db_url).await?; + let mut options = sea_orm::ConnectOptions::new(db_url); + options.max_connections(16); + + let db = sea_orm::Database::connect(options).await?; crate::db::Migrator::up(&db, None).await?; let config = Config { diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index a8471eb..2e31fb7 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,7 +1,6 @@ use std::io::{self, Write}; use std::path::{Path, PathBuf}; -use std::sync::Arc; -use tokio::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::sync::{mpsc, oneshot}; use libarchive::write::{Builder, FileWriter, WriteEntry}; @@ -15,49 +14,12 @@ enum Message { /// Struct to abstract away the intrinsics of writing entries to an archive file pub struct RepoArchiveWriter { - tx: mpsc::Sender, -} - -fn archive_manager_task(mut rx: mpsc::Receiver, mut ar: FileWriter) { - // Once the accompanying struct and the final sender get dropped, this will return None and the - // task will exit - while let Some(msg) = rx.blocking_recv() { - match msg { - Message::AppendFilesEntry(tx, full_name) => { - let mut ar_entry = WriteEntry::new(); - ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); - ar_entry.set_pathname(PathBuf::from(full_name).join("files")); - ar_entry.set_mode(0o100644); - // TODO set entry size? - - let mut res = ar.append_entry(&mut ar_entry).map_err(io::Error::from); - - if res.is_ok() { - res = writeln!(ar, "%FILES%"); - } - - tx.send(res); - - // All "files" entries start with this line - } - Message::AppendLine(tx, line) => { - let res = writeln!(ar, "{}", line); - - tx.send(res); - } - Message::Close(tx) => { - let res = ar.close(); - - tx.send(res.map_err(io::Error::from)); - } - } - } + ar: Arc>, } impl RepoArchiveWriter { pub async fn open>(path: P) -> io::Result { let path = PathBuf::from(path.as_ref()); - let (tx, rx) = mpsc::channel(1); // Open the archive file let ar = tokio::task::spawn_blocking(move || { @@ -70,37 +32,55 @@ impl RepoArchiveWriter { .await .unwrap()?; - // Spawn blocking task to perform blocking actions - tokio::task::spawn_blocking(move || archive_manager_task(rx, ar)); - - Ok(Self { tx }) + Ok(Self { + ar: Arc::new(Mutex::new(ar)), + }) } /// Set the current entry to be a new "files" list - pub async fn add_files_entry(&self, full_name: &str) -> io::Result<()> { + pub async fn add_files_entry>( + &self, + full_name: &str, + path: P, + ) -> io::Result<()> { + let metadata = tokio::fs::metadata(&path).await?; + let file_size = metadata.len(); + + let ar = Arc::clone(&self.ar); let full_name = String::from(full_name); - let (tx, rx) = oneshot::channel(); + let path = PathBuf::from(path.as_ref()); - self.tx.send(Message::AppendFilesEntry(tx, full_name)).await; + Ok(tokio::task::spawn_blocking(move || { + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(PathBuf::from(full_name).join("files")); + ar_entry.set_mode(0o100644); + ar_entry.set_size(file_size.try_into().unwrap()); - rx.await.unwrap() - } - // - ///// Append the given line to the currently active entry - pub async fn write_line(&self, line: &str) -> io::Result<()> { - let line = String::from(line); - let (tx, rx) = oneshot::channel(); - - self.tx.send(Message::AppendLine(tx, line)).await; - - rx.await.unwrap() + ar.lock().unwrap().append_path(&mut ar_entry, path) + }) + .await + .unwrap()?) } pub async fn close(&self) -> io::Result<()> { - let (tx, rx) = oneshot::channel(); + let ar = Arc::clone(&self.ar); - self.tx.send(Message::Close(tx)).await; - - rx.await.unwrap() + Ok( + tokio::task::spawn_blocking(move || ar.lock().unwrap().close()) + .await + .unwrap()?, + ) } + + // + ///// Append the given line to the currently active entry + //pub async fn write_line(&self, line: &str) -> io::Result<()> { + // let line = String::from(line); + // let (tx, rx) = oneshot::channel(); + // + // self.tx.send(Message::AppendLine(tx, line)).await; + // + // rx.await.unwrap() + //} } diff --git a/server/src/repo/manager_new.rs b/server/src/repo/manager_new.rs index f7c0fb1..8b3a1bd 100644 --- a/server/src/repo/manager_new.rs +++ b/server/src/repo/manager_new.rs @@ -9,6 +9,7 @@ use uuid::Uuid; use futures::StreamExt; use tokio::io::AsyncRead; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use super::archive; use super::package; @@ -54,16 +55,27 @@ impl MetaRepoMgr { .stream(conn) .await?; - while let Some(pkg) = pkgs.next().await.transpose()? { - ar_files - .add_files_entry(&format!("{}-{}", pkg.name, pkg.version)) - .await?; + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let tmp_file_path = self.pkg_dir.join(uuid.to_string()); + while let Some(pkg) = pkgs.next().await.transpose()? { + let mut tmp_file = tokio::fs::File::create(&tmp_file_path).await?; + + let line = "%FILES%\n"; + tmp_file.write_all(line.as_bytes()).await?; + + // Generate the files list for the package let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; while let Some(file) = files.next().await.transpose()? { - ar_files.write_line(&file.path).await?; + tmp_file + .write_all(format!("{}\n", file.path).as_bytes()) + .await?; } + + ar_files + .add_files_entry(&format!("{}-{}", pkg.name, pkg.version), &tmp_file_path) + .await?; } ar_files.close().await?;