Compare commits

..

No commits in common. "aa0aae41ab465d1d2de83ff872ee7b0416f014d5" and "d39205b653a33198706a1940134c79786bfb74a3" have entirely different histories.

13 changed files with 38 additions and 194 deletions

View File

@ -5,6 +5,6 @@ pub mod write;
pub use archive::{ pub use archive::{
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat, Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
WriteFilter, WriteFormat, FileType, WriteFilter, WriteFormat,
}; };
pub use error::Result; pub use error::Result;

View File

@ -6,10 +6,9 @@ pub use builder::Builder;
use crate::archive::Handle; use crate::archive::Handle;
use crate::ReadFilter; use crate::ReadFilter;
pub use entries::{Entries, ReadEntry}; use entries::Entries;
use libarchive3_sys::ffi; use libarchive3_sys::ffi;
use std::path::Path; use std::path::Path;
pub use file::FileReader;
// Represents a read view of an archive // Represents a read view of an archive
pub trait Archive: Handle + Sized { pub trait Archive: Handle + Sized {

View File

@ -1,10 +1,7 @@
api_key = "test" api_key = "test"
pkg_workers = 2
log_level = "rieterd=debug" log_level = "rieterd=debug"
[repo]
sync_workers = 2
async_workers = 1
[fs] [fs]
type = "local" type = "local"
data_dir = "./data" data_dir = "./data"

View File

@ -36,14 +36,6 @@ pub enum DbConfig {
}, },
} }
#[derive(Deserialize, Debug, Clone)]
pub struct RepoConfig {
#[serde(default = "default_repo_sync_workers")]
pub sync_workers: u32,
#[serde(default = "default_repo_async_workers")]
pub async_workers: u32,
}
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub api_key: String, pub api_key: String,
@ -55,7 +47,8 @@ pub struct Config {
pub log_level: String, pub log_level: String,
pub fs: FsConfig, pub fs: FsConfig,
pub db: DbConfig, pub db: DbConfig,
pub repo: RepoConfig, #[serde(default = "default_pkg_workers")]
pub pkg_workers: u32,
} }
impl Config { impl Config {
@ -90,10 +83,6 @@ fn default_db_postgres_max_connections() -> u32 {
16 16
} }
fn default_repo_sync_workers() -> u32 { fn default_pkg_workers() -> u32 {
1
}
fn default_repo_async_workers() -> u32 {
1 1
} }

View File

@ -60,8 +60,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
data_dir.join("repos"), data_dir.join("repos"),
db.clone(), db.clone(),
rt.clone(), rt.clone(),
config.repo.sync_workers, config.pkg_workers,
config.repo.async_workers,
)? )?
//rt.block_on(crate::repo::RepoMgr::new( //rt.block_on(crate::repo::RepoMgr::new(
// data_dir.join("repos"), // data_dir.join("repos"),

View File

@ -1,7 +1,5 @@
use crate::{ use super::{archive, package, Command, SharedState};
db, use crate::db;
repo::{archive, package, Command, SharedState},
};
use std::{ use std::{
path::PathBuf, path::PathBuf,
@ -22,10 +20,10 @@ pub struct Actor {
} }
impl Actor { impl Actor {
pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self { pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
Self { Self {
rt, rt,
state: Arc::clone(state), state: Arc::clone(&state),
} }
} }
@ -39,7 +37,7 @@ impl Actor {
/// Run the main actor loop /// Run the main actor loop
pub fn run(self) { pub fn run(self) {
while let Some(msg) = { while let Some(msg) = {
let mut rx = self.state.sync_queue.1.lock().unwrap(); let mut rx = self.state.rx.lock().unwrap();
rx.blocking_recv() rx.blocking_recv()
} { } {
match msg { match msg {

View File

@ -1,32 +0,0 @@
use crate::{
db,
repo::{archive, package, AsyncCommand, SharedState},
};
use std::{
path::PathBuf,
sync::{atomic::Ordering, Arc},
};
pub struct AsyncActor {
state: Arc<SharedState>,
}
impl AsyncActor {
pub fn new(state: &Arc<SharedState>) -> Self {
Self {
state: Arc::clone(state)
}
}
pub async fn run(self) {
while let Some(msg) = {
let mut rx = self.state.async_queue.1.lock().await;
rx.recv().await
} {
match msg {
}
}
}
}

View File

@ -1,5 +0,0 @@
mod sync;
mod r#async;
pub use sync::Actor;
pub use r#async::AsyncActor;

View File

@ -31,11 +31,7 @@ impl Handle {
} }
pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> { pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
let repo_dir = self.state.repos_dir.join(repo_id.to_string()); tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
if !tokio::fs::try_exists(repo_dir).await? {
tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
}
let mut repos = self.state.repos.write().await; let mut repos = self.state.repos.write().await;
repos.insert(repo_id, Default::default()); repos.insert(repo_id, Default::default());
@ -140,25 +136,17 @@ impl Handle {
} }
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.state self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
.sync_queue
.0
.send(Command::ParsePkg(repo, path))
.unwrap();
self.state.repos.read().await.get(&repo).inspect(|n| { self.state.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst); n.0.fetch_add(1, Ordering::SeqCst);
}); });
} }
async fn queue_sync(&self, repo: i32) { async fn queue_sync(&self, repo: i32) {
self.state self.state.tx.send(Command::SyncRepo(repo)).unwrap();
.sync_queue
.0
.send(Command::SyncRepo(repo))
.unwrap();
} }
async fn queue_clean(&self) { async fn queue_clean(&self) {
self.state.sync_queue.0.send(Command::Clean).unwrap(); self.state.tx.send(Command::Clean).unwrap();
} }
} }

View File

@ -1,3 +0,0 @@
mod parser;
pub use parser::{DbArchiveEntry, DbArchiveParser};

View File

@ -1,75 +0,0 @@
use std::{
io::{self, BufRead},
path::Path,
};
use libarchive::{
read::{Archive, Builder, Entries, FileReader, ReadEntry},
Entry,
};
pub struct DbArchiveParser<'a, T: 'a + Archive> {
entries: Entries<'a, T>,
}
pub struct DbArchiveEntry {
name: String,
version: String,
filename: String,
}
impl<'a, T: Archive> DbArchiveParser<'a, T> {
pub fn new(ar: &'a mut T) -> Self {
Self {
entries: Entries::new(ar),
}
}
// parse a given entry. If the entry's not a regular file, the function returns None.
fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result<DbArchiveEntry> {
let reader = io::BufReader::new(entry);
let mut lines = reader.lines();
let mut name: Option<String> = None;
let mut version: Option<String> = None;
let mut filename: Option<String> = None;
while let Some(line) = lines.next().transpose()? {
match line.as_str() {
"%NAME%" => name = lines.next().transpose()?,
"%VERSION%" => version = lines.next().transpose()?,
"%FILENAME%" => filename = lines.next().transpose()?,
_ => {}
}
}
if name.is_some() && version.is_some() && filename.is_some() {
Ok(DbArchiveEntry {
name: name.unwrap(),
version: version.unwrap(),
filename: filename.unwrap(),
})
} else {
Err(io::Error::other("Missing fields in entry file"))
}
}
}
impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> {
type Item = io::Result<DbArchiveEntry>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(entry) = self.entries.next() {
match entry {
Ok(entry) => {
if entry.filetype() == libarchive::FileType::RegularFile {
return Some(Self::parse_entry(entry));
}
}
Err(err) => return Some(Err(err.into())),
}
}
None
}
}

View File

@ -1,10 +1,9 @@
mod actor; mod actor;
mod archive; mod archive;
mod handle; mod handle;
mod mirror;
pub mod package; pub mod package;
pub use actor::{Actor, AsyncActor}; pub use actor::Actor;
pub use handle::Handle; pub use handle::Handle;
use std::{ use std::{
@ -28,33 +27,30 @@ pub enum Command {
Clean, Clean,
} }
pub enum AsyncCommand {
}
type RepoState = (AtomicU32, Arc<Mutex<()>>); type RepoState = (AtomicU32, Arc<Mutex<()>>);
pub struct SharedState { pub struct SharedState {
pub repos_dir: PathBuf, pub repos_dir: PathBuf,
pub conn: DbConn, pub conn: DbConn,
pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>), pub rx: Mutex<UnboundedReceiver<Command>>,
pub async_queue: ( pub tx: UnboundedSender<Command>,
UnboundedSender<AsyncCommand>,
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
),
pub repos: RwLock<HashMap<i32, RepoState>>, pub repos: RwLock<HashMap<i32, RepoState>>,
} }
impl SharedState { impl SharedState {
pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self { pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let (async_tx, async_rx) = unbounded_channel();
Self { Self {
repos_dir: repos_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
conn, conn,
sync_queue: (tx, Mutex::new(rx)), rx: Mutex::new(rx),
async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)), tx,
repos: RwLock::new(Default::default()), repos: RwLock::new(repos),
} }
} }
} }
@ -63,11 +59,11 @@ pub fn start(
repos_dir: impl AsRef<Path>, repos_dir: impl AsRef<Path>,
conn: DbConn, conn: DbConn,
rt: runtime::Handle, rt: runtime::Handle,
sync_actors: u32, actors: u32,
async_actors: u32,
) -> crate::Result<Handle> { ) -> crate::Result<Handle> {
std::fs::create_dir_all(repos_dir.as_ref())?; std::fs::create_dir_all(repos_dir.as_ref())?;
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on( let repo_ids: Vec<i32> = rt.block_on(
entity::prelude::Repo::find() entity::prelude::Repo::find()
.select_only() .select_only()
@ -76,25 +72,17 @@ pub fn start(
.all(&conn), .all(&conn),
)?; )?;
let state = Arc::new(SharedState::new(repos_dir, conn)); for id in repo_ids {
repos.insert(id, Default::default());
}
for _ in 0..sync_actors { let state = Arc::new(SharedState::new(repos_dir, conn, repos));
let actor = Actor::new(rt.clone(), &state);
for _ in 0..actors {
let actor = Actor::new(rt.clone(), Arc::clone(&state));
std::thread::spawn(|| actor.run()); std::thread::spawn(|| actor.run());
} }
for _ in 0..async_actors { Ok(Handle::new(&state))
let actor = AsyncActor::new(&state);
rt.spawn(actor.run());
}
let handle = Handle::new(&state);
for id in repo_ids {
rt.block_on(handle.register_repo(id))?;
}
Ok(handle)
} }

View File

@ -1,4 +1,5 @@
mod pagination; mod pagination;
mod repo;
use crate::db; use crate::db;
use pagination::PaginatedResponse; use pagination::PaginatedResponse;