mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Fewer streams
This commit is contained in:
parent
1119ed740e
commit
08c3169d3f
7 changed files with 84 additions and 566 deletions
|
@ -7,23 +7,3 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl As
|
||||||
|
|
||||||
Ok(process.bytes_read(input).unwrap())
|
Ok(process.bytes_read(input).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn clear_metadata_write_read(
|
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
) -> std::io::Result<impl AsyncRead + Unpin> {
|
|
||||||
let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?;
|
|
||||||
|
|
||||||
Ok(process.write_read(input).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn clear_metadata_stream<S, E>(
|
|
||||||
input: S,
|
|
||||||
) -> std::io::Result<futures::stream::LocalBoxStream<'static, Result<actix_web::web::Bytes, E>>>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<actix_web::web::Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?;
|
|
||||||
|
|
||||||
Ok(Box::pin(process.sink_stream(input).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
|
@ -83,62 +83,6 @@ pub(crate) fn to_mp4_bytes(
|
||||||
Ok(process.bytes_read(input).unwrap())
|
Ok(process.bytes_read(input).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn to_mp4_write_read(
|
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
input_format: InputFormat,
|
|
||||||
) -> std::io::Result<impl AsyncRead + Unpin> {
|
|
||||||
let process = Process::spawn(Command::new("ffmpeg").args([
|
|
||||||
"-f",
|
|
||||||
input_format.as_format(),
|
|
||||||
"-i",
|
|
||||||
"pipe:",
|
|
||||||
"-movflags",
|
|
||||||
"faststart+frag_keyframe+empty_moov",
|
|
||||||
"-pix_fmt",
|
|
||||||
"yuv420p",
|
|
||||||
"-vf",
|
|
||||||
"scale=trunc(iw/2)*2:trunc(ih/2)*2",
|
|
||||||
"-an",
|
|
||||||
"-codec",
|
|
||||||
"h264",
|
|
||||||
"-f",
|
|
||||||
"mp4",
|
|
||||||
"pipe:",
|
|
||||||
]))?;
|
|
||||||
|
|
||||||
Ok(process.write_read(input).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn to_mp4_stream<S, E>(
|
|
||||||
input: S,
|
|
||||||
input_format: InputFormat,
|
|
||||||
) -> std::io::Result<futures::stream::LocalBoxStream<'static, Result<Bytes, E>>>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let process = Process::spawn(Command::new("ffmpeg").args([
|
|
||||||
"-f",
|
|
||||||
input_format.as_format(),
|
|
||||||
"-i",
|
|
||||||
"pipe:",
|
|
||||||
"-movflags",
|
|
||||||
"faststart+frag_keyframe+empty_moov",
|
|
||||||
"-pix_fmt",
|
|
||||||
"yuv420p",
|
|
||||||
"-vf",
|
|
||||||
"scale=trunc(iw/2)*2:trunc(ih/2)*2",
|
|
||||||
"-an",
|
|
||||||
"-codec",
|
|
||||||
"h264",
|
|
||||||
"-f",
|
|
||||||
"mp4",
|
|
||||||
"pipe:",
|
|
||||||
]))?;
|
|
||||||
|
|
||||||
Ok(Box::pin(process.sink_stream(input).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn thumbnail<P1, P2>(
|
pub(crate) async fn thumbnail<P1, P2>(
|
||||||
from: P1,
|
from: P1,
|
||||||
to: P2,
|
to: P2,
|
||||||
|
|
133
src/magick.rs
133
src/magick.rs
|
@ -46,17 +46,7 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl As
|
||||||
Ok(process.bytes_read(input).unwrap())
|
Ok(process.bytes_read(input).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn clear_metadata_write_read(
|
pub(crate) async fn details_bytes(input: Bytes) -> Result<Details, MagickError> {
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
) -> std::io::Result<impl AsyncRead + Unpin> {
|
|
||||||
let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?;
|
|
||||||
|
|
||||||
Ok(process.write_read(input).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn details_write_read(
|
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
) -> Result<Details, MagickError> {
|
|
||||||
let process = Process::spawn(Command::new("magick").args([
|
let process = Process::spawn(Command::new("magick").args([
|
||||||
"identify",
|
"identify",
|
||||||
"-ping",
|
"-ping",
|
||||||
|
@ -65,41 +55,15 @@ pub(crate) async fn details_write_read(
|
||||||
"-",
|
"-",
|
||||||
]))?;
|
]))?;
|
||||||
|
|
||||||
let mut reader = process.write_read(input).unwrap();
|
let mut reader = process.bytes_read(input).unwrap();
|
||||||
|
|
||||||
let mut bytes = Vec::new();
|
let mut bytes = Vec::new();
|
||||||
|
|
||||||
reader.read_to_end(&mut bytes).await?;
|
reader.read_to_end(&mut bytes).await?;
|
||||||
|
|
||||||
let s = String::from_utf8_lossy(&bytes);
|
let s = String::from_utf8_lossy(&bytes);
|
||||||
|
|
||||||
parse_details(s)
|
parse_details(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn convert_write_read(
|
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
format: Format,
|
|
||||||
) -> std::io::Result<impl AsyncRead + Unpin> {
|
|
||||||
let process = Process::spawn(Command::new("magick").args([
|
|
||||||
"convert",
|
|
||||||
"-",
|
|
||||||
format!("{}:-", format.to_magick_format()).as_str(),
|
|
||||||
]))?;
|
|
||||||
|
|
||||||
Ok(process.write_read(input).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn clear_metadata_stream<S, E>(
|
|
||||||
input: S,
|
|
||||||
) -> std::io::Result<futures::stream::LocalBoxStream<'static, Result<Bytes, E>>>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?;
|
|
||||||
|
|
||||||
Ok(Box::pin(process.sink_stream(input).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn convert_bytes_read(
|
pub(crate) fn convert_bytes_read(
|
||||||
input: Bytes,
|
input: Bytes,
|
||||||
format: Format,
|
format: Format,
|
||||||
|
@ -113,56 +77,6 @@ pub(crate) fn convert_bytes_read(
|
||||||
Ok(process.bytes_read(input).unwrap())
|
Ok(process.bytes_read(input).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn convert_stream<S, E>(
|
|
||||||
input: S,
|
|
||||||
format: Format,
|
|
||||||
) -> std::io::Result<futures::stream::LocalBoxStream<'static, Result<Bytes, E>>>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let process = Process::spawn(Command::new("magick").args([
|
|
||||||
"convert",
|
|
||||||
"-",
|
|
||||||
format!("{}:-", format.to_magick_format()).as_str(),
|
|
||||||
]))?;
|
|
||||||
|
|
||||||
Ok(Box::pin(process.sink_stream(input).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn details_stream<S, E1, E2>(input: S) -> Result<Details, E2>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E1>> + Unpin,
|
|
||||||
E1: From<std::io::Error>,
|
|
||||||
E2: From<E1> + From<std::io::Error> + From<MagickError>,
|
|
||||||
{
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
|
|
||||||
let permit = semaphore().acquire().await.map_err(MagickError::from)?;
|
|
||||||
|
|
||||||
let mut process = Process::spawn(Command::new("magick").args([
|
|
||||||
"identify",
|
|
||||||
"-ping",
|
|
||||||
"-format",
|
|
||||||
"%w %h | %m\n",
|
|
||||||
"-",
|
|
||||||
]))?;
|
|
||||||
|
|
||||||
process.take_sink().unwrap().send(input).await?;
|
|
||||||
let mut stream = process.take_stream().unwrap();
|
|
||||||
|
|
||||||
let mut buf = actix_web::web::BytesMut::new();
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let bytes = res?;
|
|
||||||
buf.extend_from_slice(&bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(permit);
|
|
||||||
|
|
||||||
let s = String::from_utf8_lossy(&buf);
|
|
||||||
Ok(parse_details(s)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn details<P>(file: P) -> Result<Details, MagickError>
|
pub(crate) async fn details<P>(file: P) -> Result<Details, MagickError>
|
||||||
where
|
where
|
||||||
P: AsRef<std::path::Path>,
|
P: AsRef<std::path::Path>,
|
||||||
|
@ -250,35 +164,6 @@ pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result<ValidInputType,
|
||||||
parse_input_type(s)
|
parse_input_type(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn input_type_stream<S, E1, E2>(input: S) -> Result<ValidInputType, E2>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E1>> + Unpin,
|
|
||||||
E1: From<std::io::Error>,
|
|
||||||
E2: From<E1> + From<std::io::Error> + From<MagickError>,
|
|
||||||
{
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
|
|
||||||
let permit = semaphore().acquire().await.map_err(MagickError::from)?;
|
|
||||||
|
|
||||||
let mut process =
|
|
||||||
Process::spawn(Command::new("magick").args(["identify", "-ping", "-format", "%m\n", "-"]))?;
|
|
||||||
|
|
||||||
process.take_sink().unwrap().send(input).await?;
|
|
||||||
let mut stream = process.take_stream().unwrap();
|
|
||||||
|
|
||||||
let mut buf = actix_web::web::BytesMut::new();
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let bytes = res?;
|
|
||||||
buf.extend_from_slice(&bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(permit);
|
|
||||||
|
|
||||||
let s = String::from_utf8_lossy(&buf);
|
|
||||||
|
|
||||||
Ok(parse_input_type(s)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result<ValidInputType, MagickError> {
|
fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result<ValidInputType, MagickError> {
|
||||||
let mut lines = s.lines();
|
let mut lines = s.lines();
|
||||||
let first = lines.next();
|
let first = lines.next();
|
||||||
|
@ -298,15 +183,11 @@ fn parse_input_type(s: std::borrow::Cow<'_, str>) -> Result<ValidInputType, Magi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn process_image_stream<S, E>(
|
pub(crate) fn process_image_write_read(
|
||||||
input: S,
|
input: impl AsyncRead + Unpin + 'static,
|
||||||
args: Vec<String>,
|
args: Vec<String>,
|
||||||
format: Format,
|
format: Format,
|
||||||
) -> std::io::Result<futures::stream::LocalBoxStream<'static, Result<Bytes, E>>>
|
) -> std::io::Result<impl AsyncRead + Unpin> {
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let process = Process::spawn(
|
let process = Process::spawn(
|
||||||
Command::new("magick")
|
Command::new("magick")
|
||||||
.args([&"convert", &"-"])
|
.args([&"convert", &"-"])
|
||||||
|
@ -314,7 +195,7 @@ where
|
||||||
.arg(format!("{}:-", format.to_magick_format())),
|
.arg(format!("{}:-", format.to_magick_format())),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(Box::pin(process.sink_stream(input).unwrap()))
|
Ok(process.write_read(input).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<tokio::sync::AcquireError> for MagickError {
|
impl From<tokio::sync::AcquireError> for MagickError {
|
||||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -10,6 +10,7 @@ use futures::stream::{Stream, TryStreamExt};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::{collections::HashSet, path::PathBuf, pin::Pin, time::SystemTime};
|
use std::{collections::HashSet, path::PathBuf, pin::Pin, time::SystemTime};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
use tracing::{debug, error, info, instrument, Span};
|
use tracing::{debug, error, info, instrument, Span};
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
|
@ -402,58 +403,27 @@ async fn process(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let stream = Box::pin(async_stream::stream! {
|
let file = tokio::fs::File::open(original_path.clone()).await?;
|
||||||
use futures::stream::StreamExt;
|
|
||||||
|
|
||||||
let mut s = actix_fs::read_to_stream(original_path.clone())
|
let mut processed_reader =
|
||||||
.await?
|
crate::magick::process_image_write_read(file, thumbnail_args, format)?;
|
||||||
.faster();
|
|
||||||
|
|
||||||
while let Some(res) = s.next().await {
|
let mut vec = Vec::new();
|
||||||
yield res.map_err(UploadError::from);
|
processed_reader.read_to_end(&mut vec).await?;
|
||||||
}
|
let bytes = web::Bytes::from(vec);
|
||||||
});
|
|
||||||
|
|
||||||
let processed_stream = crate::magick::process_image_stream(stream, thumbnail_args, format)?;
|
let details = if let Some(details) = details {
|
||||||
|
details
|
||||||
let (base_stream, copied_stream) = crate::stream::try_duplicate(processed_stream, 1024);
|
|
||||||
|
|
||||||
let (details, base_stream) = if let Some(details) = details {
|
|
||||||
(
|
|
||||||
details,
|
|
||||||
Box::pin(base_stream)
|
|
||||||
as Pin<
|
|
||||||
Box<
|
|
||||||
dyn futures::stream::Stream<
|
|
||||||
Item = Result<actix_web::web::Bytes, UploadError>,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
let (base_stream, copied2) = crate::stream::try_duplicate(Box::pin(base_stream), 1024);
|
Details::from_bytes(bytes.clone()).await?
|
||||||
let details = Details::from_stream(Box::pin(base_stream)).await?;
|
|
||||||
(
|
|
||||||
details,
|
|
||||||
Box::pin(copied2)
|
|
||||||
as Pin<
|
|
||||||
Box<
|
|
||||||
dyn futures::stream::Stream<
|
|
||||||
Item = Result<actix_web::web::Bytes, UploadError>,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let span = tracing::Span::current();
|
let span = tracing::Span::current();
|
||||||
let details2 = details.clone();
|
let details2 = details.clone();
|
||||||
|
let bytes2 = bytes.clone();
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
let entered = span.enter();
|
let entered = span.enter();
|
||||||
if let Err(e) =
|
if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await {
|
||||||
upload_manager::safe_save_stream(thumbnail_path.clone(), Box::pin(copied_stream))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::warn!("Error saving thumbnail: {}", e);
|
tracing::warn!("Error saving thumbnail: {}", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -472,7 +442,7 @@ async fn process(
|
||||||
|
|
||||||
return Ok(srv_response(
|
return Ok(srv_response(
|
||||||
HttpResponse::Ok(),
|
HttpResponse::Ok(),
|
||||||
base_stream,
|
futures::stream::once(futures::future::ready(Ok(bytes) as Result<_, UploadError>)),
|
||||||
details.content_type(),
|
details.content_type(),
|
||||||
7 * DAYS,
|
7 * DAYS,
|
||||||
details.system_time(),
|
details.system_time(),
|
||||||
|
|
211
src/stream.rs
211
src/stream.rs
|
@ -1,12 +1,11 @@
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use futures::stream::{LocalBoxStream, Stream, StreamExt};
|
use futures::stream::{LocalBoxStream, Stream};
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf};
|
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
|
|
||||||
pub(crate) struct ReadAdapter<S> {
|
pub(crate) struct ReadAdapter<S> {
|
||||||
inner: S,
|
inner: S,
|
||||||
|
@ -22,59 +21,10 @@ pub(crate) struct ProcessRead<I> {
|
||||||
err_closed: bool,
|
err_closed: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct ProcessSink {
|
|
||||||
stdin: tokio::process::ChildStdin,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct ProcessStream {
|
|
||||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct ProcessSinkStream<E> {
|
pub(crate) struct ProcessSinkStream<E> {
|
||||||
stream: LocalBoxStream<'static, Result<Bytes, E>>,
|
stream: LocalBoxStream<'static, Result<Bytes, E>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct TryDuplicateStream<T, E> {
|
|
||||||
inner: ReceiverStream<Result<T, E>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) struct StringError(String);
|
|
||||||
|
|
||||||
impl<S> ReadAdapter<S> {
|
|
||||||
pub(crate) fn new_unsync<E>(
|
|
||||||
mut stream: S,
|
|
||||||
) -> ReadAdapter<ReceiverStream<Result<Bytes, StringError>>>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: std::fmt::Display,
|
|
||||||
{
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
|
||||||
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
if tx
|
|
||||||
.send(res.map_err(|e| StringError(e.to_string())))
|
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ReadAdapter::new_sync(ReceiverStream::new(rx))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_sync<E>(stream: S) -> Self
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
|
||||||
E: Into<Box<dyn std::error::Error + Send + Sync>>,
|
|
||||||
{
|
|
||||||
ReadAdapter { inner: stream }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Process {
|
impl Process {
|
||||||
fn new(child: tokio::process::Child) -> Self {
|
fn new(child: tokio::process::Child) -> Self {
|
||||||
Process { child }
|
Process { child }
|
||||||
|
@ -87,14 +37,6 @@ impl Process {
|
||||||
.map(Process::new)
|
.map(Process::new)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn take_sink(&mut self) -> Option<ProcessSink> {
|
|
||||||
self.child.stdin.take().map(ProcessSink::new)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn take_stream(&mut self) -> Option<ProcessStream> {
|
|
||||||
self.child.stdout.take().map(ProcessStream::new)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option<impl AsyncRead + Unpin> {
|
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option<impl AsyncRead + Unpin> {
|
||||||
let mut stdin = self.child.stdin.take()?;
|
let mut stdin = self.child.stdin.take()?;
|
||||||
let stdout = self.child.stdout.take()?;
|
let stdout = self.child.stdout.take()?;
|
||||||
|
@ -135,131 +77,6 @@ impl Process {
|
||||||
err_closed: false,
|
err_closed: false,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn sink_stream<S, E>(mut self, input_stream: S) -> Option<ProcessSinkStream<E>>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
{
|
|
||||||
let mut stdin = self.take_sink()?;
|
|
||||||
let mut stdout = self.take_stream()?;
|
|
||||||
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
|
||||||
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
if let Err(e) = stdin.send(input_stream).await {
|
|
||||||
let _ = tx.send(e).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Some(ProcessSinkStream {
|
|
||||||
stream: Box::pin(async_stream::stream! {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
opt = rx.recv() => {
|
|
||||||
if let Some(e) = opt {
|
|
||||||
yield Err(e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
res = stdout.next() => {
|
|
||||||
match res {
|
|
||||||
Some(Ok(bytes)) => yield Ok(bytes),
|
|
||||||
Some(Err(e)) => {
|
|
||||||
yield Err(e.into());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(stdout);
|
|
||||||
match self.child.wait().await {
|
|
||||||
Ok(status) if status.success() => return,
|
|
||||||
Ok(_) => yield Err(std::io::Error::from(std::io::ErrorKind::Other).into()),
|
|
||||||
Err(e) => yield Err(e.into()),
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ProcessSink {
|
|
||||||
fn new(stdin: tokio::process::ChildStdin) -> Self {
|
|
||||||
ProcessSink { stdin }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn send<S, E>(&mut self, mut stream: S) -> Result<(), E>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
|
||||||
E: From<std::io::Error>,
|
|
||||||
{
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let mut bytes = res?;
|
|
||||||
|
|
||||||
self.stdin.write_all_buf(&mut bytes).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ProcessStream {
|
|
||||||
fn new(mut stdout: tokio::process::ChildStdout) -> ProcessStream {
|
|
||||||
let s = async_stream::stream! {
|
|
||||||
loop {
|
|
||||||
let mut buf = actix_web::web::BytesMut::with_capacity(65_536);
|
|
||||||
|
|
||||||
match stdout.read_buf(&mut buf).await {
|
|
||||||
Ok(len) if len == 0 => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
yield Ok(buf.freeze());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
yield Err(e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ProcessStream {
|
|
||||||
stream: Box::pin(s),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn try_duplicate<S, T, E>(
|
|
||||||
mut stream: S,
|
|
||||||
buffer: usize,
|
|
||||||
) -> (impl Stream<Item = Result<T, E>>, TryDuplicateStream<T, E>)
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<T, E>> + Unpin,
|
|
||||||
T: Clone,
|
|
||||||
{
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(buffer);
|
|
||||||
let s = async_stream::stream! {
|
|
||||||
while let Some(value) = stream.next().await {
|
|
||||||
match value {
|
|
||||||
Ok(t) => {
|
|
||||||
let _ = tx.send(Ok(t.clone())).await;
|
|
||||||
yield Ok(t);
|
|
||||||
}
|
|
||||||
Err(e) => yield Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
s,
|
|
||||||
TryDuplicateStream {
|
|
||||||
inner: ReceiverStream::new(rx),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, E> AsyncRead for ReadAdapter<S>
|
impl<S, E> AsyncRead for ReadAdapter<S>
|
||||||
|
@ -312,14 +129,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for ProcessStream {
|
|
||||||
type Item = std::io::Result<Bytes>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
Pin::new(&mut self.stream).poll_next(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E> Stream for ProcessSinkStream<E> {
|
impl<E> Stream for ProcessSinkStream<E> {
|
||||||
type Item = Result<Bytes, E>;
|
type Item = Result<Bytes, E>;
|
||||||
|
|
||||||
|
@ -327,19 +136,3 @@ impl<E> Stream for ProcessSinkStream<E> {
|
||||||
Pin::new(&mut self.stream).poll_next(cx)
|
Pin::new(&mut self.stream).poll_next(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, E> Stream for TryDuplicateStream<T, E> {
|
|
||||||
type Item = Result<T, E>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
Pin::new(&mut self.inner).poll_next(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for StringError {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(f, "{}", self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for StringError {}
|
|
||||||
|
|
|
@ -33,12 +33,22 @@ pub struct UploadManager {
|
||||||
inner: Arc<UploadManagerInner>,
|
inner: Arc<UploadManagerInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Hasher<I> {
|
pub struct Hasher<I, D> {
|
||||||
inner: I,
|
inner: I,
|
||||||
hasher: sha2::Sha256,
|
hasher: D,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I> Hasher<I> {
|
impl<I, D> Hasher<I, D>
|
||||||
|
where
|
||||||
|
D: Digest + Send + 'static,
|
||||||
|
{
|
||||||
|
fn new(reader: I, digest: D) -> Self {
|
||||||
|
Hasher {
|
||||||
|
inner: reader,
|
||||||
|
hasher: digest,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn finalize_reset(self) -> Result<Hash, UploadError> {
|
async fn finalize_reset(self) -> Result<Hash, UploadError> {
|
||||||
let mut hasher = self.hasher;
|
let mut hasher = self.hasher;
|
||||||
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
|
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
|
||||||
|
@ -46,9 +56,10 @@ impl<I> Hasher<I> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I> AsyncRead for Hasher<I>
|
impl<I, D> AsyncRead for Hasher<I, D>
|
||||||
where
|
where
|
||||||
I: AsyncRead + Unpin,
|
I: AsyncRead + Unpin,
|
||||||
|
D: Digest + Unpin,
|
||||||
{
|
{
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
|
@ -134,25 +145,8 @@ pub(crate) struct Details {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Details {
|
impl Details {
|
||||||
pub(crate) async fn from_stream<S, E>(stream: S) -> Result<Self, UploadError>
|
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, UploadError> {
|
||||||
where
|
let details = crate::magick::details_bytes(input).await?;
|
||||||
S: futures::stream::Stream<Item = Result<actix_web::web::Bytes, E>> + Unpin + 'static,
|
|
||||||
E: From<std::io::Error> + 'static,
|
|
||||||
UploadError: From<E>,
|
|
||||||
{
|
|
||||||
let details = crate::magick::details_stream::<S, E, UploadError>(stream).await?;
|
|
||||||
|
|
||||||
Ok(Details::now(
|
|
||||||
details.width,
|
|
||||||
details.height,
|
|
||||||
details.mime_type,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn from_async_read(
|
|
||||||
input: impl AsyncRead + Unpin + 'static,
|
|
||||||
) -> Result<Self, UploadError> {
|
|
||||||
let details = crate::magick::details_write_read(input).await?;
|
|
||||||
|
|
||||||
Ok(Details::now(
|
Ok(Details::now(
|
||||||
details.width,
|
details.width,
|
||||||
|
@ -550,25 +544,24 @@ impl UploadManager {
|
||||||
UploadError: From<E>,
|
UploadError: From<E>,
|
||||||
E: Unpin + 'static,
|
E: Unpin + 'static,
|
||||||
{
|
{
|
||||||
let mapped_err_stream = Box::pin(async_stream::stream! {
|
let mut bytes_mut = actix_web::web::BytesMut::new();
|
||||||
use futures::stream::StreamExt;
|
|
||||||
|
|
||||||
while let Some(res) = stream.next().await {
|
debug!("Reading stream to memory");
|
||||||
yield res.map_err(UploadError::from);
|
while let Some(res) = stream.next().await {
|
||||||
}
|
let bytes = res?;
|
||||||
});
|
bytes_mut.extend_from_slice(&bytes);
|
||||||
|
}
|
||||||
|
|
||||||
let (content_type, validated_stream) =
|
debug!("Validating bytes");
|
||||||
crate::validate::validate_image_stream(mapped_err_stream, self.inner.format.clone())
|
let (content_type, validated_reader) =
|
||||||
|
crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let (s1, s2) = crate::stream::try_duplicate(validated_stream, 1024);
|
let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone());
|
||||||
|
|
||||||
let tmpfile = crate::tmp_file();
|
let tmpfile = crate::tmp_file();
|
||||||
let (hash, _) = tokio::try_join!(
|
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
||||||
self.hash_stream::<_, UploadError>(Box::pin(s1)),
|
let hash = hasher_reader.finalize_reset().await?;
|
||||||
safe_save_stream::<UploadError>(tmpfile.clone(), Box::pin(s2))
|
|
||||||
)?;
|
|
||||||
|
|
||||||
debug!("Storing alias");
|
debug!("Storing alias");
|
||||||
self.add_existing_alias(&hash, &alias).await?;
|
self.add_existing_alias(&hash, &alias).await?;
|
||||||
|
@ -599,10 +592,7 @@ impl UploadManager {
|
||||||
crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone())
|
crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut hasher_reader = Hasher {
|
let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone());
|
||||||
inner: validated_reader,
|
|
||||||
hasher: self.inner.hasher.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let tmpfile = crate::tmp_file();
|
let tmpfile = crate::tmp_file();
|
||||||
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
||||||
|
@ -717,30 +707,6 @@ impl UploadManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// produce a sh256sum of the uploaded file
|
|
||||||
async fn hash_stream<S, E>(&self, mut stream: S) -> Result<Hash, UploadError>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<actix_web::web::Bytes, E>> + Unpin,
|
|
||||||
UploadError: From<E>,
|
|
||||||
{
|
|
||||||
let mut hasher = self.inner.hasher.clone();
|
|
||||||
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let bytes = res?;
|
|
||||||
hasher = web::block(move || {
|
|
||||||
hasher.update(&bytes);
|
|
||||||
Ok(hasher) as Result<_, UploadError>
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
}
|
|
||||||
|
|
||||||
let hash =
|
|
||||||
web::block(move || Ok(hasher.finalize_reset().to_vec()) as Result<_, UploadError>)
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
Ok(Hash::new(hash))
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for an already-uploaded image with this hash, returning the path to the target file
|
// check for an already-uploaded image with this hash, returning the path to the target file
|
||||||
#[instrument(skip(self, hash, content_type))]
|
#[instrument(skip(self, hash, content_type))]
|
||||||
async fn check_duplicate(
|
async fn check_duplicate(
|
||||||
|
@ -1023,3 +989,34 @@ fn variant_details_key(hash: &[u8], path: &str) -> Vec<u8> {
|
||||||
key.extend(b"details");
|
key.extend(b"details");
|
||||||
key
|
key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::Hasher;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hasher_works() {
|
||||||
|
let hash = actix_rt::System::new()
|
||||||
|
.block_on(async move {
|
||||||
|
let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
|
||||||
|
|
||||||
|
let mut hasher = Hasher::new(file1, Sha256::new());
|
||||||
|
|
||||||
|
tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?;
|
||||||
|
|
||||||
|
hasher.finalize_reset().await
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap();
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
file.read_to_end(&mut vec).unwrap();
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(vec);
|
||||||
|
let correct_hash = hasher.finalize_reset().to_vec();
|
||||||
|
|
||||||
|
assert_eq!(hash.inner, correct_hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ pub(crate) async fn validate_image_bytes(
|
||||||
)),
|
)),
|
||||||
(_, ValidInputType::Mp4) => Ok((
|
(_, ValidInputType::Mp4) => Ok((
|
||||||
video_mp4(),
|
video_mp4(),
|
||||||
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?),
|
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?),
|
||||||
)),
|
)),
|
||||||
(Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok((
|
(Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok((
|
||||||
mime::IMAGE_JPEG,
|
mime::IMAGE_JPEG,
|
||||||
|
@ -43,50 +43,3 @@ pub(crate) async fn validate_image_bytes(
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn validate_image_stream<S>(
|
|
||||||
stream: S,
|
|
||||||
prescribed_format: Option<Format>,
|
|
||||||
) -> Result<
|
|
||||||
(
|
|
||||||
mime::Mime,
|
|
||||||
futures::stream::LocalBoxStream<'static, Result<actix_web::web::Bytes, UploadError>>,
|
|
||||||
),
|
|
||||||
UploadError,
|
|
||||||
>
|
|
||||||
where
|
|
||||||
S: futures::stream::Stream<Item = Result<actix_web::web::Bytes, UploadError>> + Unpin + 'static,
|
|
||||||
{
|
|
||||||
let (base_stream, copied_stream) = crate::stream::try_duplicate(stream, 1024);
|
|
||||||
|
|
||||||
let input_type =
|
|
||||||
crate::magick::input_type_stream::<_, UploadError, UploadError>(Box::pin(base_stream))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
match (prescribed_format, input_type) {
|
|
||||||
(_, ValidInputType::Gif) => Ok((
|
|
||||||
video_mp4(),
|
|
||||||
crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Gif)?,
|
|
||||||
)),
|
|
||||||
(_, ValidInputType::Mp4) => Ok((
|
|
||||||
video_mp4(),
|
|
||||||
crate::ffmpeg::to_mp4_stream(copied_stream, InputFormat::Mp4)?,
|
|
||||||
)),
|
|
||||||
(Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok((
|
|
||||||
mime::IMAGE_JPEG,
|
|
||||||
crate::exiftool::clear_metadata_stream(copied_stream)?,
|
|
||||||
)),
|
|
||||||
(Some(Format::Png) | None, ValidInputType::Png) => Ok((
|
|
||||||
mime::IMAGE_PNG,
|
|
||||||
crate::exiftool::clear_metadata_stream(copied_stream)?,
|
|
||||||
)),
|
|
||||||
(Some(Format::Webp) | None, ValidInputType::Webp) => Ok((
|
|
||||||
image_webp(),
|
|
||||||
crate::magick::clear_metadata_stream(copied_stream)?,
|
|
||||||
)),
|
|
||||||
(Some(format), _) => Ok((
|
|
||||||
format.to_mime(),
|
|
||||||
crate::magick::convert_stream(copied_stream, format)?,
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue