use std::{ future::Future, sync::{Arc, OnceLock}, time::{Duration, Instant}, }; static NOOP_WAKER: OnceLock = OnceLock::new(); fn noop_waker() -> &'static std::task::Waker { NOOP_WAKER.get_or_init(|| std::task::Waker::from(Arc::new(NoopWaker))) } struct NoopWaker; impl std::task::Wake for NoopWaker { fn wake(self: std::sync::Arc) {} fn wake_by_ref(self: &std::sync::Arc) {} } pub(crate) type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; pub(crate) trait NowOrNever: Future { fn now_or_never(self) -> Option where Self: Sized, { let fut = std::pin::pin!(self); let mut cx = std::task::Context::from_waker(noop_waker()); match fut.poll(&mut cx) { std::task::Poll::Pending => None, std::task::Poll::Ready(out) => Some(out), } } } pub(crate) trait WithTimeout: Future { fn with_timeout(self, duration: Duration) -> tokio::time::Timeout where Self: Sized, { tokio::time::timeout(duration, self) } } pub(crate) trait WithMetrics: Future { fn with_metrics(self, name: &'static str) -> MetricsFuture where Self: Sized, { MetricsFuture { future: self, metrics: Metrics { name, start: Instant::now(), complete: false, }, } } } pub(crate) trait WithPollTimer: Future { fn with_poll_timer(self, name: &'static str) -> PollTimer where Self: Sized, { PollTimer { name, inner: self } } } impl NowOrNever for F where F: Future {} impl WithMetrics for F where F: Future {} impl WithTimeout for F where F: Future {} impl WithPollTimer for F where F: Future {} pin_project_lite::pin_project! { pub(crate) struct MetricsFuture { #[pin] future: F, metrics: Metrics, } } struct Metrics { name: &'static str, start: Instant, complete: bool, } impl Future for MetricsFuture where F: Future, { type Output = F::Output; fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let this = self.project(); let out = std::task::ready!(this.future.poll(cx)); this.metrics.complete = true; std::task::Poll::Ready(out) } } impl Drop for Metrics { fn drop(&mut self) { metrics::histogram!(self.name, "complete" => self.complete.to_string()) .record(self.start.elapsed().as_secs_f64()); } } pin_project_lite::pin_project! { pub(crate) struct PollTimer { name: &'static str, #[pin] inner: F, } } impl Future for PollTimer where F: Future, { type Output = F::Output; fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let start = Instant::now(); let this = self.project(); let out = this.inner.poll(cx); let elapsed = start.elapsed(); // only record 1 in 10 polls if elapsed.as_micros() % 10 == 0 { if elapsed > Duration::from_micros(10) { metrics::counter!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED, "timer" => this.name.to_string()).increment(1); metrics::histogram!(crate::init_metrics::FUTURE_POLL_TIMER_EXCEEDED_SECONDS, "timer" => this.name.to_string()).record(elapsed.as_secs_f64()); } if elapsed > Duration::from_secs(1) { #[cfg(feature = "poll-timer-warnings")] tracing::warn!( "Future {} polled for {} seconds", this.name, elapsed.as_secs() ); #[cfg(not(feature = "poll-timer-warnings"))] tracing::debug!( "Future {} polled for {} seconds", this.name, elapsed.as_secs() ); } else if elapsed > Duration::from_millis(1) { #[cfg(feature = "poll-timer-warnings")] tracing::warn!("Future {} polled for {} ms", this.name, elapsed.as_millis()); #[cfg(not(feature = "poll-timer-warnings"))] tracing::debug!("Future {} polled for {} ms", this.name, elapsed.as_millis()); } else if elapsed > Duration::from_micros(200) { #[cfg(feature = "poll-timer-warnings")] tracing::debug!( "Future {} polled for {} microseconds", this.name, elapsed.as_micros(), ); #[cfg(not(feature = "poll-timer-warnings"))] tracing::trace!( "Future {} polled for {} microseconds", this.name, elapsed.as_micros(), ); } else if elapsed > Duration::from_micros(1) { tracing::trace!( "Future {} polled for {} microseconds", this.name, elapsed.as_micros() ); } } out } }