2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-11-10 06:25:00 +00:00

Drop unneeded dependencies

This commit is contained in:
Aode (lion) 2021-09-04 14:20:31 -05:00
parent a75c62bb8b
commit e21fd29c09
11 changed files with 163 additions and 260 deletions

76
Cargo.lock generated
View file

@ -34,20 +34,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "actix-fs"
version = "0.1.0"
source = "git+https://git.asonix.dog/asonix/actix-fs?branch=asonix/actix-rt-2#aef0e3c557b1365c0c1039bedd321724cd09201f"
dependencies = [
"actix-rt",
"bytes",
"futures-util",
"log",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "actix-http" name = "actix-http"
version = "3.0.0-beta.9" version = "3.0.0-beta.9"
@ -292,27 +278,6 @@ version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf"
[[package]]
name = "async-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -998,16 +963,13 @@ name = "pict-rs"
version = "0.3.0-alpha.23" version = "0.3.0-alpha.23"
dependencies = [ dependencies = [
"actix-form-data", "actix-form-data",
"actix-fs",
"actix-rt", "actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"async-stream",
"awc", "awc",
"base64", "base64",
"futures", "futures-core",
"mime", "mime",
"num_cpus",
"once_cell", "once_cell",
"rand", "rand",
"serde", "serde",
@ -1018,7 +980,6 @@ dependencies = [
"thiserror", "thiserror",
"time 0.3.2", "time 0.3.2",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-futures", "tracing-futures",
@ -1477,9 +1438,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.75" version = "1.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7" checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1602,37 +1563,14 @@ dependencies = [
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"tokio-macros",
"winapi", "winapi",
] ]
[[package]]
name = "tokio-macros"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.7" version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@ -1739,9 +1677,9 @@ dependencies = [
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.13.0" version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
[[package]] [[package]]
name = "ucd-trie" name = "ucd-trie"

View file

@ -12,16 +12,13 @@ edition = "2018"
[dependencies] [dependencies]
actix-form-data = "0.6.0-beta.1" actix-form-data = "0.6.0-beta.1"
actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs", branch = "asonix/actix-rt-2" }
actix-rt = "2.2.0" actix-rt = "2.2.0"
actix-web = { version = "4.0.0-beta.8", default-features = false } actix-web = { version = "4.0.0-beta.8", default-features = false }
anyhow = "1.0" anyhow = "1.0"
async-stream = "0.3.0"
awc = { version = "3.0.0-beta.7", default-features = false } awc = { version = "3.0.0-beta.7", default-features = false }
base64 = "0.13.0" base64 = "0.13.0"
futures = "0.3.4" futures-core = "0.3.17"
mime = "0.3.1" mime = "0.3.1"
num_cpus = "1"
once_cell = "1.4.0" once_cell = "1.4.0"
rand = "0.8.0" rand = "0.8.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
@ -31,8 +28,7 @@ sled = { version = "0.34.6" }
structopt = "0.3.14" structopt = "0.3.14"
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.0", features = ["serde"] } time = { version = "0.3.0", features = ["serde"] }
tokio = { version = "1", default-features = false, features = ["fs", "io-util", "macros", "process", "sync"] } tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] }
tokio-stream = { version = "0.1", default-features = false }
tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] }
tracing = "0.1.15" tracing = "0.1.15"
tracing-futures = "0.2.4" tracing-futures = "0.2.4"

View file

@ -18,9 +18,6 @@ pub(crate) enum UploadError {
#[error("Error interacting with filesystem, {0}")] #[error("Error interacting with filesystem, {0}")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("Error in filesyste, {0}")]
Fs(#[from] actix_fs::Error),
#[error("Panic in blocking operation")] #[error("Panic in blocking operation")]
Canceled, Canceled,

View file

@ -9,9 +9,6 @@ pub(crate) enum VideoError {
#[error("Failed to convert file")] #[error("Failed to convert file")]
Status, Status,
#[error("Transcode semaphore is closed")]
Closed,
} }
pub(crate) enum InputFormat { pub(crate) enum InputFormat {
@ -49,14 +46,6 @@ impl ThumbnailFormat {
} }
} }
static MAX_TRANSCODES: once_cell::sync::OnceCell<tokio::sync::Semaphore> =
once_cell::sync::OnceCell::new();
fn semaphore() -> &'static tokio::sync::Semaphore {
MAX_TRANSCODES
.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
}
pub(crate) fn to_mp4_bytes( pub(crate) fn to_mp4_bytes(
input: Bytes, input: Bytes,
input_format: InputFormat, input_format: InputFormat,
@ -92,8 +81,6 @@ where
P1: AsRef<std::path::Path>, P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>, P2: AsRef<std::path::Path>,
{ {
let permit = semaphore().acquire().await?;
let mut child = Command::new("ffmpeg") let mut child = Command::new("ffmpeg")
.arg(&"-i") .arg(&"-i")
.arg(&from.as_ref()) .arg(&from.as_ref())
@ -109,7 +96,6 @@ where
.spawn()?; .spawn()?;
let status = child.wait().await?; let status = child.wait().await?;
drop(permit);
if !status.success() { if !status.success() {
return Err(VideoError::Status); return Err(VideoError::Status);
@ -117,9 +103,3 @@ where
Ok(()) Ok(())
} }
impl From<tokio::sync::AcquireError> for VideoError {
fn from(_: tokio::sync::AcquireError) -> VideoError {
VideoError::Closed
}
}

View file

@ -11,9 +11,6 @@ pub(crate) enum MagickError {
#[error("{0}")] #[error("{0}")]
IO(#[from] std::io::Error), IO(#[from] std::io::Error),
#[error("Magick semaphore is closed")]
Closed,
#[error("Invalid format")] #[error("Invalid format")]
Format, Format,
} }
@ -32,14 +29,6 @@ pub(crate) struct Details {
pub(crate) height: usize, pub(crate) height: usize,
} }
static MAX_CONVERSIONS: once_cell::sync::OnceCell<tokio::sync::Semaphore> =
once_cell::sync::OnceCell::new();
fn semaphore() -> &'static tokio::sync::Semaphore {
MAX_CONVERSIONS
.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
}
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> { pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?;
@ -81,16 +70,12 @@ pub(crate) async fn details<P>(file: P) -> Result<Details, MagickError>
where where
P: AsRef<std::path::Path>, P: AsRef<std::path::Path>,
{ {
let permit = semaphore().acquire().await?;
let output = Command::new("magick") let output = Command::new("magick")
.args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"]) .args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"])
.arg(&file.as_ref()) .arg(&file.as_ref())
.output() .output()
.await?; .await?;
drop(permit);
let s = String::from_utf8_lossy(&output.stdout); let s = String::from_utf8_lossy(&output.stdout);
parse_details(s) parse_details(s)
@ -140,8 +125,6 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, MagickError> {
} }
pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result<ValidInputType, MagickError> { pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result<ValidInputType, MagickError> {
let permit = semaphore().acquire().await.map_err(MagickError::from)?;
let mut child = Command::new("magick") let mut child = Command::new("magick")
.args(["identify", "-ping", "-format", "%m\n", "-"]) .args(["identify", "-ping", "-format", "%m\n", "-"])
.stdin(Stdio::piped()) .stdin(Stdio::piped())
@ -152,13 +135,13 @@ pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result<ValidInputType,
let mut stdout = child.stdout.take().unwrap(); let mut stdout = child.stdout.take().unwrap();
stdin.write_all_buf(&mut input).await?; stdin.write_all_buf(&mut input).await?;
drop(stdin);
let mut vec = Vec::new(); let mut vec = Vec::new();
stdout.read_to_end(&mut vec).await?; stdout.read_to_end(&mut vec).await?;
drop(stdout);
drop(stdin);
child.wait().await?; child.wait().await?;
drop(permit);
let s = String::from_utf8_lossy(&vec); let s = String::from_utf8_lossy(&vec);
parse_input_type(s) parse_input_type(s)
@ -198,12 +181,6 @@ pub(crate) fn process_image_write_read(
Ok(process.write_read(input).unwrap()) Ok(process.write_read(input).unwrap())
} }
impl From<tokio::sync::AcquireError> for MagickError {
fn from(_: tokio::sync::AcquireError) -> MagickError {
MagickError::Closed
}
}
impl From<std::num::ParseIntError> for MagickError { impl From<std::num::ParseIntError> for MagickError {
fn from(_: std::num::ParseIntError) -> MagickError { fn from(_: std::num::ParseIntError) -> MagickError {
MagickError::Format MagickError::Format

View file

@ -6,11 +6,11 @@ use actix_web::{
web, App, HttpResponse, HttpResponseBuilder, HttpServer, web, App, HttpResponse, HttpResponseBuilder, HttpServer,
}; };
use awc::Client; use awc::Client;
use futures::stream::{Stream, TryStreamExt}; use futures_core::stream::Stream;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{collections::HashSet, path::PathBuf, pin::Pin, time::SystemTime}; use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::io::AsyncReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, info, instrument, Span}; use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@ -31,6 +31,7 @@ use self::{
config::{Config, Format}, config::{Config, Format},
error::UploadError, error::UploadError,
middleware::{Internal, Tracing}, middleware::{Internal, Tracing},
stream::{once, LocalBoxStream},
upload_manager::{Details, UploadManager}, upload_manager::{Details, UploadManager},
validate::{image_webp, video_mp4}, validate::{image_webp, video_mp4},
}; };
@ -64,12 +65,12 @@ static CONFIG: Lazy<Config> = Lazy::new(Config::from_args);
async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
if let Some(path) = to.parent() { if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; tokio::fs::create_dir_all(path).await?;
} }
debug!("Checking if {:?} already exists", to); debug!("Checking if {:?} already exists", to);
if let Err(e) = actix_fs::metadata(to.clone()).await { if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into()); return Err(e.into());
} }
} else { } else {
@ -77,8 +78,8 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
} }
debug!("Moving {:?} to {:?}", from, to); debug!("Moving {:?} to {:?}", from, to);
actix_fs::copy(from.clone(), to).await?; tokio::fs::copy(&from, to).await?;
actix_fs::remove_file(from).await?; tokio::fs::remove_file(from).await?;
Ok(()) Ok(())
} }
@ -88,7 +89,7 @@ where
{ {
if let Some(path) = path.as_ref().parent() { if let Some(path) = path.as_ref().parent() {
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; tokio::fs::create_dir_all(path).await?;
} }
Ok(()) Ok(())
@ -96,17 +97,17 @@ where
// Try writing to a file // Try writing to a file
#[instrument(skip(bytes))] #[instrument(skip(bytes))]
async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), UploadError> { async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), UploadError> {
if let Some(path) = path.parent() { if let Some(path) = path.parent() {
// create the directory for the file // create the directory for the file
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; tokio::fs::create_dir_all(path).await?;
} }
// Only write the file if it doesn't already exist // Only write the file if it doesn't already exist
debug!("Checking if {:?} already exists", path); debug!("Checking if {:?} already exists", path);
if let Err(e) = actix_fs::metadata(path.clone()).await { if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into()); return Err(e.into());
} }
} else { } else {
@ -115,14 +116,14 @@ async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), UploadEr
// Open the file for writing // Open the file for writing
debug!("Creating {:?}", path); debug!("Creating {:?}", path);
let file = actix_fs::file::create(path.clone()).await?; let mut file = tokio::fs::File::create(&path).await?;
// try writing // try writing
debug!("Writing to {:?}", path); debug!("Writing to {:?}", path);
if let Err(e) = actix_fs::file::write(file, bytes).await { if let Err(e) = file.write_all_buf(&mut bytes).await {
error!("Error writing {:?}, {}", path, e); error!("Error writing {:?}, {}", path, e);
// remove file if writing failed before completion // remove file if writing failed before completion
actix_fs::remove_file(path).await?; tokio::fs::remove_file(path).await?;
return Err(e.into()); return Err(e.into());
} }
debug!("{:?} written", path); debug!("{:?} written", path);
@ -240,7 +241,7 @@ async fn download(
let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES); let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES);
let stream = Box::pin(futures::stream::once(fut)); let stream = Box::pin(once(fut));
let alias = manager.upload(stream).await?; let alias = manager.upload(stream).await?;
let delete_token = manager.delete_token(alias.clone()).await?; let delete_token = manager.delete_token(alias.clone()).await?;
@ -362,8 +363,8 @@ async fn process(
prepare_process(query, ext.as_str(), &manager, &whitelist).await?; prepare_process(query, ext.as_str(), &manager, &whitelist).await?;
// If the thumbnail doesn't exist, we need to create it // If the thumbnail doesn't exist, we need to create it
let thumbnail_exists = if let Err(e) = actix_fs::metadata(thumbnail_path.clone()).await { let thumbnail_exists = if let Err(e) = tokio::fs::metadata(&thumbnail_path).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != std::io::ErrorKind::NotFound {
error!("Error looking up processed image, {}", e); error!("Error looking up processed image, {}", e);
return Err(e.into()); return Err(e.into());
} }
@ -442,7 +443,7 @@ async fn process(
return Ok(srv_response( return Ok(srv_response(
HttpResponse::Ok(), HttpResponse::Ok(),
futures::stream::once(futures::future::ready(Ok(bytes) as Result<_, UploadError>)), once(ready(Ok(bytes) as Result<_, UploadError>)),
details.content_type(), details.content_type(),
7 * DAYS, 7 * DAYS,
details.system_time(), details.system_time(),
@ -527,9 +528,9 @@ async fn ranged_file_resp(
if range_header.is_empty() { if range_header.is_empty() {
return Err(UploadError::Range); return Err(UploadError::Range);
} else if range_header.len() == 1 { } else if range_header.len() == 1 {
let file = actix_fs::file::open(path).await?; let file = tokio::fs::File::open(path).await?;
let (file, meta) = actix_fs::file::metadata(file).await?; let meta = file.metadata().await?;
let range = range_header.ranges().next().unwrap(); let range = range_header.ranges().next().unwrap();
@ -543,12 +544,8 @@ async fn ranged_file_resp(
} }
//No Range header in the request - return the entire document //No Range header in the request - return the entire document
None => { None => {
let stream = actix_fs::read_to_stream(path) let file = tokio::fs::File::open(path).await?;
.await? let stream = Box::pin(crate::stream::bytes_stream(file)) as LocalBoxStream<'_, _>;
.faster()
.map_err(UploadError::from);
let stream: Pin<Box<dyn Stream<Item = Result<web::Bytes, UploadError>>>> =
Box::pin(stream);
(HttpResponse::Ok(), stream) (HttpResponse::Ok(), stream)
} }
}; };
@ -774,8 +771,8 @@ async fn main() -> Result<(), anyhow::Error> {
.run() .run()
.await?; .await?;
if actix_fs::metadata(&*TMP_DIR).await.is_ok() { if tokio::fs::metadata(&*TMP_DIR).await.is_ok() {
actix_fs::remove_dir_all(&*TMP_DIR).await?; tokio::fs::remove_dir_all(&*TMP_DIR).await?;
} }
Ok(()) Ok(())

View file

@ -1,10 +1,13 @@
use crate::stream::LocalBoxFuture;
use actix_web::{ use actix_web::{
dev::{Service, ServiceRequest, Transform}, dev::{Service, ServiceRequest, Transform},
http::StatusCode, http::StatusCode,
HttpResponse, ResponseError, HttpResponse, ResponseError,
}; };
use futures::future::{ok, LocalBoxFuture, Ready}; use std::{
use std::task::{Context, Poll}; future::{ready, Ready},
task::{Context, Poll},
};
use tracing_futures::{Instrument, Instrumented}; use tracing_futures::{Instrument, Instrumented};
use uuid::Uuid; use uuid::Uuid;
@ -47,7 +50,7 @@ where
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
ok(TracingMiddleware { inner: service }) ready(Ok(TracingMiddleware { inner: service }))
} }
} }
@ -85,7 +88,7 @@ where
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
ok(InternalMiddleware(self.0.clone(), service)) ready(Ok(InternalMiddleware(self.0.clone(), service)))
} }
} }

View file

@ -302,7 +302,7 @@ pub(crate) async fn prepare_image(
let jpg_path = format!("{}.jpg", original_path_str); let jpg_path = format!("{}.jpg", original_path_str);
let jpg_path = PathBuf::from(jpg_path); let jpg_path = PathBuf::from(jpg_path);
if actix_fs::metadata(jpg_path.clone()).await.is_ok() { if tokio::fs::metadata(&jpg_path).await.is_ok() {
return Ok(Some((jpg_path, Exists::Exists))); return Ok(Some((jpg_path, Exists::Exists)));
} }
@ -316,7 +316,7 @@ pub(crate) async fn prepare_image(
if let Err(e) = res { if let Err(e) = res {
error!("transcode error: {:?}", e); error!("transcode error: {:?}", e);
actix_fs::remove_file(tmpfile.clone()).await?; tokio::fs::remove_file(&tmpfile).await?;
return Err(e.into()); return Err(e.into());
} }

View file

@ -1,4 +1,7 @@
use crate::UploadError; use crate::{
stream::{bytes_stream, LocalBoxStream},
UploadError,
};
use actix_web::{ use actix_web::{
dev::Payload, dev::Payload,
http::{ http::{
@ -8,9 +11,8 @@ use actix_web::{
web::Bytes, web::Bytes,
FromRequest, HttpRequest, FromRequest, HttpRequest,
}; };
use futures::stream::{Stream, StreamExt, TryStreamExt}; use std::{future::ready, io};
use std::{fs, io}; use tokio::io::{AsyncReadExt, AsyncSeekExt};
use std::{future::ready, pin::Pin};
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Range { pub(crate) enum Range {
@ -45,32 +47,25 @@ impl Range {
pub(crate) async fn chop_file( pub(crate) async fn chop_file(
&self, &self,
file: fs::File, mut file: tokio::fs::File,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, UploadError>>>>, UploadError> { ) -> Result<LocalBoxStream<'static, Result<Bytes, UploadError>>, UploadError> {
match self { match self {
Range::RangeStart(start) => { Range::RangeStart(start) => {
let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; file.seek(io::SeekFrom::Start(*start)).await?;
Ok(Box::pin( Ok(Box::pin(bytes_stream(file)))
actix_fs::file::read_to_stream(file)
.await?
.faster()
.map_err(UploadError::from),
))
} }
Range::SuffixLength(from_start) => { Range::SuffixLength(from_start) => {
let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(0)).await?; file.seek(io::SeekFrom::Start(0)).await?;
let reader = file.take(*from_start);
Ok(Box::pin( Ok(Box::pin(bytes_stream(reader)))
read_num_bytes_to_stream(file, *from_start as usize).await?,
))
} }
Range::Segment(start, end) => { Range::Segment(start, end) => {
let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; file.seek(io::SeekFrom::Start(*start)).await?;
let reader = file.take(end.saturating_sub(*start));
Ok(Box::pin( Ok(Box::pin(bytes_stream(reader)))
read_num_bytes_to_stream(file, end.saturating_sub(*start) as usize).await?,
))
} }
} }
} }
@ -175,32 +170,3 @@ fn parse_range(s: &str) -> Result<Range, UploadError> {
Ok(Range::Segment(range_start, range_end)) Ok(Range::Segment(range_start, range_end))
} }
} }
async fn read_num_bytes_to_stream(
file: fs::File,
mut num_bytes: usize,
) -> Result<impl Stream<Item = Result<Bytes, UploadError>>, UploadError> {
let mut stream = actix_fs::file::read_to_stream(file).await?;
let stream = async_stream::stream! {
while let Some(res) = stream.next().await {
let read_bytes = res.as_ref().map(|b| b.len()).unwrap_or(0);
if read_bytes == 0 {
break;
}
yield res.map_err(UploadError::from).map(|bytes| {
if bytes.len() > num_bytes {
bytes.slice(0..num_bytes)
} else {
bytes
}
});
num_bytes = num_bytes.saturating_sub(read_bytes);
}
};
Ok(stream)
}

View file

@ -1,5 +1,6 @@
use actix_web::web::Bytes; use crate::error::UploadError;
use futures::stream::{LocalBoxStream, Stream}; use actix_web::web::{Bytes, BytesMut};
use futures_core::stream::Stream;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -7,9 +8,8 @@ use std::{
}; };
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
pub(crate) struct ReadAdapter<S> { pub(crate) type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
inner: S, pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
}
pub(crate) struct Process { pub(crate) struct Process {
child: tokio::process::Child, child: tokio::process::Child,
@ -21,10 +21,14 @@ pub(crate) struct ProcessRead<I> {
err_closed: bool, err_closed: bool,
} }
pub(crate) struct ProcessSinkStream<E> { struct BytesFreezer<S>(S);
stream: LocalBoxStream<'static, Result<Bytes, E>>,
pub(crate) struct Once<T> {
inner: Option<T>,
} }
pub(crate) struct Next<'a, S>(&'a mut S);
impl Process { impl Process {
fn new(child: tokio::process::Child) -> Self { fn new(child: tokio::process::Child) -> Self {
Process { child } Process { child }
@ -79,28 +83,21 @@ impl Process {
} }
} }
impl<S, E> AsyncRead for ReadAdapter<S> pub(crate) fn bytes_stream(
where input: impl AsyncRead + Unpin,
S: Stream<Item = Result<Bytes, E>> + Unpin, ) -> impl Stream<Item = Result<Bytes, UploadError>> + Unpin {
E: Into<Box<dyn std::error::Error + Send + Sync>>, BytesFreezer(tokio_util::codec::FramedRead::new(
{ input,
fn poll_read( tokio_util::codec::BytesCodec::new(),
mut self: Pin<&mut Self>, ))
cx: &mut Context<'_>, }
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { pub(crate) fn once<T>(input: T) -> Once<T> {
match Pin::new(&mut self.inner).poll_next(cx) { Once { inner: Some(input) }
Poll::Ready(Some(Ok(bytes))) => { }
buf.put_slice(&bytes);
Poll::Ready(Ok(())) pub(crate) fn next<'a, S>(stream: &'a mut S) -> Next<'a, S> {
} Next(stream)
Poll::Ready(None) => Poll::Ready(Ok(())),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
}
Poll::Pending => Poll::Pending,
}
}
} }
impl<I> AsyncRead for ProcessRead<I> impl<I> AsyncRead for ProcessRead<I>
@ -129,10 +126,48 @@ where
} }
} }
impl<E> Stream for ProcessSinkStream<E> { impl<S> Stream for BytesFreezer<S>
type Item = Result<Bytes, E>; where
S: Stream<Item = std::io::Result<BytesMut>> + Unpin,
{
type Item = Result<Bytes, UploadError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx) Pin::new(&mut self.0)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze())))
.map_err(UploadError::from)
}
}
impl<T> Stream for Once<T>
where
T: Future + Unpin,
{
type Item = <T as Future>::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut fut) = self.inner.take() {
match Pin::new(&mut fut).poll(cx) {
Poll::Ready(item) => Poll::Ready(Some(item)),
Poll::Pending => {
self.inner = Some(fut);
Poll::Pending
}
}
} else {
Poll::Ready(None)
}
}
}
impl<'a, S> Future for Next<'a, S>
where
S: Stream + Unpin,
{
type Output = Option<<S as Stream>::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll_next(cx)
} }
} }

View file

@ -2,10 +2,10 @@ use crate::{
config::Format, config::Format,
error::UploadError, error::UploadError,
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
stream::{next, LocalBoxStream},
to_ext, to_ext,
}; };
use actix_web::web; use actix_web::web;
use futures::stream::{Stream, StreamExt, TryStreamExt};
use sha2::Digest; use sha2::Digest;
use std::{ use std::{
path::PathBuf, path::PathBuf,
@ -13,7 +13,7 @@ use std::{
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::io::{AsyncRead, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tracing::{debug, error, info, instrument, warn, Span}; use tracing::{debug, error, info, instrument, warn, Span};
// TREE STRUCTURE // TREE STRUCTURE
@ -92,7 +92,7 @@ impl std::fmt::Debug for UploadManager {
} }
} }
type UploadStream<E> = Pin<Box<dyn Stream<Item = Result<web::Bytes, E>>>>; type UploadStream<E> = LocalBoxStream<'static, Result<web::Bytes, E>>;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Serde<T> { pub(crate) struct Serde<T> {
@ -247,7 +247,7 @@ impl UploadManager {
root_dir.push("files"); root_dir.push("files");
// Ensure file dir exists // Ensure file dir exists
actix_fs::create_dir_all(root_dir.clone()).await?; tokio::fs::create_dir_all(&root_dir).await?;
Ok(UploadManager { Ok(UploadManager {
inner: Arc::new(UploadManagerInner { inner: Arc::new(UploadManagerInner {
@ -547,7 +547,7 @@ impl UploadManager {
let mut bytes_mut = actix_web::web::BytesMut::new(); let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory"); debug!("Reading stream to memory");
while let Some(res) = stream.next().await { while let Some(res) = next(&mut stream).await {
let bytes = res?; let bytes = res?;
bytes_mut.extend_from_slice(&bytes); bytes_mut.extend_from_slice(&bytes);
} }
@ -582,7 +582,7 @@ impl UploadManager {
let mut bytes_mut = actix_web::web::BytesMut::new(); let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory"); debug!("Reading stream to memory");
while let Some(res) = stream.next().await { while let Some(res) = next(&mut stream).await {
let bytes = res?; let bytes = res?;
bytes_mut.extend_from_slice(&bytes); bytes_mut.extend_from_slice(&bytes);
} }
@ -638,7 +638,7 @@ impl UploadManager {
let mut errors = Vec::new(); let mut errors = Vec::new();
debug!("Deleting {:?}", path); debug!("Deleting {:?}", path);
if let Err(e) = actix_fs::remove_file(path).await { if let Err(e) = tokio::fs::remove_file(path).await {
errors.push(e.into()); errors.push(e.into());
} }
@ -767,8 +767,8 @@ impl UploadManager {
path.push(filename.clone()); path.push(filename.clone());
if let Err(e) = actix_fs::metadata(path).await { if let Err(e) = tokio::fs::metadata(path).await {
if e.kind() == Some(std::io::ErrorKind::NotFound) { if e.kind() == std::io::ErrorKind::NotFound {
debug!("Generated unused filename {}", filename); debug!("Generated unused filename {}", filename);
return Ok(filename); return Ok(filename);
} }
@ -904,12 +904,12 @@ pub(crate) async fn safe_save_reader(
) -> Result<(), UploadError> { ) -> Result<(), UploadError> {
if let Some(path) = to.parent() { if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; tokio::fs::create_dir_all(path.to_owned()).await?;
} }
debug!("Checking if {:?} already exists", to); debug!("Checking if {:?} already exists", to);
if let Err(e) = actix_fs::metadata(to.clone()).await { if let Err(e) = tokio::fs::metadata(to.clone()).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into()); return Err(e.into());
} }
} else { } else {
@ -928,7 +928,7 @@ pub(crate) async fn safe_save_reader(
#[instrument(skip(stream))] #[instrument(skip(stream))]
pub(crate) async fn safe_save_stream<E>( pub(crate) async fn safe_save_stream<E>(
to: PathBuf, to: PathBuf,
stream: UploadStream<E>, mut stream: UploadStream<E>,
) -> Result<(), UploadError> ) -> Result<(), UploadError>
where where
UploadError: From<E>, UploadError: From<E>,
@ -936,12 +936,12 @@ where
{ {
if let Some(path) = to.parent() { if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; tokio::fs::create_dir_all(path).await?;
} }
debug!("Checking if {:?} already exists", to); debug!("Checking if {:?} already exists", to);
if let Err(e) = actix_fs::metadata(to.clone()).await { if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into()); return Err(e.into());
} }
} else { } else {
@ -950,16 +950,30 @@ where
debug!("Writing stream to {:?}", to); debug!("Writing stream to {:?}", to);
let file = actix_fs::file::create(to).await?; let to1 = to.clone();
let fut = async move {
let mut file = tokio::fs::File::create(to1).await?;
actix_fs::file::write_stream_faster(file, stream.map_err(UploadError::from)).await?; while let Some(res) = next(&mut stream).await {
let mut bytes = res?;
file.write_all_buf(&mut bytes).await?;
}
Ok(())
};
if let Err(e) = fut.await {
error!("Failed to save file: {}", e);
let _ = tokio::fs::remove_file(to).await;
return Err(e);
}
Ok(()) Ok(())
} }
async fn remove_path(path: sled::IVec) -> Result<(), UploadError> { async fn remove_path(path: sled::IVec) -> Result<(), UploadError> {
let path_string = String::from_utf8(path.to_vec())?; let path_string = String::from_utf8(path.to_vec())?;
actix_fs::remove_file(path_string).await?; tokio::fs::remove_file(path_string).await?;
Ok(()) Ok(())
} }