feat: set up prober repo actors; refactor code; this commit is too large

agents
Jef Roosens 2024-06-24 13:02:26 +02:00
parent 76395afb10
commit 8864925e58
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 232 additions and 65 deletions

View File

@ -1,12 +1,6 @@
use crate::{Config, FsConfig, Global}; use std::path::PathBuf;
use std::{io, path::PathBuf, sync::Arc};
use axum::Router;
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait;
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
@ -19,57 +13,3 @@ pub struct Cli {
)] )]
pub config_file: PathBuf, pub config_file: PathBuf,
} }
impl Cli {
pub async fn run(&self) -> crate::Result<()> {
let config: Config = Config::figment(&self.config_file)
.extract()
.inspect_err(|e| tracing::error!("{}", e))?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Connecting to database");
let db = crate::db::connect(&config.db).await?;
crate::db::Migrator::up(&db, None).await?;
let mgr = match &config.fs {
FsConfig::Local { data_dir } => {
crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
}
};
let mgr = Arc::new(mgr);
for _ in 0..config.pkg_workers {
let clone = Arc::clone(&mgr);
tokio::spawn(async move { clone.pkg_parse_task().await });
}
let global = Global {
config: config.clone(),
mgr,
db,
};
// build our application with a single route
let app = Router::new()
.nest("/api", crate::api::router())
.merge(crate::repo::router(&config.api_key))
.with_state(global)
.layer(TraceLayer::new_for_http());
let domain: String = format!("{}:{}", config.domain, config.port)
.parse()
.unwrap();
let listener = tokio::net::TcpListener::bind(domain).await?;
// run it with hyper on localhost:3000
Ok(axum::serve(listener, app.into_make_service())
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?)
}
}

View File

@ -8,9 +8,15 @@ mod repo;
pub use config::{Config, DbConfig, FsConfig}; pub use config::{Config, DbConfig, FsConfig};
pub use error::{Result, ServerError}; pub use error::{Result, ServerError};
use std::sync::Arc; use std::{io, path::PathBuf, sync::Arc};
use axum::Router;
use tower_http::trace::TraceLayer;
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait;
use tokio::runtime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub const ANY_ARCH: &'static str = "any"; pub const ANY_ARCH: &'static str = "any";
@ -21,8 +27,71 @@ pub struct Global {
db: sea_orm::DbConn, db: sea_orm::DbConn,
} }
#[tokio::main] fn main() -> crate::Result<()> {
async fn main() -> crate::Result<()> { let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
let cli = cli::Cli::parse(); let cli = cli::Cli::parse();
cli.run().await let global = setup(handle, cli.config_file)?;
handle.block_on(run(global))
}
fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
let config: Config = Config::figment(config_file)
.extract()
.inspect_err(|e| tracing::error!("{}", e))?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Connecting to database");
let db = rt.block_on(crate::db::connect(&config.db))?;
rt.block_on(crate::db::Migrator::up(&db, None))?;
let mgr = match &config.fs {
FsConfig::Local { data_dir } => {
rt.block_on(crate::repo::RepoMgr::new(
data_dir.join("repos"),
db.clone(),
))?
//RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())?
}
};
let mgr = Arc::new(mgr);
for _ in 0..config.pkg_workers {
let clone = Arc::clone(&mgr);
rt.spawn(async move { clone.pkg_parse_task().await });
}
Ok(Global {
config: config.clone(),
mgr,
db,
})
}
async fn run(global: Global) -> crate::Result<()> {
let domain: String = format!("{}:{}", &global.config.domain, global.config.port)
.parse()
.unwrap();
let listener = tokio::net::TcpListener::bind(domain).await?;
// build our application with a single route
let app = Router::new()
.nest("/api", crate::api::router())
.merge(crate::repo::router(&global.config.api_key))
.with_state(global)
.layer(TraceLayer::new_for_http());
// run it with hyper on localhost:3000
Ok(axum::serve(listener, app.into_make_service())
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?)
} }

View File

@ -0,0 +1,82 @@
use super::{archive, package, RepoHandle};
use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex, RwLock},
};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use tokio::{runtime, sync::mpsc::UnboundedReceiver};
pub enum RepoCommand {
ParsePkg(i32, PathBuf),
}
/// The actor is responsible for mutating the repositories. They receive their commands their
/// messages and process these commands in both a synchronous and asynchronous way.
pub struct RepoActor {
repos_dir: PathBuf,
conn: DbConn,
rt: runtime::Handle,
rx: Arc<Mutex<UnboundedReceiver<RepoCommand>>>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
}
impl RepoActor {
pub fn new(
repos_dir: PathBuf,
conn: DbConn,
rt: runtime::Handle,
rx: Arc<Mutex<UnboundedReceiver<RepoCommand>>>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
) -> Self {
Self {
repos_dir,
conn,
rt,
rx,
repos,
}
}
/// Run the main actor loop
pub fn run(self) {
while let Some(msg) = {
let mut rx = self.rx.lock().unwrap();
rx.blocking_recv()
} {
match msg {
RepoCommand::ParsePkg(repo, path) => {
let _ = self.parse_pkg(repo, path);
}
}
}
}
fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> {
let pkg = package::Package::open(&path)?;
let pkg = self
.rt
.block_on(db::query::package::insert(&self.conn, repo, pkg))?;
let dest_path = self
.repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
std::fs::rename(path, dest_path)?;
tracing::info!(
"Added '{}-{}-{}' to repository {}",
pkg.name,
pkg.version,
pkg.arch,
repo,
);
Ok(())
}
}

View File

@ -0,0 +1,72 @@
use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex, RwLock},
};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use tokio::{
runtime,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
#[derive(Clone)]
pub struct RepoHandle {
repos_dir: PathBuf,
conn: DbConn,
tx: UnboundedSender<super::RepoCommand>,
repos: Arc<RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>>,
}
impl RepoHandle {
pub fn start(
repos_dir: impl AsRef<Path>,
conn: DbConn,
actors: u32,
rt: runtime::Handle,
) -> crate::Result<Self> {
std::fs::create_dir_all(repos_dir.as_ref())?;
let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn),
)?;
for id in repo_ids {
repos.insert(id, Default::default());
}
let rx = Arc::new(Mutex::new(rx));
let repos = Arc::new(RwLock::new(repos));
for _ in 0..actors {
let actor = super::RepoActor::new(
repos_dir.as_ref().to_path_buf(),
conn.clone(),
rt.clone(),
Arc::clone(&rx),
Arc::clone(&repos),
);
std::thread::spawn(|| actor.run());
}
Ok(Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
tx,
repos,
})
}
}

View File

@ -1,7 +1,11 @@
mod actor;
mod archive; mod archive;
mod handle;
mod manager; mod manager;
pub mod package; pub mod package;
pub use actor::{RepoActor, RepoCommand};
pub use handle::RepoHandle;
pub use manager::RepoMgr; pub use manager::RepoMgr;
use crate::FsConfig; use crate::FsConfig;