diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index 3182817..4bf2378 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -2,9 +2,12 @@ use super::{archive, package}; use crate::{db, error::Result}; use std::{ - path::{Path, PathBuf}, - sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}}, collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, }; use futures::StreamExt; @@ -15,7 +18,7 @@ use sea_orm::{ use sea_query::{Expr, Query}; use tokio::{ io::AsyncRead, - sync::{Mutex, Semaphore, RwLock, Notify}, + sync::{Mutex, Notify, RwLock, Semaphore}, }; use uuid::Uuid; @@ -66,15 +69,17 @@ impl DistroMgr { repos.get(&repo_id).map(Arc::clone) }; - if state.is_none() { - tracing::debug!("is none") + tracing::debug!("is none"); return Ok(()); } let state = state.unwrap(); - let res = state.sync_queued.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst); + let res = + state + .sync_queued + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst); // Already a sync job scheduled, so this one can simply quit if res.is_err() { @@ -376,7 +381,7 @@ impl DistroMgr { } Ok((repo.id, pkg.name, pkg.version, pkg.arch)) - }, + } Err(e) => Err(e), } diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs new file mode 100644 index 0000000..d1e7d23 --- /dev/null +++ b/server/src/repo/manager2.rs @@ -0,0 +1,148 @@ +use super::{archive, package}; +use crate::db; + +use std::path::{Path, PathBuf}; + +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, + QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Mutex, +}; + +struct PkgQueueMsg { + repo: i32, + path: PathBuf, +} + +/// A single instance of this struct orchestrates everything related to managing packages files on +/// disk for all repositories in the server +pub struct RepoMgr { + repos_dir: PathBuf, + conn: DbConn, + pkg_queue: ( + UnboundedSender, + Mutex>, + ), + repos_lock: Mutex<()>, +} + +impl RepoMgr { + pub async fn new>(repos_dir: P, conn: DbConn) -> crate::Result { + if !tokio::fs::try_exists(&repos_dir).await? { + tokio::fs::create_dir(&repos_dir).await?; + } + + let (tx, rx) = unbounded_channel(); + + Ok(Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + pkg_queue: (tx, Mutex::new(rx)), + repos_lock: Mutex::new(()), + }) + } + + pub async fn pkg_parse_task(&self) { + loop { + // Receive the next message and immediately drop the mutex afterwards. As long as the + // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked + // as soon as a message is received, so another worker can pick up the mutex. + let mut recv = self.pkg_queue.1.lock().await; + let msg = recv.recv().await; + drop(recv); + + if let Some(msg) = msg { + // TODO better handle this error (retry if failure wasn't because the package is + // faulty) + let _ = self + .add_pkg_from_path(msg.path, msg.repo) + .await + .inspect_err(|e| tracing::error!("{:?}", e)); + } + } + } + + pub fn queue_pkg(&self, repo: i32, path: PathBuf) { + let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); + } + + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { + let _guard = self.repos_lock.lock().await; + + let distro_id: Option = db::Distro::find() + .filter(db::distro::Column::Name.eq(distro)) + .select_only() + .column(db::distro::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let distro_id = if let Some(id) = distro_id { + id + } else { + let new_distro = db::distro::ActiveModel { + id: NotSet, + name: Set(distro.to_string()), + description: NotSet, + }; + + new_distro.insert(&self.conn).await?.id + }; + + let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::Name.eq(repo)) + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let repo_id = if let Some(id) = repo_id { + id + } else { + let new_repo = db::repo::ActiveModel { + id: NotSet, + distro_id: Set(distro_id), + name: Set(repo.to_string()), + description: NotSet, + }; + + new_repo.insert(&self.conn).await?.id + }; + + Ok(repo_id) + } + + async fn add_pkg_from_path>( + &self, + path: P, + repo: i32, + ) -> crate::Result<()> { + let path_clone = path.as_ref().to_path_buf(); + let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) + .await + .unwrap()?; + + // TODO prevent database from being updated but file failing to move to repo dir? + let pkg = db::query::package::insert(&self.conn, repo, pkg).await?; + + let dest_path = self + .repos_dir + .join(repo.to_string()) + .join(pkg.id.to_string()); + tokio::fs::rename(path.as_ref(), dest_path).await?; + + tracing::info!( + "Added '{}-{}-{}' to repository {}", + pkg.name, + pkg.version, + pkg.arch, + repo, + ); + + Ok(()) + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index ebdc655..a0d4c15 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,5 +1,6 @@ mod archive; mod manager; +mod manager2; pub mod package; pub use manager::DistroMgr;