diff --git a/server/src/web/api/mod.rs b/server/src/api/mod.rs similarity index 100% rename from server/src/web/api/mod.rs rename to server/src/api/mod.rs diff --git a/server/src/web/api/pagination.rs b/server/src/api/pagination.rs similarity index 100% rename from server/src/web/api/pagination.rs rename to server/src/api/pagination.rs diff --git a/server/src/db/query/mod.rs b/server/src/db/query/mod.rs index 9eb7954..f0a809b 100644 --- a/server/src/db/query/mod.rs +++ b/server/src/db/query/mod.rs @@ -1,3 +1,5 @@ pub mod distro; pub mod package; pub mod repo; + +type Result = std::result::Result; diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index ad9d74a..bfdad73 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,7 +1,8 @@ use crate::db::{self, *}; +use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Expr, Query, SelectStatement}; +use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] diff --git a/server/src/main.rs b/server/src/main.rs index 5a91fdb..274d419 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,14 +1,17 @@ +mod api; mod cli; mod config; pub mod db; mod error; mod repo; -mod web; pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use std::{io, path::PathBuf}; +use std::{io, path::PathBuf, sync::Arc}; + +use axum::Router; +use tower_http::trace::TraceLayer; use clap::Parser; use sea_orm_migration::MigratorTrait; @@ -53,7 +56,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { let repo = match &config.fs { FsConfig::Local { data_dir } => { - crate::repo::start( + crate::repo::Handle::start( data_dir.join("repos"), db.clone(), rt.clone(), @@ -87,8 +90,12 @@ async fn run(global: Global) -> crate::Result<()> { .unwrap(); let listener = tokio::net::TcpListener::bind(domain).await?; - let app = web::router(global); - + // build our application with a single route + let app = Router::new() + .nest("/api", crate::api::router()) + .merge(crate::repo::router(&global.config.api_key)) + .with_state(global) + .layer(TraceLayer::new_for_http()); // run it with hyper on localhost:3000 Ok(axum::serve(listener, app.into_make_service()) .await diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index 2c2fd74..d76efa3 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -1,26 +1,71 @@ -use super::{archive, package, Command, SharedState}; +use super::{archive, package, Handle}; use crate::db; use std::{ - path::PathBuf, - sync::{atomic::Ordering, Arc}, + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, + }, }; use futures::StreamExt; -use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; -use sea_query::Expr; -use tokio::{runtime, sync::mpsc}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use sea_query::{Alias, Expr, Query}; +use tokio::{ + runtime, + sync::{ + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, + RwLock, + }, +}; use uuid::Uuid; +pub enum RepoCommand { + ParsePkg(i32, PathBuf), + SyncRepo(i32), + Clean, +} + +pub struct RepoSharedState { + pub repos_dir: PathBuf, + pub conn: DbConn, + pub rx: Mutex>, + pub tx: UnboundedSender, + pub repos: RwLock>)>>, +} + +impl RepoSharedState { + pub fn new( + repos_dir: impl AsRef, + conn: DbConn, + repos: HashMap>)>, + ) -> Self { + let (tx, rx) = unbounded_channel(); + + Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + rx: Mutex::new(rx), + tx, + repos: RwLock::new(repos), + } + } +} + /// The actor is responsible for mutating the repositories. They receive their commands their /// messages and process these commands in both a synchronous and asynchronous way. -pub struct Actor { +pub struct RepoActor { rt: runtime::Handle, - state: Arc, + state: Arc, } -impl Actor { - pub fn new(rt: runtime::Handle, state: Arc) -> Self { +impl RepoActor { + pub fn new(rt: runtime::Handle, state: Arc) -> Self { Self { rt, state: Arc::clone(&state), @@ -41,7 +86,7 @@ impl Actor { rx.blocking_recv() } { match msg { - Command::ParsePkg(repo, path) => { + RepoCommand::ParsePkg(repo, path) => { let _ = self.parse_pkg(repo, path); if self @@ -53,13 +98,12 @@ impl Actor { == Some(0) { let _ = self.sync_repo(repo); - let _ = self.clean(); } } - Command::SyncRepo(repo) => { + RepoCommand::SyncRepo(repo) => { let _ = self.sync_repo(repo); } - Command::Clean => { + RepoCommand::Clean => { let _ = self.clean(); } } @@ -135,19 +179,22 @@ impl Actor { // sea_orm needs its connections to be dropped inside an async context, so we spawn a task // that streams the responses to the synchronous context via message passing self.rt.spawn(async move { - match query.stream(&conn).await { - Ok(mut stream) => { - while let Some(res) = stream.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; + let stream = query.stream(&conn).await; - if is_err { - return; - } - } - } - Err(err) => { - let _ = tx.send(Err(err)).await; + if let Err(err) = stream { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut stream = stream.unwrap(); + + while let Some(res) = stream.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; } } }); @@ -186,60 +233,6 @@ impl Actor { } fn clean(&self) -> crate::Result<()> { - let (tx, mut rx) = mpsc::channel(1); - let conn = self.state.conn.clone(); - let query = db::query::package::stale_pkgs(&self.state.conn); - - // sea_orm needs its connections to be dropped inside an async context, so we spawn a task - // that streams the responses to the synchronous context via message passing - self.rt.spawn(async move { - match query.stream(&conn).await { - Ok(mut stream) => { - while let Some(res) = stream.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; - - if is_err { - return; - } - } - } - Err(err) => { - let _ = tx.send(Err(err)).await; - } - } - }); - - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - - while let Some(pkg) = rx.blocking_recv().transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = std::fs::remove_file( - self.state - .repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ); - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; - } - - if removed_pkgs > 0 { - self.rt.block_on(db::query::package::delete_stale_pkgs( - &self.state.conn, - max_id, - ))?; - } - - tracing::info!("Cleaned up {removed_pkgs} old package(s)"); - - Ok(()) + todo!() } } diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index ad08a67..973a395 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,6 +1,6 @@ use crate::db; use std::{ - io::Write, + io::{self, Write}, path::{Path, PathBuf}, }; @@ -94,21 +94,23 @@ impl RepoArchivesWriter { let conn = self.conn.clone(); let query = pkg.find_related(db::PackageFile); - self.rt.spawn(async move { - match query.stream(&conn).await { - Ok(mut stream) => { - while let Some(res) = stream.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; + let files = query.stream(&conn).await; - if is_err { - return; - } - } - } - Err(err) => { - let _ = tx.send(Err(err)).await; + if let Err(err) = files { + let _ = tx.send(Err(err)).await; + + return; + } + + let mut files = files.unwrap(); + + while let Some(res) = files.next().await { + let is_err = res.is_err(); + let _ = tx.send(res).await; + + if is_err { + return; } } }); diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index bbcc153..9e63e81 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -1,27 +1,62 @@ -use super::{Command, SharedState}; +use super::{RepoCommand, RepoSharedState}; use crate::db; use std::{ - path::PathBuf, - sync::{atomic::Ordering, Arc}, + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, RwLock, + }, }; use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, EntityTrait, NotSet, QueryFilter, QuerySelect, Set, + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use sea_query::{Alias, Expr, Query}; +use tokio::{ + runtime, + sync::mpsc::{unbounded_channel, UnboundedSender}, }; -use sea_query::Expr; use uuid::Uuid; #[derive(Clone)] pub struct Handle { - state: Arc, + state: Arc, } impl Handle { - pub fn new(state: &Arc) -> Self { - Self { - state: Arc::clone(state), + pub fn start( + repos_dir: impl AsRef, + conn: DbConn, + rt: runtime::Handle, + actors: u32, + ) -> crate::Result { + std::fs::create_dir_all(repos_dir.as_ref())?; + + let mut repos = HashMap::new(); + let repo_ids: Vec = rt.block_on( + db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn), + )?; + + for id in repo_ids { + repos.insert(id, Default::default()); } + + let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos)); + + for _ in 0..actors { + let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state)); + + std::thread::spawn(|| actor.run()); + } + + Ok(Self { state }) } pub fn random_file_paths(&self) -> [PathBuf; C] { @@ -128,17 +163,20 @@ impl Handle { } pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); + self.state + .tx + .send(RepoCommand::ParsePkg(repo, path)) + .unwrap(); self.state.repos.read().await.get(&repo).inspect(|n| { n.0.fetch_add(1, Ordering::SeqCst); }); } async fn queue_sync(&self, repo: i32) { - self.state.tx.send(Command::SyncRepo(repo)).unwrap(); + self.state.tx.send(RepoCommand::SyncRepo(repo)).unwrap(); } async fn queue_clean(&self) { - self.state.tx.send(Command::Clean).unwrap(); + self.state.tx.send(RepoCommand::Clean).unwrap(); } } diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs new file mode 100644 index 0000000..e4f0581 --- /dev/null +++ b/server/src/repo/manager.rs @@ -0,0 +1,385 @@ +use super::{archive, package}; +use crate::db::{self, query::package::delete_stale_pkgs}; + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; + +use futures::StreamExt; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, + ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, +}; +use sea_query::{Alias, Expr, Query}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Mutex, RwLock, +}; +use uuid::Uuid; + +struct PkgQueueMsg { + repo: i32, + path: PathBuf, +} + +/// A single instance of this struct orchestrates everything related to managing packages files on +/// disk for all repositories in the server +pub struct RepoMgr { + repos_dir: PathBuf, + conn: DbConn, + pkg_queue: ( + UnboundedSender, + Mutex>, + ), + repos: RwLock>)>>, +} + +impl RepoMgr { + pub async fn new>(repos_dir: P, conn: DbConn) -> crate::Result { + if !tokio::fs::try_exists(&repos_dir).await? { + tokio::fs::create_dir(&repos_dir).await?; + } + + let (tx, rx) = unbounded_channel(); + + let mut repos = HashMap::new(); + let repo_ids: Vec = db::Repo::find() + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .all(&conn) + .await?; + + for id in repo_ids { + repos.insert(id, Default::default()); + } + + Ok(Self { + repos_dir: repos_dir.as_ref().to_path_buf(), + conn, + pkg_queue: (tx, Mutex::new(rx)), + repos: RwLock::new(repos), + }) + } + + /// Generate archive databases for all known architectures in the repository, including the + /// "any" architecture. + pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> { + let lock = self + .repos + .read() + .await + .get(&repo) + .map(|(_, lock)| Arc::clone(lock)); + + if lock.is_none() { + return Ok(()); + } + + let lock = lock.unwrap(); + let _guard = lock.lock().await; + + let archs: Vec = db::Package::find() + .filter(db::package::Column::RepoId.eq(repo)) + .select_only() + .column(db::package::Column::Arch) + .distinct() + .into_tuple() + .all(&self.conn) + .await?; + + for arch in archs { + self.generate_archives(repo, &arch).await?; + } + + Ok(()) + } + + /// Generate the archive databases for the given repository and architecture. + async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { + let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = + self.random_file_paths(); + let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; + let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?; + + // Query all packages in the repo that have the given architecture or the "any" + // architecture + let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) + .stream(&self.conn) + .await?; + + let mut commited_ids: Vec = Vec::new(); + + while let Some(pkg) = pkgs.next().await.transpose()? { + commited_ids.push(pkg.id); + + let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; + let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; + + package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?; + package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?; + + let full_name = format!("{}-{}", pkg.name, pkg.version); + + ar_db + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &desc_tmp_file_path, true) + .await?; + ar_files + .add_entry(&full_name, &files_tmp_file_path, false) + .await?; + } + + // Cleanup + ar_db.close().await?; + ar_files.close().await?; + + let repo_dir = self.repos_dir.join(repo.to_string()); + + // Move the db archives to their respective places + tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; + tokio::fs::rename( + tmp_ar_files_path, + repo_dir.join(format!("{}.files.tar.gz", arch)), + ) + .await?; + + // Only after we have successfully written everything to disk do we update the database. + // This order ensures any failure can be recovered, as the database is our single source of + // truth. + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::Committed), + ) + .filter(db::package::Column::Id.is_in(commited_ids)) + .exec(&self.conn) + .await?; + + // If this fails there's no point in failing the function + if there were no packages in + // the repo, this fails anyway because the temp file doesn't exist + let _ = tokio::fs::remove_file(desc_tmp_file_path).await; + let _ = tokio::fs::remove_file(files_tmp_file_path).await; + + tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); + + Ok(()) + } + + /// Clean any remaining old package files from the database and file system + pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { + let mut pkgs = db::query::package::stale_pkgs(&self.conn) + .stream(&self.conn) + .await?; + + // Ids are monotonically increasing, so the max id suffices to know which packages to + // remove later + let mut max_id = -1; + let mut removed_pkgs = 0; + + while let Some(pkg) = pkgs.next().await.transpose()? { + // Failing to remove the package file isn't the biggest problem + let _ = tokio::fs::remove_file( + self.repos_dir + .join(pkg.repo_id.to_string()) + .join(pkg.id.to_string()), + ) + .await; + + if pkg.id > max_id { + max_id = pkg.id; + } + + removed_pkgs += 1; + } + + if removed_pkgs > 0 { + db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; + } + + tracing::info!("Removed {removed_pkgs} stale package(s)"); + + Ok(()) + } + + pub async fn pkg_parse_task(&self) { + loop { + // Receive the next message and immediately drop the mutex afterwards. As long as the + // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked + // as soon as a message is received, so another worker can pick up the mutex. + let msg = { + let mut recv = self.pkg_queue.1.lock().await; + recv.recv().await + }; + + if let Some(msg) = msg { + // TODO better handle this error (retry if failure wasn't because the package is + // faulty) + let _ = self + .add_pkg_from_path(msg.path, msg.repo) + .await + .inspect_err(|e| tracing::error!("{:?}", e)); + + let old = self + .repos + .read() + .await + .get(&msg.repo) + .map(|n| n.0.fetch_sub(1, Ordering::SeqCst)); + + // Every time the queue for a repo becomes empty, we run a sync job + if old == Some(1) { + // TODO error handling + let _ = self.sync_repo(msg.repo).await; + + // TODO move this so that we only clean if entire queue is empty, not just + // queue for specific repo + let _ = self.remove_stale_pkgs().await; + } + } + } + } + + pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { + self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); + self.repos.read().await.get(&repo).inspect(|n| { + n.0.fetch_add(1, Ordering::SeqCst); + }); + } + + pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { + Ok(db::Repo::find() + .find_also_related(db::Distro) + .filter( + Condition::all() + .add(db::repo::Column::Name.eq(repo)) + .add(db::distro::Column::Name.eq(distro)), + ) + .one(&self.conn) + .await + .map(|res| res.map(|(repo, _)| repo.id))?) + } + + pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { + let mut repos = self.repos.write().await; + + let distro_id: Option = db::Distro::find() + .filter(db::distro::Column::Name.eq(distro)) + .select_only() + .column(db::distro::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let distro_id = if let Some(id) = distro_id { + id + } else { + let new_distro = db::distro::ActiveModel { + id: NotSet, + name: Set(distro.to_string()), + description: NotSet, + }; + + new_distro.insert(&self.conn).await?.id + }; + + let repo_id: Option = db::Repo::find() + .filter(db::repo::Column::DistroId.eq(distro_id)) + .filter(db::repo::Column::Name.eq(repo)) + .select_only() + .column(db::repo::Column::Id) + .into_tuple() + .one(&self.conn) + .await?; + + let repo_id = if let Some(id) = repo_id { + id + } else { + let new_repo = db::repo::ActiveModel { + id: NotSet, + distro_id: Set(distro_id), + name: Set(repo.to_string()), + description: NotSet, + }; + let id = new_repo.insert(&self.conn).await?.id; + + tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; + repos.insert(id, Default::default()); + + id + }; + + Ok(repo_id) + } + + async fn add_pkg_from_path>(&self, path: P, repo: i32) -> crate::Result<()> { + let path_clone = path.as_ref().to_path_buf(); + let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) + .await + .unwrap()?; + + // TODO prevent database from being updated but file failing to move to repo dir? + let pkg = db::query::package::insert(&self.conn, repo, pkg).await?; + + let dest_path = self + .repos_dir + .join(repo.to_string()) + .join(pkg.id.to_string()); + tokio::fs::rename(path.as_ref(), dest_path).await?; + + tracing::info!( + "Added '{}-{}-{}' to repository {}", + pkg.name, + pkg.version, + pkg.arch, + repo, + ); + + Ok(()) + } + + pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { + self.repos.write().await.remove(&repo); + db::Repo::delete_by_id(repo).exec(&self.conn).await?; + let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; + + Ok(()) + } + + /// Remove all packages in the repository that have a given arch. This method marks all + /// packages with the given architecture as "pending deletion", before performing a manual sync + /// & removal of stale packages. + pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { + db::Package::update_many() + .col_expr( + db::package::Column::State, + Expr::value(db::PackageState::PendingDeletion), + ) + .filter( + Condition::all() + .add(db::package::Column::RepoId.eq(repo)) + .add(db::package::Column::Arch.eq(arch)), + ) + .exec(&self.conn) + .await?; + + self.sync_repo(repo).await?; + self.remove_stale_pkgs().await?; + + Ok(()) + } + + pub fn random_file_paths(&self) -> [PathBuf; C] { + std::array::from_fn(|_| { + let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); + self.repos_dir.join(uuid.to_string()) + }) + } +} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index 9920326..f48c0d7 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -3,86 +3,148 @@ mod archive; mod handle; pub mod package; -pub use actor::Actor; +pub use actor::{RepoActor, RepoCommand, RepoSharedState}; pub use handle::Handle; -use crate::db; +use crate::FsConfig; -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{atomic::AtomicU32, Arc, Mutex}, +use axum::{ + body::Body, + extract::{Path, State}, + http::{Request, StatusCode}, + response::IntoResponse, + routing::{delete, post}, + Router, }; +use futures::TryStreamExt; +use tokio_util::io::StreamReader; +use tower::util::ServiceExt; +use tower_http::{services::ServeFile, validate_request::ValidateRequestHeaderLayer}; -use sea_orm::{DbConn, EntityTrait, QuerySelect}; -use tokio::{ - runtime, - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - RwLock, - }, -}; - -pub enum Command { - ParsePkg(i32, PathBuf), - SyncRepo(i32), - Clean, +pub fn router(api_key: &str) -> Router { + Router::new() + .route( + "/:distro/:repo", + post(post_package_archive) + .delete(delete_repo) + .route_layer(ValidateRequestHeaderLayer::bearer(api_key)), + ) + .route( + "/:distro/:repo/:arch", + delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)), + ) + // Routes added after the layer do not get that layer applied, so the GET requests will not + // be authorized + .route( + "/:distro/:repo/:arch/:filename", + delete(delete_package) + .route_layer(ValidateRequestHeaderLayer::bearer(api_key)) + .get(get_file), + ) } -pub struct SharedState { - pub repos_dir: PathBuf, - pub conn: DbConn, - pub rx: Mutex>, - pub tx: UnboundedSender, - pub repos: RwLock>)>>, -} +/// Serve the package archive files and database archives. If files are requested for an +/// architecture that does not have any explicit packages, a repository containing only "any" files +/// is returned. +async fn get_file( + State(global): State, + Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, + req: Request, +) -> crate::Result { + if let Some(repo_id) = global.repo.get_repo(&distro, &repo).await? { + match global.config.fs { + FsConfig::Local { data_dir } => { + let repo_dir = data_dir.join("repos").join(repo_id.to_string()); -impl SharedState { - pub fn new( - repos_dir: impl AsRef, - conn: DbConn, - repos: HashMap>)>, - ) -> Self { - let (tx, rx) = unbounded_channel(); + let file_name = if file_name == format!("{}.db", repo) + || file_name == format!("{}.db.tar.gz", repo) + { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; - Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - rx: Mutex::new(rx), - tx, - repos: RwLock::new(repos), + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) + } } + } else { + Err(StatusCode::NOT_FOUND.into()) } } -pub fn start( - repos_dir: impl AsRef, - conn: DbConn, - rt: runtime::Handle, - actors: u32, -) -> crate::Result { - std::fs::create_dir_all(repos_dir.as_ref())?; +async fn post_package_archive( + State(global): State, + Path((distro, repo)): Path<(String, String)>, + body: Body, +) -> crate::Result { + let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?; - let mut repos = HashMap::new(); - let repo_ids: Vec = rt.block_on( - db::Repo::find() - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .all(&conn), - )?; + let [tmp_path] = global.repo.random_file_paths(); + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; + let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); + tokio::io::copy(&mut body, &mut tmp_file).await?; - for id in repo_ids { - repos.insert(id, Default::default()); - } + global.repo.queue_pkg(repo_id, tmp_path).await; - let state = Arc::new(SharedState::new(repos_dir, conn, repos)); - - for _ in 0..actors { - let actor = Actor::new(rt.clone(), Arc::clone(&state)); - - std::thread::spawn(|| actor.run()); - } - - Ok(Handle::new(&state)) + Ok(StatusCode::ACCEPTED) +} + +async fn delete_repo( + State(global): State, + Path((distro, repo)): Path<(String, String)>, +) -> crate::Result { + if let Some(repo) = global.repo.get_repo(&distro, &repo).await? { + global.repo.remove_repo(repo).await?; + + tracing::info!("Removed repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } +} + +async fn delete_arch_repo( + State(global): State, + Path((distro, repo, arch)): Path<(String, String, String)>, +) -> crate::Result { + if let Some(repo) = global.repo.get_repo(&distro, &repo).await? { + global.repo.remove_repo_arch(repo, &arch).await?; + + tracing::info!("Removed architecture '{arch}' from repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } +} + +async fn delete_package( + State(global): State, + Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, +) -> crate::Result { + Ok(StatusCode::NOT_FOUND) + //if let Some(mgr) = global.mgr.get_mgr(&distro).await { + // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; + // + // if pkg_removed { + // tracing::info!( + // "Removed package '{}' ({}) from repository '{}'", + // pkg_name, + // arch, + // repo + // ); + // + // Ok(StatusCode::OK) + // } else { + // Ok(StatusCode::NOT_FOUND) + // } + //} else { + // Ok(StatusCode::NOT_FOUND) + //} } diff --git a/server/src/repo/package.rs b/server/src/repo/package.rs index 70466ba..df98559 100644 --- a/server/src/repo/package.rs +++ b/server/src/repo/package.rs @@ -1,17 +1,19 @@ -use crate::db::entities::package; +use crate::db::{self, entities::package, PackageRelatedEnum}; use std::{ fmt, fs, - io::{self, BufRead, BufReader, Read}, + io::{self, BufRead, BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, }; use chrono::NaiveDateTime; +use futures::StreamExt; use libarchive::{ read::{Archive, Builder}, Entry, ReadFilter, }; -use sea_orm::ActiveValue::Set; +use sea_orm::{ActiveValue::Set, ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; const IGNORED_FILES: [&str; 5] = [".BUILDINFO", ".INSTALL", ".MTREE", ".PKGINFO", ".CHANGELOG"]; @@ -202,6 +204,74 @@ impl Package { self.compression.extension().unwrap() ) } + + /// Write the formatted desc file to the provided writer + pub fn write_desc(&self, w: &mut W) -> io::Result<()> { + // We write a lot of small strings to the writer, so wrapping it in a BufWriter is + // beneficial + let mut w = BufWriter::new(w); + + let info = &self.info; + + writeln!(w, "%FILENAME%\n{}", self.file_name())?; + + let mut write = |key: &str, value: &str| { + if !value.is_empty() { + writeln!(w, "\n%{}%\n{}", key, value) + } else { + Ok(()) + } + }; + + write("NAME", &info.name)?; + write("BASE", &info.base)?; + write("VERSION", &info.version)?; + + if let Some(ref description) = info.description { + write("DESC", description)?; + } + write("GROUPS", &info.groups.join("\n"))?; + write("CSIZE", &info.csize.to_string())?; + write("ISIZE", &info.size.to_string())?; + + write("SHA256SUM", &info.sha256sum)?; + + if let Some(ref url) = info.url { + write("URL", url)?; + } + + write("LICENSE", &info.licenses.join("\n"))?; + write("ARCH", &info.arch)?; + write("BUILDDATE", &info.build_date.timestamp().to_string())?; + + if let Some(ref packager) = info.packager { + write("PACKAGER", packager)?; + } + + write("REPLACES", &info.replaces.join("\n"))?; + write("CONFLICTS", &info.conflicts.join("\n"))?; + write("PROVIDES", &info.provides.join("\n"))?; + write("DEPENDS", &info.depends.join("\n"))?; + write("OPTDEPENDS", &info.optdepends.join("\n"))?; + write("MAKEDEPENDS", &info.makedepends.join("\n"))?; + write("CHECKDEPENDS", &info.checkdepends.join("\n"))?; + + Ok(()) + } + + pub fn write_files(&self, w: &mut W) -> io::Result<()> { + // We write a lot of small strings to the writer, so wrapping it in a BufWriter is + // beneficial + let mut w = BufWriter::new(w); + + writeln!(w, "%FILES%")?; + + for file in &self.files { + writeln!(w, "{}", file.to_string_lossy())?; + } + + Ok(()) + } } impl From for package::ActiveModel { @@ -233,3 +303,123 @@ pub fn filename(pkg: &package::Model) -> String { pkg.name, pkg.version, pkg.arch, pkg.compression ) } + +async fn write_attribute( + writer: &mut W, + key: &str, + value: &str, +) -> io::Result<()> { + if !value.is_empty() { + let s = format!("\n%{}%\n{}\n", key, value); + writer.write_all(s.as_bytes()).await?; + } + + Ok(()) +} + +pub async fn write_desc( + conn: &DbConn, + writer: &mut W, + pkg: &package::Model, +) -> crate::Result<()> { + writer + .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes()) + .await?; + + write_attribute(writer, "NAME", &pkg.name).await?; + write_attribute(writer, "BASE", &pkg.base).await?; + write_attribute(writer, "VERSION", &pkg.version).await?; + + if let Some(ref description) = pkg.description { + write_attribute(writer, "DESC", description).await?; + } + + let groups: Vec = pkg + .find_related(db::PackageGroup) + .select_only() + .column(db::package_group::Column::Name) + .into_tuple() + .all(conn) + .await?; + write_attribute(writer, "GROUPS", &groups.join("\n")).await?; + + write_attribute(writer, "CSIZE", &pkg.c_size.to_string()).await?; + write_attribute(writer, "ISIZE", &pkg.size.to_string()).await?; + write_attribute(writer, "SHA256SUM", &pkg.sha256_sum).await?; + + if let Some(ref url) = pkg.url { + write_attribute(writer, "URL", url).await?; + } + + let licenses: Vec = pkg + .find_related(db::PackageLicense) + .select_only() + .column(db::package_license::Column::Name) + .into_tuple() + .all(conn) + .await?; + write_attribute(writer, "LICENSE", &licenses.join("\n")).await?; + + write_attribute(writer, "ARCH", &pkg.arch).await?; + + // TODO build date + write_attribute( + writer, + "BUILDDATE", + &pkg.build_date.and_utc().timestamp().to_string(), + ) + .await?; + + if let Some(ref packager) = pkg.packager { + write_attribute(writer, "PACKAGER", packager).await?; + } + + let related = [ + ("REPLACES", PackageRelatedEnum::Replaces), + ("CONFLICTS", PackageRelatedEnum::Conflicts), + ("PROVIDES", PackageRelatedEnum::Provides), + ("DEPENDS", PackageRelatedEnum::Depend), + ("OPTDEPENDS", PackageRelatedEnum::Optdepend), + ("MAKEDEPENDS", PackageRelatedEnum::Makedepend), + ("CHECKDEPENDS", PackageRelatedEnum::Checkdepend), + ]; + + for (key, attr) in related.into_iter() { + let items: Vec = pkg + .find_related(db::PackageRelated) + .filter(db::package_related::Column::Type.eq(attr)) + .select_only() + .column(db::package_related::Column::Name) + .into_tuple() + .all(conn) + .await?; + + write_attribute(writer, key, &items.join("\n")).await?; + } + + writer.flush().await?; + + Ok(()) +} + +pub async fn write_files( + conn: &DbConn, + writer: &mut W, + pkg: &package::Model, +) -> crate::Result<()> { + let line = "%FILES%\n"; + writer.write_all(line.as_bytes()).await?; + + // Generate the files list for the package + let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; + + while let Some(file) = files.next().await.transpose()? { + writer + .write_all(format!("{}\n", file.path).as_bytes()) + .await?; + } + + writer.flush().await?; + + Ok(()) +} diff --git a/server/src/web/mod.rs b/server/src/web/mod.rs deleted file mode 100644 index 48e9cbb..0000000 --- a/server/src/web/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod api; -mod repo; - -use axum::Router; -use tower_http::trace::TraceLayer; - -pub fn router(global: crate::Global) -> Router { - Router::new() - .nest("/api", api::router()) - .merge(repo::router(&global.config.api_key)) - .with_state(global) - .layer(TraceLayer::new_for_http()) -} diff --git a/server/src/web/repo.rs b/server/src/web/repo.rs deleted file mode 100644 index d690895..0000000 --- a/server/src/web/repo.rs +++ /dev/null @@ -1,142 +0,0 @@ -use crate::FsConfig; - -use axum::{ - body::Body, - extract::{Path, State}, - http::{Request, StatusCode}, - response::IntoResponse, - routing::{delete, post}, - Router, -}; -use futures::TryStreamExt; -use tokio_util::io::StreamReader; -use tower::util::ServiceExt; -use tower_http::{services::ServeFile, validate_request::ValidateRequestHeaderLayer}; - -pub fn router(api_key: &str) -> Router { - Router::new() - .route( - "/:distro/:repo", - post(post_package_archive) - .delete(delete_repo) - .route_layer(ValidateRequestHeaderLayer::bearer(api_key)), - ) - .route( - "/:distro/:repo/:arch", - delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)), - ) - // Routes added after the layer do not get that layer applied, so the GET requests will not - // be authorized - .route( - "/:distro/:repo/:arch/:filename", - delete(delete_package) - .route_layer(ValidateRequestHeaderLayer::bearer(api_key)) - .get(get_file), - ) -} - -/// Serve the package archive files and database archives. If files are requested for an -/// architecture that does not have any explicit packages, a repository containing only "any" files -/// is returned. -async fn get_file( - State(global): State, - Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, - req: Request, -) -> crate::Result { - if let Some(repo_id) = global.repo.get_repo(&distro, &repo).await? { - match global.config.fs { - FsConfig::Local { data_dir } => { - let repo_dir = data_dir.join("repos").join(repo_id.to_string()); - - let file_name = if file_name == format!("{}.db", repo) - || file_name == format!("{}.db.tar.gz", repo) - { - format!("{}.db.tar.gz", arch) - } else if file_name == format!("{}.files", repo) - || file_name == format!("{}.files.tar.gz", repo) - { - format!("{}.files.tar.gz", arch) - } else { - file_name - }; - - let path = repo_dir.join(file_name); - Ok(ServeFile::new(path).oneshot(req).await) - } - } - } else { - Err(StatusCode::NOT_FOUND.into()) - } -} - -async fn post_package_archive( - State(global): State, - Path((distro, repo)): Path<(String, String)>, - body: Body, -) -> crate::Result { - let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?; - - let [tmp_path] = global.repo.random_file_paths(); - let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; - let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); - tokio::io::copy(&mut body, &mut tmp_file).await?; - - global.repo.queue_pkg(repo_id, tmp_path).await; - - Ok(StatusCode::ACCEPTED) -} - -async fn delete_repo( - State(global): State, - Path((distro, repo)): Path<(String, String)>, -) -> crate::Result { - if let Some(repo) = global.repo.get_repo(&distro, &repo).await? { - global.repo.remove_repo(repo).await?; - - tracing::info!("Removed repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } -} - -async fn delete_arch_repo( - State(global): State, - Path((distro, repo, arch)): Path<(String, String, String)>, -) -> crate::Result { - if let Some(repo) = global.repo.get_repo(&distro, &repo).await? { - global.repo.remove_repo_arch(repo, &arch).await?; - - tracing::info!("Removed architecture '{arch}' from repository {repo}"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) - } -} - -async fn delete_package( - State(global): State, - Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, -) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(mgr) = global.mgr.get_mgr(&distro).await { - // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?; - // - // if pkg_removed { - // tracing::info!( - // "Removed package '{}' ({}) from repository '{}'", - // pkg_name, - // arch, - // repo - // ); - // - // Ok(StatusCode::OK) - // } else { - // Ok(StatusCode::NOT_FOUND) - // } - //} else { - // Ok(StatusCode::NOT_FOUND) - //} -}