Compare commits
	
		
			No commits in common. "aa0aae41ab465d1d2de83ff872ee7b0416f014d5" and "d39205b653a33198706a1940134c79786bfb74a3" have entirely different histories. 
		
	
	
		
			aa0aae41ab
			...
			d39205b653
		
	
		| 
						 | 
				
			
			@ -5,6 +5,6 @@ pub mod write;
 | 
			
		|||
 | 
			
		||||
pub use archive::{
 | 
			
		||||
    Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
 | 
			
		||||
    WriteFilter, WriteFormat, FileType,
 | 
			
		||||
    WriteFilter, WriteFormat,
 | 
			
		||||
};
 | 
			
		||||
pub use error::Result;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,10 +6,9 @@ pub use builder::Builder;
 | 
			
		|||
 | 
			
		||||
use crate::archive::Handle;
 | 
			
		||||
use crate::ReadFilter;
 | 
			
		||||
pub use entries::{Entries, ReadEntry};
 | 
			
		||||
use entries::Entries;
 | 
			
		||||
use libarchive3_sys::ffi;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
pub use file::FileReader;
 | 
			
		||||
 | 
			
		||||
// Represents a read view of an archive
 | 
			
		||||
pub trait Archive: Handle + Sized {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,10 +1,7 @@
 | 
			
		|||
api_key = "test"
 | 
			
		||||
pkg_workers = 2
 | 
			
		||||
log_level = "rieterd=debug"
 | 
			
		||||
 | 
			
		||||
[repo]
 | 
			
		||||
sync_workers = 2
 | 
			
		||||
async_workers = 1
 | 
			
		||||
 | 
			
		||||
[fs]
 | 
			
		||||
type = "local"
 | 
			
		||||
data_dir = "./data"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,14 +36,6 @@ pub enum DbConfig {
 | 
			
		|||
    },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
pub struct RepoConfig {
 | 
			
		||||
    #[serde(default = "default_repo_sync_workers")]
 | 
			
		||||
    pub sync_workers: u32,
 | 
			
		||||
    #[serde(default = "default_repo_async_workers")]
 | 
			
		||||
    pub async_workers: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Deserialize, Debug, Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    pub api_key: String,
 | 
			
		||||
| 
						 | 
				
			
			@ -55,7 +47,8 @@ pub struct Config {
 | 
			
		|||
    pub log_level: String,
 | 
			
		||||
    pub fs: FsConfig,
 | 
			
		||||
    pub db: DbConfig,
 | 
			
		||||
    pub repo: RepoConfig,
 | 
			
		||||
    #[serde(default = "default_pkg_workers")]
 | 
			
		||||
    pub pkg_workers: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Config {
 | 
			
		||||
| 
						 | 
				
			
			@ -90,10 +83,6 @@ fn default_db_postgres_max_connections() -> u32 {
 | 
			
		|||
    16
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_repo_sync_workers() -> u32 {
 | 
			
		||||
    1
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn default_repo_async_workers() -> u32 {
 | 
			
		||||
fn default_pkg_workers() -> u32 {
 | 
			
		||||
    1
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -60,8 +60,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
 | 
			
		|||
                data_dir.join("repos"),
 | 
			
		||||
                db.clone(),
 | 
			
		||||
                rt.clone(),
 | 
			
		||||
                config.repo.sync_workers,
 | 
			
		||||
                config.repo.async_workers,
 | 
			
		||||
                config.pkg_workers,
 | 
			
		||||
            )?
 | 
			
		||||
            //rt.block_on(crate::repo::RepoMgr::new(
 | 
			
		||||
            //    data_dir.join("repos"),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,5 @@
 | 
			
		|||
use crate::{
 | 
			
		||||
    db,
 | 
			
		||||
    repo::{archive, package, Command, SharedState},
 | 
			
		||||
};
 | 
			
		||||
use super::{archive, package, Command, SharedState};
 | 
			
		||||
use crate::db;
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    path::PathBuf,
 | 
			
		||||
| 
						 | 
				
			
			@ -22,10 +20,10 @@ pub struct Actor {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl Actor {
 | 
			
		||||
    pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self {
 | 
			
		||||
    pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            rt,
 | 
			
		||||
            state: Arc::clone(state),
 | 
			
		||||
            state: Arc::clone(&state),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -39,7 +37,7 @@ impl Actor {
 | 
			
		|||
    /// Run the main actor loop
 | 
			
		||||
    pub fn run(self) {
 | 
			
		||||
        while let Some(msg) = {
 | 
			
		||||
            let mut rx = self.state.sync_queue.1.lock().unwrap();
 | 
			
		||||
            let mut rx = self.state.rx.lock().unwrap();
 | 
			
		||||
            rx.blocking_recv()
 | 
			
		||||
        } {
 | 
			
		||||
            match msg {
 | 
			
		||||
| 
						 | 
				
			
			@ -1,32 +0,0 @@
 | 
			
		|||
use crate::{
 | 
			
		||||
    db,
 | 
			
		||||
    repo::{archive, package, AsyncCommand, SharedState},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
    path::PathBuf,
 | 
			
		||||
    sync::{atomic::Ordering, Arc},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
pub struct AsyncActor {
 | 
			
		||||
    state: Arc<SharedState>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl AsyncActor {
 | 
			
		||||
    pub fn new(state: &Arc<SharedState>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            state: Arc::clone(state)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn run(self) {
 | 
			
		||||
        while let Some(msg) = {
 | 
			
		||||
            let mut rx = self.state.async_queue.1.lock().await;
 | 
			
		||||
            rx.recv().await
 | 
			
		||||
        } {
 | 
			
		||||
            match msg {
 | 
			
		||||
                
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,5 +0,0 @@
 | 
			
		|||
mod sync;
 | 
			
		||||
mod r#async;
 | 
			
		||||
 | 
			
		||||
pub use sync::Actor;
 | 
			
		||||
pub use r#async::AsyncActor;
 | 
			
		||||
| 
						 | 
				
			
			@ -31,11 +31,7 @@ impl Handle {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
 | 
			
		||||
        let repo_dir = self.state.repos_dir.join(repo_id.to_string());
 | 
			
		||||
 | 
			
		||||
        if !tokio::fs::try_exists(repo_dir).await? {
 | 
			
		||||
        tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let mut repos = self.state.repos.write().await;
 | 
			
		||||
        repos.insert(repo_id, Default::default());
 | 
			
		||||
| 
						 | 
				
			
			@ -140,25 +136,17 @@ impl Handle {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
 | 
			
		||||
        self.state
 | 
			
		||||
            .sync_queue
 | 
			
		||||
            .0
 | 
			
		||||
            .send(Command::ParsePkg(repo, path))
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
 | 
			
		||||
        self.state.repos.read().await.get(&repo).inspect(|n| {
 | 
			
		||||
            n.0.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn queue_sync(&self, repo: i32) {
 | 
			
		||||
        self.state
 | 
			
		||||
            .sync_queue
 | 
			
		||||
            .0
 | 
			
		||||
            .send(Command::SyncRepo(repo))
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        self.state.tx.send(Command::SyncRepo(repo)).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn queue_clean(&self) {
 | 
			
		||||
        self.state.sync_queue.0.send(Command::Clean).unwrap();
 | 
			
		||||
        self.state.tx.send(Command::Clean).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +0,0 @@
 | 
			
		|||
mod parser;
 | 
			
		||||
 | 
			
		||||
pub use parser::{DbArchiveEntry, DbArchiveParser};
 | 
			
		||||
| 
						 | 
				
			
			@ -1,75 +0,0 @@
 | 
			
		|||
use std::{
 | 
			
		||||
    io::{self, BufRead},
 | 
			
		||||
    path::Path,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use libarchive::{
 | 
			
		||||
    read::{Archive, Builder, Entries, FileReader, ReadEntry},
 | 
			
		||||
    Entry,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
pub struct DbArchiveParser<'a, T: 'a + Archive> {
 | 
			
		||||
    entries: Entries<'a, T>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct DbArchiveEntry {
 | 
			
		||||
    name: String,
 | 
			
		||||
    version: String,
 | 
			
		||||
    filename: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a, T: Archive> DbArchiveParser<'a, T> {
 | 
			
		||||
    pub fn new(ar: &'a mut T) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            entries: Entries::new(ar),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // parse a given entry. If the entry's not a regular file, the function returns None.
 | 
			
		||||
    fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result<DbArchiveEntry> {
 | 
			
		||||
        let reader = io::BufReader::new(entry);
 | 
			
		||||
        let mut lines = reader.lines();
 | 
			
		||||
 | 
			
		||||
        let mut name: Option<String> = None;
 | 
			
		||||
        let mut version: Option<String> = None;
 | 
			
		||||
        let mut filename: Option<String> = None;
 | 
			
		||||
 | 
			
		||||
        while let Some(line) = lines.next().transpose()? {
 | 
			
		||||
            match line.as_str() {
 | 
			
		||||
                "%NAME%" => name = lines.next().transpose()?,
 | 
			
		||||
                "%VERSION%" => version = lines.next().transpose()?,
 | 
			
		||||
                "%FILENAME%" => filename = lines.next().transpose()?,
 | 
			
		||||
                _ => {}
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if name.is_some() && version.is_some() && filename.is_some() {
 | 
			
		||||
            Ok(DbArchiveEntry {
 | 
			
		||||
                name: name.unwrap(),
 | 
			
		||||
                version: version.unwrap(),
 | 
			
		||||
                filename: filename.unwrap(),
 | 
			
		||||
            })
 | 
			
		||||
        } else {
 | 
			
		||||
            Err(io::Error::other("Missing fields in entry file"))
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> {
 | 
			
		||||
    type Item = io::Result<DbArchiveEntry>;
 | 
			
		||||
 | 
			
		||||
    fn next(&mut self) -> Option<Self::Item> {
 | 
			
		||||
        while let Some(entry) = self.entries.next() {
 | 
			
		||||
            match entry {
 | 
			
		||||
                Ok(entry) => {
 | 
			
		||||
                    if entry.filetype() == libarchive::FileType::RegularFile {
 | 
			
		||||
                        return Some(Self::parse_entry(entry));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => return Some(Err(err.into())),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        None
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,10 +1,9 @@
 | 
			
		|||
mod actor;
 | 
			
		||||
mod archive;
 | 
			
		||||
mod handle;
 | 
			
		||||
mod mirror;
 | 
			
		||||
pub mod package;
 | 
			
		||||
 | 
			
		||||
pub use actor::{Actor, AsyncActor};
 | 
			
		||||
pub use actor::Actor;
 | 
			
		||||
pub use handle::Handle;
 | 
			
		||||
 | 
			
		||||
use std::{
 | 
			
		||||
| 
						 | 
				
			
			@ -28,33 +27,30 @@ pub enum Command {
 | 
			
		|||
    Clean,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub enum AsyncCommand {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type RepoState = (AtomicU32, Arc<Mutex<()>>);
 | 
			
		||||
 | 
			
		||||
pub struct SharedState {
 | 
			
		||||
    pub repos_dir: PathBuf,
 | 
			
		||||
    pub conn: DbConn,
 | 
			
		||||
    pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>),
 | 
			
		||||
    pub async_queue: (
 | 
			
		||||
        UnboundedSender<AsyncCommand>,
 | 
			
		||||
        tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
 | 
			
		||||
    ),
 | 
			
		||||
    pub rx: Mutex<UnboundedReceiver<Command>>,
 | 
			
		||||
    pub tx: UnboundedSender<Command>,
 | 
			
		||||
    pub repos: RwLock<HashMap<i32, RepoState>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl SharedState {
 | 
			
		||||
    pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        repos_dir: impl AsRef<Path>,
 | 
			
		||||
        conn: DbConn,
 | 
			
		||||
        repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        let (tx, rx) = unbounded_channel();
 | 
			
		||||
        let (async_tx, async_rx) = unbounded_channel();
 | 
			
		||||
 | 
			
		||||
        Self {
 | 
			
		||||
            repos_dir: repos_dir.as_ref().to_path_buf(),
 | 
			
		||||
            conn,
 | 
			
		||||
            sync_queue: (tx, Mutex::new(rx)),
 | 
			
		||||
            async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
 | 
			
		||||
            repos: RwLock::new(Default::default()),
 | 
			
		||||
            rx: Mutex::new(rx),
 | 
			
		||||
            tx,
 | 
			
		||||
            repos: RwLock::new(repos),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -63,11 +59,11 @@ pub fn start(
 | 
			
		|||
    repos_dir: impl AsRef<Path>,
 | 
			
		||||
    conn: DbConn,
 | 
			
		||||
    rt: runtime::Handle,
 | 
			
		||||
    sync_actors: u32,
 | 
			
		||||
    async_actors: u32,
 | 
			
		||||
    actors: u32,
 | 
			
		||||
) -> crate::Result<Handle> {
 | 
			
		||||
    std::fs::create_dir_all(repos_dir.as_ref())?;
 | 
			
		||||
 | 
			
		||||
    let mut repos = HashMap::new();
 | 
			
		||||
    let repo_ids: Vec<i32> = rt.block_on(
 | 
			
		||||
        entity::prelude::Repo::find()
 | 
			
		||||
            .select_only()
 | 
			
		||||
| 
						 | 
				
			
			@ -76,25 +72,17 @@ pub fn start(
 | 
			
		|||
            .all(&conn),
 | 
			
		||||
    )?;
 | 
			
		||||
 | 
			
		||||
    let state = Arc::new(SharedState::new(repos_dir, conn));
 | 
			
		||||
    for id in repo_ids {
 | 
			
		||||
        repos.insert(id, Default::default());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for _ in 0..sync_actors {
 | 
			
		||||
        let actor = Actor::new(rt.clone(), &state);
 | 
			
		||||
    let state = Arc::new(SharedState::new(repos_dir, conn, repos));
 | 
			
		||||
 | 
			
		||||
    for _ in 0..actors {
 | 
			
		||||
        let actor = Actor::new(rt.clone(), Arc::clone(&state));
 | 
			
		||||
 | 
			
		||||
        std::thread::spawn(|| actor.run());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for _ in 0..async_actors {
 | 
			
		||||
        let actor = AsyncActor::new(&state);
 | 
			
		||||
 | 
			
		||||
        rt.spawn(actor.run());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let handle = Handle::new(&state);
 | 
			
		||||
 | 
			
		||||
    for id in repo_ids {
 | 
			
		||||
        rt.block_on(handle.register_repo(id))?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(handle)
 | 
			
		||||
    Ok(Handle::new(&state))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,5 @@
 | 
			
		|||
mod pagination;
 | 
			
		||||
mod repo;
 | 
			
		||||
 | 
			
		||||
use crate::db;
 | 
			
		||||
use pagination::PaginatedResponse;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue