mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Use with_timeout for all timeouts
This commit is contained in:
parent
fa11c4853e
commit
3bd0f78e75
4 changed files with 27 additions and 13 deletions
|
@ -38,6 +38,7 @@ use actix_web::{
|
||||||
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
||||||
};
|
};
|
||||||
use details::{ApiDetails, HumanDate};
|
use details::{ApiDetails, HumanDate};
|
||||||
|
use future::WithTimeout;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use middleware::Metrics;
|
use middleware::Metrics;
|
||||||
|
@ -432,7 +433,11 @@ async fn claim_upload<S: Store + 'static>(
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let upload_id = Serde::into_inner(query.into_inner().upload_id);
|
let upload_id = Serde::into_inner(query.into_inner().upload_id);
|
||||||
|
|
||||||
match actix_rt::time::timeout(Duration::from_secs(10), repo.wait(upload_id)).await {
|
match repo
|
||||||
|
.wait(upload_id)
|
||||||
|
.with_timeout(Duration::from_secs(10))
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(wait_res) => {
|
Ok(wait_res) => {
|
||||||
let upload_result = wait_res?;
|
let upload_result = wait_res?;
|
||||||
repo.claim(upload_id).await?;
|
repo.claim(upload_id).await?;
|
||||||
|
|
|
@ -12,6 +12,8 @@ use std::{
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::future::WithTimeout;
|
||||||
|
|
||||||
pub(crate) use self::metrics::Metrics;
|
pub(crate) use self::metrics::Metrics;
|
||||||
|
|
||||||
pub(crate) struct Deadline;
|
pub(crate) struct Deadline;
|
||||||
|
@ -149,8 +151,12 @@ impl actix_web::error::ResponseError for DeadlineExceeded {
|
||||||
HttpResponse::build(self.status_code())
|
HttpResponse::build(self.status_code())
|
||||||
.content_type("application/json")
|
.content_type("application/json")
|
||||||
.body(
|
.body(
|
||||||
serde_json::to_string(&serde_json::json!({ "msg": self.to_string() }))
|
serde_json::to_string(
|
||||||
.unwrap_or_else(|_| r#"{"msg":"request timeout"}"#.to_string()),
|
&serde_json::json!({ "msg": self.to_string(), "code": "request-timeout" }),
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
r#"{"msg":"request timeout","code":"request-timeout"}"#.to_string()
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -163,7 +169,7 @@ where
|
||||||
DeadlineFuture {
|
DeadlineFuture {
|
||||||
inner: match timeout {
|
inner: match timeout {
|
||||||
Some(duration) => DeadlineFutureInner::Timed {
|
Some(duration) => DeadlineFutureInner::Timed {
|
||||||
timeout: actix_rt::time::timeout(duration, future),
|
timeout: future.with_timeout(duration),
|
||||||
},
|
},
|
||||||
None => DeadlineFutureInner::Untimed { future },
|
None => DeadlineFutureInner::Untimed { future },
|
||||||
},
|
},
|
||||||
|
|
|
@ -14,7 +14,7 @@ use tokio::{
|
||||||
};
|
};
|
||||||
use tracing::{Instrument, Span};
|
use tracing::{Instrument, Span};
|
||||||
|
|
||||||
use crate::error_code::ErrorCode;
|
use crate::{error_code::ErrorCode, future::WithTimeout};
|
||||||
|
|
||||||
struct MetricsGuard {
|
struct MetricsGuard {
|
||||||
start: Instant,
|
start: Instant,
|
||||||
|
@ -159,7 +159,7 @@ impl Process {
|
||||||
timeout,
|
timeout,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let res = actix_rt::time::timeout(timeout, child.wait()).await;
|
let res = child.wait().with_timeout(timeout).await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Ok(status)) if status.success() => {
|
Ok(Ok(status)) if status.success() => {
|
||||||
|
@ -220,7 +220,7 @@ impl Process {
|
||||||
child.wait().await
|
child.wait().await
|
||||||
};
|
};
|
||||||
|
|
||||||
let error = match actix_rt::time::timeout(timeout, child_fut).await {
|
let error = match child_fut.with_timeout(timeout).await {
|
||||||
Ok(Ok(status)) if status.success() => {
|
Ok(Ok(status)) if status.success() => {
|
||||||
guard.disarm();
|
guard.disarm();
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -266,11 +266,12 @@ impl Inner {
|
||||||
|
|
||||||
impl UploadInterest {
|
impl UploadInterest {
|
||||||
async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> {
|
async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> {
|
||||||
actix_rt::time::timeout(
|
self.interest
|
||||||
timeout,
|
.as_ref()
|
||||||
self.interest.as_ref().expect("interest exists").notified(),
|
.expect("interest exists")
|
||||||
)
|
.notified()
|
||||||
.await
|
.with_timeout(timeout)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1214,7 +1215,9 @@ impl QueueRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(conn);
|
drop(conn);
|
||||||
if actix_rt::time::timeout(Duration::from_secs(5), notifier.notified())
|
if notifier
|
||||||
|
.notified()
|
||||||
|
.with_timeout(Duration::from_secs(5))
|
||||||
.await
|
.await
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in a new issue