wip: mspc-based pkg queue

concurrent-repos
Jef Roosens 2024-06-13 18:40:24 +02:00
parent 6dff65f30d
commit be2ce7bf45
Signed by: Jef Roosens
GPG Key ID: 02D4C0997E74717B
7 changed files with 231 additions and 75 deletions

View File

@ -1,3 +1,11 @@
api_key = "test" api_key = "test"
port = 8000 port = 8000
log_level = "tower_http=debug,rieterd=debug" log_level = "tower_http=debug,rieterd=debug"
[fs]
type = "locl"
data_dir = "./data"
[db]
type = "sqlite"
db_dir = "./data"

View File

@ -1,6 +1,6 @@
use crate::{distro::MetaDistroMgr, Config, Global}; use crate::{distro::MetaDistroMgr, Config, Global};
use std::{io, path::PathBuf}; use std::{io, path::PathBuf, sync::Arc};
use axum::Router; use axum::Router;
use clap::Parser; use clap::Parser;
@ -61,9 +61,11 @@ impl Cli {
pub async fn run(&self) -> crate::Result<()> { pub async fn run(&self) -> crate::Result<()> {
self.init_tracing(); self.init_tracing();
tracing::debug!("{:?}", &self.config_file); //tracing::debug!("{:?}", &self.config_file);
let new_config = crate::config::Config::figment(&self.config_file).extract()?; //let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err(
tracing::debug!("{:?}", new_config); // |e| tracing::error!("{}", e)
//)?;
//tracing::debug!("{:?}", new_config);
let db_url = if let Some(url) = &self.database_url { let db_url = if let Some(url) = &self.database_url {
url.clone() url.clone()
@ -88,7 +90,13 @@ impl Cli {
data_dir: self.data_dir.clone(), data_dir: self.data_dir.clone(),
}; };
let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?; let mgr = Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?);
for _ in 0..1 {
let clone = Arc::clone(&mgr);
tokio::spawn(async move { clone.pkg_parse_task().await });
}
let global = Global { config, mgr, db }; let global = Global { config, mgr, db };

View File

@ -6,19 +6,19 @@ use figment::{
}; };
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum FsConfig { pub enum FsConfig {
Local { data_dir: PathBuf }, Local { data_dir: PathBuf },
} }
#[derive(Deserialize)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum DbConfig { pub enum DbConfig {
Sqlite { Sqlite {
path: PathBuf, db_dir: PathBuf,
}, },
Postgres { Postgres {
host: String, host: String,
@ -27,7 +27,7 @@ pub enum DbConfig {
}, },
} }
#[derive(Deserialize)] #[derive(Deserialize, Debug)]
pub struct Config { pub struct Config {
api_key: String, api_key: String,
port: u16, port: u16,

View File

@ -15,6 +15,7 @@ pub enum ServerError {
Status(StatusCode), Status(StatusCode),
Archive(libarchive::error::ArchiveError), Archive(libarchive::error::ArchiveError),
Figment(figment::Error), Figment(figment::Error),
Unit,
} }
impl fmt::Display for ServerError { impl fmt::Display for ServerError {
@ -26,6 +27,7 @@ impl fmt::Display for ServerError {
ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Db(err) => write!(fmt, "{}", err),
ServerError::Archive(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err),
ServerError::Figment(err) => write!(fmt, "{}", err), ServerError::Figment(err) => write!(fmt, "{}", err),
ServerError::Unit => Ok(()),
} }
} }
} }
@ -43,7 +45,7 @@ impl IntoResponse for ServerError {
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
StatusCode::NOT_FOUND.into_response() StatusCode::NOT_FOUND.into_response()
} }
ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) => { ServerError::Db(_) | ServerError::Archive(_) | ServerError::Figment(_) | ServerError::Unit => {
StatusCode::INTERNAL_SERVER_ERROR.into_response() StatusCode::INTERNAL_SERVER_ERROR.into_response()
} }
} }

View File

@ -10,7 +10,7 @@ pub use error::{Result, ServerError};
use repo::DistroMgr; use repo::DistroMgr;
use clap::Parser; use clap::Parser;
use std::path::PathBuf; use std::{path::PathBuf, sync::Arc};
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {
@ -20,7 +20,7 @@ pub struct Config {
#[derive(Clone)] #[derive(Clone)]
pub struct Global { pub struct Global {
config: Config, config: Config,
mgr: distro::MetaDistroMgr, mgr: Arc<repo::RepoMgr>,
db: sea_orm::DbConn, db: sea_orm::DbConn,
} }

View File

@ -1,16 +1,21 @@
use super::{archive, package}; use super::{archive, package};
use crate::db; use crate::db;
use std::path::{Path, PathBuf}; use std::{path::{Path, PathBuf}, sync::{atomic::{Ordering, AtomicU32}, Arc}, collections::HashMap};
use futures::StreamExt;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet, ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, JoinType, ModelTrait, NotSet,
QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
}; };
use sea_query::{Expr, Query};
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex, Mutex, RwLock,
}; };
use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any";
struct PkgQueueMsg { struct PkgQueueMsg {
repo: i32, repo: i32,
@ -26,7 +31,7 @@ pub struct RepoMgr {
UnboundedSender<PkgQueueMsg>, UnboundedSender<PkgQueueMsg>,
Mutex<UnboundedReceiver<PkgQueueMsg>>, Mutex<UnboundedReceiver<PkgQueueMsg>>,
), ),
repos_lock: Mutex<()>, repos: RwLock<HashMap<i32, AtomicU32>>,
} }
impl RepoMgr { impl RepoMgr {
@ -37,22 +42,137 @@ impl RepoMgr {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = db::Repo::find().select_only().column(db::repo::Column::Id).into_tuple().all(&conn).await?;
for id in repo_ids {
repos.insert(id, AtomicU32::new(0));
}
Ok(Self { Ok(Self {
repos_dir: repos_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
conn, conn,
pkg_queue: (tx, Mutex::new(rx)), pkg_queue: (tx, Mutex::new(rx)),
repos_lock: Mutex::new(()), repos: RwLock::new(repos)
}) })
} }
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn sync_repo(&self, repo_id: i32) -> crate::Result<()> {
let mut archs = db::Package::find()
.filter(db::package::Column::RepoId.eq(repo_id))
.select_only()
.column(db::package::Column::Arch)
.distinct()
.into_tuple::<String>()
.stream(&self.conn)
.await?;
while let Some(arch) = archs.next().await.transpose()? {
self.generate_archives(repo_id, &arch).await?;
}
Ok(())
}
/// Generate the archive databases for the given repository and architecture.
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = db::Package::find()
.filter(db::package::Column::RepoId.eq(repo))
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.filter(
db::package::Column::Id.in_subquery(
Query::select()
.expr(db::package::Column::Id.max())
.from(db::package::Entity)
.group_by_columns([db::package::Column::Arch, db::package::Column::Name])
.to_owned(),
),
)
.stream(&self.conn)
.await?;
let mut commited_ids: Vec<i32> = Vec::new();
while let Some(pkg) = pkgs.next().await.transpose()? {
commited_ids.push(pkg.id);
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
ar_db
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &desc_tmp_file_path, true)
.await?;
ar_files
.add_entry(&full_name, &files_tmp_file_path, false)
.await?;
}
// Cleanup
ar_db.close().await?;
ar_files.close().await?;
let repo_dir = self.repos_dir.join(repo.to_string());
// Move the db archives to their respective places
tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
tokio::fs::rename(
tmp_ar_files_path,
repo_dir.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// Only after we have successfully written everything to disk do we update the database.
// This order ensures any failure can be recovered, as the database is our single source of
// truth.
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(commited_ids))
.exec(&self.conn)
.await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!(
"Package archives generated for repo {} ('{}')",
repo,
arch
);
Ok(())
}
pub async fn pkg_parse_task(&self) { pub async fn pkg_parse_task(&self) {
loop { loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the // Receive the next message and immediately drop the mutex afterwards. As long as the
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
// as soon as a message is received, so another worker can pick up the mutex. // as soon as a message is received, so another worker can pick up the mutex.
let mut recv = self.pkg_queue.1.lock().await; let msg = {
let msg = recv.recv().await; let mut recv = self.pkg_queue.1.lock().await;
drop(recv); recv.recv().await
};
if let Some(msg) = msg { if let Some(msg) = msg {
// TODO better handle this error (retry if failure wasn't because the package is // TODO better handle this error (retry if failure wasn't because the package is
@ -61,16 +181,25 @@ impl RepoMgr {
.add_pkg_from_path(msg.path, msg.repo) .add_pkg_from_path(msg.path, msg.repo)
.await .await
.inspect_err(|e| tracing::error!("{:?}", e)); .inspect_err(|e| tracing::error!("{:?}", e));
let old = self.repos.read().await.get(&msg.repo).map(|n| n.fetch_sub(1, Ordering::SeqCst) );
// Every time the queue for a repo becomes empty, we run a sync job
if old == Some(1) {
// TODO error handling
let _ = self.sync_repo(msg.repo).await;
}
} }
} }
} }
pub fn queue_pkg(&self, repo: i32, path: PathBuf) { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo }); let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo });
self.repos.read().await.get(&repo).inspect(|n| { n.fetch_add(1, Ordering::SeqCst); });
} }
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> { pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let _guard = self.repos_lock.lock().await; let mut repos = self.repos.write().await;
let distro_id: Option<i32> = db::Distro::find() let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro)) .filter(db::distro::Column::Name.eq(distro))
@ -109,8 +238,12 @@ impl RepoMgr {
name: Set(repo.to_string()), name: Set(repo.to_string()),
description: NotSet, description: NotSet,
}; };
let id = new_repo.insert(&self.conn).await?.id;
new_repo.insert(&self.conn).await?.id tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
repos.insert(id, AtomicU32::new(0));
id
}; };
Ok(repo_id) Ok(repo_id)
@ -141,4 +274,11 @@ impl RepoMgr {
Ok(()) Ok(())
} }
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.repos_dir.join(uuid.to_string())
})
}
} }

View File

@ -4,6 +4,7 @@ mod manager2;
pub mod package; pub mod package;
pub use manager::DistroMgr; pub use manager::DistroMgr;
pub use manager2::RepoMgr;
use axum::{ use axum::{
body::Body, body::Body,
@ -75,19 +76,13 @@ async fn post_package_archive(
body: Body, body: Body,
) -> crate::Result<()> { ) -> crate::Result<()> {
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
let mgr = global.mgr.get_or_create_mgr(&distro).await?; let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
let [tmp_path] = mgr.random_file_paths(); let [tmp_path] = global.mgr.random_file_paths();
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut body, &mut tmp_file).await?; tokio::io::copy(&mut body, &mut tmp_file).await?;
tokio::spawn(async move { global.mgr.queue_pkg(repo, tmp_path).await;
if let Ok((repo, _, _, _)) = mgr.add_pkg_from_path(tmp_path, &repo).await {
tracing::debug!("starting schedule_sync");
let _ = mgr.schedule_sync(repo).await;
tracing::debug!("finished schedule_sync");
};
});
//let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?; //let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?;
// //
@ -108,60 +103,63 @@ async fn delete_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>, Path((distro, repo)): Path<(String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await { Ok(StatusCode::NOT_FOUND)
let repo_removed = mgr.remove_repo(&repo).await?; //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
// let repo_removed = mgr.remove_repo(&repo).await?;
if repo_removed { //
tracing::info!("Removed repository '{}'", repo); // if repo_removed {
// tracing::info!("Removed repository '{}'", repo);
Ok(StatusCode::OK) //
} else { // Ok(StatusCode::OK)
Ok(StatusCode::NOT_FOUND) // } else {
} // Ok(StatusCode::NOT_FOUND)
} else { // }
Ok(StatusCode::NOT_FOUND) //} else {
} // Ok(StatusCode::NOT_FOUND)
//}
} }
async fn delete_arch_repo( async fn delete_arch_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo, arch)): Path<(String, String, String)>, Path((distro, repo, arch)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await { Ok(StatusCode::NOT_FOUND)
let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
if repo_removed { //
tracing::info!("Removed arch '{}' from repository '{}'", arch, repo); // if repo_removed {
// tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
Ok(StatusCode::OK) //
} else { // Ok(StatusCode::OK)
Ok(StatusCode::NOT_FOUND) // } else {
} // Ok(StatusCode::NOT_FOUND)
} else { // }
Ok(StatusCode::NOT_FOUND) //} else {
} // Ok(StatusCode::NOT_FOUND)
//}
} }
async fn delete_package( async fn delete_package(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await { Ok(StatusCode::NOT_FOUND)
let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
// let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
if pkg_removed { //
tracing::info!( // if pkg_removed {
"Removed package '{}' ({}) from repository '{}'", // tracing::info!(
pkg_name, // "Removed package '{}' ({}) from repository '{}'",
arch, // pkg_name,
repo // arch,
); // repo
// );
Ok(StatusCode::OK) //
} else { // Ok(StatusCode::OK)
Ok(StatusCode::NOT_FOUND) // } else {
} // Ok(StatusCode::NOT_FOUND)
} else { // }
Ok(StatusCode::NOT_FOUND) //} else {
} // Ok(StatusCode::NOT_FOUND)
//}
} }