From 9797e2e3fcc15cd7f0d5dfe1ac673289fd908be4 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 18 Jan 2021 19:54:39 -0600 Subject: [PATCH] Go back to actix-fs --- Cargo.lock | 129 +++++------------------------------------- Cargo.toml | 3 +- src/error.rs | 3 + src/main.rs | 69 ++++++++-------------- src/processor.rs | 4 +- src/range.rs | 74 +++++++++++++----------- src/upload_manager.rs | 33 +++++------ 7 files changed, 99 insertions(+), 216 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0264075..8b2f246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "actix-fs" +version = "0.1.0" +source = "git+https://git.asonix.dog/asonix/actix-fs?branch=main#dc37026b9f1a8016304b71af34802a1ec52a318b" +dependencies = [ + "actix-threadpool", + "bytes", + "futures", + "log", + "thiserror", +] + [[package]] name = "actix-http" version = "2.2.0" @@ -353,37 +365,6 @@ version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee67c11feeac938fae061b232e38e0b6d94f97a9df10e6271319325ac4c56a86" -[[package]] -name = "async-channel" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-fs" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2" -dependencies = [ - "async-lock", - "blocking", - "futures-lite", -] - -[[package]] -name = "async-lock" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.0" @@ -405,12 +386,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "async-trait" version = "0.1.42" @@ -422,12 +397,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "atty" version = "0.2.14" @@ -561,20 +530,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "brotli-sys" version = "0.3.2" @@ -622,12 +577,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "cache-padded" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" - [[package]] name = "cc" version = "1.0.66" @@ -694,15 +643,6 @@ dependencies = [ "vec_map", ] -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - [[package]] name = "const_fn" version = "0.4.5" @@ -832,21 +772,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "event-listener" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" - -[[package]] -name = "fastrand" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" -dependencies = [ - "instant", -] - [[package]] name = "ffmpeg-next" version = "4.3.8" @@ -974,21 +899,6 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" -[[package]] -name = "futures-lite" -version = "1.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite 0.2.1", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.8" @@ -1495,12 +1405,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.11.1" @@ -1543,17 +1447,16 @@ name = "pict-rs" version = "0.3.0-alpha.5" dependencies = [ "actix-form-data", + "actix-fs", "actix-rt", "actix-web", "anyhow", - "async-fs", "async-stream", "base64 0.13.0", "bytes", "ffmpeg-next", "ffmpeg-sys-next", "futures", - "futures-lite", "magick_rust", "mime", "once_cell", @@ -2524,12 +2427,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 8579966..dcde47e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,15 +12,14 @@ edition = "2018" [dependencies] actix-form-data = "0.5.0" +actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs", branch = "main" } actix-rt = "1.1.1" actix-web = { version = "3.0.1", default-features = false, features = ["rustls"] } anyhow = "1.0" -async-fs = "1.5.0" async-stream = "0.3.0" base64 = "0.13.0" bytes = "0.5" futures = "0.3.4" -futures-lite = "1.11.3" magick_rust = { version = "0.14.0", git = "https://github.com/nlfiedler/magick-rust" } mime = "0.3.1" once_cell = "1.4.0" diff --git a/src/error.rs b/src/error.rs index 35a51f6..e29643c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,6 +18,9 @@ pub(crate) enum UploadError { #[error("Error interacting with filesystem, {0}")] Io(#[from] std::io::Error), + #[error("Error in filesyste, {0}")] + Fs(#[from] actix_fs::Error), + #[error("Panic in blocking operation")] Canceled, diff --git a/src/main.rs b/src/main.rs index ffb05bd..f772a7c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,11 +8,10 @@ use actix_web::{ web, App, HttpResponse, HttpServer, }; use bytes::Bytes; -use futures::stream::{once, Stream}; -use futures_lite::{AsyncReadExt, AsyncWriteExt}; +use futures::stream::{once, Stream, TryStreamExt}; use once_cell::sync::Lazy; use std::{ - collections::HashSet, future::ready, io, path::PathBuf, pin::Pin, sync::Once, time::SystemTime, + collections::HashSet, future::ready, path::PathBuf, pin::Pin, sync::Once, time::SystemTime, }; use structopt::StructOpt; use tracing::{debug, error, info, instrument, Span}; @@ -36,7 +35,6 @@ use self::{ validate::{image_webp, video_mp4}, }; -const CHUNK_SIZE: usize = 65_356; const MEGABYTES: usize = 1024 * 1024; const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; @@ -66,12 +64,12 @@ static MAGICK_INIT: Once = Once::new(); async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - async_fs::create_dir_all(path.to_owned()).await?; + actix_fs::create_dir_all(path.to_owned()).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = async_fs::metadata(to.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { + if let Err(e) = actix_fs::metadata(to.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { return Err(e.into()); } } else { @@ -79,15 +77,15 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { } debug!("Moving {:?} to {:?}", from, to); - async_fs::copy(from.clone(), to).await?; - async_fs::remove_file(from).await?; + actix_fs::copy(from.clone(), to).await?; + actix_fs::remove_file(from).await?; Ok(()) } async fn safe_create_parent(path: PathBuf) -> Result<(), UploadError> { if let Some(path) = path.parent() { debug!("Creating directory {:?}", path); - async_fs::create_dir_all(path.to_owned()).await?; + actix_fs::create_dir_all(path.to_owned()).await?; } Ok(()) @@ -99,13 +97,13 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload if let Some(path) = path.parent() { // create the directory for the file debug!("Creating directory {:?}", path); - async_fs::create_dir_all(path.to_owned()).await?; + actix_fs::create_dir_all(path.to_owned()).await?; } // Only write the file if it doesn't already exist debug!("Checking if {:?} already exists", path); - if let Err(e) = async_fs::metadata(path.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { + if let Err(e) = actix_fs::metadata(path.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { return Err(e.into()); } } else { @@ -114,17 +112,16 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload // Open the file for writing debug!("Creating {:?}", path); - let mut file = async_fs::File::create(path.clone()).await?; + let file = actix_fs::file::create(path.clone()).await?; // try writing debug!("Writing to {:?}", path); - if let Err(e) = file.write_all(&bytes).await { + if let Err(e) = actix_fs::file::write(file, bytes).await { error!("Error writing {:?}, {}", path, e); // remove file if writing failed before completion - async_fs::remove_file(path).await?; + actix_fs::remove_file(path).await?; return Err(e.into()); } - file.flush().await?; debug!("{:?} written", path); Ok(()) @@ -353,8 +350,8 @@ async fn process( prepare_process(query, ext.as_str(), &manager, &whitelist).await?; // If the thumbnail doesn't exist, we need to create it - let thumbnail_exists = if let Err(e) = async_fs::metadata(thumbnail_path.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { + let thumbnail_exists = if let Err(e) = actix_fs::metadata(thumbnail_path.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { error!("Error looking up processed image, {}", e); return Err(e.into()); } @@ -531,25 +528,6 @@ async fn serve( ranged_file_resp(path, range, details).await } -fn read_to_stream(mut file: async_fs::File) -> impl Stream> { - async_stream::stream! { - let mut buf = Vec::with_capacity(CHUNK_SIZE); - - while { - buf.clear(); - let mut take = (&mut file).take(CHUNK_SIZE as u64); - - let read_bytes_result = take.read_to_end(&mut buf).await; - - let read_bytes = read_bytes_result.as_ref().map(|num| *num).unwrap_or(0); - - yield read_bytes_result.map(|_| Bytes::copy_from_slice(&buf)); - - read_bytes > 0 - } {} - } -} - async fn ranged_file_resp( path: PathBuf, range: Option, @@ -565,9 +543,9 @@ async fn ranged_file_resp( if range_header.is_empty() { return Err(UploadError::Range); } else if range_header.len() == 1 { - let file = async_fs::File::open(path).await?; + let file = actix_fs::file::open(path).await?; - let meta = file.metadata().await?; + let (file, meta) = actix_fs::file::metadata(file).await?; let range = range_header.ranges().next().unwrap(); @@ -581,9 +559,10 @@ async fn ranged_file_resp( } //No Range header in the request - return the entire document None => { - let file = async_fs::File::open(path).await?; - let stream: Pin>>> = - Box::pin(read_to_stream(file)); + let stream = actix_fs::read_to_stream(path) + .await? + .map_err(UploadError::from); + let stream: Pin>>> = Box::pin(stream); (HttpResponse::Ok(), stream) } }; @@ -814,8 +793,8 @@ async fn main() -> Result<(), anyhow::Error> { .run() .await?; - if async_fs::metadata(&*TMP_DIR).await.is_ok() { - async_fs::remove_dir_all(&*TMP_DIR).await?; + if actix_fs::metadata(&*TMP_DIR).await.is_ok() { + actix_fs::remove_dir_all(&*TMP_DIR).await?; } Ok(()) diff --git a/src/processor.rs b/src/processor.rs index 0f355c2..661c625 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -356,7 +356,7 @@ pub(crate) async fn prepare_image( let jpg_path = format!("{}.jpg", original_path_str); let jpg_path = PathBuf::from(jpg_path); - if async_fs::metadata(jpg_path.clone()).await.is_ok() { + if actix_fs::metadata(jpg_path.clone()).await.is_ok() { return Ok(Some((jpg_path, Exists::Exists))); } @@ -376,7 +376,7 @@ pub(crate) async fn prepare_image( if let Err(e) = res { error!("transcode error: {:?}", e); - async_fs::remove_file(tmpfile2).await?; + actix_fs::remove_file(tmpfile2).await?; return Err(e.into()); } diff --git a/src/range.rs b/src/range.rs index c363153..172219c 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,4 +1,4 @@ -use crate::{UploadError, CHUNK_SIZE}; +use crate::UploadError; use actix_web::{ dev::Payload, http::{ @@ -8,9 +8,8 @@ use actix_web::{ web::Bytes, FromRequest, HttpRequest, }; -use futures::stream::{once, Once, Stream}; -use futures_lite::{AsyncReadExt, AsyncSeekExt}; -use std::io; +use futures::stream::{once, Once, Stream, StreamExt, TryStreamExt}; +use std::{fs, io}; use std::{ future::{ready, Ready}, pin::Pin, @@ -47,7 +46,7 @@ impl Range { } } - pub(crate) fn chop_bytes(&self, bytes: Bytes) -> Once>> { + pub(crate) fn chop_bytes(&self, bytes: Bytes) -> Once>> { match self { Range::RangeStart(start) => once(ready(Ok(bytes.slice(*start as usize..)))), Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))), @@ -59,26 +58,31 @@ impl Range { pub(crate) async fn chop_file( &self, - mut file: async_fs::File, - ) -> Result>>>, io::Error> { + file: fs::File, + ) -> Result>>>, UploadError> { match self { Range::RangeStart(start) => { - file.seek(io::SeekFrom::Start(*start)).await?; + let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; - Ok(Box::pin(crate::read_to_stream(file))) + Ok(Box::pin( + actix_fs::file::read_to_stream(file) + .await? + .map_err(UploadError::from), + )) } Range::SuffixLength(from_start) => { - file.seek(io::SeekFrom::Start(0)).await?; + let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(0)).await?; - Ok(Box::pin(read_num_bytes_to_stream(file, *from_start))) + Ok(Box::pin( + read_num_bytes_to_stream(file, *from_start as usize).await?, + )) } Range::Segment(start, end) => { - file.seek(io::SeekFrom::Start(*start)).await?; + let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; - Ok(Box::pin(read_num_bytes_to_stream( - file, - end.saturating_sub(*start), - ))) + Ok(Box::pin( + read_num_bytes_to_stream(file, end.saturating_sub(*start) as usize).await?, + )) } } } @@ -184,25 +188,31 @@ fn parse_range(s: &str) -> Result { } } -fn read_num_bytes_to_stream( - mut file: async_fs::File, - mut num_bytes: u64, -) -> impl Stream> { - async_stream::stream! { - let mut buf = Vec::with_capacity((CHUNK_SIZE as u64).min(num_bytes) as usize); +async fn read_num_bytes_to_stream( + file: fs::File, + mut num_bytes: usize, +) -> Result>, UploadError> { + let mut stream = actix_fs::file::read_to_stream(file).await?; - while { - buf.clear(); - let mut take = (&mut file).take((CHUNK_SIZE as u64).min(num_bytes)); + let stream = async_stream::stream! { + while let Some(res) = stream.next().await { + let read_bytes = res.as_ref().map(|b| b.len()).unwrap_or(0); - let read_bytes_result = take.read_to_end(&mut buf).await; + if read_bytes == 0 { + break; + } - let read_bytes = read_bytes_result.as_ref().map(|num| *num).unwrap_or(0); + yield res.map_err(UploadError::from).map(|bytes| { + if bytes.len() > num_bytes { + bytes.slice(0..num_bytes) + } else { + bytes + } + }); - yield read_bytes_result.map(|_| Bytes::copy_from_slice(&buf)); + num_bytes = num_bytes.saturating_sub(read_bytes); + } + }; - num_bytes = num_bytes.saturating_sub(read_bytes as u64); - read_bytes > 0 && num_bytes > 0 - } {} - } + Ok(stream) } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 215ec82..a278da8 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -6,7 +6,7 @@ use crate::{ validate::validate_image, }; use actix_web::web; -use futures::stream::{Stream, StreamExt}; +use futures::stream::{Stream, StreamExt, TryStreamExt}; use sha2::Digest; use std::{path::PathBuf, pin::Pin, sync::Arc}; use tracing::{debug, error, info, instrument, warn, Span}; @@ -218,7 +218,7 @@ impl UploadManager { root_dir.push("files"); // Ensure file dir exists - async_fs::create_dir_all(root_dir.clone()).await?; + actix_fs::create_dir_all(root_dir.clone()).await?; Ok(UploadManager { inner: Arc::new(UploadManagerInner { @@ -603,7 +603,7 @@ impl UploadManager { let mut errors = Vec::new(); debug!("Deleting {:?}", path); - if let Err(e) = async_fs::remove_file(path).await { + if let Err(e) = actix_fs::remove_file(path).await { errors.push(e.into()); } @@ -676,8 +676,8 @@ impl UploadManager { async fn hash(&self, tmpfile: PathBuf) -> Result { let mut hasher = self.inner.hasher.clone(); - let file = async_fs::File::open(tmpfile).await?; - let mut stream = Box::pin(crate::read_to_stream(file)); + let file = actix_fs::file::open(tmpfile).await?; + let mut stream = Box::pin(actix_fs::file::read_to_stream(file).await?); while let Some(res) = stream.next().await { let bytes = res?; @@ -751,8 +751,8 @@ impl UploadManager { path.push(filename.clone()); - if let Err(e) = async_fs::metadata(path).await { - if e.kind() == std::io::ErrorKind::NotFound { + if let Err(e) = actix_fs::metadata(path).await { + if e.kind() == Some(std::io::ErrorKind::NotFound) { debug!("Generated unused filename {}", filename); return Ok(filename); } @@ -878,19 +878,19 @@ impl UploadManager { } #[instrument(skip(stream))] -async fn safe_save_stream(to: PathBuf, mut stream: UploadStream) -> Result<(), UploadError> +async fn safe_save_stream(to: PathBuf, stream: UploadStream) -> Result<(), UploadError> where UploadError: From, E: Unpin, { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - async_fs::create_dir_all(path.to_owned()).await?; + actix_fs::create_dir_all(path.to_owned()).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = async_fs::metadata(to.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { + if let Err(e) = actix_fs::metadata(to.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { return Err(e.into()); } } else { @@ -899,21 +899,16 @@ where debug!("Writing stream to {:?}", to); - let mut file = async_fs::File::create(to).await?; + let file = actix_fs::file::create(to).await?; - use futures_lite::AsyncWriteExt; - while let Some(res) = stream.next().await { - let bytes = res?; - file.write_all(&bytes).await?; - } - file.flush().await?; + actix_fs::file::write_stream(file, stream.map_err(UploadError::from)).await?; Ok(()) } async fn remove_path(path: sled::IVec) -> Result<(), UploadError> { let path_string = String::from_utf8(path.to_vec())?; - async_fs::remove_file(path_string).await?; + actix_fs::remove_file(path_string).await?; Ok(()) }