feat: implement archive writer just mutex and spawn_blocking

concurrent-repos
Jef Roosens 2024-05-26 17:51:40 +02:00
parent f209c81759
commit 2d4cfee27a
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
3 changed files with 63 additions and 68 deletions

View File

@ -77,7 +77,10 @@ impl Cli {
debug!("Connecting to database with URL {}", db_url);
let db = sea_orm::Database::connect(db_url).await?;
let mut options = sea_orm::ConnectOptions::new(db_url);
options.max_connections(16);
let db = sea_orm::Database::connect(options).await?;
crate::db::Migrator::up(&db, None).await?;
let config = Config {

View File

@ -1,7 +1,6 @@
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use libarchive::write::{Builder, FileWriter, WriteEntry};
@ -15,49 +14,12 @@ enum Message {
/// Struct to abstract away the intrinsics of writing entries to an archive file
pub struct RepoArchiveWriter {
tx: mpsc::Sender<Message>,
}
fn archive_manager_task(mut rx: mpsc::Receiver<Message>, mut ar: FileWriter) {
// Once the accompanying struct and the final sender get dropped, this will return None and the
// task will exit
while let Some(msg) = rx.blocking_recv() {
match msg {
Message::AppendFilesEntry(tx, full_name) => {
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(PathBuf::from(full_name).join("files"));
ar_entry.set_mode(0o100644);
// TODO set entry size?
let mut res = ar.append_entry(&mut ar_entry).map_err(io::Error::from);
if res.is_ok() {
res = writeln!(ar, "%FILES%");
}
tx.send(res);
// All "files" entries start with this line
}
Message::AppendLine(tx, line) => {
let res = writeln!(ar, "{}", line);
tx.send(res);
}
Message::Close(tx) => {
let res = ar.close();
tx.send(res.map_err(io::Error::from));
}
}
}
ar: Arc<Mutex<FileWriter>>,
}
impl RepoArchiveWriter {
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = PathBuf::from(path.as_ref());
let (tx, rx) = mpsc::channel(1);
// Open the archive file
let ar = tokio::task::spawn_blocking(move || {
@ -70,37 +32,55 @@ impl RepoArchiveWriter {
.await
.unwrap()?;
// Spawn blocking task to perform blocking actions
tokio::task::spawn_blocking(move || archive_manager_task(rx, ar));
Ok(Self { tx })
Ok(Self {
ar: Arc::new(Mutex::new(ar)),
})
}
/// Set the current entry to be a new "files" list
pub async fn add_files_entry(&self, full_name: &str) -> io::Result<()> {
pub async fn add_files_entry<P: AsRef<Path>>(
&self,
full_name: &str,
path: P,
) -> io::Result<()> {
let metadata = tokio::fs::metadata(&path).await?;
let file_size = metadata.len();
let ar = Arc::clone(&self.ar);
let full_name = String::from(full_name);
let (tx, rx) = oneshot::channel();
let path = PathBuf::from(path.as_ref());
self.tx.send(Message::AppendFilesEntry(tx, full_name)).await;
Ok(tokio::task::spawn_blocking(move || {
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(PathBuf::from(full_name).join("files"));
ar_entry.set_mode(0o100644);
ar_entry.set_size(file_size.try_into().unwrap());
rx.await.unwrap()
}
//
///// Append the given line to the currently active entry
pub async fn write_line(&self, line: &str) -> io::Result<()> {
let line = String::from(line);
let (tx, rx) = oneshot::channel();
self.tx.send(Message::AppendLine(tx, line)).await;
rx.await.unwrap()
ar.lock().unwrap().append_path(&mut ar_entry, path)
})
.await
.unwrap()?)
}
pub async fn close(&self) -> io::Result<()> {
let (tx, rx) = oneshot::channel();
let ar = Arc::clone(&self.ar);
self.tx.send(Message::Close(tx)).await;
rx.await.unwrap()
Ok(
tokio::task::spawn_blocking(move || ar.lock().unwrap().close())
.await
.unwrap()?,
)
}
//
///// Append the given line to the currently active entry
//pub async fn write_line(&self, line: &str) -> io::Result<()> {
// let line = String::from(line);
// let (tx, rx) = oneshot::channel();
//
// self.tx.send(Message::AppendLine(tx, line)).await;
//
// rx.await.unwrap()
//}
}

View File

@ -9,6 +9,7 @@ use uuid::Uuid;
use futures::StreamExt;
use tokio::io::AsyncRead;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use super::archive;
use super::package;
@ -54,16 +55,27 @@ impl MetaRepoMgr {
.stream(conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
ar_files
.add_files_entry(&format!("{}-{}", pkg.name, pkg.version))
.await?;
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let tmp_file_path = self.pkg_dir.join(uuid.to_string());
while let Some(pkg) = pkgs.next().await.transpose()? {
let mut tmp_file = tokio::fs::File::create(&tmp_file_path).await?;
let line = "%FILES%\n";
tmp_file.write_all(line.as_bytes()).await?;
// Generate the files list for the package
let mut files = pkg.find_related(db::PackageFile).stream(conn).await?;
while let Some(file) = files.next().await.transpose()? {
ar_files.write_line(&file.path).await?;
tmp_file
.write_all(format!("{}\n", file.path).as_bytes())
.await?;
}
ar_files
.add_files_entry(&format!("{}-{}", pkg.name, pkg.version), &tmp_file_path)
.await?;
}
ar_files.close().await?;