From 5839d66213e1e5695a6014cb722171791f20b7d5 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 11 Jun 2024 12:22:44 +0200 Subject: [PATCH] wip: concurrent repo sync --- server/src/repo/manager.rs | 119 ++++++++++++++++++++++++++++++------- server/src/repo/mod.rs | 8 ++- 2 files changed, 104 insertions(+), 23 deletions(-) diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs index d3c753c..3182817 100644 --- a/server/src/repo/manager.rs +++ b/server/src/repo/manager.rs @@ -3,7 +3,8 @@ use crate::{db, error::Result}; use std::{ path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}}, + collections::HashMap, }; use futures::StreamExt; @@ -14,20 +15,27 @@ use sea_orm::{ use sea_query::{Expr, Query}; use tokio::{ io::AsyncRead, - sync::{Mutex, Semaphore}, + sync::{Mutex, Semaphore, RwLock, Notify}, }; use uuid::Uuid; pub const ANY_ARCH: &'static str = "any"; pub const REPOS_DIR: &'static str = "repos"; +#[derive(Default)] +pub struct RepoState { + queued_pkgs: AtomicU32, + sync_queued: AtomicBool, + sync_notify: Notify, +} + pub struct DistroMgr { distro_dir: PathBuf, distro_id: i32, conn: DbConn, - repo_lock: Arc>, - sync_lock: Arc>, - pkg_sema: Arc, + repos: RwLock>>, + sync_lock: Mutex<()>, + pkg_sema: Semaphore, } impl DistroMgr { @@ -46,18 +54,50 @@ impl DistroMgr { distro_dir: distro_dir.as_ref().to_path_buf(), distro_id, conn, - repo_lock: Arc::new(Mutex::new(())), - sync_lock: Arc::new(Mutex::new(())), - pkg_sema: Arc::new(Semaphore::new(1)), + sync_lock: Mutex::new(()), + pkg_sema: Semaphore::new(1), + repos: RwLock::new(HashMap::new()), }) } + pub async fn schedule_sync(&self, repo_id: i32) -> Result<()> { + let state = { + let repos = self.repos.read().await; + repos.get(&repo_id).map(Arc::clone) + }; + + + if state.is_none() { + tracing::debug!("is none") + return Ok(()); + } + + let state = state.unwrap(); + + let res = state.sync_queued.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst); + + // Already a sync job scheduled, so this one can simply quit + if res.is_err() { + tracing::debug!("shit"); + return Ok(()); + } + + // If the queue is not empty, we wait for a notification that it is before syncing + if state.queued_pkgs.load(Ordering::SeqCst) > 0 { + tracing::debug!("sync waiter waiting"); + state.sync_notify.notified().await; + tracing::debug!("sync waiter notified"); + } + + self.sync_repo(repo_id).await + } + /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn sync_repo(&self, repo: &str) -> Result<()> { + pub async fn sync_repo(&self, repo_id: i32) -> Result<()> { let _guard = self.sync_lock.lock().await; - let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; + let repo = crate::db::query::repo::by_id(&self.conn, repo_id).await?; if repo.is_none() { return Ok(()); @@ -170,14 +210,17 @@ impl DistroMgr { } async fn get_or_create_repo(&self, repo: &str) -> Result { - let _guard = self.repo_lock.lock().await; + let mut repos = self.repos.write().await; if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? { Ok(repo) } else { tokio::fs::create_dir(self.distro_dir.join(repo)).await?; + let repo = db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?; - Ok(db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?) + repos.insert(repo.id, Arc::new(RepoState::default())); + + Ok(repo) } } @@ -234,7 +277,7 @@ impl DistroMgr { // If we removed all "any" packages, we need to resync all databases if arch == ANY_ARCH { - self.sync_repo(&repo.name).await?; + //self.sync_repo(&repo.name).await?; } Ok(true) @@ -275,20 +318,16 @@ impl DistroMgr { } } - pub async fn add_pkg_from_path>( + async fn _add_pkg_from_path>( &self, path: P, - repo: &str, - ) -> crate::Result<(String, String, String)> { - let _guard = self.pkg_sema.acquire().await.unwrap(); - + repo: &db::repo::Model, + ) -> crate::Result { let path_clone = path.as_ref().to_path_buf(); let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) .await .unwrap()?; - let repo = self.get_or_create_repo(repo).await?; - // TODO prevent database from being updated but file failing to move to repo dir? let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?; @@ -303,6 +342,44 @@ impl DistroMgr { pkg.arch ); + Ok(pkg) + } + + pub async fn add_pkg_from_path>( + &self, + path: P, + repo: &str, + ) -> crate::Result<(i32, String, String, String)> { + let repo = self.get_or_create_repo(repo).await?; + + { + let repos = self.repos.read().await; + + if let Some(state) = repos.get(&repo.id) { + state.queued_pkgs.fetch_add(1, Ordering::SeqCst); + } + } + + let _guard = self.pkg_sema.acquire().await.unwrap(); + let res = self._add_pkg_from_path(path, &repo).await; + + match res { + Ok(pkg) => { + let repos = self.repos.read().await; + + if let Some(state) = repos.get(&repo.id) { + let old = state.queued_pkgs.fetch_sub(1, Ordering::SeqCst); + + if old - 1 == 0 { + state.sync_notify.notify_one(); + } + } + + Ok((repo.id, pkg.name, pkg.version, pkg.arch)) + }, + Err(e) => Err(e), + } + // If the package already exists in the database, we remove it first //let res = db::query::package::by_fields( // &self.conn, @@ -335,8 +412,6 @@ impl DistroMgr { //} else { // self.generate_archives(&repo.name, &arch).await?; //} - - Ok((pkg.name, pkg.version, pkg.arch)) } /// Generate a path to a unique file that can be used as a temporary file diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index cdc6c09..ebdc655 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -80,7 +80,13 @@ async fn post_package_archive( let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; tokio::io::copy(&mut body, &mut tmp_file).await?; - tokio::spawn(async move { mgr.add_pkg_from_path(tmp_path, &repo).await }); + tokio::spawn(async move { + if let Ok((repo, _, _, _)) = mgr.add_pkg_from_path(tmp_path, &repo).await { + tracing::debug!("starting schedule_sync"); + let _ = mgr.schedule_sync(repo).await; + tracing::debug!("finished schedule_sync"); + }; + }); //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; //