chore: remove old manager code once again

concurrent-repos
Jef Roosens 2024-06-18 10:47:35 +02:00
parent cc8848d3ae
commit 730ae009b0
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
6 changed files with 241 additions and 744 deletions

View File

@ -1,4 +1,4 @@
use crate::{distro::MetaDistroMgr, Config, FsConfig, Global}; use crate::{Config, FsConfig, Global};
use std::{io, path::PathBuf, sync::Arc}; use std::{io, path::PathBuf, sync::Arc};
@ -6,7 +6,6 @@ use axum::Router;
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait; use sea_orm_migration::MigratorTrait;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser)] #[derive(Parser)]

View File

@ -1,70 +0,0 @@
use crate::{db, DistroMgr};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use sea_orm::{DbConn, EntityTrait};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct MetaDistroMgr {
distro_dir: PathBuf,
conn: DbConn,
distros: Arc<Mutex<HashMap<String, Arc<DistroMgr>>>>,
}
impl MetaDistroMgr {
pub async fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&distro_dir).await? {
tokio::fs::create_dir(&distro_dir).await?;
}
let distro_dir = distro_dir.as_ref().to_path_buf();
let mut map: HashMap<String, Arc<DistroMgr>> = HashMap::new();
let distros = db::Distro::find().all(&conn).await?;
for distro in distros {
let mgr =
DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?;
map.insert(distro.name, Arc::new(mgr));
}
Ok(Self {
distro_dir,
conn,
distros: Arc::new(Mutex::new(map)),
})
}
pub async fn get_mgr(&self, distro: &str) -> Option<Arc<DistroMgr>> {
let map = self.distros.lock().await;
map.get(distro).map(|mgr| Arc::clone(mgr))
}
pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<DistroMgr>> {
let mut map = self.distros.lock().await;
if let Some(mgr) = map.get(distro) {
Ok(Arc::clone(mgr))
} else {
let distro = db::query::distro::insert(&self.conn, distro, None).await?;
let mgr = Arc::new(
DistroMgr::new(
self.distro_dir.join(&distro.name),
distro.id,
self.conn.clone(),
)
.await?,
);
map.insert(distro.name, Arc::clone(&mgr));
Ok(mgr)
}
}
}

View File

@ -2,16 +2,15 @@ mod api;
mod cli; mod cli;
mod config; mod config;
pub mod db; pub mod db;
mod distro;
mod error; mod error;
mod repo; mod repo;
pub use config::{Config, DbConfig, FsConfig}; pub use config::{Config, DbConfig, FsConfig};
pub use error::{Result, ServerError}; pub use error::{Result, ServerError};
use repo::DistroMgr;
use std::sync::Arc;
use clap::Parser; use clap::Parser;
use std::{path::PathBuf, sync::Arc};
pub const ANY_ARCH: &'static str = "any"; pub const ANY_ARCH: &'static str = "any";

View File

@ -1,133 +1,107 @@
use super::{archive, package}; use super::{archive, package};
use crate::{db, error::Result}; use crate::db::{self, query::package::delete_stale_pkgs};
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicBool, AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
Arc, Arc,
}, },
}; };
use futures::StreamExt; use futures::StreamExt;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter, QuerySelect, ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
Related, RelationTrait, Set, TransactionTrait, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
}; };
use sea_query::{Expr, Query}; use sea_query::{Alias, Expr, Query};
use tokio::{ use tokio::sync::{
io::AsyncRead, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
sync::{Mutex, Notify, RwLock, Semaphore}, Mutex, RwLock,
}; };
use uuid::Uuid; use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any"; struct PkgQueueMsg {
pub const REPOS_DIR: &'static str = "repos"; repo: i32,
path: PathBuf,
#[derive(Default)]
pub struct RepoState {
queued_pkgs: AtomicU32,
sync_queued: AtomicBool,
sync_notify: Notify,
} }
pub struct DistroMgr { /// A single instance of this struct orchestrates everything related to managing packages files on
distro_dir: PathBuf, /// disk for all repositories in the server
distro_id: i32, pub struct RepoMgr {
repos_dir: PathBuf,
conn: DbConn, conn: DbConn,
repos: RwLock<HashMap<i32, Arc<RepoState>>>, pkg_queue: (
sync_lock: Mutex<()>, UnboundedSender<PkgQueueMsg>,
pkg_sema: Semaphore, Mutex<UnboundedReceiver<PkgQueueMsg>>,
),
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
} }
impl DistroMgr { impl RepoMgr {
pub async fn new<P: AsRef<Path>>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result<Self> { pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&distro_dir).await? {
tokio::fs::create_dir(&distro_dir).await?;
}
let repos_dir = distro_dir.as_ref().join(REPOS_DIR);
if !tokio::fs::try_exists(&repos_dir).await? { if !tokio::fs::try_exists(&repos_dir).await? {
tokio::fs::create_dir(repos_dir).await?; tokio::fs::create_dir(&repos_dir).await?;
}
let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn)
.await?;
for id in repo_ids {
repos.insert(id, Default::default());
} }
Ok(Self { Ok(Self {
distro_dir: distro_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
distro_id,
conn, conn,
sync_lock: Mutex::new(()), pkg_queue: (tx, Mutex::new(rx)),
pkg_sema: Semaphore::new(1), repos: RwLock::new(repos),
repos: RwLock::new(HashMap::new()),
}) })
} }
pub async fn schedule_sync(&self, repo_id: i32) -> Result<()> {
let state = {
let repos = self.repos.read().await;
repos.get(&repo_id).map(Arc::clone)
};
if state.is_none() {
tracing::debug!("is none");
return Ok(());
}
let state = state.unwrap();
let res =
state
.sync_queued
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
// Already a sync job scheduled, so this one can simply quit
if res.is_err() {
tracing::debug!("shit");
return Ok(());
}
// If the queue is not empty, we wait for a notification that it is before syncing
if state.queued_pkgs.load(Ordering::SeqCst) > 0 {
tracing::debug!("sync waiter waiting");
state.sync_notify.notified().await;
tracing::debug!("sync waiter notified");
}
self.sync_repo(repo_id).await
}
/// Generate archive databases for all known architectures in the repository, including the /// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture. /// "any" architecture.
pub async fn sync_repo(&self, repo_id: i32) -> Result<()> { pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let _guard = self.sync_lock.lock().await; let lock = self
.repos
.read()
.await
.get(&repo)
.map(|(_, lock)| Arc::clone(lock));
let repo = crate::db::query::repo::by_id(&self.conn, repo_id).await?; if lock.is_none() {
if repo.is_none() {
return Ok(()); return Ok(());
} }
let repo = repo.unwrap(); let lock = lock.unwrap();
let _guard = lock.lock().await;
let mut archs = repo let archs: Vec<String> = db::Package::find()
.find_related(db::Package) .filter(db::package::Column::RepoId.eq(repo))
.select_only() .select_only()
.column(db::package::Column::Arch) .column(db::package::Column::Arch)
.distinct() .distinct()
.into_tuple::<String>() .into_tuple()
.stream(&self.conn) .all(&self.conn)
.await?; .await?;
while let Some(arch) = archs.next().await.transpose()? { for arch in archs {
self.generate_archives(&repo, &arch).await?; self.generate_archives(repo, &arch).await?;
} }
Ok(()) Ok(())
} }
/// Generate the archive databases for the given repository and architecture. /// Generate the archive databases for the given repository and architecture.
async fn generate_archives(&self, repo: &db::repo::Model, arch: &str) -> Result<()> { async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths(); self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
@ -135,18 +109,7 @@ impl DistroMgr {
// Query all packages in the repo that have the given architecture or the "any" // Query all packages in the repo that have the given architecture or the "any"
// architecture // architecture
let mut pkgs = repo let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.filter(
db::package::Column::Id.in_subquery(
Query::select()
.expr(db::package::Column::Id.max())
.from(db::package::Entity)
.group_by_columns([db::package::Column::Arch, db::package::Column::Name])
.to_owned(),
),
)
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
@ -178,7 +141,7 @@ impl DistroMgr {
ar_db.close().await?; ar_db.close().await?;
ar_files.close().await?; ar_files.close().await?;
let repo_dir = self.distro_dir.join(&repo.name); let repo_dir = self.repos_dir.join(repo.to_string());
// Move the db archives to their respective places // Move the db archives to their respective places
tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
@ -205,225 +168,218 @@ impl DistroMgr {
let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!( tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
"Package archives generated for '{}' ('{}')",
&repo.name,
arch
);
Ok(()) Ok(())
} }
async fn get_or_create_repo(&self, repo: &str) -> Result<db::repo::Model> { /// Clean any remaining old package files from the database and file system
let mut repos = self.repos.write().await; pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? {
Ok(repo)
} else {
tokio::fs::create_dir(self.distro_dir.join(repo)).await?;
let repo = db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?;
repos.insert(repo.id, Arc::new(RepoState::default()));
Ok(repo)
}
}
/// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo_entry) = res {
// Remove repository from database
repo_entry.delete(&self.conn).await?;
// Remove files from file system
tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?;
Ok(true)
} else {
Ok(false)
}
}
/// Remove all packages from the repository with the given arch.
pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let mut pkgs = repo
.find_related(db::Package)
.filter(db::package::Column::Arch.eq(arch))
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
let path = self // Failing to remove the package file isn't the biggest problem
.distro_dir let _ = tokio::fs::remove_file(
.join(&repo.name) self.repos_dir
.join(super::package::filename(&pkg)); .join(pkg.repo_id.to_string())
tokio::fs::remove_file(path).await?; .join(pkg.id.to_string()),
)
.await;
pkg.delete(&self.conn).await?; if pkg.id > max_id {
max_id = pkg.id;
} }
tokio::fs::remove_file( removed_pkgs += 1;
self.distro_dir }
.join(&repo.name)
.join(format!("{}.db.tar.gz", arch)), if removed_pkgs > 0 {
) db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
.await?; }
tokio::fs::remove_file(
self.distro_dir tracing::info!("Removed {removed_pkgs} stale package(s)");
.join(&repo.name)
.join(format!("{}.files.tar.gz", arch)), Ok(())
}
pub async fn pkg_parse_task(&self) {
loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
// as soon as a message is received, so another worker can pick up the mutex.
let msg = {
let mut recv = self.pkg_queue.1.lock().await;
recv.recv().await
};
if let Some(msg) = msg {
// TODO better handle this error (retry if failure wasn't because the package is
// faulty)
let _ = self
.add_pkg_from_path(msg.path, msg.repo)
.await
.inspect_err(|e| tracing::error!("{:?}", e));
let old = self
.repos
.read()
.await
.get(&msg.repo)
.map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
// Every time the queue for a repo becomes empty, we run a sync job
if old == Some(1) {
// TODO error handling
let _ = self.sync_repo(msg.repo).await;
// TODO move this so that we only clean if entire queue is empty, not just
// queue for specific repo
let _ = self.remove_stale_pkgs().await;
}
}
}
}
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
self.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
}
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
Ok(db::Repo::find()
.find_also_related(db::Distro)
.filter(
Condition::all()
.add(db::repo::Column::Name.eq(repo))
.add(db::distro::Column::Name.eq(distro)),
) )
.one(&self.conn)
.await
.map(|res| res.map(|(repo, _)| repo.id))?)
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let mut repos = self.repos.write().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.conn)
.await?; .await?;
// If we removed all "any" packages, we need to resync all databases let distro_id = if let Some(id) = distro_id {
if arch == ANY_ARCH { id
//self.sync_repo(&repo.name).await?;
}
Ok(true)
} else { } else {
Ok(false) let new_distro = db::distro::ActiveModel {
} id: NotSet,
} name: Set(distro.to_string()),
description: NotSet,
};
pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> { new_distro.insert(&self.conn).await?.id
let repo = db::query::repo::by_name(&self.conn, repo).await?; };
if let Some(repo) = repo { let repo_id: Option<i32> = db::Repo::find()
let pkg = .filter(db::repo::Column::DistroId.eq(distro_id))
db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; .filter(db::repo::Column::Name.eq(repo))
.select_only()
if let Some(pkg) = pkg { .column(db::repo::Column::Id)
// Remove package from database & file system .into_tuple()
tokio::fs::remove_file( .one(&self.conn)
self.distro_dir
.join(&repo.name)
.join(super::package::filename(&pkg)),
)
.await?; .await?;
pkg.delete(&self.conn).await?;
//if arch == ANY_ARCH { let repo_id = if let Some(id) = repo_id {
// self.sync_repo(&repo.name).await?; id
//} else {
// self.generate_archives(&repo.name, arch).await?;
//}
Ok(true)
} else { } else {
Ok(false) let new_repo = db::repo::ActiveModel {
} id: NotSet,
} else { distro_id: Set(distro_id),
Ok(false) name: Set(repo.to_string()),
} description: NotSet,
};
let id = new_repo.insert(&self.conn).await?.id;
tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
repos.insert(id, Default::default());
id
};
Ok(repo_id)
} }
async fn _add_pkg_from_path<P: AsRef<Path>>( async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
&self,
path: P,
repo: &db::repo::Model,
) -> crate::Result<db::package::Model> {
let path_clone = path.as_ref().to_path_buf(); let path_clone = path.as_ref().to_path_buf();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await .await
.unwrap()?; .unwrap()?;
// TODO prevent database from being updated but file failing to move to repo dir? // TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?; let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
let queue_path = self.distro_dir.join(&repo.name).join(pkg.id.to_string()); let dest_path = self
tokio::fs::rename(path.as_ref(), queue_path).await?; .repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
tokio::fs::rename(path.as_ref(), dest_path).await?;
tracing::info!( tracing::info!(
"Added '{}-{}' to repository '{}' ({})", "Added '{}-{}-{}' to repository {}",
pkg.name, pkg.name,
pkg.version, pkg.version,
repo.name, pkg.arch,
pkg.arch repo,
); );
Ok(pkg) Ok(())
} }
pub async fn add_pkg_from_path<P: AsRef<Path>>( pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
&self, self.repos.write().await.remove(&repo);
path: P, db::Repo::delete_by_id(repo).exec(&self.conn).await?;
repo: &str, let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
) -> crate::Result<(i32, String, String, String)> {
let repo = self.get_or_create_repo(repo).await?;
{ Ok(())
let repos = self.repos.read().await;
if let Some(state) = repos.get(&repo.id) {
state.queued_pkgs.fetch_add(1, Ordering::SeqCst);
}
} }
let _guard = self.pkg_sema.acquire().await.unwrap(); /// Remove all packages in the repository that have a given arch. This method marks all
let res = self._add_pkg_from_path(path, &repo).await; /// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::PendingDeletion),
)
.filter(
Condition::all()
.add(db::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)),
)
.exec(&self.conn)
.await?;
match res { self.sync_repo(repo).await?;
Ok(pkg) => { self.remove_stale_pkgs().await?;
let repos = self.repos.read().await;
if let Some(state) = repos.get(&repo.id) { Ok(())
let old = state.queued_pkgs.fetch_sub(1, Ordering::SeqCst);
if old - 1 == 0 {
state.sync_notify.notify_one();
}
} }
Ok((repo.id, pkg.name, pkg.version, pkg.arch))
}
Err(e) => Err(e),
}
// If the package already exists in the database, we remove it first
//let res = db::query::package::by_fields(
// &self.conn,
// repo.id,
// &pkg.info.arch,
// &pkg.info.name,
// None,
// None,
//)
//.await?;
//
//if let Some(entry) = res {
// entry.delete(&self.conn).await?;
//}
//let dest_pkg_path = repo_dir.join(pkg.file_name());
//
//// Insert new package into database
//let name = pkg.info.name.clone();
//let version = pkg.info.version.clone();
//let arch = pkg.info.arch.clone();
//db::query::package::insert(&self.conn, repo.id, pkg).await?;
//
//// Move the package to its final resting place
//tokio::fs::rename(tmp_file_path, dest_pkg_path).await?;
//
// Synchronize archive databases
//if arch == ANY_ARCH {
// self.generate_archives_all(&repo.name).await?;
//} else {
// self.generate_archives(&repo.name, &arch).await?;
//}
}
/// Generate a path to a unique file that can be used as a temporary file
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] { pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| { std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.distro_dir.join(uuid.to_string()) self.repos_dir.join(uuid.to_string())
}) })
} }
} }

View File

@ -1,385 +0,0 @@
use super::{archive, package};
use crate::db::{self, query::package::delete_stale_pkgs};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use futures::StreamExt;
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use sea_query::{Alias, Expr, Query};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex, RwLock,
};
use uuid::Uuid;
struct PkgQueueMsg {
repo: i32,
path: PathBuf,
}
/// A single instance of this struct orchestrates everything related to managing packages files on
/// disk for all repositories in the server
pub struct RepoMgr {
repos_dir: PathBuf,
conn: DbConn,
pkg_queue: (
UnboundedSender<PkgQueueMsg>,
Mutex<UnboundedReceiver<PkgQueueMsg>>,
),
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoMgr {
pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&repos_dir).await? {
tokio::fs::create_dir(&repos_dir).await?;
}
let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn)
.await?;
for id in repo_ids {
repos.insert(id, Default::default());
}
Ok(Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
pkg_queue: (tx, Mutex::new(rx)),
repos: RwLock::new(repos),
})
}
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let lock = self
.repos
.read()
.await
.get(&repo)
.map(|(_, lock)| Arc::clone(lock));
if lock.is_none() {
return Ok(());
}
let lock = lock.unwrap();
let _guard = lock.lock().await;
let archs: Vec<String> = db::Package::find()
.filter(db::package::Column::RepoId.eq(repo))
.select_only()
.column(db::package::Column::Arch)
.distinct()
.into_tuple()
.all(&self.conn)
.await?;
for arch in archs {
self.generate_archives(repo, &arch).await?;
}
Ok(())
}
/// Generate the archive databases for the given repository and architecture.
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
.stream(&self.conn)
.await?;
let mut commited_ids: Vec<i32> = Vec::new();
while let Some(pkg) = pkgs.next().await.transpose()? {
commited_ids.push(pkg.id);
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
ar_db
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &files_tmp_file_path, false)
.await?;
}
// Cleanup
ar_db.close().await?;
ar_files.close().await?;
let repo_dir = self.repos_dir.join(repo.to_string());
// Move the db archives to their respective places
tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
tokio::fs::rename(
tmp_ar_files_path,
repo_dir.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// Only after we have successfully written everything to disk do we update the database.
// This order ensures any failure can be recovered, as the database is our single source of
// truth.
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(commited_ids))
.exec(&self.conn)
.await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
Ok(())
}
/// Clean any remaining old package files from the database and file system
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
.stream(&self.conn)
.await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = pkgs.next().await.transpose()? {
// Failing to remove the package file isn't the biggest problem
let _ = tokio::fs::remove_file(
self.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
)
.await;
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
}
if removed_pkgs > 0 {
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
}
tracing::info!("Removed {removed_pkgs} stale package(s)");
Ok(())
}
pub async fn pkg_parse_task(&self) {
loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
// as soon as a message is received, so another worker can pick up the mutex.
let msg = {
let mut recv = self.pkg_queue.1.lock().await;
recv.recv().await
};
if let Some(msg) = msg {
// TODO better handle this error (retry if failure wasn't because the package is
// faulty)
let _ = self
.add_pkg_from_path(msg.path, msg.repo)
.await
.inspect_err(|e| tracing::error!("{:?}", e));
let old = self
.repos
.read()
.await
.get(&msg.repo)
.map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
// Every time the queue for a repo becomes empty, we run a sync job
if old == Some(1) {
// TODO error handling
let _ = self.sync_repo(msg.repo).await;
// TODO move this so that we only clean if entire queue is empty, not just
// queue for specific repo
let _ = self.remove_stale_pkgs().await;
}
}
}
}
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
self.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
}
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
Ok(db::Repo::find()
.find_also_related(db::Distro)
.filter(
Condition::all()
.add(db::repo::Column::Name.eq(repo))
.add(db::distro::Column::Name.eq(distro)),
)
.one(&self.conn)
.await
.map(|res| res.map(|(repo, _)| repo.id))?)
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let mut repos = self.repos.write().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.conn)
.await?;
let distro_id = if let Some(id) = distro_id {
id
} else {
let new_distro = db::distro::ActiveModel {
id: NotSet,
name: Set(distro.to_string()),
description: NotSet,
};
new_distro.insert(&self.conn).await?.id
};
let repo_id: Option<i32> = db::Repo::find()
.filter(db::repo::Column::DistroId.eq(distro_id))
.filter(db::repo::Column::Name.eq(repo))
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.one(&self.conn)
.await?;
let repo_id = if let Some(id) = repo_id {
id
} else {
let new_repo = db::repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(repo.to_string()),
description: NotSet,
};
let id = new_repo.insert(&self.conn).await?.id;
tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
repos.insert(id, Default::default());
id
};
Ok(repo_id)
}
async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
let path_clone = path.as_ref().to_path_buf();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
// TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
let dest_path = self
.repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
tokio::fs::rename(path.as_ref(), dest_path).await?;
tracing::info!(
"Added '{}-{}-{}' to repository {}",
pkg.name,
pkg.version,
pkg.arch,
repo,
);
Ok(())
}
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
self.repos.write().await.remove(&repo);
db::Repo::delete_by_id(repo).exec(&self.conn).await?;
let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
Ok(())
}
/// Remove all packages in the repository that have a given arch. This method marks all
/// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::PendingDeletion),
)
.filter(
Condition::all()
.add(db::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)),
)
.exec(&self.conn)
.await?;
self.sync_repo(repo).await?;
self.remove_stale_pkgs().await?;
Ok(())
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.repos_dir.join(uuid.to_string())
})
}
}

View File

@ -1,10 +1,8 @@
mod archive; mod archive;
mod manager; mod manager;
mod manager2;
pub mod package; pub mod package;
pub use manager::DistroMgr; pub use manager::RepoMgr;
pub use manager2::RepoMgr;
use crate::FsConfig; use crate::FsConfig;