feat: reimplemented clean method in actor

agents
Jef Roosens 2024-06-26 21:25:23 +02:00
parent 042f1ecbd3
commit bde3b90711
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
2 changed files with 81 additions and 31 deletions

View File

@ -98,6 +98,7 @@ impl RepoActor {
== Some(0)
{
let _ = self.sync_repo(repo);
let _ = self.clean();
}
}
RepoCommand::SyncRepo(repo) => {
@ -179,16 +180,8 @@ impl RepoActor {
// sea_orm needs its connections to be dropped inside an async context, so we spawn a task
// that streams the responses to the synchronous context via message passing
self.rt.spawn(async move {
let stream = query.stream(&conn).await;
if let Err(err) = stream {
let _ = tx.send(Err(err)).await;
return;
}
let mut stream = stream.unwrap();
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
@ -197,6 +190,11 @@ impl RepoActor {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
let mut committed_ids: Vec<i32> = Vec::new();
@ -233,6 +231,60 @@ impl RepoActor {
}
fn clean(&self) -> crate::Result<()> {
todo!()
let (tx, mut rx) = mpsc::channel(1);
let conn = self.state.conn.clone();
let query = db::query::package::stale_pkgs(&self.state.conn);
// sea_orm needs its connections to be dropped inside an async context, so we spawn a task
// that streams the responses to the synchronous context via message passing
self.rt.spawn(async move {
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
if is_err {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = rx.blocking_recv().transpose()? {
// Failing to remove the package file isn't the biggest problem
let _ = std::fs::remove_file(
self.state
.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
);
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
}
if removed_pkgs > 0 {
self.rt.block_on(db::query::package::delete_stale_pkgs(
&self.state.conn,
max_id,
))?;
}
tracing::info!("Cleaned up {removed_pkgs} old package(s)");
Ok(())
}
}

View File

@ -94,18 +94,11 @@ impl RepoArchivesWriter {
let conn = self.conn.clone();
let query = pkg.find_related(db::PackageFile);
self.rt.spawn(async move {
let files = query.stream(&conn).await;
if let Err(err) = files {
let _ = tx.send(Err(err)).await;
return;
}
let mut files = files.unwrap();
while let Some(res) = files.next().await {
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
@ -113,6 +106,11 @@ impl RepoArchivesWriter {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
while let Some(file) = rx.blocking_recv().transpose()? {