mirror of
https://github.com/LemmyNet/lemmy.git
synced 2025-01-11 12:35:54 +00:00
use boxfuture for readability
This commit is contained in:
parent
14479cefd2
commit
59f08d81c9
2 changed files with 9 additions and 15 deletions
|
@ -6,7 +6,6 @@ use activitypub_federation::config::FederationConfig;
|
||||||
use chrono::{Local, Timelike};
|
use chrono::{Local, Timelike};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use federation_queue_state::FederationQueueState;
|
use federation_queue_state::FederationQueueState;
|
||||||
use futures::Future;
|
|
||||||
use lemmy_db_schema::{
|
use lemmy_db_schema::{
|
||||||
source::instance::Instance,
|
source::instance::Instance,
|
||||||
utils::{ActualDbPool, DbPool},
|
utils::{ActualDbPool, DbPool},
|
||||||
|
@ -114,7 +113,7 @@ pub fn start_stop_federation_workers_cancellable(
|
||||||
opts: Opts,
|
opts: Opts,
|
||||||
pool: ActualDbPool,
|
pool: ActualDbPool,
|
||||||
config: FederationConfig<impl Clone + Send + Sync + 'static>,
|
config: FederationConfig<impl Clone + Send + Sync + 'static>,
|
||||||
) -> CancellableTask<(), impl Future<Output = anyhow::Result<()>>> {
|
) -> CancellableTask<()> {
|
||||||
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| {
|
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| {
|
||||||
start_stop_federation_workers(opts, pool, config, c)
|
start_stop_federation_workers(opts, pool, config, c)
|
||||||
})
|
})
|
||||||
|
|
|
@ -23,28 +23,23 @@ use serde_json::Value;
|
||||||
use std::{
|
use std::{
|
||||||
borrow::{Borrow, Cow},
|
borrow::{Borrow, Cow},
|
||||||
future::Future,
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::{task::JoinHandle, time::sleep};
|
use tokio::{task::JoinHandle, time::sleep};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
pub struct CancellableTask<R: Send + 'static, F>
|
pub struct CancellableTask<R: Send + 'static> {
|
||||||
where
|
f: Pin<Box<dyn Future<Output = Result<R, anyhow::Error>> + Send + 'static>>,
|
||||||
F: Future<Output = Result<R>>,
|
|
||||||
{
|
|
||||||
f: F,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Send + 'static, F> CancellableTask<R, F>
|
impl<R: Send + 'static> CancellableTask<R> {
|
||||||
where
|
|
||||||
F: Future<Output = Result<R>>,
|
|
||||||
{
|
|
||||||
/// spawn a task but with graceful shutdown
|
/// spawn a task but with graceful shutdown
|
||||||
pub fn spawn(
|
pub fn spawn<F>(
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
task: impl FnOnce(CancellationToken) -> F,
|
task: impl FnOnce(CancellationToken) -> F,
|
||||||
) -> CancellableTask<R, impl Future<Output = Result<R>>>
|
) -> CancellableTask<R>
|
||||||
where
|
where
|
||||||
F: Future<Output = Result<R>> + Send + 'static,
|
F: Future<Output = Result<R>> + Send + 'static,
|
||||||
{
|
{
|
||||||
|
@ -62,7 +57,7 @@ where
|
||||||
});
|
});
|
||||||
let abort = task.abort_handle();
|
let abort = task.abort_handle();
|
||||||
CancellableTask {
|
CancellableTask {
|
||||||
f: async move {
|
f: Box::pin(async move {
|
||||||
stop.cancel();
|
stop.cancel();
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
r = task => {
|
r = task => {
|
||||||
|
@ -74,7 +69,7 @@ where
|
||||||
Err(anyhow!("task aborted due to timeout"))
|
Err(anyhow!("task aborted due to timeout"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue