feat: write stale packages query

concurrent-repos
Jef Roosens 2024-06-14 10:54:45 +02:00
parent be2ce7bf45
commit a408c14ab1
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 143 additions and 23 deletions

View File

@ -90,7 +90,8 @@ impl Cli {
data_dir: self.data_dir.clone(), data_dir: self.data_dir.clone(),
}; };
let mgr = Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?); let mgr =
Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?);
for _ in 0..1 { for _ in 0..1 {
let clone = Arc::clone(&mgr); let clone = Arc::clone(&mgr);

View File

@ -1,6 +1,8 @@
use crate::db::{self, *}; use crate::db::{self, *};
use futures::Stream;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Expr, Query};
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -213,3 +215,71 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
Ok(None) Ok(None)
} }
} }
#[derive(FromQueryResult)]
pub struct PkgToRemove {
repo_id: i32,
id: i32,
}
pub fn to_be_removed_query(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
let mut max_id_query = Query::select();
max_id_query
.from(db::package::Entity)
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.cond_where(
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
);
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select();
query
.from_as(db::package::Entity, p1.clone())
.columns([
(p1.clone(), db::package::Column::RepoId),
(p1.clone(), db::package::Column::Id),
])
.join_subquery(
JoinType::InnerJoin,
max_id_query,
p2.clone(),
Condition::all()
.add(
Expr::col((p1.clone(), db::package::Column::RepoId))
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.eq(Expr::col((p2.clone(), db::package::Column::Arch))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Name))
.eq(Expr::col((p2.clone(), db::package::Column::Name))),
),
)
.cond_where(
Condition::any()
.add(
Expr::col((p1.clone(), db::package::Column::Id))
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Id))
.eq(db::PackageState::PendingDeletion),
),
);
let builder = conn.get_database_backend();
let sql = builder.build(&query);
PkgToRemove::find_by_statement(sql)
}

View File

@ -45,9 +45,10 @@ impl IntoResponse for ServerError {
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
StatusCode::NOT_FOUND.into_response() StatusCode::NOT_FOUND.into_response()
} }
ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) | ServerError::Unit => { ServerError::Db(_)
StatusCode::INTERNAL_SERVER_ERROR.into_response() | ServerError::Archive(_)
} | ServerError::Figment(_)
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }
} }
} }

View File

@ -1,14 +1,21 @@
use super::{archive, package}; use super::{archive, package};
use crate::db; use crate::db;
use std::{path::{Path, PathBuf}, sync::{atomic::{Ordering, AtomicU32}, Arc}, collections::HashMap}; use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use futures::StreamExt; use futures::StreamExt;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
}; };
use sea_query::{Expr, Query}; use sea_query::{Alias, Expr, Query};
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex, RwLock, Mutex, RwLock,
@ -31,7 +38,7 @@ pub struct RepoMgr {
UnboundedSender<PkgQueueMsg>, UnboundedSender<PkgQueueMsg>,
Mutex<UnboundedReceiver<PkgQueueMsg>>, Mutex<UnboundedReceiver<PkgQueueMsg>>,
), ),
repos: RwLock<HashMap<i32, AtomicU32>>, repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
} }
impl RepoMgr { impl RepoMgr {
@ -43,25 +50,44 @@ impl RepoMgr {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new(); let mut repos = HashMap::new();
let repo_ids: Vec<i32> = db::Repo::find().select_only().column(db::repo::Column::Id).into_tuple().all(&conn).await?; let repo_ids: Vec<i32> = db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn)
.await?;
for id in repo_ids { for id in repo_ids {
repos.insert(id, AtomicU32::new(0)); repos.insert(id, Default::default());
} }
Ok(Self { Ok(Self {
repos_dir: repos_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
conn, conn,
pkg_queue: (tx, Mutex::new(rx)), pkg_queue: (tx, Mutex::new(rx)),
repos: RwLock::new(repos) repos: RwLock::new(repos),
}) })
} }
/// Generate archive databases for all known architectures in the repository, including the /// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture. /// "any" architecture.
pub async fn sync_repo(&self, repo_id: i32) -> crate::Result<()> { pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let lock = self
.repos
.read()
.await
.get(&repo)
.map(|(_, lock)| Arc::clone(lock));
if lock.is_none() {
return Ok(());
}
let lock = lock.unwrap();
let _guard = lock.lock().await;
let mut archs = db::Package::find() let mut archs = db::Package::find()
.filter(db::package::Column::RepoId.eq(repo_id)) .filter(db::package::Column::RepoId.eq(repo))
.select_only() .select_only()
.column(db::package::Column::Arch) .column(db::package::Column::Arch)
.distinct() .distinct()
@ -70,12 +96,27 @@ impl RepoMgr {
.await?; .await?;
while let Some(arch) = archs.next().await.transpose()? { while let Some(arch) = archs.next().await.transpose()? {
self.generate_archives(repo_id, &arch).await?; self.generate_archives(repo, &arch).await?;
} }
Ok(()) Ok(())
} }
/// Clean any remaining old package files from the database and file system
pub async fn clean(&self) -> crate::Result<()> {
let mut pkgs = db::query::package::to_be_removed_query(&self.conn)
.stream(&self.conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
// TODO remove package from file system and database
}
// TODO log indicating how many packages were cleaned
Ok(())
}
/// Generate the archive databases for the given repository and architecture. /// Generate the archive databases for the given repository and architecture.
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
@ -155,11 +196,7 @@ impl RepoMgr {
let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!( tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
"Package archives generated for repo {} ('{}')",
repo,
arch
);
Ok(()) Ok(())
} }
@ -182,12 +219,21 @@ impl RepoMgr {
.await .await
.inspect_err(|e| tracing::error!("{:?}", e)); .inspect_err(|e| tracing::error!("{:?}", e));
let old = self.repos.read().await.get(&msg.repo).map(|n| n.fetch_sub(1, Ordering::SeqCst) ); let old = self
.repos
.read()
.await
.get(&msg.repo)
.map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
// Every time the queue for a repo becomes empty, we run a sync job // Every time the queue for a repo becomes empty, we run a sync job
if old == Some(1) { if old == Some(1) {
// TODO error handling // TODO error handling
let _ = self.sync_repo(msg.repo).await; let _ = self.sync_repo(msg.repo).await;
// TODO move this so that we only clean if entire queue is empty, not just
// queue for specific repo
let _ = self.clean().await;
} }
} }
} }
@ -195,7 +241,9 @@ impl RepoMgr {
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo });
self.repos.read().await.get(&repo).inspect(|n| { n.fetch_add(1, Ordering::SeqCst); }); self.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
} }
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> { pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
@ -241,7 +289,7 @@ impl RepoMgr {
let id = new_repo.insert(&self.conn).await?.id; let id = new_repo.insert(&self.conn).await?.id;
tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
repos.insert(id, AtomicU32::new(0)); repos.insert(id, Default::default());
id id
}; };