From 95637fdfe5438a573db7a873401862461f5019f9 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 23 Aug 2023 19:10:10 -0500 Subject: [PATCH] Remove direct dep on futures-util --- Cargo.lock | 1 - Cargo.toml | 1 - src/backgrounded.rs | 5 +++-- src/file.rs | 9 ++++++--- src/lib.rs | 15 +++++++-------- src/queue/process.rs | 4 ++-- src/store/object_store.rs | 7 ++++--- src/stream.rs | 36 ++++++++++++++++++++++++++++++++++++ 8 files changed, 58 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d2439d..a1968f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1744,7 +1744,6 @@ dependencies = [ "dashmap", "flume", "futures-core", - "futures-util", "hex", "md-5", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 635827c..e0f44ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ console-subscriber = "0.1" dashmap = "5.1.0" flume = "0.11.0" futures-core = "0.3" -futures-util = { version = "0.3.17", default-features = false } hex = "0.4.3" md-5 = "0.10.5" metrics = "0.21.1" diff --git a/src/backgrounded.rs b/src/backgrounded.rs index dd673d0..87ffdc6 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -2,10 +2,10 @@ use crate::{ error::Error, repo::{ArcRepo, UploadId}, store::Store, + stream::StreamMap, }; use actix_web::web::Bytes; use futures_core::Stream; -use futures_util::TryStreamExt; use mime::APPLICATION_OCTET_STREAM; use tracing::{Instrument, Span}; @@ -58,7 +58,8 @@ where .create_upload(self.upload_id.expect("Upload id exists")) .await?; - let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let stream = + stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); // use octet-stream, we don't know the upload's real type yet let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?; diff --git a/src/file.rs b/src/file.rs index 50df3f2..6706ca7 100644 --- a/src/file.rs +++ b/src/file.rs @@ -6,10 +6,13 @@ pub(crate) use tokio_file::File; #[cfg(not(feature = "io-uring"))] mod tokio_file { - use crate::{store::file_store::FileError, stream::IntoStreamer, Either}; + use crate::{ + store::file_store::FileError, + stream::{IntoStreamer, StreamMap}, + Either, + }; use actix_web::web::{Bytes, BytesMut}; use futures_core::Stream; - use futures_util::TryStreamExt; use std::{io::SeekFrom, path::Path}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -97,7 +100,7 @@ mod tokio_file { (None, None) => Either::right(self.inner), }; - Ok(FramedRead::new(obj, BytesCodec::new()).map_ok(BytesMut::freeze)) + Ok(FramedRead::new(obj, BytesCodec::new()).map(|res| res.map(BytesMut::freeze))) } } } diff --git a/src/lib.rs b/src/lib.rs index d473542..ad02a69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ use actix_web::{ web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; use futures_core::Stream; -use futures_util::{StreamExt, TryStreamExt}; use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; use once_cell::sync::Lazy; @@ -69,7 +68,7 @@ use self::{ repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, - stream::{empty, once, StreamLimit, StreamTimeout}, + stream::{empty, once, StreamLimit, StreamMap, StreamTimeout}, }; pub use self::config::{ConfigSource, PictRsConfiguration}; @@ -154,7 +153,7 @@ impl FormData for Upload { let span = tracing::info_span!("file-upload", ?filename); - let stream = stream.map_err(Error::from); + let stream = stream.map(|res| res.map_err(Error::from)); Box::pin( async move { @@ -213,7 +212,7 @@ impl FormData for Import { let span = tracing::info_span!("file-import", ?filename); - let stream = stream.map_err(Error::from); + let stream = stream.map(|res| res.map_err(Error::from)); Box::pin( async move { @@ -350,7 +349,7 @@ impl FormData for BackgroundedUpload { let span = tracing::info_span!("file-proxy", ?filename); - let stream = stream.map_err(Error::from); + let stream = stream.map(|res| res.map_err(Error::from)); Box::pin( async move { @@ -521,7 +520,7 @@ async fn download_stream( let stream = res .bytes_stream() - .map_err(Error::from) + .map(|res| res.map_err(Error::from)) .limit((config.media.max_file_size * MEGABYTES) as u64); Ok(stream) @@ -1231,7 +1230,7 @@ async fn ranged_file_resp( Either::left(Either::left( range::chop_store(range, store, &identifier, len) .await? - .map_err(Error::from), + .map(|res| res.map_err(Error::from)), )), ) } else { @@ -1248,7 +1247,7 @@ async fn ranged_file_resp( let stream = store .to_stream(&identifier, None, None) .await? - .map_err(Error::from); + .map(|res| res.map_err(Error::from)); if not_found { (HttpResponse::NotFound(), Either::right(stream)) diff --git a/src/queue/process.rs b/src/queue/process.rs index be01479..399d817 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -8,8 +8,8 @@ use crate::{ repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, store::{Identifier, Store}, + stream::StreamMap, }; -use futures_util::TryStreamExt; use std::path::PathBuf; pub(super) fn perform<'a, S>( @@ -92,7 +92,7 @@ where let stream = store2 .to_stream(&ident, None, None) .await? - .map_err(Error::from); + .map(|res| res.map_err(Error::from)); let session = crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 1847eed..f3767ec 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -2,7 +2,7 @@ use crate::{ bytes_stream::BytesStream, repo::{Repo, SettingsRepo}, store::Store, - stream::IntoStreamer, + stream::{IntoStreamer, StreamMap}, }; use actix_rt::task::JoinError; use actix_web::{ @@ -15,7 +15,6 @@ use actix_web::{ }; use base64::{prelude::BASE64_STANDARD, Engine}; use futures_core::Stream; -use futures_util::TryStreamExt; use reqwest::{header::RANGE, Body, Response}; use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; @@ -382,7 +381,9 @@ impl Store for ObjectStore { } Ok(Box::pin( - response.bytes_stream().map_err(payload_to_io_error), + response + .bytes_stream() + .map(|res| res.map_err(payload_to_io_error)), )) } diff --git a/src/stream.rs b/src/stream.rs index 3ebe13d..60a0728 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -13,6 +13,42 @@ use std::{ time::Duration, }; +pin_project_lite::pin_project! { + pub(crate) struct Map { + #[pin] + stream: S, + func: F, + } +} + +pub(crate) trait StreamMap: Stream { + fn map(self, func: F) -> Map + where + F: FnMut(Self::Item) -> U, + Self: Sized, + { + Map { stream: self, func } + } +} + +impl StreamMap for T where T: Stream {} + +impl Stream for Map +where + S: Stream, + F: FnMut(S::Item) -> U, +{ + type Item = U; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + let value = std::task::ready!(this.stream.poll_next(cx)); + + Poll::Ready(value.map(this.func)) + } +} + pub(crate) struct Empty(PhantomData); impl Stream for Empty {