Compare commits

..

No commits in common. "ecc33f01534a4db31e4b3e233f8e32805d65934b" and "fc844c685f3d392981ef9864524585c97d305ebf" have entirely different histories.

7 changed files with 612 additions and 286 deletions

View File

@ -81,7 +81,7 @@ impl Cli {
let config = Config { let config = Config {
data_dir: self.data_dir.clone(), data_dir: self.data_dir.clone(),
}; };
let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"), db.clone()); let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"));
let global = Global { let global = Global {
config, config,

View File

@ -1,4 +1,4 @@
use crate::db::{self, *}; use crate::db::*;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use serde::Deserialize; use serde::Deserialize;
@ -50,14 +50,20 @@ pub async fn by_fields(
version: Option<&str>, version: Option<&str>,
compression: Option<&str>, compression: Option<&str>,
) -> Result<Option<package::Model>> { ) -> Result<Option<package::Model>> {
let cond = Condition::all() let mut query = Package::find()
.add(package::Column::RepoId.eq(repo_id)) .filter(package::Column::RepoId.eq(repo_id))
.add(package::Column::Name.eq(name)) .filter(package::Column::Name.eq(name))
.add(package::Column::Arch.eq(arch)) .filter(package::Column::Arch.eq(arch));
.add_option(version.map(|version| package::Column::Version.eq(version)))
.add_option(compression.map(|compression| package::Column::Compression.eq(compression)));
Package::find().filter(cond).one(conn).await if let Some(version) = version {
query = query.filter(package::Column::Version.eq(version));
}
if let Some(compression) = compression {
query = query.filter(package::Column::Compression.eq(compression));
}
query.one(conn).await
} }
pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result<DeleteResult> { pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result<DeleteResult> {
@ -162,34 +168,34 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> { pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
if let Some(entry) = by_id(conn, id).await? { if let Some(entry) = by_id(conn, id).await? {
let licenses: Vec<String> = entry let licenses = entry
.find_related(PackageLicense) .find_related(PackageLicense)
.select_only()
.column(package_license::Column::Name)
.into_tuple()
.all(conn) .all(conn)
.await?; .await?
let groups: Vec<String> = entry .into_iter()
.map(|e| e.name)
.collect();
let groups = entry
.find_related(PackageGroup) .find_related(PackageGroup)
.select_only()
.column(package_group::Column::Name)
.into_tuple()
.all(conn) .all(conn)
.await?; .await?
let related: Vec<(db::PackageRelatedEnum, String)> = entry .into_iter()
.map(|e| e.name)
.collect();
let related = entry
.find_related(PackageRelated) .find_related(PackageRelated)
.select_only()
.columns([package_related::Column::Type, package_related::Column::Name])
.into_tuple()
.all(conn) .all(conn)
.await?; .await?
let files: Vec<String> = entry .into_iter()
.map(|e| (e.r#type, e.name))
.collect();
let files = entry
.find_related(PackageFile) .find_related(PackageFile)
.select_only()
.column(package_file::Column::Path)
.into_tuple()
.all(conn) .all(conn)
.await?; .await?
.into_iter()
.map(|e| e.path)
.collect();
Ok(Some(FullPackage { Ok(Some(FullPackage {
entry, entry,

View File

@ -43,12 +43,16 @@ pub async fn by_name(conn: &DbConn, name: &str) -> Result<Option<repo::Model>> {
.await .await
} }
pub async fn insert(conn: &DbConn, name: &str, description: Option<&str>) -> Result<repo::Model> { pub async fn insert(
conn: &DbConn,
name: &str,
description: Option<&str>,
) -> Result<InsertResult<repo::ActiveModel>> {
let model = repo::ActiveModel { let model = repo::ActiveModel {
id: NotSet, id: NotSet,
name: Set(String::from(name)), name: Set(String::from(name)),
description: Set(description.map(String::from)), description: Set(description.map(String::from)),
}; };
model.insert(conn).await Repo::insert(model).exec(conn).await
} }

View File

@ -30,13 +30,11 @@ impl RepoArchiveWriter {
.unwrap()?; .unwrap()?;
Ok(Self { Ok(Self {
// In practice, mutex is only ever used by one thread at a time. It's simply here so we
// can use spawn_blocking without issues.
ar: Arc::new(Mutex::new(ar)), ar: Arc::new(Mutex::new(ar)),
}) })
} }
/// Add either a "desc" or "files" entry to the archive /// Set the current entry to be a new "files" list
pub async fn add_entry<P: AsRef<Path>>( pub async fn add_entry<P: AsRef<Path>>(
&self, &self,
full_name: &str, full_name: &str,
@ -75,4 +73,15 @@ impl RepoArchiveWriter {
.unwrap()?, .unwrap()?,
) )
} }
//
///// Append the given line to the currently active entry
//pub async fn write_line(&self, line: &str) -> io::Result<()> {
// let line = String::from(line);
// let (tx, rx) = oneshot::channel();
//
// self.tx.send(Message::AppendLine(tx, line)).await;
//
// rx.await.unwrap()
//}
} }

View File

@ -1,287 +1,311 @@
use super::{archive, package}; use super::package::Package;
use crate::{db, error::Result}; use libarchive::write::{Builder, WriteEntry};
use libarchive::{Entry, WriteFilter, WriteFormat};
use std::fs;
use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use futures::StreamExt; pub const ANY_ARCH: &str = "any";
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
use tokio::io::AsyncRead;
use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any"; /// Overarching abstraction that orchestrates updating the repositories stored on the server
pub struct RepoGroupManager {
pub struct MetaRepoMgr {
repo_dir: PathBuf, repo_dir: PathBuf,
conn: DbConn, pkg_dir: PathBuf,
} }
impl MetaRepoMgr { fn parse_pkg_filename(file_name: &str) -> (String, &str, &str, &str) {
pub fn new<P: AsRef<Path>>(repo_dir: P, conn: DbConn) -> Self { let name_parts = file_name.split('-').collect::<Vec<_>>();
MetaRepoMgr { let name = name_parts[..name_parts.len() - 3].join("-");
let version = name_parts[name_parts.len() - 3];
let release = name_parts[name_parts.len() - 2];
let (arch, _) = name_parts[name_parts.len() - 1].split_once('.').unwrap();
(name, version, release, arch)
}
impl RepoGroupManager {
pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(repo_dir: P1, pkg_dir: P2) -> Self {
RepoGroupManager {
repo_dir: repo_dir.as_ref().to_path_buf(), repo_dir: repo_dir.as_ref().to_path_buf(),
conn, pkg_dir: pkg_dir.as_ref().to_path_buf(),
} }
} }
/// Generate archive databases for all known architectures in the repository, including the pub fn sync(&mut self, repo: &str, arch: &str) -> io::Result<()> {
/// "any" architecture. let subrepo_path = self.repo_dir.join(repo).join(arch);
pub async fn generate_archives_all(&self, repo: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() { let mut ar_db = Builder::new();
return Ok(()); ar_db.add_filter(WriteFilter::Gzip)?;
ar_db.set_format(WriteFormat::PaxRestricted)?;
let mut ar_files = Builder::new();
ar_files.add_filter(WriteFilter::Gzip)?;
ar_files.set_format(WriteFormat::PaxRestricted)?;
let mut ar_db = ar_db.open_file(subrepo_path.join(format!("{}.db.tar.gz", repo)))?;
let mut ar_files =
ar_files.open_file(subrepo_path.join(format!("{}.files.tar.gz", repo)))?;
// All architectures should also include the "any" architecture, except for the "any"
// architecture itself.
let repo_any_dir = self.repo_dir.join(repo).join(ANY_ARCH);
let any_entries_iter = if arch != ANY_ARCH && repo_any_dir.try_exists()? {
Some(repo_any_dir.read_dir()?)
} else {
None
}
.into_iter()
.flatten();
for entry in subrepo_path.read_dir()?.chain(any_entries_iter) {
let entry = entry?;
if entry.file_type()?.is_dir() {
// The desc file needs to be added to both archives
let path_in_tar = PathBuf::from(entry.file_name()).join("desc");
let src_path = entry.path().join("desc");
let metadata = src_path.metadata()?;
let mut ar_entry = WriteEntry::new();
ar_entry.set_pathname(&path_in_tar);
// These small text files will definitely fit inside an i64
ar_entry.set_size(metadata.len().try_into().unwrap());
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_mode(0o100644);
ar_db.append_path(&mut ar_entry, &src_path)?;
ar_files.append_path(&mut ar_entry, src_path)?;
// The files file is only required in the files database
let path_in_tar = PathBuf::from(entry.file_name()).join("files");
let src_path = entry.path().join("files");
let metadata = src_path.metadata()?;
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(&path_in_tar);
ar_entry.set_mode(0o100644);
// These small text files will definitely fit inside an i64
ar_entry.set_size(metadata.len().try_into().unwrap());
ar_files.append_path(&mut ar_entry, src_path)?;
}
} }
let repo = repo.unwrap(); ar_db.close()?;
ar_files.close()?;
let mut archs = repo Ok(())
.find_related(crate::db::Package) }
.select_only()
.column(crate::db::package::Column::Arch)
.distinct()
.into_tuple::<String>()
.stream(&self.conn)
.await?;
while let Some(arch) = archs.next().await.transpose()? { /// Synchronize all present architectures' db archives in the given repository.
self.generate_archives(&repo.name, &arch).await?; pub fn sync_all(&mut self, repo: &str) -> io::Result<()> {
for entry in self.repo_dir.join(repo).read_dir()? {
let entry = entry?;
if entry.file_type()?.is_dir() {
self.sync(repo, &entry.file_name().to_string_lossy())?;
}
} }
Ok(()) Ok(())
} }
/// Generate the archive databases for the given repository and architecture. pub fn add_pkg_from_path<P: AsRef<Path>>(
pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { &mut self,
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
let parent_dir = self.repo_dir.join(&repo.name);
tokio::fs::create_dir_all(&parent_dir).await?;
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = repo
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.stream(&self.conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
ar_db
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &files_tmp_file_path, false)
.await?;
}
// Cleanup
ar_db.close().await?;
ar_files.close().await?;
// Move the db archives to their respective places
tokio::fs::rename(
tmp_ar_db_path,
parent_dir.join(format!("{}.db.tar.gz", arch)),
)
.await?;
tokio::fs::rename(
tmp_ar_files_path,
parent_dir.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
Ok(())
}
/// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo_entry) = res {
// Remove repository from database
repo_entry.delete(&self.conn).await?;
// Remove files from file system
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?;
Ok(true)
} else {
Ok(false)
}
}
/// Remove all packages from the repository with the given arch.
pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let mut pkgs = repo
.find_related(db::Package)
.filter(db::package::Column::Arch.eq(arch))
.stream(&self.conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
let path = self
.repo_dir
.join(&repo.name)
.join(super::package::filename(&pkg));
tokio::fs::remove_file(path).await?;
pkg.delete(&self.conn).await?;
}
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(format!("{}.db.tar.gz", arch)),
)
.await?;
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// If we removed all "any" packages, we need to resync all databases
if arch == ANY_ARCH {
self.generate_archives_all(&repo.name).await?;
}
Ok(true)
} else {
Ok(false)
}
}
pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let pkg =
db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?;
if let Some(pkg) = pkg {
// Remove package from database & file system
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(super::package::filename(&pkg)),
)
.await?;
pkg.delete(&self.conn).await?;
if arch == ANY_ARCH {
self.generate_archives_all(&repo.name).await?;
} else {
self.generate_archives(&repo.name, arch).await?;
}
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
&self,
reader: &mut R,
repo: &str, repo: &str,
) -> crate::Result<(String, String, String)> { path: P,
// Copy file contents to temporary path so libarchive can work with it ) -> io::Result<Package> {
let [path] = self.random_file_paths(); let pkg = Package::open(&path)?;
let mut temp_file = tokio::fs::File::create(&path).await?;
tokio::io::copy(reader, &mut temp_file).await?; self.add_pkg(repo, &pkg)?;
// Parse the package // After successfully adding the package, we move it to the packages directory
let path_clone = path.clone(); let dest_pkg_path = self
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .pkg_dir
.await .join(repo)
.unwrap()?; .join(&pkg.info.arch)
.join(pkg.file_name());
// Query the repo for its ID, or create it if it does not already exist fs::create_dir_all(dest_pkg_path.parent().unwrap())?;
let res = db::query::repo::by_name(&self.conn, &repo).await?; fs::rename(&path, dest_pkg_path)?;
let repo_id = if let Some(repo_entity) = res { Ok(pkg)
repo_entity.id
} else {
db::query::repo::insert(&self.conn, repo, None).await?.id
};
// If the package already exists in the database, we remove it first
let res = db::query::package::by_fields(
&self.conn,
repo_id,
&pkg.info.arch,
&pkg.info.name,
None,
None,
)
.await?;
if let Some(entry) = res {
entry.delete(&self.conn).await?;
} }
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name()); /// Add a package to the given repo, returning to what architectures the package was added.
pub fn add_pkg(&mut self, repo: &str, pkg: &Package) -> io::Result<()> {
// TODO
// * if arch is "any", check if package doesn't already exist for other architecture
// * if arch isn't "any", check if package doesn't already exist for "any" architecture
// Insert new package into database // We first remove any existing version of the package
let name = pkg.info.name.clone(); self.remove_pkg(repo, &pkg.info.arch, &pkg.info.name, false)?;
let version = pkg.info.version.clone();
let arch = pkg.info.arch.clone();
db::query::package::insert(&self.conn, repo_id, pkg).await?;
// Move the package to its final resting place // Write the `desc` and `files` metadata files to disk
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?; let metadata_dir = self
tokio::fs::rename(path, dest_pkg_path).await?; .repo_dir
.join(repo)
.join(&pkg.info.arch)
.join(format!("{}-{}", pkg.info.name, pkg.info.version));
// Synchronize archive databases fs::create_dir_all(&metadata_dir)?;
let mut desc_file = fs::File::create(metadata_dir.join("desc"))?;
pkg.write_desc(&mut desc_file)?;
let mut files_file = fs::File::create(metadata_dir.join("files"))?;
pkg.write_files(&mut files_file)?;
// If a package of type "any" is added, we need to update every existing database
if pkg.info.arch == ANY_ARCH {
self.sync_all(repo)?;
} else {
self.sync(repo, &pkg.info.arch)?;
}
Ok(())
}
pub fn remove_repo(&mut self, repo: &str) -> io::Result<bool> {
let repo_dir = self.repo_dir.join(repo);
if !repo_dir.exists() {
Ok(false)
} else {
fs::remove_dir_all(&repo_dir)?;
fs::remove_dir_all(self.pkg_dir.join(repo))?;
Ok(true)
}
}
pub fn remove_repo_arch(&mut self, repo: &str, arch: &str) -> io::Result<bool> {
let sub_path = PathBuf::from(repo).join(arch);
let repo_dir = self.repo_dir.join(&sub_path);
if !repo_dir.exists() {
return Ok(false);
}
fs::remove_dir_all(&repo_dir)?;
fs::remove_dir_all(self.pkg_dir.join(sub_path))?;
// Removing the "any" architecture updates all other repositories
if arch == ANY_ARCH { if arch == ANY_ARCH {
self.generate_archives_all(repo).await?; self.sync_all(repo)?;
}
Ok(true)
}
pub fn remove_pkg(
&mut self,
repo: &str,
arch: &str,
pkg_name: &str,
sync: bool,
) -> io::Result<bool> {
let repo_arch_dir = self.repo_dir.join(repo).join(arch);
if !repo_arch_dir.exists() {
return Ok(false);
}
for entry in repo_arch_dir.read_dir()? {
let entry = entry?;
// Make sure we skip the archive files
if !entry.metadata()?.is_dir() {
continue;
}
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
// The directory name should only contain the name of the package. The last two parts
// when splitting on a dash are the pkgver and pkgrel, so we trim those
let name_parts = file_name.split('-').collect::<Vec<_>>();
let name = name_parts[..name_parts.len() - 2].join("-");
if name == pkg_name {
fs::remove_dir_all(entry.path())?;
// Also remove the old package archive
let repo_arch_pkg_dir = self.pkg_dir.join(repo).join(arch);
repo_arch_pkg_dir.read_dir()?.try_for_each(|res| {
res.and_then(|entry: fs::DirEntry| {
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let (name, _, _, _) = parse_pkg_filename(&file_name);
if name == pkg_name {
fs::remove_file(entry.path())
} else { } else {
self.generate_archives(repo, &arch).await?; Ok(())
} }
Ok((name, version, arch))
}
/// Generate a path to a unique file that can be used as a temporary file
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.repo_dir.join(uuid.to_string())
}) })
})?;
if sync {
if arch == ANY_ARCH {
self.sync_all(repo)?;
} else {
self.sync(repo, arch)?;
}
}
return Ok(true);
}
}
Ok(false)
}
/// Wrapper around `remove_pkg` that accepts a path relative to the package directory to a
/// package archive.
pub fn remove_pkg_from_path<P: AsRef<Path>>(
&mut self,
path: P,
sync: bool,
) -> io::Result<Option<(String, String, String, String)>> {
let path = path.as_ref();
let components: Vec<_> = path.iter().collect();
if let [repo, _arch, file_name] = components[..] {
let full_path = self.pkg_dir.join(path);
if full_path.try_exists()? {
let file_name = file_name.to_string_lossy();
let (name, version, release, arch) = parse_pkg_filename(&file_name);
let metadata_dir_name = format!("{}-{}-{}", name, version, release);
// Remove package archive and entry in database
fs::remove_file(full_path)?;
fs::remove_dir_all(self.repo_dir.join(repo).join(arch).join(metadata_dir_name))?;
if sync {
if arch == ANY_ARCH {
self.sync_all(&repo.to_string_lossy())?;
} else {
self.sync(&repo.to_string_lossy(), arch)?;
}
}
Ok(Some((
name,
version.to_string(),
release.to_string(),
arch.to_string(),
)))
} else {
Ok(None)
}
} else {
Ok(None)
}
} }
} }

View File

@ -0,0 +1,283 @@
use super::{archive, package};
use crate::{db, error::Result};
use std::path::{Path, PathBuf};
use futures::StreamExt;
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
use tokio::io::AsyncRead;
use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any";
pub struct MetaRepoMgr {
repo_dir: PathBuf,
}
impl MetaRepoMgr {
pub fn new<P: AsRef<Path>>(repo_dir: P) -> Self {
MetaRepoMgr {
repo_dir: repo_dir.as_ref().to_path_buf(),
}
}
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
let mut archs = repo
.find_related(crate::db::Package)
.select_only()
.column(crate::db::package::Column::Arch)
.distinct()
.into_tuple::<String>()
.stream(conn)
.await?;
while let Some(arch) = archs.next().await.transpose()? {
self.generate_archives(conn, &repo.name, &arch).await?;
}
Ok(())
}
/// Generate the archive databases for the given repository and architecture.
pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
let parent_dir = self.repo_dir.join(&repo.name);
tokio::fs::create_dir_all(&parent_dir).await?;
let ar_db =
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch)))
.await?;
let ar_files =
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch)))
.await?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = repo
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.stream(conn)
.await?;
// Create two temp file paths to write our entries to
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let files_tmp_file_path = self.repo_dir.join(uuid.to_string());
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let desc_tmp_file_path = self.repo_dir.join(uuid.to_string());
while let Some(pkg) = pkgs.next().await.transpose()? {
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
package::write_files(conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(conn, &mut desc_tmp_file, &pkg).await?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
ar_db
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &files_tmp_file_path, false)
.await?;
}
// Cleanup
ar_db.close().await?;
ar_files.close().await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
Ok(())
}
/// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(conn, repo).await?;
if let Some(repo_entry) = res {
// Remove repository from database
repo_entry.delete(conn).await?;
// Remove files from file system
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?;
Ok(true)
} else {
Ok(false)
}
}
/// Remove all packages from the repository with the given arch.
pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(conn, repo).await?;
if let Some(repo) = repo {
let mut pkgs = repo
.find_related(db::Package)
.filter(db::package::Column::Arch.eq(arch))
.stream(conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
let path = self
.repo_dir
.join(&repo.name)
.join(super::package::filename(&pkg));
tokio::fs::remove_file(path).await?;
pkg.delete(conn).await?;
}
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(format!("{}.db.tar.gz", arch)),
)
.await?;
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// If we removed all "any" packages, we need to resync all databases
if arch == ANY_ARCH {
self.generate_archives_all(conn, &repo.name).await?;
}
Ok(true)
} else {
Ok(false)
}
}
pub async fn remove_pkg(
&self,
conn: &DbConn,
repo: &str,
arch: &str,
name: &str,
) -> Result<bool> {
let repo = db::query::repo::by_name(conn, repo).await?;
if let Some(repo) = repo {
let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?;
if let Some(pkg) = pkg {
// Remove package from database & file system
tokio::fs::remove_file(
self.repo_dir
.join(&repo.name)
.join(super::package::filename(&pkg)),
)
.await?;
pkg.delete(conn).await?;
if arch == ANY_ARCH {
self.generate_archives_all(conn, &repo.name).await?;
} else {
self.generate_archives(conn, &repo.name, arch).await?;
}
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
&self,
conn: &DbConn,
reader: &mut R,
repo: &str,
) -> crate::Result<(String, String, String)> {
// Copy file contents to temporary path so libarchive can work with it
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = self.repo_dir.join(uuid.to_string());
let mut temp_file = tokio::fs::File::create(&path).await?;
tokio::io::copy(reader, &mut temp_file).await?;
// Parse the package
let path_clone = path.clone();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
// Query the repo for its ID, or create it if it does not already exist
let res = db::query::repo::by_name(conn, &repo).await?;
let repo_id = if let Some(repo_entity) = res {
repo_entity.id
} else {
db::query::repo::insert(conn, repo, None)
.await?
.last_insert_id
};
// If the package already exists in the database, we remove it first
let res = db::query::package::by_fields(
conn,
repo_id,
&pkg.info.arch,
&pkg.info.name,
None,
None,
)
.await?;
if let Some(entry) = res {
entry.delete(conn).await?;
}
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name());
// Insert new package into database
let name = pkg.info.name.clone();
let version = pkg.info.version.clone();
let arch = pkg.info.arch.clone();
db::query::package::insert(conn, repo_id, pkg).await?;
// Move the package to its final resting place
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
tokio::fs::rename(path, dest_pkg_path).await?;
// Synchronize archive databases
if arch == ANY_ARCH {
self.generate_archives_all(conn, repo).await?;
} else {
self.generate_archives(conn, repo, &arch).await?;
}
Ok((name, version, arch))
}
}

View File

@ -1,8 +1,9 @@
mod archive; mod archive;
mod manager; mod manager;
mod manager_new;
pub mod package; pub mod package;
pub use manager::MetaRepoMgr; pub use manager_new::MetaRepoMgr;
use axum::{ use axum::{
body::Body, body::Body,
@ -73,16 +74,10 @@ async fn post_package_archive(
.repo_manager .repo_manager
.write() .write()
.await .await
.add_pkg_from_reader(&mut body, &repo) .add_pkg_from_reader(&global.db, &mut body, &repo)
.await?; .await?;
tracing::info!( tracing::info!("Added '{}-{}' to repository '{}' ({})", name, version, repo, arch);
"Added '{}-{}' to repository '{}' ({})",
name,
version,
repo,
arch
);
Ok(()) Ok(())
} }
@ -91,7 +86,12 @@ async fn delete_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path(repo): Path<String>, Path(repo): Path<String>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
let repo_removed = global.repo_manager.write().await.remove_repo(&repo).await?; let repo_removed = global
.repo_manager
.write()
.await
.remove_repo(&global.db, &repo)
.await?;
if repo_removed { if repo_removed {
tracing::info!("Removed repository '{}'", repo); tracing::info!("Removed repository '{}'", repo);
@ -110,7 +110,7 @@ async fn delete_arch_repo(
.repo_manager .repo_manager
.write() .write()
.await .await
.remove_repo_arch(&repo, &arch) .remove_repo_arch(&global.db, &repo, &arch)
.await?; .await?;
if repo_removed { if repo_removed {
@ -130,7 +130,7 @@ async fn delete_package(
.repo_manager .repo_manager
.write() .write()
.await .await
.remove_pkg(&repo, &arch, &pkg_name) .remove_pkg(&global.db, &repo, &arch, &pkg_name)
.await?; .await?;
if pkg_removed { if pkg_removed {