diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index a24922f..c1a2c73 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -4,7 +4,10 @@ use crate::db; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, }; use sea_orm::{ @@ -13,7 +16,10 @@ use sea_orm::{ }; use tokio::{ runtime, - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + RwLock, + }, }; pub enum RepoCommand { @@ -21,11 +27,11 @@ pub enum RepoCommand { } pub struct RepoSharedState { - repos_dir: PathBuf, - conn: DbConn, - rx: Mutex>, - tx: UnboundedSender, - repos: RwLock>)>>, + pub repos_dir: PathBuf, + pub conn: DbConn, + pub rx: Mutex>, + pub tx: UnboundedSender, + pub repos: RwLock>)>>, } impl RepoSharedState { @@ -70,11 +76,23 @@ impl RepoActor { match msg { RepoCommand::ParsePkg(repo, path) => { let _ = self.parse_pkg(repo, path); + + if self + .state + .repos + .blocking_read() + .get(&repo) + .map(|n| n.0.load(Ordering::SeqCst)) + == Some(0) + { + // TODO sync + } } } } } + /// Parse a queued package for the given repository. fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> { let pkg = package::Package::open(&path)?; let pkg = self @@ -95,6 +113,36 @@ impl RepoActor { repo, ); + self.state.repos.blocking_read().get(&repo).inspect(|n| { + n.0.fetch_sub(1, Ordering::SeqCst); + }); + + Ok(()) + } + + fn sync_repo(&self, repo: i32) -> crate::Result<()> { + let repos = self.state.repos.blocking_read(); + + if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) { + let archs: Vec = self.rt.block_on( + db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) + .select_only() + .column(db::package::Column::Arch) + .distinct() + .into_tuple() + .all(&self.state.conn), + )?; + + for arch in archs { + self.generate_archives(repo, &arch)?; + } + } + + Ok(()) + } + + fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { Ok(()) } } diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index ff12f42..262f274 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,10 +1,13 @@ -use super::RepoSharedState; +use super::{RepoCommand, RepoSharedState}; use crate::db; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, RwLock, + }, }; use sea_orm::{ @@ -15,6 +18,7 @@ use tokio::{ runtime, sync::mpsc::{unbounded_channel, UnboundedSender}, }; +use uuid::Uuid; #[derive(Clone)] pub struct Handle { @@ -53,4 +57,73 @@ impl Handle { Ok(Self { state }) } + + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.state.repos_dir.join(uuid.to_string()) + }) + } + + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { + let mut repos = self.state.repos.write().await; + + let distro_id: Option = db::Distro::find() + .filter(db::distro::Column::Name.eq(distro)) + .select_only() + .column(db::distro::Column::Id) + .into_tuple() + .one(&self.state.conn) + .await?; + + let distro_id = if let Some(id) = distro_id { + id + } else { + let new_distro = db::distro::ActiveModel { + id: NotSet, + name: Set(distro.to_string()), + description: NotSet, + }; + + new_distro.insert(&self.state.conn).await?.id + }; + + let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::DistroId.eq(distro_id)) + .filter(db::repo::Column::Name.eq(repo)) + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .one(&self.state.conn) + .await?; + + let repo_id = if let Some(id) = repo_id { + id + } else { + let new_repo = db::repo::ActiveModel { + id: NotSet, + distro_id: Set(distro_id), + name: Set(repo.to_string()), + description: NotSet, + }; + let id = new_repo.insert(&self.state.conn).await?.id; + + tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?; + repos.insert(id, Default::default()); + + id + }; + + Ok(repo_id) + } + + pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { + self.state + .tx + .send(RepoCommand::ParsePkg(repo, path)) + .unwrap(); + self.state.repos.read().await.get(&repo).inspect(|n| { + n.0.fetch_add(1, Ordering::SeqCst); + }); + } } diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 3bd2a1c..6fe6650 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -85,17 +85,16 @@ async fn post_package_archive( Path((distro, repo)): Path<(String, String)>, body: Body, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - //let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; - //let [tmp_path] = global.mgr.random_file_paths(); - // - //let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; - //tokio::io::copy(&mut body, &mut tmp_file).await?; - // - //global.mgr.queue_pkg(repo, tmp_path).await; - // - //Ok(StatusCode::ACCEPTED) + let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?; + + let [tmp_path] = global.repo.random_file_paths(); + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); + tokio::io::copy(&mut body, &mut tmp_file).await?; + + global.repo.queue_pkg(repo_id, tmp_path).await; + + Ok(StatusCode::ACCEPTED) } async fn delete_repo(