feat: streaming upload of archives

main
Jef Roosens 2023-07-12 11:04:31 +02:00
parent 3e5ac03f2d
commit 51cda94c1a
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
2 changed files with 45 additions and 4 deletions

View File

@ -7,6 +7,9 @@ edition = "2021"
[dependencies]
axum = "0.6.18"
futures = "0.3.28"
tokio = { version = "1.29.1", features = ["full"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tower = { version = "0.4.13", features = ["make"] }
tower-http = { version = "0.4.1", features = ["fs"] }
uuid = { version = "1.4.0", features = ["v4"] }

View File

@ -1,12 +1,50 @@
use axum::extract::{Path, State};
use axum::routing::get_service;
use axum::extract::{BodyStream, Path, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get_service, post};
use axum::Router;
use futures::StreamExt;
use futures::TryFutureExt;
use futures::TryStreamExt;
use tokio::{fs, io, io::AsyncWriteExt};
use tower_http::services::ServeDir;
use uuid::Uuid;
pub fn router(config: &crate::Config) -> Router<crate::Config> {
// Try to serve packages by default, and try the database files instead if not found
let service = ServeDir::new(&config.pkg_dir).fallback(ServeDir::new(&config.repo_dir));
let serve_repos =
get_service(ServeDir::new(&config.pkg_dir).fallback(ServeDir::new(&config.repo_dir)));
Router::new()
.route("/*path", get_service(service))
.route(
"/:repo",
post(post_package_archive).get(serve_repos.clone()),
)
.fallback(serve_repos)
.with_state(config.clone())
}
async fn post_package_archive(
State(config): State<crate::Config>,
Path(repo): Path<String>,
body: BodyStream,
) -> Result<(), StatusCode> {
// let mut body_reader = tokio_util::io::StreamReader::new(
// body.map_err(|err| io::Error::new(io::ErrorKind::Other, err)),
// );
let mut body = body.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR);
// We first stream the uploaded file to disk
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = config.pkg_dir.join(uuid.to_string());
let mut f = fs::File::create(&path)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
.await?;
while let Some(chunk) = body.next().await {
f.write_all(&chunk?)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
.await?;
}
Ok(())
}