diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index c1a2c73..b90fcee 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -10,17 +10,20 @@ use std::{ }, }; +use futures::StreamExt; use sea_orm::{ ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; +use sea_query::{Alias, Expr, Query}; use tokio::{ runtime, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, RwLock, }, }; +use uuid::Uuid; pub enum RepoCommand { ParsePkg(i32, PathBuf), @@ -67,6 +70,13 @@ impl RepoActor { } } + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.state.repos_dir.join(uuid.to_string()) + }) + } + /// Run the main actor loop pub fn run(self) { while let Some(msg) = { @@ -85,7 +95,7 @@ impl RepoActor { .map(|n| n.0.load(Ordering::SeqCst)) == Some(0) { - // TODO sync + let _ = self.sync_repo(repo); } } } @@ -143,6 +153,74 @@ impl RepoActor { } fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { + let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths(); + + let mut ars = archive::RepoArchivesWriter::new( + &tmp_ar_db_path, + &tmp_ar_files_path, + self.random_file_paths(), + &self.rt, + &self.state.conn, + )?; + + let (tx, mut rx) = mpsc::channel(1); + + let conn = self.state.conn.clone(); + let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch); + + // sea_orm needs its connections to be dropped inside an async context, so we spawn a task + // that streams the responses to the synchronous context via message passing + self.rt.spawn(async move { + let stream = query.stream(&conn).await; + + if let Err(err) = stream { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut stream = stream.unwrap(); + + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; + } + } + }); + + let mut committed_ids: Vec = Vec::new(); + + while let Some(pkg) = rx.blocking_recv().transpose()? { + committed_ids.push(pkg.id); + ars.append_pkg(&pkg)?; + } + + ars.close()?; + + // Move newly generated package archives to their correct place + let repo_dir = self.state.repos_dir.join(repo.to_string()); + std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?; + std::fs::rename( + tmp_ar_files_path, + repo_dir.join(format!("{}.files.tar.gz", arch)), + )?; + + // Update the state for the newly committed packages + self.rt.block_on( + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(committed_ids)) + .exec(&self.state.conn), + )?; + + tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); + Ok(()) } } diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index a979c09..973a395 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,78 +1,222 @@ +use crate::db; use std::{ - io, + io::{self, Write}, path::{Path, PathBuf}, - sync::{Arc, Mutex}, }; +use futures::StreamExt; use libarchive::{ write::{Builder, FileWriter, WriteEntry}, Entry, WriteFilter, WriteFormat, }; +use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; +use tokio::{runtime, sync::mpsc}; -/// Struct to abstract away the intrinsics of writing entries to an archive file -pub struct RepoArchiveWriter { - ar: Arc>, +pub struct RepoArchivesWriter { + ar_db: FileWriter, + ar_files: FileWriter, + rt: runtime::Handle, + conn: DbConn, + tmp_paths: [PathBuf; 2], } -impl RepoArchiveWriter { - pub async fn open>(path: P) -> io::Result { - let path = PathBuf::from(path.as_ref()); - - // Open the archive file - let ar = tokio::task::spawn_blocking(move || { - let mut builder = Builder::new(); - builder.add_filter(WriteFilter::Gzip)?; - builder.set_format(WriteFormat::PaxRestricted)?; - - builder.open_file(path) - }) - .await - .unwrap()?; +impl RepoArchivesWriter { + pub fn new( + ar_db_path: impl AsRef, + ar_files_path: impl AsRef, + tmp_paths: [impl AsRef; 2], + rt: &runtime::Handle, + conn: &sea_orm::DbConn, + ) -> crate::Result { + let ar_db = Self::open_ar(ar_db_path)?; + let ar_files = Self::open_ar(ar_files_path)?; Ok(Self { - // In practice, mutex is only ever used by one thread at a time. It's simply here so we - // can use spawn_blocking without issues. - ar: Arc::new(Mutex::new(ar)), + ar_db, + ar_files, + rt: rt.clone(), + conn: conn.clone(), + tmp_paths: [ + tmp_paths[0].as_ref().to_path_buf(), + tmp_paths[1].as_ref().to_path_buf(), + ], }) } - /// Add either a "desc" or "files" entry to the archive - pub async fn add_entry>( - &self, - full_name: &str, - path: P, - desc: bool, - ) -> io::Result<()> { - let metadata = tokio::fs::metadata(&path).await?; + fn open_ar(path: impl AsRef) -> crate::Result { + let mut builder = Builder::new(); + builder.add_filter(WriteFilter::Gzip)?; + builder.set_format(WriteFormat::PaxRestricted)?; + + Ok(builder.open_file(path)?) + } + + fn append_entry( + ar: &mut FileWriter, + src_path: impl AsRef, + dest_path: impl AsRef, + ) -> crate::Result<()> { + let metadata = std::fs::metadata(&src_path)?; let file_size = metadata.len(); - let ar = Arc::clone(&self.ar); - let full_name = String::from(full_name); - let path = PathBuf::from(path.as_ref()); + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); - Ok(tokio::task::spawn_blocking(move || { - let mut ar_entry = WriteEntry::new(); - ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(dest_path); + ar_entry.set_mode(0o100644); + ar_entry.set_size(file_size.try_into().unwrap()); - let name = if desc { "desc" } else { "files" }; - - ar_entry.set_pathname(PathBuf::from(full_name).join(name)); - ar_entry.set_mode(0o100644); - ar_entry.set_size(file_size.try_into().unwrap()); - - ar.lock().unwrap().append_path(&mut ar_entry, path) - }) - .await - .unwrap()?) + Ok(ar.append_path(&mut ar_entry, src_path)?) } - pub async fn close(&self) -> io::Result<()> { - let ar = Arc::clone(&self.ar); + pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> { + self.write_desc(&self.tmp_paths[0], pkg)?; + self.write_files(&self.tmp_paths[1], pkg)?; - Ok( - tokio::task::spawn_blocking(move || ar.lock().unwrap().close()) - .await - .unwrap()?, - ) + let full_name = format!("{}-{}", pkg.name, pkg.version); + let dest_desc_path = format!("{}/desc", full_name); + let dest_files_path = format!("{}/files", full_name); + + Self::append_entry(&mut self.ar_db, &self.tmp_paths[0], &dest_desc_path)?; + Self::append_entry(&mut self.ar_files, &self.tmp_paths[0], &dest_desc_path)?; + Self::append_entry(&mut self.ar_files, &self.tmp_paths[1], &dest_files_path)?; + + Ok(()) + } + + /// Generate a "files" archive entry for the package in the given path + fn write_files(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { + let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); + + writeln!(f, "%FILES%")?; + + let (tx, mut rx) = mpsc::channel(1); + + let conn = self.conn.clone(); + let query = pkg.find_related(db::PackageFile); + self.rt.spawn(async move { + let files = query.stream(&conn).await; + + if let Err(err) = files { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut files = files.unwrap(); + + while let Some(res) = files.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; + } + } + }); + + while let Some(file) = rx.blocking_recv().transpose()? { + writeln!(f, "{}", file.path)?; + } + + f.flush()?; + Ok(()) + } + + fn write_desc(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { + let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); + + writeln!(f, "%FILENAME%\n{}", pkg.id)?; + + let mut write_attr = |k: &str, v: &str| { + if !v.is_empty() { + writeln!(f, "\n%{}%\n{}", k, v) + } else { + Ok(()) + } + }; + + write_attr("NAME", &pkg.name)?; + write_attr("BASE", &pkg.base)?; + write_attr("VERSION", &pkg.version)?; + + if let Some(ref desc) = pkg.description { + write_attr("DESC", desc)?; + } + + let groups: Vec = self.rt.block_on( + pkg.find_related(db::PackageGroup) + .select_only() + .column(db::package_group::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + + write_attr("GROUPS", &groups.join("\n"))?; + + write_attr("CSIZE", &pkg.c_size.to_string())?; + write_attr("ISIZE", &pkg.size.to_string())?; + write_attr("SHA256SUM", &pkg.sha256_sum)?; + + if let Some(ref url) = pkg.url { + write_attr("URL", url)?; + } + + let licenses: Vec = self.rt.block_on( + pkg.find_related(db::PackageLicense) + .select_only() + .column(db::package_license::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + write_attr("LICENSE", &licenses.join("\n"))?; + + write_attr("ARCH", &pkg.arch)?; + + // TODO build date + write_attr( + "BUILDDATE", + &pkg.build_date.and_utc().timestamp().to_string(), + )?; + + if let Some(ref packager) = pkg.packager { + write_attr("PACKAGER", packager)?; + } + + let related = [ + ("REPLACES", db::PackageRelatedEnum::Replaces), + ("CONFLICTS", db::PackageRelatedEnum::Conflicts), + ("PROVIDES", db::PackageRelatedEnum::Provides), + ("DEPENDS", db::PackageRelatedEnum::Depend), + ("OPTDEPENDS", db::PackageRelatedEnum::Optdepend), + ("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend), + ("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend), + ]; + + for (key, attr) in related.into_iter() { + let items: Vec = self.rt.block_on( + pkg.find_related(db::PackageRelated) + .filter(db::package_related::Column::Type.eq(attr)) + .select_only() + .column(db::package_related::Column::Name) + .into_tuple() + .all(&self.conn), + )?; + + write_attr(key, &items.join("\n"))?; + } + + f.flush()?; + Ok(()) + } + + pub fn close(&mut self) -> crate::Result<()> { + self.ar_db.close()?; + self.ar_files.close()?; + + let _ = std::fs::remove_file(&self.tmp_paths[0])?; + let _ = std::fs::remove_file(&self.tmp_paths[1])?; + + Ok(()) } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 6fe6650..e8b65e3 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,12 +1,10 @@ mod actor; mod archive; mod handle; -mod manager; pub mod package; pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use handle::Handle; -pub use manager::RepoMgr; use crate::FsConfig;