feat(repo): implement file downloads
This commit is contained in:
parent
aa0aae41ab
commit
64d9df2e18
5 changed files with 399 additions and 10 deletions
|
|
@ -27,6 +27,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
|||
uuid = { version = "1.4.0", features = ["v4"] }
|
||||
migration = { path = "../migration" }
|
||||
entity = { path = "../entity" }
|
||||
reqwest = { version = "0.12.5", features = ["stream"] }
|
||||
|
||||
[dependencies.sea-orm]
|
||||
version = "0.12.1"
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ pub enum ServerError {
|
|||
Status(StatusCode),
|
||||
Archive(libarchive::error::ArchiveError),
|
||||
Figment(figment::Error),
|
||||
Reqwest(reqwest::Error),
|
||||
Unit,
|
||||
}
|
||||
|
||||
|
|
@ -27,6 +28,7 @@ impl fmt::Display for ServerError {
|
|||
ServerError::Db(err) => write!(fmt, "{}", err),
|
||||
ServerError::Archive(err) => write!(fmt, "{}", err),
|
||||
ServerError::Figment(err) => write!(fmt, "{}", err),
|
||||
ServerError::Reqwest(err) => write!(fmt, "{}", err),
|
||||
ServerError::Unit => Ok(()),
|
||||
}
|
||||
}
|
||||
|
|
@ -48,6 +50,7 @@ impl IntoResponse for ServerError {
|
|||
ServerError::Db(_)
|
||||
| ServerError::Archive(_)
|
||||
| ServerError::Figment(_)
|
||||
| ServerError::Reqwest(_)
|
||||
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||
}
|
||||
}
|
||||
|
|
@ -94,3 +97,9 @@ impl From<figment::Error> for ServerError {
|
|||
ServerError::Figment(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for ServerError {
|
||||
fn from(err: reqwest::Error) -> Self {
|
||||
ServerError::Reqwest(err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,10 +4,15 @@ use crate::{
|
|||
};
|
||||
|
||||
use std::{
|
||||
io,
|
||||
path::PathBuf,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
};
|
||||
|
||||
use uuid::Uuid;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
pub struct AsyncActor {
|
||||
state: Arc<SharedState>,
|
||||
}
|
||||
|
|
@ -19,14 +24,41 @@ impl AsyncActor {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
|
||||
std::array::from_fn(|_| {
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
self.state.repos_dir.join(uuid.to_string())
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(self) {
|
||||
while let Some(msg) = {
|
||||
let mut rx = self.state.async_queue.1.lock().await;
|
||||
rx.recv().await
|
||||
} {
|
||||
match msg {
|
||||
|
||||
AsyncCommand::DownloadDbArchive(repo_id, url) => {
|
||||
match self.download_file(url).await {
|
||||
Ok(path) => todo!("schedule parse db archive"),
|
||||
Err(err) => {
|
||||
todo!("reschedule unless max retries reached")
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_file(&self, url: reqwest::Url) -> crate::Result<PathBuf> {
|
||||
let [path] = self.random_file_paths();
|
||||
|
||||
let mut stream = self.state.client.get(url).send().await?.bytes_stream();
|
||||
let mut f = tokio::fs::File::create(&path).await?;
|
||||
|
||||
while let Some(chunk) = stream.next().await.transpose()? {
|
||||
f.write_all(&chunk).await?;
|
||||
}
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ pub enum Command {
|
|||
}
|
||||
|
||||
pub enum AsyncCommand {
|
||||
DownloadDbArchive(i32, reqwest::Url)
|
||||
}
|
||||
|
||||
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
||||
|
|
@ -42,6 +43,7 @@ pub struct SharedState {
|
|||
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
|
||||
),
|
||||
pub repos: RwLock<HashMap<i32, RepoState>>,
|
||||
pub client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
|
|
@ -55,6 +57,7 @@ impl SharedState {
|
|||
sync_queue: (tx, Mutex::new(rx)),
|
||||
async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
|
||||
repos: RwLock::new(Default::default()),
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue