From bf1a16d7d31f76efddb2661f355b4c302ff8fe98 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 11 Sep 2021 16:35:38 -0500 Subject: [PATCH] Abort process writer task if reader is dropped --- src/middleware.rs | 2 +- src/range.rs | 4 +--- src/stream.rs | 13 +++++++++++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/middleware.rs b/src/middleware.rs index a69c3fc..c58fd85 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,3 +1,4 @@ +use actix_rt::time::Timeout; use actix_web::{ dev::{Service, ServiceRequest, Transform}, http::StatusCode, @@ -9,7 +10,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use actix_rt::time::Timeout; use tracing_futures::{Instrument, Instrumented}; use uuid::Uuid; diff --git a/src/range.rs b/src/range.rs index 81b9e11..da7eb0e 100644 --- a/src/range.rs +++ b/src/range.rs @@ -49,9 +49,7 @@ impl Range { ) -> impl Stream> + Unpin { match self { Range::RangeStart(start) => once(ready(Ok(bytes.slice(*start as usize..)))), - Range::SuffixLength(from_start) => { - once(ready(Ok(bytes.slice(..*from_start as usize)))) - } + Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))), Range::Segment(start, end) => { once(ready(Ok(bytes.slice(*start as usize..*end as usize)))) } diff --git a/src/stream.rs b/src/stream.rs index 5d88e60..f3316f8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -19,6 +19,7 @@ pub(crate) struct ProcessRead { inner: I, err_recv: tokio::sync::oneshot::Receiver, err_closed: bool, + handle: actix_rt::task::JoinHandle<()>, } struct BytesFreezer(S); @@ -43,7 +44,7 @@ impl Process { let mut child = self.child; - actix_rt::spawn(async move { + let handle = actix_rt::spawn(async move { if let Err(e) = stdin.write_all_buf(&mut input).await { let _ = tx.send(e); return; @@ -67,6 +68,7 @@ impl Process { inner: stdout, err_recv: rx, err_closed: false, + handle, })) } @@ -81,7 +83,7 @@ impl Process { let mut child = self.child; - actix_rt::spawn(async move { + let handle = actix_rt::spawn(async move { if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { let _ = tx.send(e); return; @@ -105,6 +107,7 @@ impl Process { inner: stdout, err_recv: rx, err_closed: false, + handle, })) } } @@ -144,6 +147,12 @@ where } } +impl Drop for ProcessRead { + fn drop(&mut self) { + self.handle.abort(); + } +} + impl Stream for BytesFreezer where S: Stream> + Unpin,