diff --git a/server/src/cli.rs b/server/src/cli.rs index c6998eb..5e8469e 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,12 +1,6 @@ -use crate::{Config, FsConfig, Global}; +use std::path::PathBuf; -use std::{io, path::PathBuf, sync::Arc}; - -use axum::Router; use clap::Parser; -use sea_orm_migration::MigratorTrait; -use tower_http::trace::TraceLayer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -19,57 +13,3 @@ pub struct Cli { )] pub config_file: PathBuf, } - -impl Cli { - pub async fn run(&self) -> crate::Result<()> { - let config: Config = Config::figment(&self.config_file) - .extract() - .inspect_err(|e| tracing::error!("{}", e))?; - - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) - .with(tracing_subscriber::fmt::layer()) - .init(); - - tracing::info!("Connecting to database"); - let db = crate::db::connect(&config.db).await?; - - crate::db::Migrator::up(&db, None).await?; - - let mgr = match &config.fs { - FsConfig::Local { data_dir } => { - crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? - } - }; - - let mgr = Arc::new(mgr); - - for _ in 0..config.pkg_workers { - let clone = Arc::clone(&mgr); - - tokio::spawn(async move { clone.pkg_parse_task().await }); - } - - let global = Global { - config: config.clone(), - mgr, - db, - }; - - // build our application with a single route - let app = Router::new() - .nest("/api", crate::api::router()) - .merge(crate::repo::router(&config.api_key)) - .with_state(global) - .layer(TraceLayer::new_for_http()); - - let domain: String = format!("{}:{}", config.domain, config.port) - .parse() - .unwrap(); - let listener = tokio::net::TcpListener::bind(domain).await?; - // run it with hyper on localhost:3000 - Ok(axum::serve(listener, app.into_make_service()) - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) - } -} diff --git a/server/src/main.rs b/server/src/main.rs index eb1c3d0..5c0ecac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,9 +8,15 @@ mod repo; pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use std::sync::Arc; +use std::{io, path::PathBuf, sync::Arc}; + +use axum::Router; +use tower_http::trace::TraceLayer; use clap::Parser; +use sea_orm_migration::MigratorTrait; +use tokio::runtime; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub const ANY_ARCH: &'static str = "any"; @@ -21,8 +27,71 @@ pub struct Global { db: sea_orm::DbConn, } -#[tokio::main] -async fn main() -> crate::Result<()> { +fn main() -> crate::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let handle = rt.handle(); + let cli = cli::Cli::parse(); - cli.run().await + let global = setup(handle, cli.config_file)?; + + handle.block_on(run(global)) +} + +fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { + let config: Config = Config::figment(config_file) + .extract() + .inspect_err(|e| tracing::error!("{}", e))?; + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) + .with(tracing_subscriber::fmt::layer()) + .init(); + + tracing::info!("Connecting to database"); + let db = rt.block_on(crate::db::connect(&config.db))?; + rt.block_on(crate::db::Migrator::up(&db, None))?; + + let mgr = match &config.fs { + FsConfig::Local { data_dir } => { + rt.block_on(crate::repo::RepoMgr::new( + data_dir.join("repos"), + db.clone(), + ))? + //RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())? + } + }; + let mgr = Arc::new(mgr); + + for _ in 0..config.pkg_workers { + let clone = Arc::clone(&mgr); + + rt.spawn(async move { clone.pkg_parse_task().await }); + } + + Ok(Global { + config: config.clone(), + mgr, + db, + }) +} + +async fn run(global: Global) -> crate::Result<()> { + let domain: String = format!("{}:{}", &global.config.domain, global.config.port) + .parse() + .unwrap(); + let listener = tokio::net::TcpListener::bind(domain).await?; + + // build our application with a single route + let app = Router::new() + .nest("/api", crate::api::router()) + .merge(crate::repo::router(&global.config.api_key)) + .with_state(global) + .layer(TraceLayer::new_for_http()); + // run it with hyper on localhost:3000 + Ok(axum::serve(listener, app.into_make_service()) + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) } diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs new file mode 100644 index 0000000..1291656 --- /dev/null +++ b/server/src/repo/actor.rs @@ -0,0 +1,82 @@ +use super::{archive, package, RepoHandle}; +use crate::db; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, +}; + +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use tokio::{runtime, sync::mpsc::UnboundedReceiver}; + +pub enum RepoCommand { + ParsePkg(i32, PathBuf), +} + +/// The actor is responsible for mutating the repositories. They receive their commands their +/// messages and process these commands in both a synchronous and asynchronous way. +pub struct RepoActor { + repos_dir: PathBuf, + conn: DbConn, + rt: runtime::Handle, + rx: Arc>>, + repos: Arc>)>>>, +} + +impl RepoActor { + pub fn new( + repos_dir: PathBuf, + conn: DbConn, + rt: runtime::Handle, + rx: Arc>>, + repos: Arc>)>>>, + ) -> Self { + Self { + repos_dir, + conn, + rt, + rx, + repos, + } + } + + /// Run the main actor loop + pub fn run(self) { + while let Some(msg) = { + let mut rx = self.rx.lock().unwrap(); + rx.blocking_recv() + } { + match msg { + RepoCommand::ParsePkg(repo, path) => { + let _ = self.parse_pkg(repo, path); + } + } + } + } + + fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> { + let pkg = package::Package::open(&path)?; + let pkg = self + .rt + .block_on(db::query::package::insert(&self.conn, repo, pkg))?; + let dest_path = self + .repos_dir + .join(repo.to_string()) + .join(pkg.id.to_string()); + std::fs::rename(path, dest_path)?; + + tracing::info!( + "Added '{}-{}-{}' to repository {}", + pkg.name, + pkg.version, + pkg.arch, + repo, + ); + + Ok(()) + } +} diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs new file mode 100644 index 0000000..b720390 --- /dev/null +++ b/server/src/repo/handle.rs @@ -0,0 +1,72 @@ +use crate::db; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, +}; + +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use tokio::{ + runtime, + sync::mpsc::{unbounded_channel, UnboundedSender}, +}; + +#[derive(Clone)] +pub struct RepoHandle { + repos_dir: PathBuf, + conn: DbConn, + tx: UnboundedSender, + repos: Arc>)>>>, +} + +impl RepoHandle { + pub fn start( + repos_dir: impl AsRef, + conn: DbConn, + actors: u32, + rt: runtime::Handle, + ) -> crate::Result { + std::fs::create_dir_all(repos_dir.as_ref())?; + + let (tx, rx) = unbounded_channel(); + + let mut repos = HashMap::new(); + let repo_ids: Vec = rt.block_on( + db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn), + )?; + + for id in repo_ids { + repos.insert(id, Default::default()); + } + + let rx = Arc::new(Mutex::new(rx)); + let repos = Arc::new(RwLock::new(repos)); + + for _ in 0..actors { + let actor = super::RepoActor::new( + repos_dir.as_ref().to_path_buf(), + conn.clone(), + rt.clone(), + Arc::clone(&rx), + Arc::clone(&repos), + ); + + std::thread::spawn(|| actor.run()); + } + + Ok(Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + tx, + repos, + }) + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 953b631..6ea74ab 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,7 +1,11 @@ +mod actor; mod archive; +mod handle; mod manager; pub mod package; +pub use actor::{RepoActor, RepoCommand}; +pub use handle::RepoHandle; pub use manager::RepoMgr; use crate::FsConfig;