Divorce reqwest from main application's spans

This commit is contained in:
Aode (lion) 2022-03-28 18:42:22 -05:00
parent 3f385c106d
commit 1291bf8beb
1 changed files with 33 additions and 14 deletions

View File

@ -18,9 +18,11 @@ use std::{
Arc, Arc,
}, },
task::{Context, Poll, Wake, Waker}, task::{Context, Poll, Wake, Waker},
time::{Duration, Instant},
}; };
use storage_path_generator::{Generator, Path}; use storage_path_generator::{Generator, Path};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tracing::Instrument;
mod object_id; mod object_id;
pub(crate) use object_id::ObjectId; pub(crate) use object_id::ObjectId;
@ -35,6 +37,9 @@ pub(crate) enum ObjectError {
#[error("Failed to generate path")] #[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError), PathGenerator(#[from] storage_path_generator::PathError),
#[error("Timeout")]
Elapsed,
#[error("Failed to parse string")] #[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error), Utf8(#[from] FromUtf8Error),
@ -113,21 +118,35 @@ impl Store for ObjectStore {
let path = identifier.as_str(); let path = identifier.as_str();
let start = from_start.unwrap_or(0); let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len); let end = len.map(|len| start + len - 1);
let request = Client::request( let request_span = tracing::info_span!(parent: None, "Get Object");
&self.client,
&self.bucket,
path,
Command::GetObjectRange { start, end },
);
let response = request.response().await.map_err(ObjectError::from)?; let request = request_span.in_scope(|| {
Client::request(
&self.client,
&self.bucket,
path,
Command::GetObjectRange { start, end },
)
});
Ok(Box::pin(timeout( let now = Instant::now();
io_error(response.bytes_stream()), let allotted = Duration::from_secs(5);
std::time::Duration::from_secs(5),
))) let response = request_span
.in_scope(|| tokio::time::timeout(allotted, request.response()))
.instrument(request_span.clone())
.await
.map_err(|_| ObjectError::Elapsed)?
.map_err(ObjectError::from)?;
let allotted = allotted.saturating_sub(now.elapsed());
Ok(
request_span
.in_scope(|| Box::pin(timeout(allotted, io_error(response.bytes_stream())))),
)
} }
#[tracing::instrument(skip(writer))] #[tracing::instrument(skip(writer))]
@ -258,7 +277,7 @@ where
IoError { inner: stream } IoError { inner: stream }
} }
fn timeout<S, T>(stream: S, duration: std::time::Duration) -> impl Stream<Item = std::io::Result<T>> fn timeout<S, T>(duration: Duration, stream: S) -> impl Stream<Item = std::io::Result<T>>
where where
S: Stream<Item = std::io::Result<T>>, S: Stream<Item = std::io::Result<T>>,
{ {
@ -321,7 +340,7 @@ where
if let Poll::Ready(()) = sleep.as_mut().poll(&mut timeout_cx) { if let Poll::Ready(()) = sleep.as_mut().poll(&mut timeout_cx) {
return Poll::Ready(Some(Err(std::io::Error::new( return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"Stream timeout".to_string(), Error::from(ObjectError::Elapsed),
)))); ))));
} else { } else {
*this.sleep = Some(sleep); *this.sleep = Some(sleep);