diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 6678d6a..7bd1b88 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -158,7 +158,9 @@ async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.variants.to_duration()); - let mut variant_stream = repo.older_variants(since).await?.into_streamer(); + let variant_stream = repo.older_variants(since).await?; + let variant_stream = std::pin::pin!(crate::stream::take(variant_stream, 2048)); + let mut variant_stream = variant_stream.into_streamer(); while let Some(res) = variant_stream.next().await { metrics::counter!("pict-rs.cleanup.outdated-variant").increment(1); @@ -176,7 +178,9 @@ async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), let now = time::OffsetDateTime::now_utc(); let since = now.saturating_sub(config.media.retention.proxy.to_duration()); - let mut alias_stream = repo.older_aliases(since).await?.into_streamer(); + let alias_stream = repo.older_aliases(since).await?; + let alias_stream = std::pin::pin!(crate::stream::take(alias_stream, 2048)); + let mut alias_stream = alias_stream.into_streamer(); while let Some(res) = alias_stream.next().await { metrics::counter!("pict-rs.cleanup.outdated-proxy").increment(1); diff --git a/src/stream.rs b/src/stream.rs index 65b6547..cae35da 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,6 +5,35 @@ use streem::IntoStreamer; use crate::future::WithMetrics; +pub(crate) fn take(stream: S, amount: usize) -> impl Stream +where + S: Stream, + S::Item: 'static, +{ + streem::from_fn(|yielder| async move { + let stream = std::pin::pin!(stream); + let mut streamer = stream.into_streamer(); + + let mut count = 0; + + if count == amount { + return; + } + + while let Some(item) = streamer.next().await { + tracing::trace!("take: looping"); + + yielder.yield_(item).await; + + count += 1; + + if count == amount { + break; + } + } + }) +} + pub(crate) fn metrics(name: &'static str, stream: S) -> impl Stream where S: Stream,