diff --git a/server/src/cli.rs b/server/src/cli.rs index 550d7dc..68658ae 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -90,7 +90,8 @@ impl Cli { data_dir: self.data_dir.clone(), }; - let mgr = Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); + let mgr = + Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); for _ in 0..1 { let clone = Arc::clone(&mgr); diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index abbfb9c..5e400ea 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,6 +1,8 @@ use crate::db::{self, *}; +use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; +use sea_query::{Alias, Expr, Query}; use serde::Deserialize; #[derive(Deserialize)] @@ -213,3 +215,71 @@ pub async fn full(conn: &DbConn, id: i32) -> Result> { Ok(None) } } + +#[derive(FromQueryResult)] +pub struct PkgToRemove { + repo_id: i32, + id: i32, +} + +pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw> { + let mut max_id_query = Query::select(); + max_id_query + .from(db::package::Entity) + .columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .expr_as(db::package::Column::Id.max(), Alias::new("max_id")) + .group_by_columns([ + db::package::Column::RepoId, + db::package::Column::Arch, + db::package::Column::Name, + ]) + .cond_where( + Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)), + ); + + let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); + let mut query = Query::select(); + query + .from_as(db::package::Entity, p1.clone()) + .columns([ + (p1.clone(), db::package::Column::RepoId), + (p1.clone(), db::package::Column::Id), + ]) + .join_subquery( + JoinType::InnerJoin, + max_id_query, + p2.clone(), + Condition::all() + .add( + Expr::col((p1.clone(), db::package::Column::RepoId)) + .eq(Expr::col((p2.clone(), db::package::Column::RepoId))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Arch)) + .eq(Expr::col((p2.clone(), db::package::Column::Arch))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Name)) + .eq(Expr::col((p2.clone(), db::package::Column::Name))), + ), + ) + .cond_where( + Condition::any() + .add( + Expr::col((p1.clone(), db::package::Column::Id)) + .lt(Expr::col((p2.clone(), Alias::new("max_id")))), + ) + .add( + Expr::col((p1.clone(), db::package::Column::Id)) + .eq(db::PackageState::PendingDeletion), + ), + ); + let builder = conn.get_database_backend(); + let sql = builder.build(&query); + + PkgToRemove::find_by_statement(sql) +} diff --git a/server/src/error.rs b/server/src/error.rs index 26bfc60..e0626d4 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -45,9 +45,10 @@ impl IntoResponse for ServerError { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { StatusCode::NOT_FOUND.into_response() } - ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) | ServerError::Unit => { - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } + ServerError::Db(_) + | ServerError::Archive(_) + | ServerError::Figment(_) + | ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } } diff --git a/server/src/repo/manager2.rs b/server/src/repo/manager2.rs index 070b822..b0df209 100644 --- a/server/src/repo/manager2.rs +++ b/server/src/repo/manager2.rs @@ -1,14 +1,21 @@ use super::{archive, package}; use crate::db; -use std::{path::{Path, PathBuf}, sync::{atomic::{Ordering, AtomicU32}, Arc}, collections::HashMap}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use futures::StreamExt; use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, - QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, }; -use sea_query::{Expr, Query}; +use sea_query::{Alias, Expr, Query}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Mutex, RwLock, @@ -31,7 +38,7 @@ pub struct RepoMgr { UnboundedSender, Mutex>, ), - repos: RwLock>, + repos: RwLock>)>>, } impl RepoMgr { @@ -43,25 +50,44 @@ impl RepoMgr { let (tx, rx) = unbounded_channel(); let mut repos = HashMap::new(); - let repo_ids: Vec = db::Repo::find().select_only().column(db::repo::Column::Id).into_tuple().all(&conn).await?; + let repo_ids: Vec = db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn) + .await?; for id in repo_ids { - repos.insert(id, AtomicU32::new(0)); + repos.insert(id, Default::default()); } Ok(Self { repos_dir: repos_dir.as_ref().to_path_buf(), conn, pkg_queue: (tx, Mutex::new(rx)), - repos: RwLock::new(repos) + repos: RwLock::new(repos), }) } /// Generate archive databases for all known architectures in the repository, including the /// "any" architecture. - pub async fn sync_repo(&self, repo_id: i32) -> crate::Result<()> { + pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> { + let lock = self + .repos + .read() + .await + .get(&repo) + .map(|(_, lock)| Arc::clone(lock)); + + if lock.is_none() { + return Ok(()); + } + + let lock = lock.unwrap(); + let _guard = lock.lock().await; + let mut archs = db::Package::find() - .filter(db::package::Column::RepoId.eq(repo_id)) + .filter(db::package::Column::RepoId.eq(repo)) .select_only() .column(db::package::Column::Arch) .distinct() @@ -70,12 +96,27 @@ impl RepoMgr { .await?; while let Some(arch) = archs.next().await.transpose()? { - self.generate_archives(repo_id, &arch).await?; + self.generate_archives(repo, &arch).await?; } Ok(()) } + /// Clean any remaining old package files from the database and file system + pub async fn clean(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::to_be_removed_query(&self.conn) + .stream(&self.conn) + .await?; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // TODO remove package from file system and database + } + + // TODO log indicating how many packages were cleaned + + Ok(()) + } + /// Generate the archive databases for the given repository and architecture. async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = @@ -155,11 +196,7 @@ impl RepoMgr { let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await; - tracing::info!( - "Package archives generated for repo {} ('{}')", - repo, - arch - ); + tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); Ok(()) } @@ -182,12 +219,21 @@ impl RepoMgr { .await .inspect_err(|e| tracing::error!("{:?}", e)); - let old = self.repos.read().await.get(&msg.repo).map(|n| n.fetch_sub(1, Ordering::SeqCst) ); + let old = self + .repos + .read() + .await + .get(&msg.repo) + .map(|n| n.0.fetch_sub(1, Ordering::SeqCst)); // Every time the queue for a repo becomes empty, we run a sync job if old == Some(1) { // TODO error handling let _ = self.sync_repo(msg.repo).await; + + // TODO move this so that we only clean if entire queue is empty, not just + // queue for specific repo + let _ = self.clean().await; } } } @@ -195,7 +241,9 @@ impl RepoMgr { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); - self.repos.read().await.get(&repo).inspect(|n| { n.fetch_add(1, Ordering::SeqCst); }); + self.repos.read().await.get(&repo).inspect(|n| { + n.0.fetch_add(1, Ordering::SeqCst); + }); } pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { @@ -241,7 +289,7 @@ impl RepoMgr { let id = new_repo.insert(&self.conn).await?.id; tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; - repos.insert(id, AtomicU32::new(0)); + repos.insert(id, Default::default()); id };