diff --git a/server/src/cli.rs b/server/src/cli.rs index 2419606..036c70b 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -81,7 +81,7 @@ impl Cli { let config = Config { data_dir: self.data_dir.clone(), }; - let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"), db.clone()); + let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos")); let global = Global { config, diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index c76e532..6cd709f 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,4 +1,4 @@ -use crate::db::{self, *}; +use crate::db::*; use sea_orm::{sea_query::IntoCondition, *}; use serde::Deserialize; @@ -50,14 +50,20 @@ pub async fn by_fields( version: Option<&str>, compression: Option<&str>, ) -> Result> { - let cond = Condition::all() - .add(package::Column::RepoId.eq(repo_id)) - .add(package::Column::Name.eq(name)) - .add(package::Column::Arch.eq(arch)) - .add_option(version.map(|version| package::Column::Version.eq(version))) - .add_option(compression.map(|compression| package::Column::Compression.eq(compression))); + let mut query = Package::find() + .filter(package::Column::RepoId.eq(repo_id)) + .filter(package::Column::Name.eq(name)) + .filter(package::Column::Arch.eq(arch)); - Package::find().filter(cond).one(conn).await + if let Some(version) = version { + query = query.filter(package::Column::Version.eq(version)); + } + + if let Some(compression) = compression { + query = query.filter(package::Column::Compression.eq(compression)); + } + + query.one(conn).await } pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result { @@ -162,34 +168,34 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack pub async fn full(conn: &DbConn, id: i32) -> Result> { if let Some(entry) = by_id(conn, id).await? { - let licenses: Vec = entry + let licenses = entry .find_related(PackageLicense) - .select_only() - .column(package_license::Column::Name) - .into_tuple() .all(conn) - .await?; - let groups: Vec = entry + .await? + .into_iter() + .map(|e| e.name) + .collect(); + let groups = entry .find_related(PackageGroup) - .select_only() - .column(package_group::Column::Name) - .into_tuple() .all(conn) - .await?; - let related: Vec<(db::PackageRelatedEnum, String)> = entry + .await? + .into_iter() + .map(|e| e.name) + .collect(); + let related = entry .find_related(PackageRelated) - .select_only() - .columns([package_related::Column::Type, package_related::Column::Name]) - .into_tuple() .all(conn) - .await?; - let files: Vec = entry + .await? + .into_iter() + .map(|e| (e.r#type, e.name)) + .collect(); + let files = entry .find_related(PackageFile) - .select_only() - .column(package_file::Column::Path) - .into_tuple() .all(conn) - .await?; + .await? + .into_iter() + .map(|e| e.path) + .collect(); Ok(Some(FullPackage { entry, diff --git a/server/src/db/query/repo.rs b/server/src/db/query/repo.rs index 0370c2b..94627f7 100644 --- a/server/src/db/query/repo.rs +++ b/server/src/db/query/repo.rs @@ -43,12 +43,16 @@ pub async fn by_name(conn: &DbConn, name: &str) -> Result> { .await } -pub async fn insert(conn: &DbConn, name: &str, description: Option<&str>) -> Result { +pub async fn insert( + conn: &DbConn, + name: &str, + description: Option<&str>, +) -> Result> { let model = repo::ActiveModel { id: NotSet, name: Set(String::from(name)), description: Set(description.map(String::from)), }; - model.insert(conn).await + Repo::insert(model).exec(conn).await } diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index a979c09..d103f13 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -30,13 +30,11 @@ impl RepoArchiveWriter { .unwrap()?; Ok(Self { - // In practice, mutex is only ever used by one thread at a time. It's simply here so we - // can use spawn_blocking without issues. ar: Arc::new(Mutex::new(ar)), }) } - /// Add either a "desc" or "files" entry to the archive + /// Set the current entry to be a new "files" list pub async fn add_entry>( &self, full_name: &str, @@ -75,4 +73,15 @@ impl RepoArchiveWriter { .unwrap()?, ) } + + // + ///// Append the given line to the currently active entry + //pub async fn write_line(&self, line: &str) -> io::Result<()> { + // let line = String::from(line); + // let (tx, rx) = oneshot::channel(); + // + // self.tx.send(Message::AppendLine(tx, line)).await; + // + // rx.await.unwrap() + //} } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index e2d9c4d..c288f30 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -1,287 +1,311 @@ -use super::{archive, package}; -use crate::{db, error::Result}; - +use super::package::Package; +use libarchive::write::{Builder, WriteEntry}; +use libarchive::{Entry, WriteFilter, WriteFormat}; +use std::fs; +use std::io; use std::path::{Path, PathBuf}; -use futures::StreamExt; -use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::io::AsyncRead; -use uuid::Uuid; +pub const ANY_ARCH: &str = "any"; -pub const ANY_ARCH: &'static str = "any"; - -pub struct MetaRepoMgr { +/// Overarching abstraction that orchestrates updating the repositories stored on the server +pub struct RepoGroupManager { repo_dir: PathBuf, - conn: DbConn, + pkg_dir: PathBuf, } -impl MetaRepoMgr { - pub fn new>(repo_dir: P, conn: DbConn) -> Self { - MetaRepoMgr { +fn parse_pkg_filename(file_name: &str) -> (String, &str, &str, &str) { + let name_parts = file_name.split('-').collect::>(); + let name = name_parts[..name_parts.len() - 3].join("-"); + let version = name_parts[name_parts.len() - 3]; + let release = name_parts[name_parts.len() - 2]; + let (arch, _) = name_parts[name_parts.len() - 1].split_once('.').unwrap(); + + (name, version, release, arch) +} + +impl RepoGroupManager { + pub fn new, P2: AsRef>(repo_dir: P1, pkg_dir: P2) -> Self { + RepoGroupManager { repo_dir: repo_dir.as_ref().to_path_buf(), - conn, + pkg_dir: pkg_dir.as_ref().to_path_buf(), } } - /// Generate archive databases for all known architectures in the repository, including the - /// "any" architecture. - pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; + pub fn sync(&mut self, repo: &str, arch: &str) -> io::Result<()> { + let subrepo_path = self.repo_dir.join(repo).join(arch); - if repo.is_none() { - return Ok(()); + let mut ar_db = Builder::new(); + ar_db.add_filter(WriteFilter::Gzip)?; + ar_db.set_format(WriteFormat::PaxRestricted)?; + + let mut ar_files = Builder::new(); + ar_files.add_filter(WriteFilter::Gzip)?; + ar_files.set_format(WriteFormat::PaxRestricted)?; + + let mut ar_db = ar_db.open_file(subrepo_path.join(format!("{}.db.tar.gz", repo)))?; + let mut ar_files = + ar_files.open_file(subrepo_path.join(format!("{}.files.tar.gz", repo)))?; + + // All architectures should also include the "any" architecture, except for the "any" + // architecture itself. + let repo_any_dir = self.repo_dir.join(repo).join(ANY_ARCH); + + let any_entries_iter = if arch != ANY_ARCH && repo_any_dir.try_exists()? { + Some(repo_any_dir.read_dir()?) + } else { + None + } + .into_iter() + .flatten(); + + for entry in subrepo_path.read_dir()?.chain(any_entries_iter) { + let entry = entry?; + + if entry.file_type()?.is_dir() { + // The desc file needs to be added to both archives + let path_in_tar = PathBuf::from(entry.file_name()).join("desc"); + let src_path = entry.path().join("desc"); + let metadata = src_path.metadata()?; + + let mut ar_entry = WriteEntry::new(); + ar_entry.set_pathname(&path_in_tar); + // These small text files will definitely fit inside an i64 + ar_entry.set_size(metadata.len().try_into().unwrap()); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_mode(0o100644); + + ar_db.append_path(&mut ar_entry, &src_path)?; + ar_files.append_path(&mut ar_entry, src_path)?; + + // The files file is only required in the files database + let path_in_tar = PathBuf::from(entry.file_name()).join("files"); + let src_path = entry.path().join("files"); + let metadata = src_path.metadata()?; + + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + ar_entry.set_pathname(&path_in_tar); + ar_entry.set_mode(0o100644); + // These small text files will definitely fit inside an i64 + ar_entry.set_size(metadata.len().try_into().unwrap()); + + ar_files.append_path(&mut ar_entry, src_path)?; + } } - let repo = repo.unwrap(); + ar_db.close()?; + ar_files.close()?; - let mut archs = repo - .find_related(crate::db::Package) - .select_only() - .column(crate::db::package::Column::Arch) - .distinct() - .into_tuple::() - .stream(&self.conn) - .await?; + Ok(()) + } - while let Some(arch) = archs.next().await.transpose()? { - self.generate_archives(&repo.name, &arch).await?; + /// Synchronize all present architectures' db archives in the given repository. + pub fn sync_all(&mut self, repo: &str) -> io::Result<()> { + for entry in self.repo_dir.join(repo).read_dir()? { + let entry = entry?; + + if entry.file_type()?.is_dir() { + self.sync(repo, &entry.file_name().to_string_lossy())?; + } } Ok(()) } - /// Generate the archive databases for the given repository and architecture. - pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; + pub fn add_pkg_from_path>( + &mut self, + repo: &str, + path: P, + ) -> io::Result { + let pkg = Package::open(&path)?; - if repo.is_none() { - return Ok(()); + self.add_pkg(repo, &pkg)?; + + // After successfully adding the package, we move it to the packages directory + let dest_pkg_path = self + .pkg_dir + .join(repo) + .join(&pkg.info.arch) + .join(pkg.file_name()); + + fs::create_dir_all(dest_pkg_path.parent().unwrap())?; + fs::rename(&path, dest_pkg_path)?; + + Ok(pkg) + } + + /// Add a package to the given repo, returning to what architectures the package was added. + pub fn add_pkg(&mut self, repo: &str, pkg: &Package) -> io::Result<()> { + // TODO + // * if arch is "any", check if package doesn't already exist for other architecture + // * if arch isn't "any", check if package doesn't already exist for "any" architecture + + // We first remove any existing version of the package + self.remove_pkg(repo, &pkg.info.arch, &pkg.info.name, false)?; + + // Write the `desc` and `files` metadata files to disk + let metadata_dir = self + .repo_dir + .join(repo) + .join(&pkg.info.arch) + .join(format!("{}-{}", pkg.info.name, pkg.info.version)); + + fs::create_dir_all(&metadata_dir)?; + + let mut desc_file = fs::File::create(metadata_dir.join("desc"))?; + pkg.write_desc(&mut desc_file)?; + + let mut files_file = fs::File::create(metadata_dir.join("files"))?; + pkg.write_files(&mut files_file)?; + + // If a package of type "any" is added, we need to update every existing database + if pkg.info.arch == ANY_ARCH { + self.sync_all(repo)?; + } else { + self.sync(repo, &pkg.info.arch)?; } - let repo = repo.unwrap(); - - let parent_dir = self.repo_dir.join(&repo.name); - tokio::fs::create_dir_all(&parent_dir).await?; - - let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = - self.random_file_paths(); - let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; - let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?; - - // Query all packages in the repo that have the given architecture or the "any" - // architecture - let mut pkgs = repo - .find_related(crate::db::Package) - .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) - .stream(&self.conn) - .await?; - - while let Some(pkg) = pkgs.next().await.transpose()? { - let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; - let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; - - package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?; - package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?; - - let full_name = format!("{}-{}", pkg.name, pkg.version); - - ar_db - .add_entry(&full_name, &desc_tmp_file_path, true) - .await?; - ar_files - .add_entry(&full_name, &desc_tmp_file_path, true) - .await?; - ar_files - .add_entry(&full_name, &files_tmp_file_path, false) - .await?; - } - - // Cleanup - ar_db.close().await?; - ar_files.close().await?; - - // Move the db archives to their respective places - tokio::fs::rename( - tmp_ar_db_path, - parent_dir.join(format!("{}.db.tar.gz", arch)), - ) - .await?; - tokio::fs::rename( - tmp_ar_files_path, - parent_dir.join(format!("{}.files.tar.gz", arch)), - ) - .await?; - - // If this fails there's no point in failing the function + if there were no packages in - // the repo, this fails anyway because the temp file doesn't exist - let _ = tokio::fs::remove_file(desc_tmp_file_path).await; - let _ = tokio::fs::remove_file(files_tmp_file_path).await; - Ok(()) } - /// Remove the repo with the given name, if it existed - pub async fn remove_repo(&self, repo: &str) -> Result { - let res = db::query::repo::by_name(&self.conn, repo).await?; + pub fn remove_repo(&mut self, repo: &str) -> io::Result { + let repo_dir = self.repo_dir.join(repo); - if let Some(repo_entry) = res { - // Remove repository from database - repo_entry.delete(&self.conn).await?; - - // Remove files from file system - tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?; + if !repo_dir.exists() { + Ok(false) + } else { + fs::remove_dir_all(&repo_dir)?; + fs::remove_dir_all(self.pkg_dir.join(repo))?; Ok(true) - } else { - Ok(false) } } - /// Remove all packages from the repository with the given arch. - pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result { - let repo = db::query::repo::by_name(&self.conn, repo).await?; + pub fn remove_repo_arch(&mut self, repo: &str, arch: &str) -> io::Result { + let sub_path = PathBuf::from(repo).join(arch); + let repo_dir = self.repo_dir.join(&sub_path); - if let Some(repo) = repo { - let mut pkgs = repo - .find_related(db::Package) - .filter(db::package::Column::Arch.eq(arch)) - .stream(&self.conn) - .await?; - - while let Some(pkg) = pkgs.next().await.transpose()? { - let path = self - .repo_dir - .join(&repo.name) - .join(super::package::filename(&pkg)); - tokio::fs::remove_file(path).await?; - - pkg.delete(&self.conn).await?; - } - - tokio::fs::remove_file( - self.repo_dir - .join(&repo.name) - .join(format!("{}.db.tar.gz", arch)), - ) - .await?; - tokio::fs::remove_file( - self.repo_dir - .join(&repo.name) - .join(format!("{}.files.tar.gz", arch)), - ) - .await?; - - // If we removed all "any" packages, we need to resync all databases - if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; - } - - Ok(true) - } else { - Ok(false) + if !repo_dir.exists() { + return Ok(false); } + + fs::remove_dir_all(&repo_dir)?; + fs::remove_dir_all(self.pkg_dir.join(sub_path))?; + + // Removing the "any" architecture updates all other repositories + if arch == ANY_ARCH { + self.sync_all(repo)?; + } + + Ok(true) } - pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result { - let repo = db::query::repo::by_name(&self.conn, repo).await?; + pub fn remove_pkg( + &mut self, + repo: &str, + arch: &str, + pkg_name: &str, + sync: bool, + ) -> io::Result { + let repo_arch_dir = self.repo_dir.join(repo).join(arch); - if let Some(repo) = repo { - let pkg = - db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; + if !repo_arch_dir.exists() { + return Ok(false); + } - if let Some(pkg) = pkg { - // Remove package from database & file system - tokio::fs::remove_file( - self.repo_dir - .join(&repo.name) - .join(super::package::filename(&pkg)), - ) - .await?; - pkg.delete(&self.conn).await?; + for entry in repo_arch_dir.read_dir()? { + let entry = entry?; - if arch == ANY_ARCH { - self.generate_archives_all(&repo.name).await?; - } else { - self.generate_archives(&repo.name, arch).await?; + // Make sure we skip the archive files + if !entry.metadata()?.is_dir() { + continue; + } + + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + + // The directory name should only contain the name of the package. The last two parts + // when splitting on a dash are the pkgver and pkgrel, so we trim those + let name_parts = file_name.split('-').collect::>(); + let name = name_parts[..name_parts.len() - 2].join("-"); + + if name == pkg_name { + fs::remove_dir_all(entry.path())?; + + // Also remove the old package archive + let repo_arch_pkg_dir = self.pkg_dir.join(repo).join(arch); + + repo_arch_pkg_dir.read_dir()?.try_for_each(|res| { + res.and_then(|entry: fs::DirEntry| { + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + let (name, _, _, _) = parse_pkg_filename(&file_name); + + if name == pkg_name { + fs::remove_file(entry.path()) + } else { + Ok(()) + } + }) + })?; + + if sync { + if arch == ANY_ARCH { + self.sync_all(repo)?; + } else { + self.sync(repo, arch)?; + } } - Ok(true) + return Ok(true); + } + } + + Ok(false) + } + + /// Wrapper around `remove_pkg` that accepts a path relative to the package directory to a + /// package archive. + pub fn remove_pkg_from_path>( + &mut self, + path: P, + sync: bool, + ) -> io::Result> { + let path = path.as_ref(); + let components: Vec<_> = path.iter().collect(); + + if let [repo, _arch, file_name] = components[..] { + let full_path = self.pkg_dir.join(path); + + if full_path.try_exists()? { + let file_name = file_name.to_string_lossy(); + let (name, version, release, arch) = parse_pkg_filename(&file_name); + + let metadata_dir_name = format!("{}-{}-{}", name, version, release); + + // Remove package archive and entry in database + fs::remove_file(full_path)?; + fs::remove_dir_all(self.repo_dir.join(repo).join(arch).join(metadata_dir_name))?; + + if sync { + if arch == ANY_ARCH { + self.sync_all(&repo.to_string_lossy())?; + } else { + self.sync(&repo.to_string_lossy(), arch)?; + } + } + + Ok(Some(( + name, + version.to_string(), + release.to_string(), + arch.to_string(), + ))) } else { - Ok(false) + Ok(None) } } else { - Ok(false) + Ok(None) } } - - pub async fn add_pkg_from_reader( - &self, - reader: &mut R, - repo: &str, - ) -> crate::Result<(String, String, String)> { - // Copy file contents to temporary path so libarchive can work with it - let [path] = self.random_file_paths(); - let mut temp_file = tokio::fs::File::create(&path).await?; - - tokio::io::copy(reader, &mut temp_file).await?; - - // Parse the package - let path_clone = path.clone(); - let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) - .await - .unwrap()?; - - // Query the repo for its ID, or create it if it does not already exist - let res = db::query::repo::by_name(&self.conn, &repo).await?; - - let repo_id = if let Some(repo_entity) = res { - repo_entity.id - } else { - db::query::repo::insert(&self.conn, repo, None).await?.id - }; - - // If the package already exists in the database, we remove it first - let res = db::query::package::by_fields( - &self.conn, - repo_id, - &pkg.info.arch, - &pkg.info.name, - None, - None, - ) - .await?; - - if let Some(entry) = res { - entry.delete(&self.conn).await?; - } - - let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name()); - - // Insert new package into database - let name = pkg.info.name.clone(); - let version = pkg.info.version.clone(); - let arch = pkg.info.arch.clone(); - db::query::package::insert(&self.conn, repo_id, pkg).await?; - - // Move the package to its final resting place - tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?; - tokio::fs::rename(path, dest_pkg_path).await?; - - // Synchronize archive databases - if arch == ANY_ARCH { - self.generate_archives_all(repo).await?; - } else { - self.generate_archives(repo, &arch).await?; - } - - Ok((name, version, arch)) - } - - /// Generate a path to a unique file that can be used as a temporary file - pub fn random_file_paths(&self) -> [PathBuf; C] { - std::array::from_fn(|_| { - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.repo_dir.join(uuid.to_string()) - }) - } } diff --git a/server/src/repo/manager_new.rs b/server/src/repo/manager_new.rs new file mode 100644 index 0000000..adf37a9 --- /dev/null +++ b/server/src/repo/manager_new.rs @@ -0,0 +1,283 @@ +use super::{archive, package}; +use crate::{db, error::Result}; + +use std::path::{Path, PathBuf}; + +use futures::StreamExt; +use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; +use tokio::io::AsyncRead; +use uuid::Uuid; + +pub const ANY_ARCH: &'static str = "any"; + +pub struct MetaRepoMgr { + repo_dir: PathBuf, +} + +impl MetaRepoMgr { + pub fn new>(repo_dir: P) -> Self { + MetaRepoMgr { + repo_dir: repo_dir.as_ref().to_path_buf(), + } + } + + /// Generate archive databases for all known architectures in the repository, including the + /// "any" architecture. + pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(conn, repo).await?; + + if repo.is_none() { + return Ok(()); + } + + let repo = repo.unwrap(); + + let mut archs = repo + .find_related(crate::db::Package) + .select_only() + .column(crate::db::package::Column::Arch) + .distinct() + .into_tuple::() + .stream(conn) + .await?; + + while let Some(arch) = archs.next().await.transpose()? { + self.generate_archives(conn, &repo.name, &arch).await?; + } + + Ok(()) + } + + /// Generate the archive databases for the given repository and architecture. + pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> { + let repo = crate::db::query::repo::by_name(conn, repo).await?; + + if repo.is_none() { + return Ok(()); + } + + let repo = repo.unwrap(); + + let parent_dir = self.repo_dir.join(&repo.name); + tokio::fs::create_dir_all(&parent_dir).await?; + + let ar_db = + archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch))) + .await?; + let ar_files = + archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch))) + .await?; + + // Query all packages in the repo that have the given architecture or the "any" + // architecture + let mut pkgs = repo + .find_related(crate::db::Package) + .filter(db::package::Column::Arch.is_in([arch, ANY_ARCH])) + .stream(conn) + .await?; + + // Create two temp file paths to write our entries to + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let files_tmp_file_path = self.repo_dir.join(uuid.to_string()); + + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let desc_tmp_file_path = self.repo_dir.join(uuid.to_string()); + + while let Some(pkg) = pkgs.next().await.transpose()? { + let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; + let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; + + package::write_files(conn, &mut files_tmp_file, &pkg).await?; + package::write_desc(conn, &mut desc_tmp_file, &pkg).await?; + + let full_name = format!("{}-{}", pkg.name, pkg.version); + + ar_db + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &files_tmp_file_path, false) + .await?; + } + + // Cleanup + ar_db.close().await?; + ar_files.close().await?; + + // If this fails there's no point in failing the function + if there were no packages in + // the repo, this fails anyway because the temp file doesn't exist + let _ = tokio::fs::remove_file(desc_tmp_file_path).await; + let _ = tokio::fs::remove_file(files_tmp_file_path).await; + + Ok(()) + } + + /// Remove the repo with the given name, if it existed + pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result { + let res = db::query::repo::by_name(conn, repo).await?; + + if let Some(repo_entry) = res { + // Remove repository from database + repo_entry.delete(conn).await?; + + // Remove files from file system + tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?; + + Ok(true) + } else { + Ok(false) + } + } + + /// Remove all packages from the repository with the given arch. + pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result { + let repo = db::query::repo::by_name(conn, repo).await?; + + if let Some(repo) = repo { + let mut pkgs = repo + .find_related(db::Package) + .filter(db::package::Column::Arch.eq(arch)) + .stream(conn) + .await?; + + while let Some(pkg) = pkgs.next().await.transpose()? { + let path = self + .repo_dir + .join(&repo.name) + .join(super::package::filename(&pkg)); + tokio::fs::remove_file(path).await?; + + pkg.delete(conn).await?; + } + + tokio::fs::remove_file( + self.repo_dir + .join(&repo.name) + .join(format!("{}.db.tar.gz", arch)), + ) + .await?; + tokio::fs::remove_file( + self.repo_dir + .join(&repo.name) + .join(format!("{}.files.tar.gz", arch)), + ) + .await?; + + // If we removed all "any" packages, we need to resync all databases + if arch == ANY_ARCH { + self.generate_archives_all(conn, &repo.name).await?; + } + + Ok(true) + } else { + Ok(false) + } + } + + pub async fn remove_pkg( + &self, + conn: &DbConn, + repo: &str, + arch: &str, + name: &str, + ) -> Result { + let repo = db::query::repo::by_name(conn, repo).await?; + + if let Some(repo) = repo { + let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?; + + if let Some(pkg) = pkg { + // Remove package from database & file system + tokio::fs::remove_file( + self.repo_dir + .join(&repo.name) + .join(super::package::filename(&pkg)), + ) + .await?; + pkg.delete(conn).await?; + + if arch == ANY_ARCH { + self.generate_archives_all(conn, &repo.name).await?; + } else { + self.generate_archives(conn, &repo.name, arch).await?; + } + + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + pub async fn add_pkg_from_reader( + &self, + conn: &DbConn, + reader: &mut R, + repo: &str, + ) -> crate::Result<(String, String, String)> { + // Copy file contents to temporary path so libarchive can work with it + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + let path = self.repo_dir.join(uuid.to_string()); + let mut temp_file = tokio::fs::File::create(&path).await?; + + tokio::io::copy(reader, &mut temp_file).await?; + + // Parse the package + let path_clone = path.clone(); + let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) + .await + .unwrap()?; + + // Query the repo for its ID, or create it if it does not already exist + let res = db::query::repo::by_name(conn, &repo).await?; + + let repo_id = if let Some(repo_entity) = res { + repo_entity.id + } else { + db::query::repo::insert(conn, repo, None) + .await? + .last_insert_id + }; + + // If the package already exists in the database, we remove it first + let res = db::query::package::by_fields( + conn, + repo_id, + &pkg.info.arch, + &pkg.info.name, + None, + None, + ) + .await?; + + if let Some(entry) = res { + entry.delete(conn).await?; + } + + let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name()); + + // Insert new package into database + let name = pkg.info.name.clone(); + let version = pkg.info.version.clone(); + let arch = pkg.info.arch.clone(); + db::query::package::insert(conn, repo_id, pkg).await?; + + // Move the package to its final resting place + tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?; + tokio::fs::rename(path, dest_pkg_path).await?; + + // Synchronize archive databases + if arch == ANY_ARCH { + self.generate_archives_all(conn, repo).await?; + } else { + self.generate_archives(conn, repo, &arch).await?; + } + + Ok((name, version, arch)) + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 2f2dacb..f5e48d4 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,8 +1,9 @@ mod archive; mod manager; +mod manager_new; pub mod package; -pub use manager::MetaRepoMgr; +pub use manager_new::MetaRepoMgr; use axum::{ body::Body, @@ -73,16 +74,10 @@ async fn post_package_archive( .repo_manager .write() .await - .add_pkg_from_reader(&mut body, &repo) + .add_pkg_from_reader(&global.db, &mut body, &repo) .await?; - tracing::info!( - "Added '{}-{}' to repository '{}' ({})", - name, - version, - repo, - arch - ); + tracing::info!("Added '{}-{}' to repository '{}' ({})", name, version, repo, arch); Ok(()) } @@ -91,7 +86,12 @@ async fn delete_repo( State(global): State, Path(repo): Path, ) -> crate::Result { - let repo_removed = global.repo_manager.write().await.remove_repo(&repo).await?; + let repo_removed = global + .repo_manager + .write() + .await + .remove_repo(&global.db, &repo) + .await?; if repo_removed { tracing::info!("Removed repository '{}'", repo); @@ -110,7 +110,7 @@ async fn delete_arch_repo( .repo_manager .write() .await - .remove_repo_arch(&repo, &arch) + .remove_repo_arch(&global.db, &repo, &arch) .await?; if repo_removed { @@ -130,7 +130,7 @@ async fn delete_package( .repo_manager .write() .await - .remove_pkg(&repo, &arch, &pkg_name) + .remove_pkg(&global.db, &repo, &arch, &pkg_name) .await?; if pkg_removed {