wip: concurrent repo sync

concurrent-repos
Jef Roosens 2024-06-11 12:22:44 +02:00
parent 97612e1af6
commit 5839d66213
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
2 changed files with 104 additions and 23 deletions

View File

@ -3,7 +3,8 @@ use crate::{db, error::Result};
use std::{
path::{Path, PathBuf},
sync::Arc,
sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}},
collections::HashMap,
};
use futures::StreamExt;
@ -14,20 +15,27 @@ use sea_orm::{
use sea_query::{Expr, Query};
use tokio::{
io::AsyncRead,
sync::{Mutex, Semaphore},
sync::{Mutex, Semaphore, RwLock, Notify},
};
use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any";
pub const REPOS_DIR: &'static str = "repos";
#[derive(Default)]
pub struct RepoState {
queued_pkgs: AtomicU32,
sync_queued: AtomicBool,
sync_notify: Notify,
}
pub struct DistroMgr {
distro_dir: PathBuf,
distro_id: i32,
conn: DbConn,
repo_lock: Arc<Mutex<()>>,
sync_lock: Arc<Mutex<()>>,
pkg_sema: Arc<Semaphore>,
repos: RwLock<HashMap<i32, Arc<RepoState>>>,
sync_lock: Mutex<()>,
pkg_sema: Semaphore,
}
impl DistroMgr {
@ -46,18 +54,50 @@ impl DistroMgr {
distro_dir: distro_dir.as_ref().to_path_buf(),
distro_id,
conn,
repo_lock: Arc::new(Mutex::new(())),
sync_lock: Arc::new(Mutex::new(())),
pkg_sema: Arc::new(Semaphore::new(1)),
sync_lock: Mutex::new(()),
pkg_sema: Semaphore::new(1),
repos: RwLock::new(HashMap::new()),
})
}
pub async fn schedule_sync(&self, repo_id: i32) -> Result<()> {
let state = {
let repos = self.repos.read().await;
repos.get(&repo_id).map(Arc::clone)
};
if state.is_none() {
tracing::debug!("is none")
return Ok(());
}
let state = state.unwrap();
let res = state.sync_queued.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
// Already a sync job scheduled, so this one can simply quit
if res.is_err() {
tracing::debug!("shit");
return Ok(());
}
// If the queue is not empty, we wait for a notification that it is before syncing
if state.queued_pkgs.load(Ordering::SeqCst) > 0 {
tracing::debug!("sync waiter waiting");
state.sync_notify.notified().await;
tracing::debug!("sync waiter notified");
}
self.sync_repo(repo_id).await
}
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn sync_repo(&self, repo: &str) -> Result<()> {
pub async fn sync_repo(&self, repo_id: i32) -> Result<()> {
let _guard = self.sync_lock.lock().await;
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
let repo = crate::db::query::repo::by_id(&self.conn, repo_id).await?;
if repo.is_none() {
return Ok(());
@ -170,14 +210,17 @@ impl DistroMgr {
}
async fn get_or_create_repo(&self, repo: &str) -> Result<db::repo::Model> {
let _guard = self.repo_lock.lock().await;
let mut repos = self.repos.write().await;
if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? {
Ok(repo)
} else {
tokio::fs::create_dir(self.distro_dir.join(repo)).await?;
let repo = db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?;
Ok(db::query::repo::insert(&self.conn, self.distro_id, repo, None).await?)
repos.insert(repo.id, Arc::new(RepoState::default()));
Ok(repo)
}
}
@ -234,7 +277,7 @@ impl DistroMgr {
// If we removed all "any" packages, we need to resync all databases
if arch == ANY_ARCH {
self.sync_repo(&repo.name).await?;
//self.sync_repo(&repo.name).await?;
}
Ok(true)
@ -275,20 +318,16 @@ impl DistroMgr {
}
}
pub async fn add_pkg_from_path<P: AsRef<Path>>(
async fn _add_pkg_from_path<P: AsRef<Path>>(
&self,
path: P,
repo: &str,
) -> crate::Result<(String, String, String)> {
let _guard = self.pkg_sema.acquire().await.unwrap();
repo: &db::repo::Model,
) -> crate::Result<db::package::Model> {
let path_clone = path.as_ref().to_path_buf();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
let repo = self.get_or_create_repo(repo).await?;
// TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?;
@ -303,6 +342,44 @@ impl DistroMgr {
pkg.arch
);
Ok(pkg)
}
pub async fn add_pkg_from_path<P: AsRef<Path>>(
&self,
path: P,
repo: &str,
) -> crate::Result<(i32, String, String, String)> {
let repo = self.get_or_create_repo(repo).await?;
{
let repos = self.repos.read().await;
if let Some(state) = repos.get(&repo.id) {
state.queued_pkgs.fetch_add(1, Ordering::SeqCst);
}
}
let _guard = self.pkg_sema.acquire().await.unwrap();
let res = self._add_pkg_from_path(path, &repo).await;
match res {
Ok(pkg) => {
let repos = self.repos.read().await;
if let Some(state) = repos.get(&repo.id) {
let old = state.queued_pkgs.fetch_sub(1, Ordering::SeqCst);
if old - 1 == 0 {
state.sync_notify.notify_one();
}
}
Ok((repo.id, pkg.name, pkg.version, pkg.arch))
},
Err(e) => Err(e),
}
// If the package already exists in the database, we remove it first
//let res = db::query::package::by_fields(
// &self.conn,
@ -335,8 +412,6 @@ impl DistroMgr {
//} else {
// self.generate_archives(&repo.name, &arch).await?;
//}
Ok((pkg.name, pkg.version, pkg.arch))
}
/// Generate a path to a unique file that can be used as a temporary file

View File

@ -80,7 +80,13 @@ async fn post_package_archive(
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut body, &mut tmp_file).await?;
tokio::spawn(async move { mgr.add_pkg_from_path(tmp_path, &repo).await });
tokio::spawn(async move {
if let Ok((repo, _, _, _)) = mgr.add_pkg_from_path(tmp_path, &repo).await {
tracing::debug!("starting schedule_sync");
let _ = mgr.schedule_sync(repo).await;
tracing::debug!("finished schedule_sync");
};
});
//let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?;
//