diff --git a/src/store/object_store.rs b/src/store/object_store.rs index f9cb01a..c639462 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -405,9 +405,9 @@ impl Store for ObjectStore { return Err(status_error(response).await); } - Ok(Box::pin(crate::stream::map_err( - response.bytes_stream(), - payload_to_io_error, + Ok(Box::pin(crate::stream::metrics( + "pict-rs.object-storage.get-object-request.stream", + crate::stream::map_err(response.bytes_stream(), payload_to_io_error), ))) } @@ -434,7 +434,11 @@ impl Store for ObjectStore { )); } - let mut stream = response.bytes_stream().into_streamer(); + let stream = std::pin::pin!(crate::stream::metrics( + "pict-rs.object-storage.get-object-request.stream", + response.bytes_stream() + )); + let mut stream = stream.into_streamer(); while let Some(res) = stream.next().await { let mut bytes = res.map_err(payload_to_io_error)?; diff --git a/src/stream.rs b/src/stream.rs index c52dc1b..54c19a7 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,6 +3,26 @@ use futures_core::Stream; use std::{pin::Pin, time::Duration}; use streem::IntoStreamer; +use crate::future::WithMetrics; + +pub(crate) fn metrics(name: &'static str, stream: S) -> impl Stream +where + S: Stream, + S::Item: 'static, +{ + streem::from_fn(|yielder| { + async move { + let stream = std::pin::pin!(stream); + let mut streamer = stream.into_streamer(); + + while let Some(item) = streamer.next().await { + yielder.yield_(item).await; + } + } + .with_metrics(name) + }) +} + pub(crate) fn make_send(stream: S) -> impl Stream + Send where S: Stream + 'static,