error handling

This commit is contained in:
Felix Ableitner 2024-05-23 13:33:00 +02:00
parent b7cc908cb2
commit 00ceaa7992
2 changed files with 14 additions and 10 deletions

View file

@ -63,8 +63,9 @@ impl SendManager {
pub fn run(mut self) -> CancellableTask { pub fn run(mut self) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move {
self.do_loop(cancel).await.unwrap(); self.do_loop(cancel).await?;
self.cancel().await.unwrap(); self.cancel().await?;
Ok(())
}) })
} }
@ -109,7 +110,8 @@ impl SendManager {
self.workers.insert( self.workers.insert(
instance.id, instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move {
InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?;
Ok(())
}), }),
); );
} else if !should_federate { } else if !should_federate {

View file

@ -17,6 +17,7 @@ use lemmy_db_schema::{
traits::ApubActor, traits::ApubActor,
utils::{get_conn, DbPool}, utils::{get_conn, DbPool},
}; };
use lemmy_utils::error::LemmyResult;
use moka::future::Cache; use moka::future::Cache;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::Url; use reqwest::Url;
@ -24,6 +25,7 @@ use serde_json::Value;
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::error;
/// Decrease the delays of the federation queue. /// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue. /// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
@ -59,26 +61,26 @@ impl CancellableTask {
task: impl FnOnce(CancellationToken) -> F + Send + 'static, task: impl FnOnce(CancellationToken) -> F + Send + 'static,
) -> CancellableTask ) -> CancellableTask
where where
F: Future<Output = R> + Send + 'static, F: Future<Output = LemmyResult<R>> + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let stop = CancellationToken::new(); let stop = CancellationToken::new();
let stop2 = stop.clone(); let stop2 = stop.clone();
// TODO: need to print error let task: JoinHandle<LemmyResult<R>> = tokio::spawn(task(stop2));
let task: JoinHandle<R> = tokio::spawn(task(stop2.clone()));
let abort = task.abort_handle(); let abort = task.abort_handle();
CancellableTask { CancellableTask {
f: Box::pin(async move { f: Box::pin(async move {
stop.cancel(); stop.cancel();
tokio::select! { tokio::select! {
r = task => { r = task => {
r.context("could not join")?; if let Err(ref e) = r? {
error!("CancellableTask threw error: {e}");
}
Ok(()) Ok(())
}, },
_ = sleep(timeout) => { _ = sleep(timeout) => {
abort.abort(); abort.abort();
tracing::warn!("Graceful shutdown timed out, aborting task"); Err(anyhow!("CancellableTask aborted due to shutdown timeout"))
Err(anyhow!("task aborted due to timeout"))
} }
} }
}), }),