From ac79a9d5cd1ef51445684686dc30505fb8c85406 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sun, 5 Sep 2021 20:00:31 -0500 Subject: [PATCH] Use futures-util instead of hand-implemented types --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 3 +-- src/middleware.rs | 2 +- src/range.rs | 6 ++--- src/stream.rs | 51 +------------------------------------------ src/upload_manager.rs | 8 +++---- 7 files changed, 11 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9ac1fe..aee9b9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -934,7 +934,7 @@ dependencies = [ "awc", "base64", "dashmap", - "futures-core", + "futures-util", "mime", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 81d633a..4099f01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ anyhow = "1.0" awc = { version = "3.0.0-beta.7", default-features = false } base64 = "0.13.0" dashmap = "4.0.2" -futures-core = "0.3.17" +futures-util = "0.3.17" mime = "0.3.1" num_cpus = "1.13" once_cell = "1.4.0" diff --git a/src/main.rs b/src/main.rs index 1fe5c2e..2978211 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use actix_web::{ }; use awc::Client; use dashmap::{mapref::entry::Entry, DashMap}; -use futures_core::stream::Stream; +use futures_util::{stream::{LocalBoxStream, once}, Stream}; use once_cell::sync::{Lazy, OnceCell}; use std::{ collections::HashSet, @@ -42,7 +42,6 @@ use self::{ config::{Config, Format}, error::UploadError, middleware::{Internal, Tracing}, - stream::{once, LocalBoxStream}, upload_manager::{Details, UploadManager}, validate::{image_webp, video_mp4}, }; diff --git a/src/middleware.rs b/src/middleware.rs index 2e98890..1184a9d 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,9 +1,9 @@ -use crate::stream::LocalBoxFuture; use actix_web::{ dev::{Service, ServiceRequest, Transform}, http::StatusCode, HttpResponse, ResponseError, }; +use futures_util::future::LocalBoxFuture; use std::{ future::{ready, Ready}, task::{Context, Poll}, diff --git a/src/range.rs b/src/range.rs index 49ea67b..03695ee 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,7 +1,4 @@ -use crate::{ - stream::{bytes_stream, LocalBoxStream}, - UploadError, -}; +use crate::{stream::bytes_stream, UploadError}; use actix_web::{ dev::Payload, http::{ @@ -11,6 +8,7 @@ use actix_web::{ web::Bytes, FromRequest, HttpRequest, }; +use futures_util::stream::LocalBoxStream; use std::{future::ready, io}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; diff --git a/src/stream.rs b/src/stream.rs index a145e47..de43ae4 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ use crate::error::UploadError; use actix_web::web::{Bytes, BytesMut}; -use futures_core::stream::Stream; +use futures_util::Stream; use std::{ future::Future, pin::Pin, @@ -8,9 +8,6 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; -pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; -pub(crate) type LocalBoxFuture<'a, T> = Pin + 'a>>; - pub(crate) struct Process { child: tokio::process::Child, } @@ -23,12 +20,6 @@ pub(crate) struct ProcessRead { struct BytesFreezer(S); -pub(crate) struct Once { - inner: Option, -} - -pub(crate) struct Next<'a, S>(&'a mut S); - impl Process { fn new(child: tokio::process::Child) -> Self { Process { child } @@ -92,14 +83,6 @@ pub(crate) fn bytes_stream( )) } -pub(crate) fn once(input: T) -> Once { - Once { inner: Some(input) } -} - -pub(crate) fn next<'a, S>(stream: &'a mut S) -> Next<'a, S> { - Next(stream) -} - impl AsyncRead for ProcessRead where I: AsyncRead + Unpin, @@ -139,35 +122,3 @@ where .map_err(UploadError::from) } } - -impl Stream for Once -where - T: Future + Unpin, -{ - type Item = ::Output; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(mut fut) = self.inner.take() { - match Pin::new(&mut fut).poll(cx) { - Poll::Ready(item) => Poll::Ready(Some(item)), - Poll::Pending => { - self.inner = Some(fut); - Poll::Pending - } - } - } else { - Poll::Ready(None) - } - } -} - -impl<'a, S> Future for Next<'a, S> -where - S: Stream + Unpin, -{ - type Output = Option<::Item>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll_next(cx) - } -} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 7168283..95ad1a5 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -2,10 +2,10 @@ use crate::{ config::Format, error::UploadError, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, - stream::{next, LocalBoxStream}, to_ext, }; use actix_web::web; +use futures_util::stream::{LocalBoxStream, StreamExt}; use sha2::Digest; use std::{ path::PathBuf, @@ -547,7 +547,7 @@ impl UploadManager { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); - while let Some(res) = next(&mut stream).await { + while let Some(res) = stream.next().await { let bytes = res?; bytes_mut.extend_from_slice(&bytes); } @@ -582,7 +582,7 @@ impl UploadManager { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); - while let Some(res) = next(&mut stream).await { + while let Some(res) = stream.next().await { let bytes = res?; bytes_mut.extend_from_slice(&bytes); } @@ -954,7 +954,7 @@ where let fut = async move { let mut file = tokio::fs::File::create(to1).await?; - while let Some(res) = next(&mut stream).await { + while let Some(res) = stream.next().await { let mut bytes = res?; file.write_all_buf(&mut bytes).await?; }