From bde3b907114d7c526cd5b8fab381cfc543c31d1b Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 26 Jun 2024 21:25:23 +0200 Subject: [PATCH] feat: reimplemented clean method in actor --- server/src/repo/actor.rs | 84 ++++++++++++++++++++++++++++++-------- server/src/repo/archive.rs | 28 ++++++------- 2 files changed, 81 insertions(+), 31 deletions(-) diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index d76efa3..04289d1 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -98,6 +98,7 @@ impl RepoActor { == Some(0) { let _ = self.sync_repo(repo); + let _ = self.clean(); } } RepoCommand::SyncRepo(repo) => { @@ -179,22 +180,19 @@ impl RepoActor { // sea_orm needs its connections to be dropped inside an async context, so we spawn a task // that streams the responses to the synchronous context via message passing self.rt.spawn(async move { - let stream = query.stream(&conn).await; + match query.stream(&conn).await { + Ok(mut stream) => { + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; - if let Err(err) = stream { - let _ = tx.send(Err(err)).await; - - return; - } - - let mut stream = stream.unwrap(); - - while let Some(res) = stream.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; - - if is_err { - return; + if is_err { + return; + } + } + } + Err(err) => { + let _ = tx.send(Err(err)).await; } } }); @@ -233,6 +231,60 @@ impl RepoActor { } fn clean(&self) -> crate::Result<()> { - todo!() + let (tx, mut rx) = mpsc::channel(1); + let conn = self.state.conn.clone(); + let query = db::query::package::stale_pkgs(&self.state.conn); + + // sea_orm needs its connections to be dropped inside an async context, so we spawn a task + // that streams the responses to the synchronous context via message passing + self.rt.spawn(async move { + match query.stream(&conn).await { + Ok(mut stream) => { + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; + } + } + } + Err(err) => { + let _ = tx.send(Err(err)).await; + } + } + }); + + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = rx.blocking_recv().transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = std::fs::remove_file( + self.state + .repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ); + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; + } + + if removed_pkgs > 0 { + self.rt.block_on(db::query::package::delete_stale_pkgs( + &self.state.conn, + max_id, + ))?; + } + + tracing::info!("Cleaned up {removed_pkgs} old package(s)"); + + Ok(()) } } diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index 973a395..39b3b82 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -94,23 +94,21 @@ impl RepoArchivesWriter { let conn = self.conn.clone(); let query = pkg.find_related(db::PackageFile); + self.rt.spawn(async move { - let files = query.stream(&conn).await; + match query.stream(&conn).await { + Ok(mut stream) => { + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; - if let Err(err) = files { - let _ = tx.send(Err(err)).await; - - return; - } - - let mut files = files.unwrap(); - - while let Some(res) = files.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; - - if is_err { - return; + if is_err { + return; + } + } + } + Err(err) => { + let _ = tx.send(Err(err)).await; } } });