rieter/server/src/repo/actor.rs

239 lines
6.7 KiB
Rust

use super::{archive, package, Handle};
use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
};
use futures::StreamExt;
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use sea_query::{Alias, Expr, Query};
use tokio::{
runtime,
sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
};
use uuid::Uuid;
pub enum RepoCommand {
ParsePkg(i32, PathBuf),
SyncRepo(i32),
Clean,
}
pub struct RepoSharedState {
pub repos_dir: PathBuf,
pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<RepoCommand>>,
pub tx: UnboundedSender<RepoCommand>,
pub repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoSharedState {
pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel();
Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
rx: Mutex::new(rx),
tx,
repos: RwLock::new(repos),
}
}
}
/// The actor is responsible for mutating the repositories. They receive their commands their
/// messages and process these commands in both a synchronous and asynchronous way.
pub struct RepoActor {
rt: runtime::Handle,
state: Arc<RepoSharedState>,
}
impl RepoActor {
pub fn new(rt: runtime::Handle, state: Arc<RepoSharedState>) -> Self {
Self {
rt,
state: Arc::clone(&state),
}
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.state.repos_dir.join(uuid.to_string())
})
}
/// Run the main actor loop
pub fn run(self) {
while let Some(msg) = {
let mut rx = self.state.rx.lock().unwrap();
rx.blocking_recv()
} {
match msg {
RepoCommand::ParsePkg(repo, path) => {
let _ = self.parse_pkg(repo, path);
if self
.state
.repos
.blocking_read()
.get(&repo)
.map(|n| n.0.load(Ordering::SeqCst))
== Some(0)
{
let _ = self.sync_repo(repo);
}
}
RepoCommand::SyncRepo(repo) => {
let _ = self.sync_repo(repo);
}
RepoCommand::Clean => {
let _ = self.clean();
}
}
}
}
/// Parse a queued package for the given repository.
fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> {
let pkg = package::Package::open(&path)?;
let pkg = self
.rt
.block_on(db::query::package::insert(&self.state.conn, repo, pkg))?;
let dest_path = self
.state
.repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
std::fs::rename(path, dest_path)?;
tracing::info!(
"Added '{}-{}-{}' to repository {}",
pkg.name,
pkg.version,
pkg.arch,
repo,
);
self.state.repos.blocking_read().get(&repo).inspect(|n| {
n.0.fetch_sub(1, Ordering::SeqCst);
});
Ok(())
}
fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let repos = self.state.repos.blocking_read();
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
let archs: Vec<String> = self.rt.block_on(
db::Package::find()
.filter(db::package::Column::RepoId.eq(repo))
.select_only()
.column(db::package::Column::Arch)
.distinct()
.into_tuple()
.all(&self.state.conn),
)?;
for arch in archs {
self.generate_archives(repo, &arch)?;
}
}
Ok(())
}
fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths();
let mut ars = archive::RepoArchivesWriter::new(
&tmp_ar_db_path,
&tmp_ar_files_path,
self.random_file_paths(),
&self.rt,
&self.state.conn,
)?;
let (tx, mut rx) = mpsc::channel(1);
let conn = self.state.conn.clone();
let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch);
// sea_orm needs its connections to be dropped inside an async context, so we spawn a task
// that streams the responses to the synchronous context via message passing
self.rt.spawn(async move {
let stream = query.stream(&conn).await;
if let Err(err) = stream {
let _ = tx.send(Err(err)).await;
return;
}
let mut stream = stream.unwrap();
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
if is_err {
return;
}
}
});
let mut committed_ids: Vec<i32> = Vec::new();
while let Some(pkg) = rx.blocking_recv().transpose()? {
committed_ids.push(pkg.id);
ars.append_pkg(&pkg)?;
}
ars.close()?;
// Move newly generated package archives to their correct place
let repo_dir = self.state.repos_dir.join(repo.to_string());
std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?;
std::fs::rename(
tmp_ar_files_path,
repo_dir.join(format!("{}.files.tar.gz", arch)),
)?;
// Update the state for the newly committed packages
self.rt.block_on(
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(committed_ids))
.exec(&self.state.conn),
)?;
tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
Ok(())
}
fn clean(&self) -> crate::Result<()> {
todo!()
}
}