diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 442c67d..9734b3a 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -18,9 +18,11 @@ use std::{ Arc, }, task::{Context, Poll, Wake, Waker}, + time::{Duration, Instant}, }; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::Instrument; mod object_id; pub(crate) use object_id::ObjectId; @@ -35,6 +37,9 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), + #[error("Timeout")] + Elapsed, + #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -113,21 +118,35 @@ impl Store for ObjectStore { let path = identifier.as_str(); let start = from_start.unwrap_or(0); - let end = len.map(|len| start + len); + let end = len.map(|len| start + len - 1); - let request = Client::request( - &self.client, - &self.bucket, - path, - Command::GetObjectRange { start, end }, - ); + let request_span = tracing::info_span!(parent: None, "Get Object"); - let response = request.response().await.map_err(ObjectError::from)?; + let request = request_span.in_scope(|| { + Client::request( + &self.client, + &self.bucket, + path, + Command::GetObjectRange { start, end }, + ) + }); - Ok(Box::pin(timeout( - io_error(response.bytes_stream()), - std::time::Duration::from_secs(5), - ))) + let now = Instant::now(); + let allotted = Duration::from_secs(5); + + let response = request_span + .in_scope(|| tokio::time::timeout(allotted, request.response())) + .instrument(request_span.clone()) + .await + .map_err(|_| ObjectError::Elapsed)? + .map_err(ObjectError::from)?; + + let allotted = allotted.saturating_sub(now.elapsed()); + + Ok( + request_span + .in_scope(|| Box::pin(timeout(allotted, io_error(response.bytes_stream())))), + ) } #[tracing::instrument(skip(writer))] @@ -258,7 +277,7 @@ where IoError { inner: stream } } -fn timeout(stream: S, duration: std::time::Duration) -> impl Stream> +fn timeout(duration: Duration, stream: S) -> impl Stream> where S: Stream>, { @@ -321,7 +340,7 @@ where if let Poll::Ready(()) = sleep.as_mut().poll(&mut timeout_cx) { return Poll::Ready(Some(Err(std::io::Error::new( std::io::ErrorKind::Other, - "Stream timeout".to_string(), + Error::from(ObjectError::Elapsed), )))); } else { *this.sleep = Some(sleep);