diff --git a/server/src/cli.rs b/server/src/cli.rs index 5e8469e..c6998eb 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,6 +1,12 @@ -use std::path::PathBuf; +use crate::{Config, FsConfig, Global}; +use std::{io, path::PathBuf, sync::Arc}; + +use axum::Router; use clap::Parser; +use sea_orm_migration::MigratorTrait; +use tower_http::trace::TraceLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -13,3 +19,57 @@ pub struct Cli { )] pub config_file: PathBuf, } + +impl Cli { + pub async fn run(&self) -> crate::Result<()> { + let config: Config = Config::figment(&self.config_file) + .extract() + .inspect_err(|e| tracing::error!("{}", e))?; + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) + .with(tracing_subscriber::fmt::layer()) + .init(); + + tracing::info!("Connecting to database"); + let db = crate::db::connect(&config.db).await?; + + crate::db::Migrator::up(&db, None).await?; + + let mgr = match &config.fs { + FsConfig::Local { data_dir } => { + crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await? + } + }; + + let mgr = Arc::new(mgr); + + for _ in 0..config.pkg_workers { + let clone = Arc::clone(&mgr); + + tokio::spawn(async move { clone.pkg_parse_task().await }); + } + + let global = Global { + config: config.clone(), + mgr, + db, + }; + + // build our application with a single route + let app = Router::new() + .nest("/api", crate::api::router()) + .merge(crate::repo::router(&config.api_key)) + .with_state(global) + .layer(TraceLayer::new_for_http()); + + let domain: String = format!("{}:{}", config.domain, config.port) + .parse() + .unwrap(); + let listener = tokio::net::TcpListener::bind(domain).await?; + // run it with hyper on localhost:3000 + Ok(axum::serve(listener, app.into_make_service()) + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 274d419..eb1c3d0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,96 +8,21 @@ mod repo; pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use std::{io, path::PathBuf, sync::Arc}; - -use axum::Router; -use tower_http::trace::TraceLayer; +use std::sync::Arc; use clap::Parser; -use sea_orm_migration::MigratorTrait; -use tokio::runtime; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub const ANY_ARCH: &'static str = "any"; #[derive(Clone)] pub struct Global { config: crate::config::Config, - repo: repo::Handle, + mgr: Arc, db: sea_orm::DbConn, } -fn main() -> crate::Result<()> { - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - let handle = rt.handle(); - +#[tokio::main] +async fn main() -> crate::Result<()> { let cli = cli::Cli::parse(); - let global = setup(handle, cli.config_file)?; - - handle.block_on(run(global)) -} - -fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result { - let config: Config = Config::figment(config_file) - .extract() - .inspect_err(|e| tracing::error!("{}", e))?; - - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(config.log_level.clone())) - .with(tracing_subscriber::fmt::layer()) - .init(); - - tracing::info!("Connecting to database"); - let db = rt.block_on(crate::db::connect(&config.db))?; - rt.block_on(crate::db::Migrator::up(&db, None))?; - - let repo = match &config.fs { - FsConfig::Local { data_dir } => { - crate::repo::Handle::start( - data_dir.join("repos"), - db.clone(), - rt.clone(), - config.pkg_workers, - )? - //rt.block_on(crate::repo::RepoMgr::new( - // data_dir.join("repos"), - // db.clone(), - //))? - //RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())? - } - }; - //let mgr = Arc::new(mgr); - // - //for _ in 0..config.pkg_workers { - // let clone = Arc::clone(&mgr); - // - // rt.spawn(async move { clone.pkg_parse_task().await }); - //} - - Ok(Global { - config: config.clone(), - repo, - db, - }) -} - -async fn run(global: Global) -> crate::Result<()> { - let domain: String = format!("{}:{}", &global.config.domain, global.config.port) - .parse() - .unwrap(); - let listener = tokio::net::TcpListener::bind(domain).await?; - - // build our application with a single route - let app = Router::new() - .nest("/api", crate::api::router()) - .merge(crate::repo::router(&global.config.api_key)) - .with_state(global) - .layer(TraceLayer::new_for_http()); - // run it with hyper on localhost:3000 - Ok(axum::serve(listener, app.into_make_service()) - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?) + cli.run().await } diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs deleted file mode 100644 index b90fcee..0000000 --- a/server/src/repo/actor.rs +++ /dev/null @@ -1,226 +0,0 @@ -use super::{archive, package, Handle}; -use crate::db; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, - }, -}; - -use futures::StreamExt; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use sea_query::{Alias, Expr, Query}; -use tokio::{ - runtime, - sync::{ - mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, - RwLock, - }, -}; -use uuid::Uuid; - -pub enum RepoCommand { - ParsePkg(i32, PathBuf), -} - -pub struct RepoSharedState { - pub repos_dir: PathBuf, - pub conn: DbConn, - pub rx: Mutex>, - pub tx: UnboundedSender, - pub repos: RwLock>)>>, -} - -impl RepoSharedState { - pub fn new( - repos_dir: impl AsRef, - conn: DbConn, - repos: HashMap>)>, - ) -> Self { - let (tx, rx) = unbounded_channel(); - - Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - rx: Mutex::new(rx), - tx, - repos: RwLock::new(repos), - } - } -} - -/// The actor is responsible for mutating the repositories. They receive their commands their -/// messages and process these commands in both a synchronous and asynchronous way. -pub struct RepoActor { - rt: runtime::Handle, - state: Arc, -} - -impl RepoActor { - pub fn new(rt: runtime::Handle, state: Arc) -> Self { - Self { - rt, - state: Arc::clone(&state), - } - } - - pub fn random_file_paths(&self) -> [PathBuf; C] { - std::array::from_fn(|_| { - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.state.repos_dir.join(uuid.to_string()) - }) - } - - /// Run the main actor loop - pub fn run(self) { - while let Some(msg) = { - let mut rx = self.state.rx.lock().unwrap(); - rx.blocking_recv() - } { - match msg { - RepoCommand::ParsePkg(repo, path) => { - let _ = self.parse_pkg(repo, path); - - if self - .state - .repos - .blocking_read() - .get(&repo) - .map(|n| n.0.load(Ordering::SeqCst)) - == Some(0) - { - let _ = self.sync_repo(repo); - } - } - } - } - } - - /// Parse a queued package for the given repository. - fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> { - let pkg = package::Package::open(&path)?; - let pkg = self - .rt - .block_on(db::query::package::insert(&self.state.conn, repo, pkg))?; - let dest_path = self - .state - .repos_dir - .join(repo.to_string()) - .join(pkg.id.to_string()); - std::fs::rename(path, dest_path)?; - - tracing::info!( - "Added '{}-{}-{}' to repository {}", - pkg.name, - pkg.version, - pkg.arch, - repo, - ); - - self.state.repos.blocking_read().get(&repo).inspect(|n| { - n.0.fetch_sub(1, Ordering::SeqCst); - }); - - Ok(()) - } - - fn sync_repo(&self, repo: i32) -> crate::Result<()> { - let repos = self.state.repos.blocking_read(); - - if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) { - let archs: Vec = self.rt.block_on( - db::Package::find() - .filter(db::package::Column::RepoId.eq(repo)) - .select_only() - .column(db::package::Column::Arch) - .distinct() - .into_tuple() - .all(&self.state.conn), - )?; - - for arch in archs { - self.generate_archives(repo, &arch)?; - } - } - - Ok(()) - } - - fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { - let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths(); - - let mut ars = archive::RepoArchivesWriter::new( - &tmp_ar_db_path, - &tmp_ar_files_path, - self.random_file_paths(), - &self.rt, - &self.state.conn, - )?; - - let (tx, mut rx) = mpsc::channel(1); - - let conn = self.state.conn.clone(); - let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch); - - // sea_orm needs its connections to be dropped inside an async context, so we spawn a task - // that streams the responses to the synchronous context via message passing - self.rt.spawn(async move { - let stream = query.stream(&conn).await; - - if let Err(err) = stream { - let _ = tx.send(Err(err)).await; - - return; - } - - let mut stream = stream.unwrap(); - - while let Some(res) = stream.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; - - if is_err { - return; - } - } - }); - - let mut committed_ids: Vec = Vec::new(); - - while let Some(pkg) = rx.blocking_recv().transpose()? { - committed_ids.push(pkg.id); - ars.append_pkg(&pkg)?; - } - - ars.close()?; - - // Move newly generated package archives to their correct place - let repo_dir = self.state.repos_dir.join(repo.to_string()); - std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?; - std::fs::rename( - tmp_ar_files_path, - repo_dir.join(format!("{}.files.tar.gz", arch)), - )?; - - // Update the state for the newly committed packages - self.rt.block_on( - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::Committed), - ) - .filter(db::package::Column::Id.is_in(committed_ids)) - .exec(&self.state.conn), - )?; - - tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); - - Ok(()) - } -} diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index 973a395..a979c09 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,222 +1,78 @@ -use crate::db; use std::{ - io::{self, Write}, + io, path::{Path, PathBuf}, + sync::{Arc, Mutex}, }; -use futures::StreamExt; use libarchive::{ write::{Builder, FileWriter, WriteEntry}, Entry, WriteFilter, WriteFormat, }; -use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::{runtime, sync::mpsc}; -pub struct RepoArchivesWriter { - ar_db: FileWriter, - ar_files: FileWriter, - rt: runtime::Handle, - conn: DbConn, - tmp_paths: [PathBuf; 2], +/// Struct to abstract away the intrinsics of writing entries to an archive file +pub struct RepoArchiveWriter { + ar: Arc>, } -impl RepoArchivesWriter { - pub fn new( - ar_db_path: impl AsRef, - ar_files_path: impl AsRef, - tmp_paths: [impl AsRef; 2], - rt: &runtime::Handle, - conn: &sea_orm::DbConn, - ) -> crate::Result { - let ar_db = Self::open_ar(ar_db_path)?; - let ar_files = Self::open_ar(ar_files_path)?; +impl RepoArchiveWriter { + pub async fn open>(path: P) -> io::Result { + let path = PathBuf::from(path.as_ref()); + + // Open the archive file + let ar = tokio::task::spawn_blocking(move || { + let mut builder = Builder::new(); + builder.add_filter(WriteFilter::Gzip)?; + builder.set_format(WriteFormat::PaxRestricted)?; + + builder.open_file(path) + }) + .await + .unwrap()?; Ok(Self { - ar_db, - ar_files, - rt: rt.clone(), - conn: conn.clone(), - tmp_paths: [ - tmp_paths[0].as_ref().to_path_buf(), - tmp_paths[1].as_ref().to_path_buf(), - ], + // In practice, mutex is only ever used by one thread at a time. It's simply here so we + // can use spawn_blocking without issues. + ar: Arc::new(Mutex::new(ar)), }) } - fn open_ar(path: impl AsRef) -> crate::Result { - let mut builder = Builder::new(); - builder.add_filter(WriteFilter::Gzip)?; - builder.set_format(WriteFormat::PaxRestricted)?; - - Ok(builder.open_file(path)?) - } - - fn append_entry( - ar: &mut FileWriter, - src_path: impl AsRef, - dest_path: impl AsRef, - ) -> crate::Result<()> { - let metadata = std::fs::metadata(&src_path)?; + /// Add either a "desc" or "files" entry to the archive + pub async fn add_entry>( + &self, + full_name: &str, + path: P, + desc: bool, + ) -> io::Result<()> { + let metadata = tokio::fs::metadata(&path).await?; let file_size = metadata.len(); - let mut ar_entry = WriteEntry::new(); - ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); + let ar = Arc::clone(&self.ar); + let full_name = String::from(full_name); + let path = PathBuf::from(path.as_ref()); - ar_entry.set_pathname(dest_path); - ar_entry.set_mode(0o100644); - ar_entry.set_size(file_size.try_into().unwrap()); + Ok(tokio::task::spawn_blocking(move || { + let mut ar_entry = WriteEntry::new(); + ar_entry.set_filetype(libarchive::archive::FileType::RegularFile); - Ok(ar.append_path(&mut ar_entry, src_path)?) + let name = if desc { "desc" } else { "files" }; + + ar_entry.set_pathname(PathBuf::from(full_name).join(name)); + ar_entry.set_mode(0o100644); + ar_entry.set_size(file_size.try_into().unwrap()); + + ar.lock().unwrap().append_path(&mut ar_entry, path) + }) + .await + .unwrap()?) } - pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> { - self.write_desc(&self.tmp_paths[0], pkg)?; - self.write_files(&self.tmp_paths[1], pkg)?; + pub async fn close(&self) -> io::Result<()> { + let ar = Arc::clone(&self.ar); - let full_name = format!("{}-{}", pkg.name, pkg.version); - let dest_desc_path = format!("{}/desc", full_name); - let dest_files_path = format!("{}/files", full_name); - - Self::append_entry(&mut self.ar_db, &self.tmp_paths[0], &dest_desc_path)?; - Self::append_entry(&mut self.ar_files, &self.tmp_paths[0], &dest_desc_path)?; - Self::append_entry(&mut self.ar_files, &self.tmp_paths[1], &dest_files_path)?; - - Ok(()) - } - - /// Generate a "files" archive entry for the package in the given path - fn write_files(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { - let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); - - writeln!(f, "%FILES%")?; - - let (tx, mut rx) = mpsc::channel(1); - - let conn = self.conn.clone(); - let query = pkg.find_related(db::PackageFile); - self.rt.spawn(async move { - let files = query.stream(&conn).await; - - if let Err(err) = files { - let _ = tx.send(Err(err)).await; - - return; - } - - let mut files = files.unwrap(); - - while let Some(res) = files.next().await { - let is_err = res.is_err(); - let _ = tx.send(res).await; - - if is_err { - return; - } - } - }); - - while let Some(file) = rx.blocking_recv().transpose()? { - writeln!(f, "{}", file.path)?; - } - - f.flush()?; - Ok(()) - } - - fn write_desc(&self, path: impl AsRef, pkg: &db::package::Model) -> crate::Result<()> { - let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); - - writeln!(f, "%FILENAME%\n{}", pkg.id)?; - - let mut write_attr = |k: &str, v: &str| { - if !v.is_empty() { - writeln!(f, "\n%{}%\n{}", k, v) - } else { - Ok(()) - } - }; - - write_attr("NAME", &pkg.name)?; - write_attr("BASE", &pkg.base)?; - write_attr("VERSION", &pkg.version)?; - - if let Some(ref desc) = pkg.description { - write_attr("DESC", desc)?; - } - - let groups: Vec = self.rt.block_on( - pkg.find_related(db::PackageGroup) - .select_only() - .column(db::package_group::Column::Name) - .into_tuple() - .all(&self.conn), - )?; - - write_attr("GROUPS", &groups.join("\n"))?; - - write_attr("CSIZE", &pkg.c_size.to_string())?; - write_attr("ISIZE", &pkg.size.to_string())?; - write_attr("SHA256SUM", &pkg.sha256_sum)?; - - if let Some(ref url) = pkg.url { - write_attr("URL", url)?; - } - - let licenses: Vec = self.rt.block_on( - pkg.find_related(db::PackageLicense) - .select_only() - .column(db::package_license::Column::Name) - .into_tuple() - .all(&self.conn), - )?; - write_attr("LICENSE", &licenses.join("\n"))?; - - write_attr("ARCH", &pkg.arch)?; - - // TODO build date - write_attr( - "BUILDDATE", - &pkg.build_date.and_utc().timestamp().to_string(), - )?; - - if let Some(ref packager) = pkg.packager { - write_attr("PACKAGER", packager)?; - } - - let related = [ - ("REPLACES", db::PackageRelatedEnum::Replaces), - ("CONFLICTS", db::PackageRelatedEnum::Conflicts), - ("PROVIDES", db::PackageRelatedEnum::Provides), - ("DEPENDS", db::PackageRelatedEnum::Depend), - ("OPTDEPENDS", db::PackageRelatedEnum::Optdepend), - ("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend), - ("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend), - ]; - - for (key, attr) in related.into_iter() { - let items: Vec = self.rt.block_on( - pkg.find_related(db::PackageRelated) - .filter(db::package_related::Column::Type.eq(attr)) - .select_only() - .column(db::package_related::Column::Name) - .into_tuple() - .all(&self.conn), - )?; - - write_attr(key, &items.join("\n"))?; - } - - f.flush()?; - Ok(()) - } - - pub fn close(&mut self) -> crate::Result<()> { - self.ar_db.close()?; - self.ar_files.close()?; - - let _ = std::fs::remove_file(&self.tmp_paths[0])?; - let _ = std::fs::remove_file(&self.tmp_paths[1])?; - - Ok(()) + Ok( + tokio::task::spawn_blocking(move || ar.lock().unwrap().close()) + .await + .unwrap()?, + ) } } diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs deleted file mode 100644 index 262f274..0000000 --- a/server/src/repo/handle.rs +++ /dev/null @@ -1,129 +0,0 @@ -use super::{RepoCommand, RepoSharedState}; -use crate::db; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, RwLock, - }, -}; - -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use tokio::{ - runtime, - sync::mpsc::{unbounded_channel, UnboundedSender}, -}; -use uuid::Uuid; - -#[derive(Clone)] -pub struct Handle { - state: Arc, -} - -impl Handle { - pub fn start( - repos_dir: impl AsRef, - conn: DbConn, - rt: runtime::Handle, - actors: u32, - ) -> crate::Result { - std::fs::create_dir_all(repos_dir.as_ref())?; - - let mut repos = HashMap::new(); - let repo_ids: Vec = rt.block_on( - db::Repo::find() - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .all(&conn), - )?; - - for id in repo_ids { - repos.insert(id, Default::default()); - } - - let state = Arc::new(RepoSharedState::new(repos_dir, conn, repos)); - - for _ in 0..actors { - let actor = super::RepoActor::new(rt.clone(), Arc::clone(&state)); - - std::thread::spawn(|| actor.run()); - } - - Ok(Self { state }) - } - - pub fn random_file_paths(&self) -> [PathBuf; C] { - std::array::from_fn(|_| { - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.state.repos_dir.join(uuid.to_string()) - }) - } - - pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { - let mut repos = self.state.repos.write().await; - - let distro_id: Option = db::Distro::find() - .filter(db::distro::Column::Name.eq(distro)) - .select_only() - .column(db::distro::Column::Id) - .into_tuple() - .one(&self.state.conn) - .await?; - - let distro_id = if let Some(id) = distro_id { - id - } else { - let new_distro = db::distro::ActiveModel { - id: NotSet, - name: Set(distro.to_string()), - description: NotSet, - }; - - new_distro.insert(&self.state.conn).await?.id - }; - - let repo_id: Option = db::Repo::find() - .filter(db::repo::Column::DistroId.eq(distro_id)) - .filter(db::repo::Column::Name.eq(repo)) - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .one(&self.state.conn) - .await?; - - let repo_id = if let Some(id) = repo_id { - id - } else { - let new_repo = db::repo::ActiveModel { - id: NotSet, - distro_id: Set(distro_id), - name: Set(repo.to_string()), - description: NotSet, - }; - let id = new_repo.insert(&self.state.conn).await?.id; - - tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?; - repos.insert(id, Default::default()); - - id - }; - - Ok(repo_id) - } - - pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.state - .tx - .send(RepoCommand::ParsePkg(repo, path)) - .unwrap(); - self.state.repos.read().await.get(&repo).inspect(|n| { - n.0.fetch_add(1, Ordering::SeqCst); - }); - } -} diff --git a/server/src/repo/mod.rs b/server/src/repo/mod.rs index e8b65e3..953b631 100644 --- a/server/src/repo/mod.rs +++ b/server/src/repo/mod.rs @@ -1,10 +1,8 @@ -mod actor; mod archive; -mod handle; +mod manager; pub mod package; -pub use actor::{RepoActor, RepoCommand, RepoSharedState}; -pub use handle::Handle; +pub use manager::RepoMgr; use crate::FsConfig; @@ -51,31 +49,30 @@ async fn get_file( Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, req: Request, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { - // match global.config.fs { - // FsConfig::Local { data_dir } => { - // let repo_dir = data_dir.join("repos").join(repo_id.to_string()); - // - // let file_name = if file_name == format!("{}.db", repo) - // || file_name == format!("{}.db.tar.gz", repo) - // { - // format!("{}.db.tar.gz", arch) - // } else if file_name == format!("{}.files", repo) - // || file_name == format!("{}.files.tar.gz", repo) - // { - // format!("{}.files.tar.gz", arch) - // } else { - // file_name - // }; - // - // let path = repo_dir.join(file_name); - // Ok(ServeFile::new(path).oneshot(req).await) - // } - // } - //} else { - // Err(StatusCode::NOT_FOUND.into()) - //} + if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? { + match global.config.fs { + FsConfig::Local { data_dir } => { + let repo_dir = data_dir.join("repos").join(repo_id.to_string()); + + let file_name = if file_name == format!("{}.db", repo) + || file_name == format!("{}.db.tar.gz", repo) + { + format!("{}.db.tar.gz", arch) + } else if file_name == format!("{}.files", repo) + || file_name == format!("{}.files.tar.gz", repo) + { + format!("{}.files.tar.gz", arch) + } else { + file_name + }; + + let path = repo_dir.join(file_name); + Ok(ServeFile::new(path).oneshot(req).await) + } + } + } else { + Err(StatusCode::NOT_FOUND.into()) + } } async fn post_package_archive( @@ -83,14 +80,14 @@ async fn post_package_archive( Path((distro, repo)): Path<(String, String)>, body: Body, ) -> crate::Result { - let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?; - - let [tmp_path] = global.repo.random_file_paths(); - let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); + let repo = global.mgr.get_or_create_repo(&distro, &repo).await?; + let [tmp_path] = global.mgr.random_file_paths(); + + let mut tmp_file = tokio::fs::File::create(&tmp_path).await?; tokio::io::copy(&mut body, &mut tmp_file).await?; - global.repo.queue_pkg(repo_id, tmp_path).await; + global.mgr.queue_pkg(repo, tmp_path).await; Ok(StatusCode::ACCEPTED) } @@ -99,32 +96,30 @@ async fn delete_repo( State(global): State, Path((distro, repo)): Path<(String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - // global.mgr.remove_repo(repo).await?; - // - // tracing::info!("Removed repository {repo}"); - // - // Ok(StatusCode::OK) - //} else { - // Ok(StatusCode::NOT_FOUND) - //} + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo(repo).await?; + + tracing::info!("Removed repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } } async fn delete_arch_repo( State(global): State, Path((distro, repo, arch)): Path<(String, String, String)>, ) -> crate::Result { - Ok(StatusCode::NOT_FOUND) - //if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { - // global.mgr.remove_repo_arch(repo, &arch).await?; - // - // tracing::info!("Removed architecture '{arch}' from repository {repo}"); - // - // Ok(StatusCode::OK) - //} else { - // Ok(StatusCode::NOT_FOUND) - //} + if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? { + global.mgr.remove_repo_arch(repo, &arch).await?; + + tracing::info!("Removed architecture '{arch}' from repository {repo}"); + + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } //if let Some(mgr) = global.mgr.get_mgr(&distro).await { // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; //