refactor: remove old manager; some refactoring
parent
fc844c685f
commit
0b1c8b640f
|
@ -1,4 +1,4 @@
|
|||
use crate::db::*;
|
||||
use crate::db::{self, *};
|
||||
|
||||
use sea_orm::{sea_query::IntoCondition, *};
|
||||
use serde::Deserialize;
|
||||
|
@ -50,20 +50,14 @@ pub async fn by_fields(
|
|||
version: Option<&str>,
|
||||
compression: Option<&str>,
|
||||
) -> Result<Option<package::Model>> {
|
||||
let mut query = Package::find()
|
||||
.filter(package::Column::RepoId.eq(repo_id))
|
||||
.filter(package::Column::Name.eq(name))
|
||||
.filter(package::Column::Arch.eq(arch));
|
||||
let cond = Condition::all()
|
||||
.add(package::Column::RepoId.eq(repo_id))
|
||||
.add(package::Column::Name.eq(name))
|
||||
.add(package::Column::Arch.eq(arch))
|
||||
.add_option(version.map(|version| package::Column::Version.eq(version)))
|
||||
.add_option(compression.map(|compression| package::Column::Compression.eq(compression)));
|
||||
|
||||
if let Some(version) = version {
|
||||
query = query.filter(package::Column::Version.eq(version));
|
||||
}
|
||||
|
||||
if let Some(compression) = compression {
|
||||
query = query.filter(package::Column::Compression.eq(compression));
|
||||
}
|
||||
|
||||
query.one(conn).await
|
||||
Package::find().filter(cond).one(conn).await
|
||||
}
|
||||
|
||||
pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result<DeleteResult> {
|
||||
|
@ -168,34 +162,34 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
|
|||
|
||||
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
|
||||
if let Some(entry) = by_id(conn, id).await? {
|
||||
let licenses = entry
|
||||
let licenses: Vec<String> = entry
|
||||
.find_related(PackageLicense)
|
||||
.select_only()
|
||||
.column(package_license::Column::Name)
|
||||
.into_tuple()
|
||||
.all(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|e| e.name)
|
||||
.collect();
|
||||
let groups = entry
|
||||
.await?;
|
||||
let groups: Vec<String> = entry
|
||||
.find_related(PackageGroup)
|
||||
.select_only()
|
||||
.column(package_group::Column::Name)
|
||||
.into_tuple()
|
||||
.all(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|e| e.name)
|
||||
.collect();
|
||||
let related = entry
|
||||
.await?;
|
||||
let related: Vec<(db::PackageRelatedEnum, String)> = entry
|
||||
.find_related(PackageRelated)
|
||||
.select_only()
|
||||
.columns([package_related::Column::Type, package_related::Column::Name])
|
||||
.into_tuple()
|
||||
.all(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|e| (e.r#type, e.name))
|
||||
.collect();
|
||||
let files = entry
|
||||
.await?;
|
||||
let files: Vec<String> = entry
|
||||
.find_related(PackageFile)
|
||||
.select_only()
|
||||
.column(package_file::Column::Path)
|
||||
.into_tuple()
|
||||
.all(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|e| e.path)
|
||||
.collect();
|
||||
.await?;
|
||||
|
||||
Ok(Some(FullPackage {
|
||||
entry,
|
||||
|
|
|
@ -43,16 +43,12 @@ pub async fn by_name(conn: &DbConn, name: &str) -> Result<Option<repo::Model>> {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn insert(
|
||||
conn: &DbConn,
|
||||
name: &str,
|
||||
description: Option<&str>,
|
||||
) -> Result<InsertResult<repo::ActiveModel>> {
|
||||
pub async fn insert(conn: &DbConn, name: &str, description: Option<&str>) -> Result<repo::Model> {
|
||||
let model = repo::ActiveModel {
|
||||
id: NotSet,
|
||||
name: Set(String::from(name)),
|
||||
description: Set(description.map(String::from)),
|
||||
};
|
||||
|
||||
Repo::insert(model).exec(conn).await
|
||||
model.insert(conn).await
|
||||
}
|
||||
|
|
|
@ -1,311 +1,281 @@
|
|||
use super::package::Package;
|
||||
use libarchive::write::{Builder, WriteEntry};
|
||||
use libarchive::{Entry, WriteFilter, WriteFormat};
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use super::{archive, package};
|
||||
use crate::{db, error::Result};
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
pub const ANY_ARCH: &str = "any";
|
||||
use futures::StreamExt;
|
||||
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
|
||||
use tokio::io::AsyncRead;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Overarching abstraction that orchestrates updating the repositories stored on the server
|
||||
pub struct RepoGroupManager {
|
||||
pub const ANY_ARCH: &'static str = "any";
|
||||
|
||||
pub struct MetaRepoMgr {
|
||||
repo_dir: PathBuf,
|
||||
pkg_dir: PathBuf,
|
||||
}
|
||||
|
||||
fn parse_pkg_filename(file_name: &str) -> (String, &str, &str, &str) {
|
||||
let name_parts = file_name.split('-').collect::<Vec<_>>();
|
||||
let name = name_parts[..name_parts.len() - 3].join("-");
|
||||
let version = name_parts[name_parts.len() - 3];
|
||||
let release = name_parts[name_parts.len() - 2];
|
||||
let (arch, _) = name_parts[name_parts.len() - 1].split_once('.').unwrap();
|
||||
|
||||
(name, version, release, arch)
|
||||
}
|
||||
|
||||
impl RepoGroupManager {
|
||||
pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(repo_dir: P1, pkg_dir: P2) -> Self {
|
||||
RepoGroupManager {
|
||||
impl MetaRepoMgr {
|
||||
pub fn new<P: AsRef<Path>>(repo_dir: P) -> Self {
|
||||
MetaRepoMgr {
|
||||
repo_dir: repo_dir.as_ref().to_path_buf(),
|
||||
pkg_dir: pkg_dir.as_ref().to_path_buf(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync(&mut self, repo: &str, arch: &str) -> io::Result<()> {
|
||||
let subrepo_path = self.repo_dir.join(repo).join(arch);
|
||||
/// Generate archive databases for all known architectures in the repository, including the
|
||||
/// "any" architecture.
|
||||
pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
let mut ar_db = Builder::new();
|
||||
ar_db.add_filter(WriteFilter::Gzip)?;
|
||||
ar_db.set_format(WriteFormat::PaxRestricted)?;
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut ar_files = Builder::new();
|
||||
ar_files.add_filter(WriteFilter::Gzip)?;
|
||||
ar_files.set_format(WriteFormat::PaxRestricted)?;
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let mut ar_db = ar_db.open_file(subrepo_path.join(format!("{}.db.tar.gz", repo)))?;
|
||||
let mut ar_files =
|
||||
ar_files.open_file(subrepo_path.join(format!("{}.files.tar.gz", repo)))?;
|
||||
let mut archs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.select_only()
|
||||
.column(crate::db::package::Column::Arch)
|
||||
.distinct()
|
||||
.into_tuple::<String>()
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
// All architectures should also include the "any" architecture, except for the "any"
|
||||
// architecture itself.
|
||||
let repo_any_dir = self.repo_dir.join(repo).join(ANY_ARCH);
|
||||
while let Some(arch) = archs.next().await.transpose()? {
|
||||
self.generate_archives(conn, &repo.name, &arch).await?;
|
||||
}
|
||||
|
||||
let any_entries_iter = if arch != ANY_ARCH && repo_any_dir.try_exists()? {
|
||||
Some(repo_any_dir.read_dir()?)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate the archive databases for the given repository and architecture.
|
||||
pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let parent_dir = self.repo_dir.join(&repo.name);
|
||||
tokio::fs::create_dir_all(&parent_dir).await?;
|
||||
|
||||
let ar_db =
|
||||
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch)))
|
||||
.await?;
|
||||
let ar_files =
|
||||
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch)))
|
||||
.await?;
|
||||
|
||||
// Query all packages in the repo that have the given architecture or the "any"
|
||||
// architecture
|
||||
let mut pkgs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
// Create two temp file paths to write our entries to
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let files_tmp_file_path = self.repo_dir.join(uuid.to_string());
|
||||
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let desc_tmp_file_path = self.repo_dir.join(uuid.to_string());
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
|
||||
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
|
||||
|
||||
package::write_files(conn, &mut files_tmp_file, &pkg).await?;
|
||||
package::write_desc(conn, &mut desc_tmp_file, &pkg).await?;
|
||||
|
||||
let full_name = format!("{}-{}", pkg.name, pkg.version);
|
||||
|
||||
ar_db
|
||||
.add_entry(&full_name, &desc_tmp_file_path, true)
|
||||
.await?;
|
||||
ar_files
|
||||
.add_entry(&full_name, &desc_tmp_file_path, true)
|
||||
.await?;
|
||||
ar_files
|
||||
.add_entry(&full_name, &files_tmp_file_path, false)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
ar_db.close().await?;
|
||||
ar_files.close().await?;
|
||||
|
||||
// If this fails there's no point in failing the function + if there were no packages in
|
||||
// the repo, this fails anyway because the temp file doesn't exist
|
||||
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
|
||||
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove the repo with the given name, if it existed
|
||||
pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> {
|
||||
let res = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if let Some(repo_entry) = res {
|
||||
// Remove repository from database
|
||||
repo_entry.delete(conn).await?;
|
||||
|
||||
// Remove files from file system
|
||||
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.into_iter()
|
||||
.flatten();
|
||||
|
||||
for entry in subrepo_path.read_dir()?.chain(any_entries_iter) {
|
||||
let entry = entry?;
|
||||
|
||||
if entry.file_type()?.is_dir() {
|
||||
// The desc file needs to be added to both archives
|
||||
let path_in_tar = PathBuf::from(entry.file_name()).join("desc");
|
||||
let src_path = entry.path().join("desc");
|
||||
let metadata = src_path.metadata()?;
|
||||
|
||||
let mut ar_entry = WriteEntry::new();
|
||||
ar_entry.set_pathname(&path_in_tar);
|
||||
// These small text files will definitely fit inside an i64
|
||||
ar_entry.set_size(metadata.len().try_into().unwrap());
|
||||
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
|
||||
ar_entry.set_mode(0o100644);
|
||||
|
||||
ar_db.append_path(&mut ar_entry, &src_path)?;
|
||||
ar_files.append_path(&mut ar_entry, src_path)?;
|
||||
|
||||
// The files file is only required in the files database
|
||||
let path_in_tar = PathBuf::from(entry.file_name()).join("files");
|
||||
let src_path = entry.path().join("files");
|
||||
let metadata = src_path.metadata()?;
|
||||
|
||||
let mut ar_entry = WriteEntry::new();
|
||||
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
|
||||
ar_entry.set_pathname(&path_in_tar);
|
||||
ar_entry.set_mode(0o100644);
|
||||
// These small text files will definitely fit inside an i64
|
||||
ar_entry.set_size(metadata.len().try_into().unwrap());
|
||||
|
||||
ar_files.append_path(&mut ar_entry, src_path)?;
|
||||
}
|
||||
}
|
||||
|
||||
ar_db.close()?;
|
||||
ar_files.close()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Synchronize all present architectures' db archives in the given repository.
|
||||
pub fn sync_all(&mut self, repo: &str) -> io::Result<()> {
|
||||
for entry in self.repo_dir.join(repo).read_dir()? {
|
||||
let entry = entry?;
|
||||
|
||||
if entry.file_type()?.is_dir() {
|
||||
self.sync(repo, &entry.file_name().to_string_lossy())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_pkg_from_path<P: AsRef<Path>>(
|
||||
&mut self,
|
||||
repo: &str,
|
||||
path: P,
|
||||
) -> io::Result<Package> {
|
||||
let pkg = Package::open(&path)?;
|
||||
|
||||
self.add_pkg(repo, &pkg)?;
|
||||
|
||||
// After successfully adding the package, we move it to the packages directory
|
||||
let dest_pkg_path = self
|
||||
.pkg_dir
|
||||
.join(repo)
|
||||
.join(&pkg.info.arch)
|
||||
.join(pkg.file_name());
|
||||
|
||||
fs::create_dir_all(dest_pkg_path.parent().unwrap())?;
|
||||
fs::rename(&path, dest_pkg_path)?;
|
||||
|
||||
Ok(pkg)
|
||||
}
|
||||
|
||||
/// Add a package to the given repo, returning to what architectures the package was added.
|
||||
pub fn add_pkg(&mut self, repo: &str, pkg: &Package) -> io::Result<()> {
|
||||
// TODO
|
||||
// * if arch is "any", check if package doesn't already exist for other architecture
|
||||
// * if arch isn't "any", check if package doesn't already exist for "any" architecture
|
||||
|
||||
// We first remove any existing version of the package
|
||||
self.remove_pkg(repo, &pkg.info.arch, &pkg.info.name, false)?;
|
||||
|
||||
// Write the `desc` and `files` metadata files to disk
|
||||
let metadata_dir = self
|
||||
.repo_dir
|
||||
.join(repo)
|
||||
.join(&pkg.info.arch)
|
||||
.join(format!("{}-{}", pkg.info.name, pkg.info.version));
|
||||
|
||||
fs::create_dir_all(&metadata_dir)?;
|
||||
|
||||
let mut desc_file = fs::File::create(metadata_dir.join("desc"))?;
|
||||
pkg.write_desc(&mut desc_file)?;
|
||||
|
||||
let mut files_file = fs::File::create(metadata_dir.join("files"))?;
|
||||
pkg.write_files(&mut files_file)?;
|
||||
|
||||
// If a package of type "any" is added, we need to update every existing database
|
||||
if pkg.info.arch == ANY_ARCH {
|
||||
self.sync_all(repo)?;
|
||||
} else {
|
||||
self.sync(repo, &pkg.info.arch)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_repo(&mut self, repo: &str) -> io::Result<bool> {
|
||||
let repo_dir = self.repo_dir.join(repo);
|
||||
|
||||
if !repo_dir.exists() {
|
||||
Ok(false)
|
||||
} else {
|
||||
fs::remove_dir_all(&repo_dir)?;
|
||||
fs::remove_dir_all(self.pkg_dir.join(repo))?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_repo_arch(&mut self, repo: &str, arch: &str) -> io::Result<bool> {
|
||||
let sub_path = PathBuf::from(repo).join(arch);
|
||||
let repo_dir = self.repo_dir.join(&sub_path);
|
||||
/// Remove all packages from the repository with the given arch.
|
||||
pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if !repo_dir.exists() {
|
||||
return Ok(false);
|
||||
if let Some(repo) = repo {
|
||||
let mut pkgs = repo
|
||||
.find_related(db::Package)
|
||||
.filter(db::package::Column::Arch.eq(arch))
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
let path = self
|
||||
.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg));
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
pkg.delete(conn).await?;
|
||||
}
|
||||
|
||||
fs::remove_dir_all(&repo_dir)?;
|
||||
fs::remove_dir_all(self.pkg_dir.join(sub_path))?;
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.db.tar.gz", arch)),
|
||||
)
|
||||
.await?;
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.files.tar.gz", arch)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Removing the "any" architecture updates all other repositories
|
||||
// If we removed all "any" packages, we need to resync all databases
|
||||
if arch == ANY_ARCH {
|
||||
self.sync_all(repo)?;
|
||||
self.generate_archives_all(conn, &repo.name).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_pkg(
|
||||
&mut self,
|
||||
pub async fn remove_pkg(
|
||||
&self,
|
||||
conn: &DbConn,
|
||||
repo: &str,
|
||||
arch: &str,
|
||||
pkg_name: &str,
|
||||
sync: bool,
|
||||
) -> io::Result<bool> {
|
||||
let repo_arch_dir = self.repo_dir.join(repo).join(arch);
|
||||
name: &str,
|
||||
) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if !repo_arch_dir.exists() {
|
||||
return Ok(false);
|
||||
}
|
||||
if let Some(repo) = repo {
|
||||
let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?;
|
||||
|
||||
for entry in repo_arch_dir.read_dir()? {
|
||||
let entry = entry?;
|
||||
if let Some(pkg) = pkg {
|
||||
// Remove package from database & file system
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg)),
|
||||
)
|
||||
.await?;
|
||||
pkg.delete(conn).await?;
|
||||
|
||||
// Make sure we skip the archive files
|
||||
if !entry.metadata()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
|
||||
// The directory name should only contain the name of the package. The last two parts
|
||||
// when splitting on a dash are the pkgver and pkgrel, so we trim those
|
||||
let name_parts = file_name.split('-').collect::<Vec<_>>();
|
||||
let name = name_parts[..name_parts.len() - 2].join("-");
|
||||
|
||||
if name == pkg_name {
|
||||
fs::remove_dir_all(entry.path())?;
|
||||
|
||||
// Also remove the old package archive
|
||||
let repo_arch_pkg_dir = self.pkg_dir.join(repo).join(arch);
|
||||
|
||||
repo_arch_pkg_dir.read_dir()?.try_for_each(|res| {
|
||||
res.and_then(|entry: fs::DirEntry| {
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let (name, _, _, _) = parse_pkg_filename(&file_name);
|
||||
|
||||
if name == pkg_name {
|
||||
fs::remove_file(entry.path())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
})?;
|
||||
|
||||
if sync {
|
||||
if arch == ANY_ARCH {
|
||||
self.sync_all(repo)?;
|
||||
self.generate_archives_all(conn, &repo.name).await?;
|
||||
} else {
|
||||
self.sync(repo, arch)?;
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(true);
|
||||
}
|
||||
self.generate_archives(conn, &repo.name, arch).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper around `remove_pkg` that accepts a path relative to the package directory to a
|
||||
/// package archive.
|
||||
pub fn remove_pkg_from_path<P: AsRef<Path>>(
|
||||
&mut self,
|
||||
path: P,
|
||||
sync: bool,
|
||||
) -> io::Result<Option<(String, String, String, String)>> {
|
||||
let path = path.as_ref();
|
||||
let components: Vec<_> = path.iter().collect();
|
||||
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
|
||||
&self,
|
||||
conn: &DbConn,
|
||||
reader: &mut R,
|
||||
repo: &str,
|
||||
) -> crate::Result<(String, String, String)> {
|
||||
// Copy file contents to temporary path so libarchive can work with it
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let path = self.repo_dir.join(uuid.to_string());
|
||||
let mut temp_file = tokio::fs::File::create(&path).await?;
|
||||
|
||||
if let [repo, _arch, file_name] = components[..] {
|
||||
let full_path = self.pkg_dir.join(path);
|
||||
tokio::io::copy(reader, &mut temp_file).await?;
|
||||
|
||||
if full_path.try_exists()? {
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let (name, version, release, arch) = parse_pkg_filename(&file_name);
|
||||
// Parse the package
|
||||
let path_clone = path.clone();
|
||||
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let metadata_dir_name = format!("{}-{}-{}", name, version, release);
|
||||
// Query the repo for its ID, or create it if it does not already exist
|
||||
let res = db::query::repo::by_name(conn, &repo).await?;
|
||||
|
||||
// Remove package archive and entry in database
|
||||
fs::remove_file(full_path)?;
|
||||
fs::remove_dir_all(self.repo_dir.join(repo).join(arch).join(metadata_dir_name))?;
|
||||
let repo_id = if let Some(repo_entity) = res {
|
||||
repo_entity.id
|
||||
} else {
|
||||
db::query::repo::insert(conn, repo, None).await?.id
|
||||
};
|
||||
|
||||
if sync {
|
||||
// If the package already exists in the database, we remove it first
|
||||
let res = db::query::package::by_fields(
|
||||
conn,
|
||||
repo_id,
|
||||
&pkg.info.arch,
|
||||
&pkg.info.name,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(entry) = res {
|
||||
entry.delete(conn).await?;
|
||||
}
|
||||
|
||||
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name());
|
||||
|
||||
// Insert new package into database
|
||||
let name = pkg.info.name.clone();
|
||||
let version = pkg.info.version.clone();
|
||||
let arch = pkg.info.arch.clone();
|
||||
db::query::package::insert(conn, repo_id, pkg).await?;
|
||||
|
||||
// Move the package to its final resting place
|
||||
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
|
||||
tokio::fs::rename(path, dest_pkg_path).await?;
|
||||
|
||||
// Synchronize archive databases
|
||||
if arch == ANY_ARCH {
|
||||
self.sync_all(&repo.to_string_lossy())?;
|
||||
self.generate_archives_all(conn, repo).await?;
|
||||
} else {
|
||||
self.sync(&repo.to_string_lossy(), arch)?;
|
||||
}
|
||||
self.generate_archives(conn, repo, &arch).await?;
|
||||
}
|
||||
|
||||
Ok(Some((
|
||||
name,
|
||||
version.to_string(),
|
||||
release.to_string(),
|
||||
arch.to_string(),
|
||||
)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
Ok((name, version, arch))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,283 +0,0 @@
|
|||
use super::{archive, package};
|
||||
use crate::{db, error::Result};
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use futures::StreamExt;
|
||||
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
|
||||
use tokio::io::AsyncRead;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const ANY_ARCH: &'static str = "any";
|
||||
|
||||
pub struct MetaRepoMgr {
|
||||
repo_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl MetaRepoMgr {
|
||||
pub fn new<P: AsRef<Path>>(repo_dir: P) -> Self {
|
||||
MetaRepoMgr {
|
||||
repo_dir: repo_dir.as_ref().to_path_buf(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate archive databases for all known architectures in the repository, including the
|
||||
/// "any" architecture.
|
||||
pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let mut archs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.select_only()
|
||||
.column(crate::db::package::Column::Arch)
|
||||
.distinct()
|
||||
.into_tuple::<String>()
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
while let Some(arch) = archs.next().await.transpose()? {
|
||||
self.generate_archives(conn, &repo.name, &arch).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate the archive databases for the given repository and architecture.
|
||||
pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let parent_dir = self.repo_dir.join(&repo.name);
|
||||
tokio::fs::create_dir_all(&parent_dir).await?;
|
||||
|
||||
let ar_db =
|
||||
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch)))
|
||||
.await?;
|
||||
let ar_files =
|
||||
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch)))
|
||||
.await?;
|
||||
|
||||
// Query all packages in the repo that have the given architecture or the "any"
|
||||
// architecture
|
||||
let mut pkgs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
// Create two temp file paths to write our entries to
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let files_tmp_file_path = self.repo_dir.join(uuid.to_string());
|
||||
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let desc_tmp_file_path = self.repo_dir.join(uuid.to_string());
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
|
||||
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
|
||||
|
||||
package::write_files(conn, &mut files_tmp_file, &pkg).await?;
|
||||
package::write_desc(conn, &mut desc_tmp_file, &pkg).await?;
|
||||
|
||||
let full_name = format!("{}-{}", pkg.name, pkg.version);
|
||||
|
||||
ar_db
|
||||
.add_entry(&full_name, &desc_tmp_file_path, true)
|
||||
.await?;
|
||||
ar_files
|
||||
.add_entry(&full_name, &desc_tmp_file_path, true)
|
||||
.await?;
|
||||
ar_files
|
||||
.add_entry(&full_name, &files_tmp_file_path, false)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
ar_db.close().await?;
|
||||
ar_files.close().await?;
|
||||
|
||||
// If this fails there's no point in failing the function + if there were no packages in
|
||||
// the repo, this fails anyway because the temp file doesn't exist
|
||||
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
|
||||
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove the repo with the given name, if it existed
|
||||
pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> {
|
||||
let res = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if let Some(repo_entry) = res {
|
||||
// Remove repository from database
|
||||
repo_entry.delete(conn).await?;
|
||||
|
||||
// Remove files from file system
|
||||
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all packages from the repository with the given arch.
|
||||
pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if let Some(repo) = repo {
|
||||
let mut pkgs = repo
|
||||
.find_related(db::Package)
|
||||
.filter(db::package::Column::Arch.eq(arch))
|
||||
.stream(conn)
|
||||
.await?;
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
let path = self
|
||||
.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg));
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
pkg.delete(conn).await?;
|
||||
}
|
||||
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.db.tar.gz", arch)),
|
||||
)
|
||||
.await?;
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.files.tar.gz", arch)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// If we removed all "any" packages, we need to resync all databases
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(conn, &repo.name).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_pkg(
|
||||
&self,
|
||||
conn: &DbConn,
|
||||
repo: &str,
|
||||
arch: &str,
|
||||
name: &str,
|
||||
) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(conn, repo).await?;
|
||||
|
||||
if let Some(repo) = repo {
|
||||
let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?;
|
||||
|
||||
if let Some(pkg) = pkg {
|
||||
// Remove package from database & file system
|
||||
tokio::fs::remove_file(
|
||||
self.repo_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg)),
|
||||
)
|
||||
.await?;
|
||||
pkg.delete(conn).await?;
|
||||
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(conn, &repo.name).await?;
|
||||
} else {
|
||||
self.generate_archives(conn, &repo.name, arch).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
|
||||
&self,
|
||||
conn: &DbConn,
|
||||
reader: &mut R,
|
||||
repo: &str,
|
||||
) -> crate::Result<(String, String, String)> {
|
||||
// Copy file contents to temporary path so libarchive can work with it
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
let path = self.repo_dir.join(uuid.to_string());
|
||||
let mut temp_file = tokio::fs::File::create(&path).await?;
|
||||
|
||||
tokio::io::copy(reader, &mut temp_file).await?;
|
||||
|
||||
// Parse the package
|
||||
let path_clone = path.clone();
|
||||
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
// Query the repo for its ID, or create it if it does not already exist
|
||||
let res = db::query::repo::by_name(conn, &repo).await?;
|
||||
|
||||
let repo_id = if let Some(repo_entity) = res {
|
||||
repo_entity.id
|
||||
} else {
|
||||
db::query::repo::insert(conn, repo, None)
|
||||
.await?
|
||||
.last_insert_id
|
||||
};
|
||||
|
||||
// If the package already exists in the database, we remove it first
|
||||
let res = db::query::package::by_fields(
|
||||
conn,
|
||||
repo_id,
|
||||
&pkg.info.arch,
|
||||
&pkg.info.name,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(entry) = res {
|
||||
entry.delete(conn).await?;
|
||||
}
|
||||
|
||||
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name());
|
||||
|
||||
// Insert new package into database
|
||||
let name = pkg.info.name.clone();
|
||||
let version = pkg.info.version.clone();
|
||||
let arch = pkg.info.arch.clone();
|
||||
db::query::package::insert(conn, repo_id, pkg).await?;
|
||||
|
||||
// Move the package to its final resting place
|
||||
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
|
||||
tokio::fs::rename(path, dest_pkg_path).await?;
|
||||
|
||||
// Synchronize archive databases
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(conn, repo).await?;
|
||||
} else {
|
||||
self.generate_archives(conn, repo, &arch).await?;
|
||||
}
|
||||
|
||||
Ok((name, version, arch))
|
||||
}
|
||||
}
|
|
@ -1,9 +1,8 @@
|
|||
mod archive;
|
||||
mod manager;
|
||||
mod manager_new;
|
||||
pub mod package;
|
||||
|
||||
pub use manager_new::MetaRepoMgr;
|
||||
pub use manager::MetaRepoMgr;
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
|
@ -77,7 +76,13 @@ async fn post_package_archive(
|
|||
.add_pkg_from_reader(&global.db, &mut body, &repo)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Added '{}-{}' to repository '{}' ({})", name, version, repo, arch);
|
||||
tracing::info!(
|
||||
"Added '{}-{}' to repository '{}' ({})",
|
||||
name,
|
||||
version,
|
||||
repo,
|
||||
arch
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue