feat: started using new meta repo manager

This commit is contained in:
Jef Roosens 2024-05-25 22:53:46 +02:00
parent c5ef7c3c28
commit f209c81759
Signed by: Jef Roosens
GPG key ID: B75D4F293C7052DB
6 changed files with 168 additions and 181 deletions

View file

@ -7,10 +7,11 @@ authors = ["Jef Roosens"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = { version = "0.7.5", features = ["http2"] }
axum = { version = "0.7.5", features = ["http2", "macros"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.12", features = ["env", "derive"] }
futures = "0.3.28"
http-body-util = "0.1.1"
libarchive = { path = "../libarchive" }
sea-orm-migration = "0.12.1"
serde = { version = "1.0.178", features = ["derive"] }

View file

@ -1,4 +1,4 @@
use crate::repo::RepoGroupManager;
use crate::repo::{MetaRepoMgr, RepoGroupManager};
use crate::{Config, Global};
use axum::extract::FromRef;
@ -7,7 +7,8 @@ use clap::Parser;
use sea_orm_migration::MigratorTrait;
use std::io;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::sync::RwLock;
use tower_http::trace::TraceLayer;
use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -48,7 +49,7 @@ pub struct Cli {
pub log: String,
}
impl FromRef<Global> for Arc<RwLock<RepoGroupManager>> {
impl FromRef<Global> for Arc<RwLock<MetaRepoMgr>> {
fn from_ref(global: &Global) -> Self {
Arc::clone(&global.repo_manager)
}
@ -85,7 +86,7 @@ impl Cli {
pkg_dir: self.pkg_dir.clone(),
api_key: self.api_key.clone(),
};
let repo_manager = RepoGroupManager::new(&config.repo_dir, &self.pkg_dir);
let repo_manager = MetaRepoMgr::new(&config.repo_dir, &self.pkg_dir);
let global = Global {
config,

View file

@ -6,9 +6,11 @@ mod repo;
use clap::Parser;
pub use error::{Result, ServerError};
use repo::MetaRepoMgr;
use repo::RepoGroupManager;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct Config {
@ -21,7 +23,7 @@ pub struct Config {
#[derive(Clone)]
pub struct Global {
config: Config,
repo_manager: Arc<RwLock<RepoGroupManager>>,
repo_manager: Arc<RwLock<MetaRepoMgr>>,
db: sea_orm::DbConn,
}

View file

@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use libarchive::write::{Builder, WriteEntry};
use libarchive::{Entry, WriteFilter, WriteFormat};
use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter};
use sea_orm::{ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter, QuerySelect};
use uuid::Uuid;
use futures::StreamExt;
@ -40,6 +40,7 @@ impl MetaRepoMgr {
let repo = repo.unwrap();
let parent_dir = self.repo_dir.join(&repo.name).join(arch);
tokio::fs::create_dir_all(&parent_dir).await?;
let ar_files =
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", repo.name)))
@ -112,7 +113,12 @@ impl MetaRepoMgr {
}
}
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(&self, conn: &DbConn, reader: &mut R, repo: &str) -> crate::Result<()> {
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
&self,
conn: &DbConn,
reader: &mut R,
repo: &str,
) -> crate::Result<()> {
// Copy file contents to temporary path so libarchive can work with it
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = self.pkg_dir.join(uuid.to_string());
@ -122,7 +128,9 @@ impl MetaRepoMgr {
// Parse the package
let path_clone = path.clone();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)).await.unwrap()?;
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
// Query the repo for its ID, or create it if it does not already exist
let res = db::query::repo::by_name(conn, &repo).await?;
@ -137,8 +145,7 @@ impl MetaRepoMgr {
// If the package already exists in the database, we remove it first
let res =
db::query::package::by_fields(conn, repo_id, &pkg.info.arch, &pkg.info.name)
.await?;
db::query::package::by_fields(conn, repo_id, &pkg.info.arch, &pkg.info.name).await?;
if let Some(entry) = res {
entry.delete(conn).await?;

View file

@ -4,9 +4,12 @@ mod manager_new;
pub mod package;
pub use manager::RepoGroupManager;
pub use manager_new::MetaRepoMgr;
use tokio_util::io::StreamReader;
use std::path::PathBuf;
use crate::db;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::Request;
@ -14,7 +17,8 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{delete, post};
use axum::Router;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::{Stream, StreamExt};
use sea_orm::ModelTrait;
use std::sync::Arc;
use tokio::{fs, io::AsyncWriteExt};
@ -23,8 +27,6 @@ use tower_http::services::{ServeDir, ServeFile};
use tower_http::validate_request::ValidateRequestHeaderLayer;
use uuid::Uuid;
use crate::db;
const DB_FILE_EXTS: [&str; 4] = [".db", ".files", ".db.tar.gz", ".files.tar.gz"];
pub fn router(api_key: &str) -> Router<crate::Global> {
@ -99,90 +101,40 @@ async fn get_file(
Ok(res)
}
#[axum::debug_handler]
async fn post_package_archive(
State(global): State<crate::Global>,
Path(repo): Path<String>,
body: Body,
) -> crate::Result<()> {
// We first stream the uploaded file to disk
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = global.config.pkg_dir.join(uuid.to_string());
let mut f = fs::File::create(&path).await?;
let body = body.into_data_stream();
let body = body.map_err(std::io::Error::other);
let mut body = StreamReader::new(body);
global
.repo_manager
.write()
.await
.add_pkg_from_reader(&global.db, &mut body, &repo)
.await?;
let mut body = body.into_data_stream();
Ok(())
while let Some(chunk) = body.next().await {
f.write_all(&chunk?).await?;
}
let clone = Arc::clone(&global.repo_manager);
let path_clone = path.clone();
let repo_clone = repo.clone();
let res = tokio::task::spawn_blocking(move || {
global
.repo_manager
.write()
.unwrap()
.add_pkg_from_path(&repo_clone, &path_clone)
})
.await?;
match res {
// Insert the newly added package into the database
Ok(pkg) => {
tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo);
// Query the repo for its ID, or create it if it does not already exist
let res = db::query::repo::by_name(&global.db, &repo).await?;
let repo_id = if let Some(repo_entity) = res {
repo_entity.id
} else {
db::query::repo::insert(&global.db, &repo, None)
.await?
.last_insert_id
};
// If the package already exists in the database, we remove it first
let res =
db::query::package::by_fields(&global.db, repo_id, &pkg.info.arch, &pkg.info.name)
.await?;
if let Some(entry) = res {
entry.delete(&global.db).await?;
}
db::query::package::insert(&global.db, repo_id, pkg).await?;
Ok(())
}
// Remove the uploaded file and return the error
Err(err) => {
tokio::fs::remove_file(path).await?;
Err(err.into())
}
}
//tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo);
}
#[axum::debug_handler]
async fn delete_repo(
State(global): State<crate::Global>,
Path(repo): Path<String>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let repo_clone = repo.clone();
let repo_removed =
tokio::task::spawn_blocking(move || clone.write().unwrap().remove_repo(&repo_clone))
.await??;
let repo_removed = global
.repo_manager
.write()
.await
.remove_repo(&global.db, &repo)
.await?;
if repo_removed {
let res = db::query::repo::by_name(&global.db, &repo).await?;
if let Some(repo_entry) = res {
repo_entry.delete(&global.db).await?;
}
tracing::info!("Removed repository '{}'", repo);
Ok(StatusCode::OK)
@ -195,60 +147,71 @@ async fn delete_arch_repo(
State(global): State<crate::Global>,
Path((repo, arch)): Path<(String, String)>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let arch_clone = arch.clone();
let repo_clone = repo.clone();
let repo_removed = tokio::task::spawn_blocking(move || {
clone
.write()
.unwrap()
.remove_repo_arch(&repo_clone, &arch_clone)
})
.await??;
if repo_removed {
let res = db::query::repo::by_name(&global.db, &repo).await?;
if let Some(repo_entry) = res {
db::query::package::delete_with_arch(&global.db, repo_entry.id, &arch).await?;
}
tracing::info!("Removed architecture '{}' from repository '{}'", arch, repo);
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
Ok(StatusCode::NOT_FOUND)
//let clone = Arc::clone(&global.repo_manager);
//
//let arch_clone = arch.clone();
//let repo_clone = repo.clone();
//let repo_removed = tokio::task::spawn_blocking(move || {
// clone
// .write()
// .unwrap()
// .remove_repo_arch(&repo_clone, &arch_clone)
//})
//.await??;
//
//if repo_removed {
// let res = db::query::repo::by_name(&global.db, &repo).await?;
//
// if let Some(repo_entry) = res {
// db::query::package::delete_with_arch(&global.db, repo_entry.id, &arch).await?;
// }
// tracing::info!("Removed architecture '{}' from repository '{}'", arch, repo);
//
// Ok(StatusCode::OK)
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
}
async fn delete_package(
State(global): State<crate::Global>,
Path((repo, arch, file_name)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let path = PathBuf::from(&repo).join(arch).join(&file_name);
let res = tokio::task::spawn_blocking(move || {
clone.write().unwrap().remove_pkg_from_path(path, true)
})
.await??;
if let Some((name, version, release, arch)) = res {
let res = db::query::repo::by_name(&global.db, &repo).await?;
if let Some(repo_entry) = res {
let res =
db::query::package::by_fields(&global.db, repo_entry.id, &arch, &name).await?;
if let Some(entry) = res {
entry.delete(&global.db).await?;
}
}
tracing::info!("Removed '{}' from repository '{}'", file_name, repo);
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
Ok(StatusCode::NOT_FOUND)
//global.repo_manager.write().unwrap().remove_pkg(&global.db, &repo, &arch, name)
//let clone = Arc::clone(&global.repo_manager);
//let path = PathBuf::from(&repo).join(arch).join(&file_name);
//
//let res = tokio::task::spawn_blocking(move || {
// clone.write().unwrap().remove_pkg_from_path(path, true)
//})
//.await??;
//
//if let Some((name, version, release, arch)) = res {
// let res = db::query::repo::by_name(&global.db, &repo).await?;
//
// if let Some(repo_entry) = res {
// let res =
// db::query::package::by_fields(&global.db, repo_entry.id, &arch, &name).await?;
//
// if let Some(entry) = res {
// entry.delete(&global.db).await?;
// }
// }
//
// tracing::info!("Removed '{}' from repository '{}'", file_name, repo);
//
// Ok(StatusCode::OK)
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
}
fn help_me_figure_this_out(_: impl Send) {}
#[allow(dead_code)]
fn assert_my_handler_is_ok() {
// This is the fun part: we need to call our handler, and *not* await it's Future.
help_me_figure_this_out(delete_repo)
}