Compare commits
3 Commits
d39205b653
...
aa0aae41ab
Author | SHA1 | Date |
---|---|---|
Jef Roosens | aa0aae41ab | |
Jef Roosens | d38fd5ca74 | |
Jef Roosens | 986162e926 |
|
@ -5,6 +5,6 @@ pub mod write;
|
||||||
|
|
||||||
pub use archive::{
|
pub use archive::{
|
||||||
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
|
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
|
||||||
WriteFilter, WriteFormat,
|
WriteFilter, WriteFormat, FileType,
|
||||||
};
|
};
|
||||||
pub use error::Result;
|
pub use error::Result;
|
||||||
|
|
|
@ -6,9 +6,10 @@ pub use builder::Builder;
|
||||||
|
|
||||||
use crate::archive::Handle;
|
use crate::archive::Handle;
|
||||||
use crate::ReadFilter;
|
use crate::ReadFilter;
|
||||||
use entries::Entries;
|
pub use entries::{Entries, ReadEntry};
|
||||||
use libarchive3_sys::ffi;
|
use libarchive3_sys::ffi;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
pub use file::FileReader;
|
||||||
|
|
||||||
// Represents a read view of an archive
|
// Represents a read view of an archive
|
||||||
pub trait Archive: Handle + Sized {
|
pub trait Archive: Handle + Sized {
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
api_key = "test"
|
api_key = "test"
|
||||||
pkg_workers = 2
|
|
||||||
log_level = "rieterd=debug"
|
log_level = "rieterd=debug"
|
||||||
|
|
||||||
|
[repo]
|
||||||
|
sync_workers = 2
|
||||||
|
async_workers = 1
|
||||||
|
|
||||||
[fs]
|
[fs]
|
||||||
type = "local"
|
type = "local"
|
||||||
data_dir = "./data"
|
data_dir = "./data"
|
||||||
|
|
|
@ -36,6 +36,14 @@ pub enum DbConfig {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
pub struct RepoConfig {
|
||||||
|
#[serde(default = "default_repo_sync_workers")]
|
||||||
|
pub sync_workers: u32,
|
||||||
|
#[serde(default = "default_repo_async_workers")]
|
||||||
|
pub async_workers: u32,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub api_key: String,
|
pub api_key: String,
|
||||||
|
@ -47,8 +55,7 @@ pub struct Config {
|
||||||
pub log_level: String,
|
pub log_level: String,
|
||||||
pub fs: FsConfig,
|
pub fs: FsConfig,
|
||||||
pub db: DbConfig,
|
pub db: DbConfig,
|
||||||
#[serde(default = "default_pkg_workers")]
|
pub repo: RepoConfig,
|
||||||
pub pkg_workers: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 {
|
||||||
16
|
16
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_pkg_workers() -> u32 {
|
fn default_repo_sync_workers() -> u32 {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_repo_async_workers() -> u32 {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
|
||||||
data_dir.join("repos"),
|
data_dir.join("repos"),
|
||||||
db.clone(),
|
db.clone(),
|
||||||
rt.clone(),
|
rt.clone(),
|
||||||
config.pkg_workers,
|
config.repo.sync_workers,
|
||||||
|
config.repo.async_workers,
|
||||||
)?
|
)?
|
||||||
//rt.block_on(crate::repo::RepoMgr::new(
|
//rt.block_on(crate::repo::RepoMgr::new(
|
||||||
// data_dir.join("repos"),
|
// data_dir.join("repos"),
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
use crate::{
|
||||||
|
db,
|
||||||
|
repo::{archive, package, AsyncCommand, SharedState},
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
path::PathBuf,
|
||||||
|
sync::{atomic::Ordering, Arc},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct AsyncActor {
|
||||||
|
state: Arc<SharedState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncActor {
|
||||||
|
pub fn new(state: &Arc<SharedState>) -> Self {
|
||||||
|
Self {
|
||||||
|
state: Arc::clone(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(self) {
|
||||||
|
while let Some(msg) = {
|
||||||
|
let mut rx = self.state.async_queue.1.lock().await;
|
||||||
|
rx.recv().await
|
||||||
|
} {
|
||||||
|
match msg {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
mod sync;
|
||||||
|
mod r#async;
|
||||||
|
|
||||||
|
pub use sync::Actor;
|
||||||
|
pub use r#async::AsyncActor;
|
|
@ -1,5 +1,7 @@
|
||||||
use super::{archive, package, Command, SharedState};
|
use crate::{
|
||||||
use crate::db;
|
db,
|
||||||
|
repo::{archive, package, Command, SharedState},
|
||||||
|
};
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
@ -20,10 +22,10 @@ pub struct Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor {
|
impl Actor {
|
||||||
pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
|
pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
rt,
|
rt,
|
||||||
state: Arc::clone(&state),
|
state: Arc::clone(state),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +39,7 @@ impl Actor {
|
||||||
/// Run the main actor loop
|
/// Run the main actor loop
|
||||||
pub fn run(self) {
|
pub fn run(self) {
|
||||||
while let Some(msg) = {
|
while let Some(msg) = {
|
||||||
let mut rx = self.state.rx.lock().unwrap();
|
let mut rx = self.state.sync_queue.1.lock().unwrap();
|
||||||
rx.blocking_recv()
|
rx.blocking_recv()
|
||||||
} {
|
} {
|
||||||
match msg {
|
match msg {
|
|
@ -31,7 +31,11 @@ impl Handle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
|
pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
|
||||||
|
let repo_dir = self.state.repos_dir.join(repo_id.to_string());
|
||||||
|
|
||||||
|
if !tokio::fs::try_exists(repo_dir).await? {
|
||||||
tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
|
tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut repos = self.state.repos.write().await;
|
let mut repos = self.state.repos.write().await;
|
||||||
repos.insert(repo_id, Default::default());
|
repos.insert(repo_id, Default::default());
|
||||||
|
@ -136,17 +140,25 @@ impl Handle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
||||||
self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
|
self.state
|
||||||
|
.sync_queue
|
||||||
|
.0
|
||||||
|
.send(Command::ParsePkg(repo, path))
|
||||||
|
.unwrap();
|
||||||
self.state.repos.read().await.get(&repo).inspect(|n| {
|
self.state.repos.read().await.get(&repo).inspect(|n| {
|
||||||
n.0.fetch_add(1, Ordering::SeqCst);
|
n.0.fetch_add(1, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn queue_sync(&self, repo: i32) {
|
async fn queue_sync(&self, repo: i32) {
|
||||||
self.state.tx.send(Command::SyncRepo(repo)).unwrap();
|
self.state
|
||||||
|
.sync_queue
|
||||||
|
.0
|
||||||
|
.send(Command::SyncRepo(repo))
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn queue_clean(&self) {
|
async fn queue_clean(&self) {
|
||||||
self.state.tx.send(Command::Clean).unwrap();
|
self.state.sync_queue.0.send(Command::Clean).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
mod parser;
|
||||||
|
|
||||||
|
pub use parser::{DbArchiveEntry, DbArchiveParser};
|
|
@ -0,0 +1,75 @@
|
||||||
|
use std::{
|
||||||
|
io::{self, BufRead},
|
||||||
|
path::Path,
|
||||||
|
};
|
||||||
|
|
||||||
|
use libarchive::{
|
||||||
|
read::{Archive, Builder, Entries, FileReader, ReadEntry},
|
||||||
|
Entry,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct DbArchiveParser<'a, T: 'a + Archive> {
|
||||||
|
entries: Entries<'a, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DbArchiveEntry {
|
||||||
|
name: String,
|
||||||
|
version: String,
|
||||||
|
filename: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Archive> DbArchiveParser<'a, T> {
|
||||||
|
pub fn new(ar: &'a mut T) -> Self {
|
||||||
|
Self {
|
||||||
|
entries: Entries::new(ar),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse a given entry. If the entry's not a regular file, the function returns None.
|
||||||
|
fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result<DbArchiveEntry> {
|
||||||
|
let reader = io::BufReader::new(entry);
|
||||||
|
let mut lines = reader.lines();
|
||||||
|
|
||||||
|
let mut name: Option<String> = None;
|
||||||
|
let mut version: Option<String> = None;
|
||||||
|
let mut filename: Option<String> = None;
|
||||||
|
|
||||||
|
while let Some(line) = lines.next().transpose()? {
|
||||||
|
match line.as_str() {
|
||||||
|
"%NAME%" => name = lines.next().transpose()?,
|
||||||
|
"%VERSION%" => version = lines.next().transpose()?,
|
||||||
|
"%FILENAME%" => filename = lines.next().transpose()?,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if name.is_some() && version.is_some() && filename.is_some() {
|
||||||
|
Ok(DbArchiveEntry {
|
||||||
|
name: name.unwrap(),
|
||||||
|
version: version.unwrap(),
|
||||||
|
filename: filename.unwrap(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(io::Error::other("Missing fields in entry file"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> {
|
||||||
|
type Item = io::Result<DbArchiveEntry>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
while let Some(entry) = self.entries.next() {
|
||||||
|
match entry {
|
||||||
|
Ok(entry) => {
|
||||||
|
if entry.filetype() == libarchive::FileType::RegularFile {
|
||||||
|
return Some(Self::parse_entry(entry));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => return Some(Err(err.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,10 @@
|
||||||
mod actor;
|
mod actor;
|
||||||
mod archive;
|
mod archive;
|
||||||
mod handle;
|
mod handle;
|
||||||
|
mod mirror;
|
||||||
pub mod package;
|
pub mod package;
|
||||||
|
|
||||||
pub use actor::Actor;
|
pub use actor::{Actor, AsyncActor};
|
||||||
pub use handle::Handle;
|
pub use handle::Handle;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -27,30 +28,33 @@ pub enum Command {
|
||||||
Clean,
|
Clean,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum AsyncCommand {
|
||||||
|
}
|
||||||
|
|
||||||
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
||||||
|
|
||||||
pub struct SharedState {
|
pub struct SharedState {
|
||||||
pub repos_dir: PathBuf,
|
pub repos_dir: PathBuf,
|
||||||
pub conn: DbConn,
|
pub conn: DbConn,
|
||||||
pub rx: Mutex<UnboundedReceiver<Command>>,
|
pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>),
|
||||||
pub tx: UnboundedSender<Command>,
|
pub async_queue: (
|
||||||
|
UnboundedSender<AsyncCommand>,
|
||||||
|
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
|
||||||
|
),
|
||||||
pub repos: RwLock<HashMap<i32, RepoState>>,
|
pub repos: RwLock<HashMap<i32, RepoState>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedState {
|
impl SharedState {
|
||||||
pub fn new(
|
pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self {
|
||||||
repos_dir: impl AsRef<Path>,
|
|
||||||
conn: DbConn,
|
|
||||||
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
|
|
||||||
) -> Self {
|
|
||||||
let (tx, rx) = unbounded_channel();
|
let (tx, rx) = unbounded_channel();
|
||||||
|
let (async_tx, async_rx) = unbounded_channel();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
repos_dir: repos_dir.as_ref().to_path_buf(),
|
repos_dir: repos_dir.as_ref().to_path_buf(),
|
||||||
conn,
|
conn,
|
||||||
rx: Mutex::new(rx),
|
sync_queue: (tx, Mutex::new(rx)),
|
||||||
tx,
|
async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
|
||||||
repos: RwLock::new(repos),
|
repos: RwLock::new(Default::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,11 +63,11 @@ pub fn start(
|
||||||
repos_dir: impl AsRef<Path>,
|
repos_dir: impl AsRef<Path>,
|
||||||
conn: DbConn,
|
conn: DbConn,
|
||||||
rt: runtime::Handle,
|
rt: runtime::Handle,
|
||||||
actors: u32,
|
sync_actors: u32,
|
||||||
|
async_actors: u32,
|
||||||
) -> crate::Result<Handle> {
|
) -> crate::Result<Handle> {
|
||||||
std::fs::create_dir_all(repos_dir.as_ref())?;
|
std::fs::create_dir_all(repos_dir.as_ref())?;
|
||||||
|
|
||||||
let mut repos = HashMap::new();
|
|
||||||
let repo_ids: Vec<i32> = rt.block_on(
|
let repo_ids: Vec<i32> = rt.block_on(
|
||||||
entity::prelude::Repo::find()
|
entity::prelude::Repo::find()
|
||||||
.select_only()
|
.select_only()
|
||||||
|
@ -72,17 +76,25 @@ pub fn start(
|
||||||
.all(&conn),
|
.all(&conn),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
for id in repo_ids {
|
let state = Arc::new(SharedState::new(repos_dir, conn));
|
||||||
repos.insert(id, Default::default());
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = Arc::new(SharedState::new(repos_dir, conn, repos));
|
for _ in 0..sync_actors {
|
||||||
|
let actor = Actor::new(rt.clone(), &state);
|
||||||
for _ in 0..actors {
|
|
||||||
let actor = Actor::new(rt.clone(), Arc::clone(&state));
|
|
||||||
|
|
||||||
std::thread::spawn(|| actor.run());
|
std::thread::spawn(|| actor.run());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Handle::new(&state))
|
for _ in 0..async_actors {
|
||||||
|
let actor = AsyncActor::new(&state);
|
||||||
|
|
||||||
|
rt.spawn(actor.run());
|
||||||
|
}
|
||||||
|
|
||||||
|
let handle = Handle::new(&state);
|
||||||
|
|
||||||
|
for id in repo_ids {
|
||||||
|
rt.block_on(handle.register_repo(id))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(handle)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
mod pagination;
|
mod pagination;
|
||||||
mod repo;
|
|
||||||
|
|
||||||
use crate::db;
|
use crate::db;
|
||||||
use pagination::PaginatedResponse;
|
use pagination::PaginatedResponse;
|
||||||
|
|
Loading…
Reference in New Issue