From ecc33f01534a4db31e4b3e233f8e32805d65934b Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 29 May 2024 15:04:20 +0200 Subject: [PATCH] feat: atomatically update db archives --- server/src/cli.rs | 2 +- server/src/repo/archive.rs | 15 +---- server/src/repo/manager.rs | 112 +++++++++++++++++++------------------ server/src/repo/mod.rs | 13 ++--- 4 files changed, 67 insertions(+), 75 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 036c70b..2419606 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -81,7 +81,7 @@ impl Cli { let config = Config { data_dir: self.data_dir.clone(), }; - let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos")); + let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"), db.clone()); let global = Global { config, diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index d103f13..a979c09 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -30,11 +30,13 @@ impl RepoArchiveWriter { .unwrap()?; Ok(Self { + // In practice, mutex is only ever used by one thread at a time. It's simply here so we + // can use spawn_blocking without issues. ar: Arc::new(Mutex::new(ar)), }) } - /// Set the current entry to be a new "files" list + /// Add either a "desc" or "files" entry to the archive pub async fn add_entry>( &self, full_name: &str, @@ -73,15 +75,4 @@ impl RepoArchiveWriter { .unwrap()?, ) } - - // - ///// Append the given line to the currently active entry - //pub async fn write_line(&self, line: &str) -> io::Result<()> { - // let line = String::from(line); - // let (tx, rx) = oneshot::channel(); - // - // self.tx.send(Message::AppendLine(tx, line)).await; - // - // rx.await.unwrap() - //} } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index ebfd01c..e2d9c4d 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -12,19 +12,21 @@ pub const ANY_ARCH: &'static str = "any"; pub struct MetaRepoMgr { repo_dir: PathBuf, + conn: DbConn, } impl MetaRepoMgr { - pub fn new>(repo_dir: P) -> Self { + pub fn new>(repo_dir: P, conn: DbConn) -> Self { MetaRepoMgr { repo_dir: repo_dir.as_ref().to_path_buf(), + conn, } } /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(conn, repo).await?; + pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; if repo.is_none() { return Ok(()); @@ -38,19 +40,19 @@ impl MetaRepoMgr { .column(crate::db::package::Column::Arch) .distinct() .into_tuple::() - .stream(conn) + .stream(&self.conn) .await?; while let Some(arch) = archs.next().await.transpose()? { - self.generate_archives(conn, &repo.name, &arch).await?; + self.generate_archives(&repo.name, &arch).await?; } Ok(()) } /// Generate the archive databases for the given repository and architecture. - pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(conn, repo).await?; + pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; if repo.is_none() { return Ok(()); @@ -61,34 +63,25 @@ impl MetaRepoMgr { let parent_dir = self.repo_dir.join(&repo.name); tokio::fs::create_dir_all(&parent_dir).await?; - let ar_db = - archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch))) - .await?; - let ar_files = - archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch))) - .await?; + let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = + self.random_file_paths(); + let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; + let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?; // Query all packages in the repo that have the given architecture or the "any" // architecture let mut pkgs = repo .find_related(crate::db::Package) .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) - .stream(conn) + .stream(&self.conn) .await?; - // Create two temp file paths to write our entries to - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - let files_tmp_file_path = self.repo_dir.join(uuid.to_string()); - - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - let desc_tmp_file_path = self.repo_dir.join(uuid.to_string()); - while let Some(pkg) = pkgs.next().await.transpose()? { let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; - package::write_files(conn, &mut files_tmp_file, &pkg).await?; - package::write_desc(conn, &mut desc_tmp_file, &pkg).await?; + package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?; + package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?; let full_name = format!("{}-{}", pkg.name, pkg.version); @@ -107,6 +100,18 @@ impl MetaRepoMgr { ar_db.close().await?; ar_files.close().await?; + // Move the db archives to their respective places + tokio::fs::rename( + tmp_ar_db_path, + parent_dir.join(format!("{}.db.tar.gz", arch)), + ) + .await?; + tokio::fs::rename( + tmp_ar_files_path, + parent_dir.join(format!("{}.files.tar.gz", arch)), + ) + .await?; + // If this fails there's no point in failing the function + if there were no packages in // the repo, this fails anyway because the temp file doesn't exist let _ = tokio::fs::remove_file(desc_tmp_file_path).await; @@ -116,12 +121,12 @@ impl MetaRepoMgr { } /// Remove the repo with the given name, if it existed - pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result { - let res = db::query::repo::by_name(conn, repo).await?; + pub async fn remove_repo(&self, repo: &str) -> Result { + let res = db::query::repo::by_name(&self.conn, repo).await?; if let Some(repo_entry) = res { // Remove repository from database - repo_entry.delete(conn).await?; + repo_entry.delete(&self.conn).await?; // Remove files from file system tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?; @@ -133,14 +138,14 @@ impl MetaRepoMgr { } /// Remove all packages from the repository with the given arch. - pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result { - let repo = db::query::repo::by_name(conn, repo).await?; + pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result { + let repo = db::query::repo::by_name(&self.conn, repo).await?; if let Some(repo) = repo { let mut pkgs = repo .find_related(db::Package) .filter(db::package::Column::Arch.eq(arch)) - .stream(conn) + .stream(&self.conn) .await?; while let Some(pkg) = pkgs.next().await.transpose()? { @@ -150,7 +155,7 @@ impl MetaRepoMgr { .join(super::package::filename(&pkg)); tokio::fs::remove_file(path).await?; - pkg.delete(conn).await?; + pkg.delete(&self.conn).await?; } tokio::fs::remove_file( @@ -168,7 +173,7 @@ impl MetaRepoMgr { // If we removed all "any" packages, we need to resync all databases if arch == ANY_ARCH { - self.generate_archives_all(conn, &repo.name).await?; + self.generate_archives_all(&repo.name).await?; } Ok(true) @@ -177,17 +182,12 @@ impl MetaRepoMgr { } } - pub async fn remove_pkg( - &self, - conn: &DbConn, - repo: &str, - arch: &str, - name: &str, - ) -> Result { - let repo = db::query::repo::by_name(conn, repo).await?; + pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result { + let repo = db::query::repo::by_name(&self.conn, repo).await?; if let Some(repo) = repo { - let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?; + let pkg = + db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; if let Some(pkg) = pkg { // Remove package from database & file system @@ -197,12 +197,12 @@ impl MetaRepoMgr { .join(super::package::filename(&pkg)), ) .await?; - pkg.delete(conn).await?; + pkg.delete(&self.conn).await?; if arch == ANY_ARCH { - self.generate_archives_all(conn, &repo.name).await?; + self.generate_archives_all(&repo.name).await?; } else { - self.generate_archives(conn, &repo.name, arch).await?; + self.generate_archives(&repo.name, arch).await?; } Ok(true) @@ -216,13 +216,11 @@ impl MetaRepoMgr { pub async fn add_pkg_from_reader( &self, - conn: &DbConn, reader: &mut R, repo: &str, ) -> crate::Result<(String, String, String)> { // Copy file contents to temporary path so libarchive can work with it - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - let path = self.repo_dir.join(uuid.to_string()); + let [path] = self.random_file_paths(); let mut temp_file = tokio::fs::File::create(&path).await?; tokio::io::copy(reader, &mut temp_file).await?; @@ -234,17 +232,17 @@ impl MetaRepoMgr { .unwrap()?; // Query the repo for its ID, or create it if it does not already exist - let res = db::query::repo::by_name(conn, &repo).await?; + let res = db::query::repo::by_name(&self.conn, &repo).await?; let repo_id = if let Some(repo_entity) = res { repo_entity.id } else { - db::query::repo::insert(conn, repo, None).await?.id + db::query::repo::insert(&self.conn, repo, None).await?.id }; // If the package already exists in the database, we remove it first let res = db::query::package::by_fields( - conn, + &self.conn, repo_id, &pkg.info.arch, &pkg.info.name, @@ -254,7 +252,7 @@ impl MetaRepoMgr { .await?; if let Some(entry) = res { - entry.delete(conn).await?; + entry.delete(&self.conn).await?; } let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name()); @@ -263,7 +261,7 @@ impl MetaRepoMgr { let name = pkg.info.name.clone(); let version = pkg.info.version.clone(); let arch = pkg.info.arch.clone(); - db::query::package::insert(conn, repo_id, pkg).await?; + db::query::package::insert(&self.conn, repo_id, pkg).await?; // Move the package to its final resting place tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?; @@ -271,11 +269,19 @@ impl MetaRepoMgr { // Synchronize archive databases if arch == ANY_ARCH { - self.generate_archives_all(conn, repo).await?; + self.generate_archives_all(repo).await?; } else { - self.generate_archives(conn, repo, &arch).await?; + self.generate_archives(repo, &arch).await?; } Ok((name, version, arch)) } + + /// Generate a path to a unique file that can be used as a temporary file + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.repo_dir.join(uuid.to_string()) + }) + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 9721451..2f2dacb 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -73,7 +73,7 @@ async fn post_package_archive( .repo_manager .write() .await - .add_pkg_from_reader(&global.db, &mut body, &repo) + .add_pkg_from_reader(&mut body, &repo) .await?; tracing::info!( @@ -91,12 +91,7 @@ async fn delete_repo( State(global): State, Path(repo): Path, ) -> crate::Result { - let repo_removed = global - .repo_manager - .write() - .await - .remove_repo(&global.db, &repo) - .await?; + let repo_removed = global.repo_manager.write().await.remove_repo(&repo).await?; if repo_removed { tracing::info!("Removed repository '{}'", repo); @@ -115,7 +110,7 @@ async fn delete_arch_repo( .repo_manager .write() .await - .remove_repo_arch(&global.db, &repo, &arch) + .remove_repo_arch(&repo, &arch) .await?; if repo_removed { @@ -135,7 +130,7 @@ async fn delete_package( .repo_manager .write() .await - .remove_pkg(&global.db, &repo, &arch, &pkg_name) + .remove_pkg(&repo, &arch, &pkg_name) .await?; if pkg_removed {