feat: start of sync reimplementation

agents
Jef Roosens 2024-06-26 12:27:51 +02:00
parent 80d5291508
commit a7c0d3e062
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 140 additions and 20 deletions

View File

@ -4,7 +4,10 @@ use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex, RwLock},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
};
use sea_orm::{
@ -13,7 +16,10 @@ use sea_orm::{
};
use tokio::{
runtime,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
};
pub enum RepoCommand {
@ -21,11 +27,11 @@ pub enum RepoCommand {
}
pub struct RepoSharedState {
repos_dir: PathBuf,
conn: DbConn,
rx: Mutex<UnboundedReceiver<RepoCommand>>,
tx: UnboundedSender<RepoCommand>,
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
pub repos_dir: PathBuf,
pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<RepoCommand>>,
pub tx: UnboundedSender<RepoCommand>,
pub repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoSharedState {
@ -70,11 +76,23 @@ impl RepoActor {
match msg {
RepoCommand::ParsePkg(repo, path) => {
let _ = self.parse_pkg(repo, path);
if self
.state
.repos
.blocking_read()
.get(&repo)
.map(|n| n.0.load(Ordering::SeqCst))
== Some(0)
{
// TODO sync
}
}
}
}
}
/// Parse a queued package for the given repository.
fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> {
let pkg = package::Package::open(&path)?;
let pkg = self
@ -95,6 +113,36 @@ impl RepoActor {
repo,
);
self.state.repos.blocking_read().get(&repo).inspect(|n| {
n.0.fetch_sub(1, Ordering::SeqCst);
});
Ok(())
}
fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let repos = self.state.repos.blocking_read();
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
let archs: Vec<String> = self.rt.block_on(
db::Package::find()
.filter(db::package::Column::RepoId.eq(repo))
.select_only()
.column(db::package::Column::Arch)
.distinct()
.into_tuple()
.all(&self.state.conn),
)?;
for arch in archs {
self.generate_archives(repo, &arch)?;
}
}
Ok(())
}
fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
Ok(())
}
}

View File

@ -1,10 +1,13 @@
use super::RepoSharedState;
use super::{RepoCommand, RepoSharedState};
use crate::db;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex, RwLock},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex, RwLock,
},
};
use sea_orm::{
@ -15,6 +18,7 @@ use tokio::{
runtime,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use uuid::Uuid;
#[derive(Clone)]
pub struct Handle {
@ -53,4 +57,73 @@ impl Handle {
Ok(Self { state })
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.state.repos_dir.join(uuid.to_string())
})
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let mut repos = self.state.repos.write().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.state.conn)
.await?;
let distro_id = if let Some(id) = distro_id {
id
} else {
let new_distro = db::distro::ActiveModel {
id: NotSet,
name: Set(distro.to_string()),
description: NotSet,
};
new_distro.insert(&self.state.conn).await?.id
};
let repo_id: Option<i32> = db::Repo::find()
.filter(db::repo::Column::DistroId.eq(distro_id))
.filter(db::repo::Column::Name.eq(repo))
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.one(&self.state.conn)
.await?;
let repo_id = if let Some(id) = repo_id {
id
} else {
let new_repo = db::repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(repo.to_string()),
description: NotSet,
};
let id = new_repo.insert(&self.state.conn).await?.id;
tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?;
repos.insert(id, Default::default());
id
};
Ok(repo_id)
}
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.state
.tx
.send(RepoCommand::ParsePkg(repo, path))
.unwrap();
self.state.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
}
}

View File

@ -85,17 +85,16 @@ async fn post_package_archive(
Path((distro, repo)): Path<(String, String)>,
body: Body,
) -> crate::Result<StatusCode> {
Ok(StatusCode::NOT_FOUND)
//let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
//let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
//let [tmp_path] = global.mgr.random_file_paths();
//
//let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
//tokio::io::copy(&mut body, &mut tmp_file).await?;
//
//global.mgr.queue_pkg(repo, tmp_path).await;
//
//Ok(StatusCode::ACCEPTED)
let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?;
let [tmp_path] = global.repo.random_file_paths();
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
tokio::io::copy(&mut body, &mut tmp_file).await?;
global.repo.queue_pkg(repo_id, tmp_path).await;
Ok(StatusCode::ACCEPTED)
}
async fn delete_repo(