From 8c1f60101b43a710afc7209558e0a6bc4a65e17a Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 18 Jan 2021 17:11:32 -0600 Subject: [PATCH] Add module for Range Header operations Switch from actix-fs to async-fs --- Cargo.lock | 151 +++++++++++++++++++++--- Cargo.toml | 4 +- src/error.rs | 3 - src/main.rs | 261 ++++++++++++++++-------------------------- src/processor.rs | 4 +- src/range.rs | 208 +++++++++++++++++++++++++++++++++ src/upload_manager.rs | 36 +++--- 7 files changed, 473 insertions(+), 194 deletions(-) create mode 100644 src/range.rs diff --git a/Cargo.lock b/Cargo.lock index e18e58f..c2b243d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,18 +55,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "actix-fs" -version = "0.1.0" -source = "git+https://git.asonix.dog/asonix/actix-fs?branch=main#dc37026b9f1a8016304b71af34802a1ec52a318b" -dependencies = [ - "actix-threadpool", - "bytes", - "futures", - "log", - "thiserror", -] - [[package]] name = "actix-http" version = "2.2.0" @@ -365,6 +353,64 @@ version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee67c11feeac938fae061b232e38e0b6d94f97a9df10e6271319325ac4c56a86" +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-fs" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2" +dependencies = [ + "async-lock", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-lock" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3670df70cbc01729f901f94c887814b3c68db038aad1329a418bae178bc5295c" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + [[package]] name = "async-trait" version = "0.1.42" @@ -376,6 +422,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "atty" version = "0.2.14" @@ -509,6 +561,20 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + [[package]] name = "brotli-sys" version = "0.3.2" @@ -556,6 +622,12 @@ dependencies = [ "bytes", ] +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cc" version = "1.0.66" @@ -622,6 +694,15 @@ dependencies = [ "vec_map", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "const_fn" version = "0.4.5" @@ -751,6 +832,21 @@ dependencies = [ "termcolor", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + +[[package]] +name = "fastrand" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] + [[package]] name = "ffmpeg-next" version = "4.3.8" @@ -878,6 +974,21 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" +[[package]] +name = "futures-lite" +version = "1.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite 0.2.1", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.8" @@ -1384,6 +1495,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.1" @@ -1426,15 +1543,17 @@ name = "pict-rs" version = "0.3.0-alpha.4" dependencies = [ "actix-form-data", - "actix-fs", "actix-rt", "actix-web", "anyhow", + "async-fs", + "async-stream", "base64 0.13.0", "bytes", "ffmpeg-next", "ffmpeg-sys-next", "futures", + "futures-lite", "magick_rust", "mime", "once_cell", @@ -2405,6 +2524,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 8ebfbca..4945c94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,15 @@ edition = "2018" [dependencies] actix-form-data = "0.5.0" -actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs", branch = "main" } actix-rt = "1.1.1" actix-web = { version = "3.0.1", default-features = false, features = ["rustls"] } anyhow = "1.0" +async-fs = "1.5.0" +async-stream = "0.3.0" base64 = "0.13.0" bytes = "0.5" futures = "0.3.4" +futures-lite = "1.11.3" magick_rust = { version = "0.14.0", git = "https://github.com/nlfiedler/magick-rust" } mime = "0.3.1" once_cell = "1.4.0" diff --git a/src/error.rs b/src/error.rs index 2900bfe..35a51f6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,9 +6,6 @@ pub(crate) enum UploadError { #[error("Couln't upload file, {0}")] Upload(String), - #[error("Couldn't save file, {0}")] - Save(#[from] actix_fs::Error), - #[error("Error in DB, {0}")] Db(#[from] sled::Error), diff --git a/src/main.rs b/src/main.rs index 09eff03..ffb05bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,18 @@ use actix_form_data::{Field, Form, Value}; -use actix_fs::file; use actix_web::{ client::Client, + dev::HttpResponseBuilder, guard, - http::{ - header::{ - CacheControl, CacheDirective, ContentRange, ContentRangeSpec, Header, LastModified, - ACCEPT_RANGES, CONTENT_LENGTH, - }, - HeaderValue, - }, + http::header::{CacheControl, CacheDirective, LastModified, ACCEPT_RANGES}, middleware::{Compress, Logger}, - web, App, HttpRequest, HttpResponse, HttpServer, + web, App, HttpResponse, HttpServer, }; use bytes::Bytes; -use futures::{ - stream::{Stream, TryStreamExt}, - StreamExt, -}; +use futures::stream::{once, Stream}; +use futures_lite::{AsyncReadExt, AsyncWriteExt}; use once_cell::sync::Lazy; use std::{ - collections::HashSet, convert::TryInto, io, path::PathBuf, sync::Once, time::SystemTime, + collections::HashSet, future::ready, io, path::PathBuf, pin::Pin, sync::Once, time::SystemTime, }; use structopt::StructOpt; use tracing::{debug, error, info, instrument, Span}; @@ -31,6 +23,7 @@ mod error; mod middleware; mod migrate; mod processor; +mod range; mod upload_manager; mod validate; @@ -43,6 +36,7 @@ use self::{ validate::{image_webp, video_mp4}, }; +const CHUNK_SIZE: usize = 65_356; const MEGABYTES: usize = 1024 * 1024; const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; @@ -72,12 +66,12 @@ static MAGICK_INIT: Once = Once::new(); async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + async_fs::create_dir_all(path.to_owned()).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = actix_fs::metadata(to.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = async_fs::metadata(to.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -85,15 +79,15 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { } debug!("Moving {:?} to {:?}", from, to); - actix_fs::copy(from.clone(), to).await?; - actix_fs::remove_file(from).await?; + async_fs::copy(from.clone(), to).await?; + async_fs::remove_file(from).await?; Ok(()) } async fn safe_create_parent(path: PathBuf) -> Result<(), UploadError> { if let Some(path) = path.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + async_fs::create_dir_all(path.to_owned()).await?; } Ok(()) @@ -105,13 +99,13 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload if let Some(path) = path.parent() { // create the directory for the file debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + async_fs::create_dir_all(path.to_owned()).await?; } // Only write the file if it doesn't already exist debug!("Checking if {:?} already exists", path); - if let Err(e) = actix_fs::metadata(path.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = async_fs::metadata(path.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -120,16 +114,17 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload // Open the file for writing debug!("Creating {:?}", path); - let file = actix_fs::file::create(path.clone()).await?; + let mut file = async_fs::File::create(path.clone()).await?; // try writing debug!("Writing to {:?}", path); - if let Err(e) = actix_fs::file::write(file, bytes).await { + if let Err(e) = file.write_all(&bytes).await { error!("Error writing {:?}, {}", path, e); // remove file if writing failed before completion - actix_fs::remove_file(path).await?; + async_fs::remove_file(path).await?; return Err(e.into()); } + file.flush().await?; debug!("{:?} written", path); Ok(()) @@ -348,7 +343,7 @@ async fn process_details( /// Process files #[instrument(skip(manager, whitelist))] async fn process( - req: HttpRequest, + range: Option, query: web::Query, ext: web::Path, manager: web::Data, @@ -358,8 +353,8 @@ async fn process( prepare_process(query, ext.as_str(), &manager, &whitelist).await?; // If the thumbnail doesn't exist, we need to create it - let thumbnail_exists = if let Err(e) = actix_fs::metadata(thumbnail_path.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + let thumbnail_exists = if let Err(e) = async_fs::metadata(thumbnail_path.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { error!("Error looking up processed image, {}", e); return Err(e.into()); } @@ -443,36 +438,34 @@ async fn process( drop(entered); }); - match req.headers().get("Range") { - Some(range_head) => { - let range = parse_range_header(range_head)?; + let (builder, stream) = match range { + Some(range_header) => { + if !range_header.is_bytes() { + return Err(UploadError::Range); + } - let resp_bytes = img_bytes.slice(range[0] as usize..range[1] as usize); + if range_header.is_empty() { + return Err(UploadError::Range); + } else if range_header.len() == 1 { + let range = range_header.ranges().next().unwrap(); - let stream = Box::pin(futures::stream::once(async move { - Ok(resp_bytes) as Result<_, UploadError> - })); - - return Ok(srv_ranged_response( - stream, - details.content_type(), - 7 * DAYS, - details.system_time(), - Some((range[0], range[1])), - Some(img_bytes.len() as u64), - )); - } - None => { - return Ok(srv_response( - Box::pin(futures::stream::once(async { - Ok(img_bytes) as Result<_, UploadError> - })), - details.content_type(), - 7 * DAYS, - details.system_time(), - )); + let mut builder = HttpResponse::PartialContent(); + builder.set(range.to_content_range(img_bytes.len() as u64)); + (builder, range.chop_bytes(img_bytes)) + } else { + return Err(UploadError::Range); + } } + None => (HttpResponse::Ok(), once(ready(Ok(img_bytes)))), }; + + return Ok(srv_response( + builder, + stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + )); } let details = if let Some(details) = details { @@ -485,7 +478,7 @@ async fn process( details }; - ranged_file_resp(thumbnail_path, req, details).await + ranged_file_resp(thumbnail_path, range, details).await } /// Fetch file details @@ -515,7 +508,7 @@ async fn details( /// Serve files #[instrument(skip(manager))] async fn serve( - req: web::HttpRequest, + range: Option, alias: web::Path, manager: web::Data, ) -> Result { @@ -535,105 +528,78 @@ async fn serve( details }; - ranged_file_resp(path, req, details).await + ranged_file_resp(path, range, details).await } -fn parse_range_header(range_head: &HeaderValue) -> Result, UploadError> { - let range_head_str = range_head.to_str().map_err(|_| { - UploadError::ParseReq("Range header contains non-utf8 characters".to_string()) - })?; +fn read_to_stream(mut file: async_fs::File) -> impl Stream> { + async_stream::stream! { + let mut buf = Vec::with_capacity(CHUNK_SIZE); - let range_dashed = range_head_str - .split('=') - .skip(1) - .next() - .ok_or(UploadError::ParseReq("Malformed Range header".to_string()))?; + while { + buf.clear(); + let mut take = (&mut file).take(CHUNK_SIZE as u64); - let range: Vec = range_dashed - .split('-') - .map(|s| s.parse::()) - .collect::, _>>() - .map_err(|_| { - UploadError::ParseReq("Cannot parse byte locations in range header".to_string()) - })?; + let read_bytes_result = take.read_to_end(&mut buf).await; - if range[0] > range[1] { - return Err(UploadError::Range); + let read_bytes = read_bytes_result.as_ref().map(|num| *num).unwrap_or(0); + + yield read_bytes_result.map(|_| Bytes::copy_from_slice(&buf)); + + read_bytes > 0 + } {} } - - Ok(range) } async fn ranged_file_resp( path: PathBuf, - req: HttpRequest, + range: Option, details: Details, ) -> Result { - match req.headers().get("Range") { + let (builder, stream) = match range { //Range header exists - return as ranged - Some(range_head) => { - let range = parse_range_header(range_head)?; - - let (out_file, _) = file::seek( - file::open(path).await?, - io::SeekFrom::Current(range[0].try_into().map_err(|_| { - UploadError::ParseReq("Byte locations too high in range header".to_string()) - })?), - ) - .await?; - - let (out_file, meta) = file::metadata(out_file) - .await - .map_err(|_| UploadError::Upload("Error reading metadata".to_string()))?; - - if meta.len() < range[0] { + Some(range_header) => { + if !range_header.is_bytes() { return Err(UploadError::Range); } - // file::read_to_stream() creates a stream in 65,356 byte chunks. - let whole_to = ((range[1] - range[0]) as f64 / 65_356.0).floor() as usize; - let partial_len = ((range[1] - range[0]) % 65_356) as usize; + if range_header.is_empty() { + return Err(UploadError::Range); + } else if range_header.len() == 1 { + let file = async_fs::File::open(path).await?; - //debug!("Range of {}. Returning {} whole chunks, and {} bytes of the partial chunk", range[1]-range[0], whole_to, partial_len); + let meta = file.metadata().await?; - let stream = file::read_to_stream(out_file) - .await? - .take(whole_to + 1) - .enumerate() - .map(move |bytes_res| match bytes_res.1 { - Ok(mut bytes) => { - if bytes_res.0 == whole_to && partial_len <= bytes.len() { - return Ok(bytes.split_to(partial_len)); - } - return Ok(bytes); - } - Err(e) => Err(e), - }); + let range = range_header.ranges().next().unwrap(); - return Ok(srv_ranged_response( - stream, - details.content_type(), - 7 * DAYS, - details.system_time(), - Some((range[0], range[1])), - Some(meta.len()), - )); + let mut builder = HttpResponse::PartialContent(); + builder.set(range.to_content_range(meta.len())); + + (builder, range.chop_file(file).await?) + } else { + return Err(UploadError::Range); + } } //No Range header in the request - return the entire document None => { - let stream = actix_fs::read_to_stream(path).await?; - return Ok(srv_response( - stream, - details.content_type(), - 7 * DAYS, - details.system_time(), - )); + let file = async_fs::File::open(path).await?; + let stream: Pin>>> = + Box::pin(read_to_stream(file)); + (HttpResponse::Ok(), stream) } }; + + Ok(srv_response( + builder, + stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + )) } // A helper method to produce responses with proper cache headers fn srv_response( + mut builder: HttpResponseBuilder, stream: S, ext: mime::Mime, expires: u32, @@ -641,9 +607,10 @@ fn srv_response( ) -> HttpResponse where S: Stream> + Unpin + 'static, - E: Into, + E: 'static, + actix_web::Error: From, { - HttpResponse::Ok() + builder .set(LastModified(modified.into())) .set(CacheControl(vec![ CacheDirective::Public, @@ -652,35 +619,7 @@ where ])) .set_header(ACCEPT_RANGES, "bytes") .content_type(ext.to_string()) - .streaming(stream.err_into()) -} - -fn srv_ranged_response( - stream: S, - ext: mime::Mime, - expires: u32, - modified: SystemTime, - range: Option<(u64, u64)>, - instance_length: Option, -) -> HttpResponse -where - S: Stream> + Unpin + 'static, - E: Into, -{ - HttpResponse::PartialContent() - .set(LastModified(modified.into())) - .set(CacheControl(vec![ - CacheDirective::Public, - CacheDirective::MaxAge(expires), - CacheDirective::Extension("immutable".to_owned(), None), - ])) - .set(ContentRange(ContentRangeSpec::Bytes { - range, - instance_length, - })) - .set_header(ACCEPT_RANGES, "bytes") - .content_type(ext.to_string()) - .streaming(stream.err_into()) + .streaming(stream) } #[derive(Debug, serde::Deserialize)] @@ -875,8 +814,8 @@ async fn main() -> Result<(), anyhow::Error> { .run() .await?; - if actix_fs::metadata(&*TMP_DIR).await.is_ok() { - actix_fs::remove_dir_all(&*TMP_DIR).await?; + if async_fs::metadata(&*TMP_DIR).await.is_ok() { + async_fs::remove_dir_all(&*TMP_DIR).await?; } Ok(()) diff --git a/src/processor.rs b/src/processor.rs index 661c625..0f355c2 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -356,7 +356,7 @@ pub(crate) async fn prepare_image( let jpg_path = format!("{}.jpg", original_path_str); let jpg_path = PathBuf::from(jpg_path); - if actix_fs::metadata(jpg_path.clone()).await.is_ok() { + if async_fs::metadata(jpg_path.clone()).await.is_ok() { return Ok(Some((jpg_path, Exists::Exists))); } @@ -376,7 +376,7 @@ pub(crate) async fn prepare_image( if let Err(e) = res { error!("transcode error: {:?}", e); - actix_fs::remove_file(tmpfile2).await?; + async_fs::remove_file(tmpfile2).await?; return Err(e.into()); } diff --git a/src/range.rs b/src/range.rs new file mode 100644 index 0000000..c363153 --- /dev/null +++ b/src/range.rs @@ -0,0 +1,208 @@ +use crate::{UploadError, CHUNK_SIZE}; +use actix_web::{ + dev::Payload, + http::{ + header::{ContentRange, ContentRangeSpec}, + HeaderValue, + }, + web::Bytes, + FromRequest, HttpRequest, +}; +use futures::stream::{once, Once, Stream}; +use futures_lite::{AsyncReadExt, AsyncSeekExt}; +use std::io; +use std::{ + future::{ready, Ready}, + pin::Pin, +}; + +#[derive(Debug)] +pub(crate) enum Range { + RangeStart(u64), + SuffixLength(u64), + Segment(u64, u64), +} + +#[derive(Debug)] +pub(crate) struct RangeHeader { + unit: String, + ranges: Vec, +} + +impl Range { + pub(crate) fn to_content_range(&self, instance_length: u64) -> ContentRange { + match self { + Range::RangeStart(start) => ContentRange(ContentRangeSpec::Bytes { + range: Some((*start, instance_length)), + instance_length: Some(instance_length), + }), + Range::SuffixLength(from_start) => ContentRange(ContentRangeSpec::Bytes { + range: Some((0, *from_start)), + instance_length: Some(instance_length), + }), + Range::Segment(start, end) => ContentRange(ContentRangeSpec::Bytes { + range: Some((*start, *end)), + instance_length: Some(instance_length), + }), + } + } + + pub(crate) fn chop_bytes(&self, bytes: Bytes) -> Once>> { + match self { + Range::RangeStart(start) => once(ready(Ok(bytes.slice(*start as usize..)))), + Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))), + Range::Segment(start, end) => { + once(ready(Ok(bytes.slice(*start as usize..*end as usize)))) + } + } + } + + pub(crate) async fn chop_file( + &self, + mut file: async_fs::File, + ) -> Result>>>, io::Error> { + match self { + Range::RangeStart(start) => { + file.seek(io::SeekFrom::Start(*start)).await?; + + Ok(Box::pin(crate::read_to_stream(file))) + } + Range::SuffixLength(from_start) => { + file.seek(io::SeekFrom::Start(0)).await?; + + Ok(Box::pin(read_num_bytes_to_stream(file, *from_start))) + } + Range::Segment(start, end) => { + file.seek(io::SeekFrom::Start(*start)).await?; + + Ok(Box::pin(read_num_bytes_to_stream( + file, + end.saturating_sub(*start), + ))) + } + } + } +} + +impl RangeHeader { + pub(crate) fn is_bytes(&self) -> bool { + self.unit == "bytes" + } + + pub(crate) fn ranges<'a>(&'a self) -> impl Iterator + 'a { + self.ranges.iter() + } + + pub(crate) fn len(&self) -> usize { + self.ranges.len() + } + + pub(crate) fn is_empty(&self) -> bool { + self.ranges.is_empty() + } +} + +impl FromRequest for RangeHeader { + type Config = (); + type Error = actix_web::Error; + type Future = std::future::Ready>; + + fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { + if let Some(range_head) = req.headers().get("Range") { + ready(parse_range_header(range_head).map_err(|e| { + tracing::warn!("Failed to parse range header: {}", e); + e.into() + })) + } else { + ready(Err(UploadError::ParseReq( + "Range header missing".to_string(), + ) + .into())) + } + } +} + +fn parse_range_header(range_head: &HeaderValue) -> Result { + let range_head_str = range_head.to_str().map_err(|_| { + UploadError::ParseReq("Range header contains non-utf8 characters".to_string()) + })?; + + let eq_pos = range_head_str + .find('=') + .ok_or_else(|| UploadError::ParseReq("Malformed Range Header".to_string()))?; + + let (unit, ranges) = range_head_str.split_at(eq_pos); + let ranges = ranges.trim_start_matches('='); + + let ranges = ranges + .split(',') + .map(parse_range) + .collect::, UploadError>>()?; + + Ok(RangeHeader { + unit: unit.to_owned(), + ranges, + }) +} + +fn parse_range(s: &str) -> Result { + let dash_pos = s + .find('-') + .ok_or_else(|| UploadError::ParseReq("Mailformed Range Bound".to_string()))?; + + let (start, end) = s.split_at(dash_pos); + let start = start.trim(); + let end = end.trim_start_matches('-').trim(); + + if start.is_empty() && end.is_empty() { + Err(UploadError::ParseReq("Malformed content range".to_string())) + } else if start.is_empty() { + let suffix_length = end.parse().map_err(|_| { + UploadError::ParseReq("Cannot parse suffix length for range header".to_string()) + })?; + + Ok(Range::SuffixLength(suffix_length)) + } else if end.is_empty() { + let range_start = start.parse().map_err(|_| { + UploadError::ParseReq("Cannot parse range start for range header".to_string()) + })?; + + Ok(Range::RangeStart(range_start)) + } else { + let range_start = start.parse().map_err(|_| { + UploadError::ParseReq("Cannot parse range start for range header".to_string()) + })?; + let range_end = end.parse().map_err(|_| { + UploadError::ParseReq("Cannot parse range end for range header".to_string()) + })?; + + if range_start > range_end { + return Err(UploadError::Range); + } + + Ok(Range::Segment(range_start, range_end)) + } +} + +fn read_num_bytes_to_stream( + mut file: async_fs::File, + mut num_bytes: u64, +) -> impl Stream> { + async_stream::stream! { + let mut buf = Vec::with_capacity((CHUNK_SIZE as u64).min(num_bytes) as usize); + + while { + buf.clear(); + let mut take = (&mut file).take((CHUNK_SIZE as u64).min(num_bytes)); + + let read_bytes_result = take.read_to_end(&mut buf).await; + + let read_bytes = read_bytes_result.as_ref().map(|num| *num).unwrap_or(0); + + yield read_bytes_result.map(|_| Bytes::copy_from_slice(&buf)); + + num_bytes = num_bytes.saturating_sub(read_bytes as u64); + read_bytes > 0 && num_bytes > 0 + } {} + } +} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 03f965d..215ec82 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -6,7 +6,7 @@ use crate::{ validate::validate_image, }; use actix_web::web; -use futures::stream::{Stream, StreamExt, TryStreamExt}; +use futures::stream::{Stream, StreamExt}; use sha2::Digest; use std::{path::PathBuf, pin::Pin, sync::Arc}; use tracing::{debug, error, info, instrument, warn, Span}; @@ -218,7 +218,7 @@ impl UploadManager { root_dir.push("files"); // Ensure file dir exists - actix_fs::create_dir_all(root_dir.clone()).await?; + async_fs::create_dir_all(root_dir.clone()).await?; Ok(UploadManager { inner: Arc::new(UploadManagerInner { @@ -603,7 +603,7 @@ impl UploadManager { let mut errors = Vec::new(); debug!("Deleting {:?}", path); - if let Err(e) = actix_fs::remove_file(path).await { + if let Err(e) = async_fs::remove_file(path).await { errors.push(e.into()); } @@ -676,7 +676,8 @@ impl UploadManager { async fn hash(&self, tmpfile: PathBuf) -> Result { let mut hasher = self.inner.hasher.clone(); - let mut stream = actix_fs::read_to_stream(tmpfile).await?; + let file = async_fs::File::open(tmpfile).await?; + let mut stream = Box::pin(crate::read_to_stream(file)); while let Some(res) = stream.next().await { let bytes = res?; @@ -750,8 +751,8 @@ impl UploadManager { path.push(filename.clone()); - if let Err(e) = actix_fs::metadata(path).await { - if e.kind() == Some(std::io::ErrorKind::NotFound) { + if let Err(e) = async_fs::metadata(path).await { + if e.kind() == std::io::ErrorKind::NotFound { debug!("Generated unused filename {}", filename); return Ok(filename); } @@ -877,19 +878,19 @@ impl UploadManager { } #[instrument(skip(stream))] -async fn safe_save_stream(to: PathBuf, stream: UploadStream) -> Result<(), UploadError> +async fn safe_save_stream(to: PathBuf, mut stream: UploadStream) -> Result<(), UploadError> where UploadError: From, E: Unpin, { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + async_fs::create_dir_all(path.to_owned()).await?; } - debug!("Checking if {:?} alreayd exists", to); - if let Err(e) = actix_fs::metadata(to.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + debug!("Checking if {:?} already exists", to); + if let Err(e) = async_fs::metadata(to.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -897,15 +898,22 @@ where } debug!("Writing stream to {:?}", to); - let stream = stream.err_into::(); - actix_fs::write_stream(to, stream).await?; + + let mut file = async_fs::File::create(to).await?; + + use futures_lite::AsyncWriteExt; + while let Some(res) = stream.next().await { + let bytes = res?; + file.write_all(&bytes).await?; + } + file.flush().await?; Ok(()) } async fn remove_path(path: sled::IVec) -> Result<(), UploadError> { let path_string = String::from_utf8(path.to_vec())?; - actix_fs::remove_file(path_string).await?; + async_fs::remove_file(path_string).await?; Ok(()) }