diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 0e3a707e2..2151c0eed 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -63,8 +63,9 @@ impl SendManager { pub fn run(mut self) -> CancellableTask { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { - self.do_loop(cancel).await.unwrap(); - self.cancel().await.unwrap(); + self.do_loop(cancel).await?; + self.cancel().await?; + Ok(()) }) } @@ -109,7 +110,8 @@ impl SendManager { self.workers.insert( instance.id, CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { - InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await + InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?; + Ok(()) }), ); } else if !should_federate { diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index c7f020e1d..f88ea5c9f 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -17,6 +17,7 @@ use lemmy_db_schema::{ traits::ApubActor, utils::{get_conn, DbPool}, }; +use lemmy_utils::error::LemmyResult; use moka::future::Cache; use once_cell::sync::Lazy; use reqwest::Url; @@ -24,6 +25,7 @@ use serde_json::Value; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{task::JoinHandle, time::sleep}; use tokio_util::sync::CancellationToken; +use tracing::error; /// Decrease the delays of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. @@ -59,26 +61,26 @@ impl CancellableTask { task: impl FnOnce(CancellationToken) -> F + Send + 'static, ) -> CancellableTask where - F: Future + Send + 'static, + F: Future> + Send + 'static, R: Send + 'static, { let stop = CancellationToken::new(); let stop2 = stop.clone(); - // TODO: need to print error - let task: JoinHandle = tokio::spawn(task(stop2.clone())); + let task: JoinHandle> = tokio::spawn(task(stop2)); let abort = task.abort_handle(); CancellableTask { f: Box::pin(async move { stop.cancel(); tokio::select! { r = task => { - r.context("could not join")?; - Ok(()) + if let Err(ref e) = r? { + error!("CancellableTask threw error: {e}"); + } + Ok(()) }, _ = sleep(timeout) => { abort.abort(); - tracing::warn!("Graceful shutdown timed out, aborting task"); - Err(anyhow!("task aborted due to timeout")) + Err(anyhow!("CancellableTask aborted due to shutdown timeout")) } } }),