From d5e31040195964dfd1ee6e5ce5d53234a44a297b Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 2 Apr 2022 12:44:41 +0200 Subject: [PATCH] First implementation of archive uploading --- Cargo.lock | 15 +--------- Cargo.toml | 2 +- src/main.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 79 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 415bde5..1b85559 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,19 +17,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "async-compression" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2bf394cfbbe876f0ac67b13b6ca819f9c9f2fb9ec67223cceb1555fbab1c31a" -dependencies = [ - "flate2", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-trait" version = "0.1.53" @@ -638,8 +625,8 @@ dependencies = [ name = "site-backend" version = "0.0.0" dependencies = [ - "async-compression", "axum", + "flate2", "futures-util", "hyper", "tar", diff --git a/Cargo.toml b/Cargo.toml index fc1d055..57222ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,6 @@ tracing = "0.1.32" tracing-subscriber = {version = "0.3.9", features = ["env-filter"] } tower-http = { version = "0.2.5", features = ["fs", "trace", "auth"] } tar = "0.4.38" -async-compression = { version = "0.3.12", features = ["tokio", "gzip"] } +flate2 = "1.0.22" tokio-util = { version = "0.7.1", features = ["io"] } futures-util = "0.3.21" diff --git a/src/main.rs b/src/main.rs index 23f7405..d30cd4f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,16 @@ -use async_compression::tokio::bufread::GzipDecoder; use axum::{ extract::BodyStream, http::StatusCode, + response::IntoResponse, routing::{get_service, post}, Router, }; +use flate2::read::GzDecoder; use futures_util::TryStreamExt; -use hyper::{Body, Request}; +use std::collections::HashSet; use std::io::ErrorKind; use std::net::SocketAddr; +use std::path::Path; use tar::Archive; use tokio_util::io::StreamReader; use tower_http::{auth::RequireAuthorizationLayer, services::ServeDir, trace::TraceLayer}; @@ -57,10 +59,79 @@ async fn main() { .unwrap(); } -async fn post_deploy(res: BodyStream) { +async fn post_deploy(res: BodyStream) -> impl IntoResponse { + // This converts a stream into something that implements AsyncRead, which we can then use to + // asynchronously write the file to disk let mut read = StreamReader::new(res.map_err(|axum_err| std::io::Error::new(ErrorKind::Other, axum_err))); - // let tar = GzipDecoder::new(body); - // let mut archive = Archive::new(tar); - // archive.unpack("./static").unwrap(); + let mut file = tokio::fs::File::create("test.archive.gz").await.unwrap(); + tokio::io::copy(&mut read, &mut file).await.unwrap(); + + // Extract the contents of the tarball synchronously + match tokio::task::spawn_blocking(|| { + let file = match std::fs::File::open("test.archive.gz") { + Ok(v) => v, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR, + }; + let tar = GzDecoder::new(file); + let mut archive = Archive::new(tar); + + let mut paths = HashSet::new(); + + let entries = match archive.entries() { + Ok(e) => e, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR, + }; + + // Extract each entry into the output directory + for entry_res in entries { + if let Ok(mut entry) = entry_res { + if let Err(_) = entry.unpack_in("static") { + return StatusCode::INTERNAL_SERVER_ERROR; + } + + if let Ok(path) = entry.path() { + paths.insert(path.into_owned()); + } + } else { + return StatusCode::INTERNAL_SERVER_ERROR; + } + } + + // Remove any old files that weren't present in new archive + let mut items = vec![]; + + // Start by populating the vec with the initial files + let base_path = Path::new("static"); + let iter = match base_path.read_dir() { + Ok(v) => v, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR, + }; + iter.filter_map(|r| r.ok()) + .for_each(|e| items.push(e.path())); + + // As long as there are still items in the vec, we keep going + while items.len() > 0 { + let item = items.pop().unwrap(); + tracing::debug!("{:?}", item); + + if !paths.contains(item.strip_prefix("static/").unwrap()) { + if item.is_dir() { + std::fs::remove_dir_all(item); + } else { + std::fs::remove_file(item); + } + } else if let Ok(iter) = item.read_dir() { + iter.filter_map(|r| r.ok()) + .for_each(|e| items.push(e.path())); + } + } + + StatusCode::OK + }) + .await + { + Ok(s) => s, + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + } }