wip: possible second reimagining of manager

concurrent-repos
Jef Roosens 2024-06-12 12:32:49 +02:00
parent 5839d66213
commit 5073855696
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
3 changed files with 161 additions and 7 deletions

View File

@ -2,9 +2,12 @@ use super::{archive, package};
use crate::{db, error::Result};
use std::{
path::{Path, PathBuf},
sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}},
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};
use futures::StreamExt;
@ -15,7 +18,7 @@ use sea_orm::{
use sea_query::{Expr, Query};
use tokio::{
io::AsyncRead,
sync::{Mutex, Semaphore, RwLock, Notify},
sync::{Mutex, Notify, RwLock, Semaphore},
};
use uuid::Uuid;
@ -66,15 +69,17 @@ impl DistroMgr {
repos.get(&repo_id).map(Arc::clone)
};
if state.is_none() {
tracing::debug!("is none")
tracing::debug!("is none");
return Ok(());
}
let state = state.unwrap();
let res = state.sync_queued.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
let res =
state
.sync_queued
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
// Already a sync job scheduled, so this one can simply quit
if res.is_err() {
@ -376,7 +381,7 @@ impl DistroMgr {
}
Ok((repo.id, pkg.name, pkg.version, pkg.arch))
},
}
Err(e) => Err(e),
}

View File

@ -0,0 +1,148 @@
use super::{archive, package};
use crate::db;
use std::path::{Path, PathBuf};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet,
QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
};
struct PkgQueueMsg {
repo: i32,
path: PathBuf,
}
/// A single instance of this struct orchestrates everything related to managing packages files on
/// disk for all repositories in the server
pub struct RepoMgr {
repos_dir: PathBuf,
conn: DbConn,
pkg_queue: (
UnboundedSender<PkgQueueMsg>,
Mutex<UnboundedReceiver<PkgQueueMsg>>,
),
repos_lock: Mutex<()>,
}
impl RepoMgr {
pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&repos_dir).await? {
tokio::fs::create_dir(&repos_dir).await?;
}
let (tx, rx) = unbounded_channel();
Ok(Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
pkg_queue: (tx, Mutex::new(rx)),
repos_lock: Mutex::new(()),
})
}
pub async fn pkg_parse_task(&self) {
loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
// as soon as a message is received, so another worker can pick up the mutex.
let mut recv = self.pkg_queue.1.lock().await;
let msg = recv.recv().await;
drop(recv);
if let Some(msg) = msg {
// TODO better handle this error (retry if failure wasn't because the package is
// faulty)
let _ = self
.add_pkg_from_path(msg.path, msg.repo)
.await
.inspect_err(|e| tracing::error!("{:?}", e));
}
}
}
pub fn queue_pkg(&self, repo: i32, path: PathBuf) {
let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo });
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let _guard = self.repos_lock.lock().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.conn)
.await?;
let distro_id = if let Some(id) = distro_id {
id
} else {
let new_distro = db::distro::ActiveModel {
id: NotSet,
name: Set(distro.to_string()),
description: NotSet,
};
new_distro.insert(&self.conn).await?.id
};
let repo_id: Option<i32> = db::Repo::find()
.filter(db::repo::Column::Name.eq(repo))
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.one(&self.conn)
.await?;
let repo_id = if let Some(id) = repo_id {
id
} else {
let new_repo = db::repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(repo.to_string()),
description: NotSet,
};
new_repo.insert(&self.conn).await?.id
};
Ok(repo_id)
}
async fn add_pkg_from_path<P: AsRef<Path>>(
&self,
path: P,
repo: i32,
) -> crate::Result<()> {
let path_clone = path.as_ref().to_path_buf();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
// TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
let dest_path = self
.repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
tokio::fs::rename(path.as_ref(), dest_path).await?;
tracing::info!(
"Added '{}-{}-{}' to repository {}",
pkg.name,
pkg.version,
pkg.arch,
repo,
);
Ok(())
}
}

View File

@ -1,5 +1,6 @@
mod archive;
mod manager;
mod manager2;
pub mod package;
pub use manager::DistroMgr;