From e3462f6664a107585f693372a8fd30f44feddd58 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 30 Sep 2023 17:24:48 -0500 Subject: [PATCH] payload middleware: switch to Rc, always inject if Payload isn't None --- src/middleware/payload.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 4a9b4c0..37bcebb 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -1,11 +1,10 @@ use std::{ future::{ready, Ready}, - sync::Arc, + rc::Rc, }; use actix_web::{ dev::{Service, ServiceRequest, Transform}, - http::Method, HttpMessage, }; use streem::IntoStreamer; @@ -48,7 +47,7 @@ async fn drain(rx: flume::Receiver) { } #[derive(Clone)] -struct DrainHandle(Option>>); +struct DrainHandle(Option>>); pub(crate) struct Payload { sender: flume::Sender, @@ -67,7 +66,7 @@ pub(crate) struct PayloadStream { impl DrainHandle { fn new(handle: actix_web::rt::task::JoinHandle<()>) -> Self { - Self(Some(Arc::new(handle))) + Self(Some(Rc::new(handle))) } } @@ -75,7 +74,7 @@ impl Payload { pub(crate) fn new() -> Self { let (tx, rx) = crate::sync::channel(LIMIT); - let handle = DrainHandle::new(crate::sync::spawn(async move { drain(rx).await })); + let handle = DrainHandle::new(crate::sync::spawn(drain(rx))); Payload { sender: tx, handle } } @@ -83,7 +82,7 @@ impl Payload { impl Drop for DrainHandle { fn drop(&mut self) { - if let Some(handle) = self.0.take().and_then(Arc::into_inner) { + if let Some(handle) = self.0.take().and_then(Rc::into_inner) { handle.abort(); } } @@ -166,8 +165,9 @@ where } fn call(&self, mut req: ServiceRequest) -> Self::Future { - if matches!(*req.method(), Method::POST | Method::PATCH | Method::PUT) { - let payload = req.take_payload(); + let payload = req.take_payload(); + + if !matches!(payload, actix_web::dev::Payload::None) { let payload: LocalBoxStream<'static, _> = Box::pin(PayloadStream { inner: Some(payload), sender: self.sender.clone(),