refactor: switch to new repo actors

agents
Jef Roosens 2024-06-25 17:05:14 +02:00
parent 656df06b4e
commit 80d5291508
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 79 additions and 69 deletions

View File

@ -23,7 +23,7 @@ pub const ANY_ARCH: &'static str = "any";
#[derive(Clone)]
pub struct Global {
config: crate::config::Config,
mgr: Arc<repo::RepoMgr>,
repo: repo::Handle,
db: sea_orm::DbConn,
}
@ -54,26 +54,32 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
let db = rt.block_on(crate::db::connect(&config.db))?;
rt.block_on(crate::db::Migrator::up(&db, None))?;
let mgr = match &config.fs {
let repo = match &config.fs {
FsConfig::Local { data_dir } => {
rt.block_on(crate::repo::RepoMgr::new(
crate::repo::Handle::start(
data_dir.join("repos"),
db.clone(),
))?
rt.clone(),
config.pkg_workers,
)?
//rt.block_on(crate::repo::RepoMgr::new(
// data_dir.join("repos"),
// db.clone(),
//))?
//RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())?
}
};
let mgr = Arc::new(mgr);
for _ in 0..config.pkg_workers {
let clone = Arc::clone(&mgr);
rt.spawn(async move { clone.pkg_parse_task().await });
}
//let mgr = Arc::new(mgr);
//
//for _ in 0..config.pkg_workers {
// let clone = Arc::clone(&mgr);
//
// rt.spawn(async move { clone.pkg_parse_task().await });
//}
Ok(Global {
config: config.clone(),
mgr,
repo,
db,
})
}

View File

@ -1,4 +1,4 @@
use super::{archive, package, RepoHandle};
use super::{archive, package, Handle};
use crate::db;
use std::{

View File

@ -17,16 +17,16 @@ use tokio::{
};
#[derive(Clone)]
pub struct RepoHandle {
pub struct Handle {
state: Arc<RepoSharedState>,
}
impl RepoHandle {
impl Handle {
pub fn start(
repos_dir: impl AsRef<Path>,
conn: DbConn,
actors: u32,
rt: runtime::Handle,
actors: u32,
) -> crate::Result<Self> {
std::fs::create_dir_all(repos_dir.as_ref())?;

View File

@ -5,7 +5,7 @@ mod manager;
pub mod package;
pub use actor::{RepoActor, RepoCommand, RepoSharedState};
pub use handle::RepoHandle;
pub use handle::Handle;
pub use manager::RepoMgr;
use crate::FsConfig;
@ -53,30 +53,31 @@ async fn get_file(
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
req: Request<Body>,
) -> crate::Result<impl IntoResponse> {
if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
match global.config.fs {
FsConfig::Local { data_dir } => {
let repo_dir = data_dir.join("repos").join(repo_id.to_string());
let file_name = if file_name == format!("{}.db", repo)
|| file_name == format!("{}.db.tar.gz", repo)
{
format!("{}.db.tar.gz", arch)
} else if file_name == format!("{}.files", repo)
|| file_name == format!("{}.files.tar.gz", repo)
{
format!("{}.files.tar.gz", arch)
} else {
file_name
};
let path = repo_dir.join(file_name);
Ok(ServeFile::new(path).oneshot(req).await)
}
}
} else {
Err(StatusCode::NOT_FOUND.into())
}
Ok(StatusCode::NOT_FOUND)
//if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
// match global.config.fs {
// FsConfig::Local { data_dir } => {
// let repo_dir = data_dir.join("repos").join(repo_id.to_string());
//
// let file_name = if file_name == format!("{}.db", repo)
// || file_name == format!("{}.db.tar.gz", repo)
// {
// format!("{}.db.tar.gz", arch)
// } else if file_name == format!("{}.files", repo)
// || file_name == format!("{}.files.tar.gz", repo)
// {
// format!("{}.files.tar.gz", arch)
// } else {
// file_name
// };
//
// let path = repo_dir.join(file_name);
// Ok(ServeFile::new(path).oneshot(req).await)
// }
// }
//} else {
// Err(StatusCode::NOT_FOUND.into())
//}
}
async fn post_package_archive(
@ -84,46 +85,49 @@ async fn post_package_archive(
Path((distro, repo)): Path<(String, String)>,
body: Body,
) -> crate::Result<StatusCode> {
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
let [tmp_path] = global.mgr.random_file_paths();
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut body, &mut tmp_file).await?;
global.mgr.queue_pkg(repo, tmp_path).await;
Ok(StatusCode::ACCEPTED)
Ok(StatusCode::NOT_FOUND)
//let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
//let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
//let [tmp_path] = global.mgr.random_file_paths();
//
//let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
//tokio::io::copy(&mut body, &mut tmp_file).await?;
//
//global.mgr.queue_pkg(repo, tmp_path).await;
//
//Ok(StatusCode::ACCEPTED)
}
async fn delete_repo(
State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>,
) -> crate::Result<StatusCode> {
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
global.mgr.remove_repo(repo).await?;
tracing::info!("Removed repository {repo}");
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
//if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
// global.mgr.remove_repo(repo).await?;
//
// tracing::info!("Removed repository {repo}");
//
// Ok(StatusCode::OK)
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
}
async fn delete_arch_repo(
State(global): State<crate::Global>,
Path((distro, repo, arch)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> {
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
global.mgr.remove_repo_arch(repo, &arch).await?;
tracing::info!("Removed architecture '{arch}' from repository {repo}");
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
//if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
// global.mgr.remove_repo_arch(repo, &arch).await?;
//
// tracing::info!("Removed architecture '{arch}' from repository {repo}");
//
// Ok(StatusCode::OK)
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
//