Improve api response times by doing send_activity asynchronously (#3493)

* do send_activity after http response

* move to util function

* format

* fix prometheus

* make synchronous federation configurable

* cargo fmt

* empty

* empty

---------

Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
This commit is contained in:
phiresky 2023-07-10 12:27:49 +02:00 committed by GitHub
parent 0c82f4e660
commit b35757b429
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 85 additions and 28 deletions

View file

@ -2,7 +2,7 @@
set -e set -e
export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
pushd .. pushd ..
cargo build cargo build
rm target/lemmy_server || true rm target/lemmy_server || true

View file

@ -17,7 +17,7 @@ mod site;
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub trait Perform { pub trait Perform {
type Response: serde::ser::Serialize + Send; type Response: serde::ser::Serialize + Send + Clone + Sync;
async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>; async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
} }

View file

@ -43,7 +43,7 @@ pub struct DeleteCustomEmoji {
pub auth: Sensitive<String>, pub auth: Sensitive<String>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]
/// The response for deleting a custom emoji. /// The response for deleting a custom emoji.

View file

@ -395,7 +395,7 @@ pub struct PurgeComment {
pub auth: Sensitive<String>, pub auth: Sensitive<String>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "full", derive(TS))] #[cfg_attr(feature = "full", derive(TS))]
#[cfg_attr(feature = "full", ts(export))] #[cfg_attr(feature = "full", ts(export))]
/// The response for purged items. /// The response for purged items.

View file

@ -12,7 +12,7 @@ mod user;
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub trait PerformCrud { pub trait PerformCrud {
type Response: serde::ser::Serialize + Send; type Response: serde::ser::Serialize + Send + Clone + Sync;
async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>; async fn perform(&self, context: &Data<LemmyContext>) -> Result<Self::Response, LemmyError>;
} }

View file

@ -29,12 +29,14 @@ use lemmy_db_schema::{
use lemmy_db_views_actor::structs::CommunityView; use lemmy_db_views_actor::structs::CommunityView;
use lemmy_utils::{ use lemmy_utils::{
error::LemmyError, error::LemmyError,
spawn_try_task,
utils::{ utils::{
slurs::{check_slurs, check_slurs_opt}, slurs::{check_slurs, check_slurs_opt},
validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
}, },
SYNCHRONOUS_FEDERATION,
}; };
use tracing::{warn, Instrument}; use tracing::Instrument;
use url::Url; use url::Url;
use webmention::{Webmention, WebmentionError}; use webmention::{Webmention, WebmentionError};
@ -143,20 +145,30 @@ impl PerformCrud for CreatePost {
// Mark the post as read // Mark the post as read
mark_post_as_read(person_id, post_id, context.pool()).await?; mark_post_as_read(person_id, post_id, context.pool()).await?;
if let Some(url) = &updated_post.url { if let Some(url) = updated_post.url.clone() {
let mut webmention = let task = async move {
Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?; let mut webmention =
webmention.set_checked(true); Webmention::new::<Url>(updated_post.ap_id.clone().into(), url.clone().into())?;
match webmention webmention.set_checked(true);
.send() match webmention
.instrument(tracing::info_span!("Sending webmention")) .send()
.await .instrument(tracing::info_span!("Sending webmention"))
{ .await
Ok(_) => {} {
Err(WebmentionError::NoEndpointDiscovered(_)) => {} Err(WebmentionError::NoEndpointDiscovered(_)) => Ok(()),
Err(e) => warn!("Failed to send webmention: {}", e), Ok(_) => Ok(()),
Err(e) => Err(LemmyError::from_error_message(
e,
"Couldn't send webmention",
)),
}
};
if *SYNCHRONOUS_FEDERATION {
task.await?;
} else {
spawn_try_task(task);
} }
} };
build_post_response(context, community_id, person_id, post_id).await build_post_response(context, community_id, person_id, post_id).await
} }

View file

@ -201,7 +201,7 @@ where
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait SendActivity: Sync { pub trait SendActivity: Sync {
type Response: Sync + Send; type Response: Sync + Send + Clone;
async fn send_activity( async fn send_activity(
_request: &Self, _request: &Self,

View file

@ -14,7 +14,11 @@ pub mod request;
pub mod utils; pub mod utils;
pub mod version; pub mod version;
use error::LemmyError;
use futures::Future;
use once_cell::sync::Lazy;
use std::time::Duration; use std::time::Duration;
use tracing::Instrument;
pub type ConnectionId = usize; pub type ConnectionId = usize;
@ -31,3 +35,27 @@ macro_rules! location_info {
) )
}; };
} }
/// if true, all federation should happen synchronously. useful for debugging and testing.
/// defaults to true on debug mode, false on releasemode
/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(cfg!(debug_assertions))
});
/// tokio::spawn, but accepts a future that may fail and also
/// * logs errors
/// * attaches the spawned task to the tracing span of the caller for better logging
pub fn spawn_try_task(task: impl Future<Output = Result<(), LemmyError>> + Send + 'static) {
tokio::spawn(
async {
if let Err(e) = task.await {
tracing::warn!("error in spawn: {e}");
}
}
.in_current_span(), // this makes sure the inner tracing gets the same context as where spawn was called
);
}

View file

@ -105,7 +105,7 @@ use lemmy_apub::{
}, },
SendActivity, SendActivity,
}; };
use lemmy_utils::rate_limit::RateLimitCell; use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION};
use serde::Deserialize; use serde::Deserialize;
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
@ -382,8 +382,14 @@ where
+ 'static, + 'static,
{ {
let res = data.perform(&context).await?; let res = data.perform(&context).await?;
SendActivity::send_activity(&data, &res, &apub_data).await?; let res_clone = res.clone();
Ok(HttpResponse::Ok().json(res)) let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
if *SYNCHRONOUS_FEDERATION {
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(HttpResponse::Ok().json(&res))
} }
async fn route_get<'a, Data>( async fn route_get<'a, Data>(
@ -432,8 +438,14 @@ where
+ 'static, + 'static,
{ {
let res = data.perform(&context).await?; let res = data.perform(&context).await?;
SendActivity::send_activity(&data, &res, &apub_data).await?; let res_clone = res.clone();
Ok(HttpResponse::Ok().json(res)) let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
if *SYNCHRONOUS_FEDERATION {
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(HttpResponse::Ok().json(&res))
} }
async fn route_get_crud<'a, Data>( async fn route_get_crud<'a, Data>(

View file

@ -26,7 +26,12 @@ use lemmy_db_schema::{
utils::{build_db_pool, get_database_url, run_migrations}, utils::{build_db_pool, get_database_url, run_migrations},
}; };
use lemmy_routes::{feeds, images, nodeinfo, webfinger}; use lemmy_routes::{feeds, images, nodeinfo, webfinger};
use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, settings::SETTINGS}; use lemmy_utils::{
error::LemmyError,
rate_limit::RateLimitCell,
settings::SETTINGS,
SYNCHRONOUS_FEDERATION,
};
use reqwest::Client; use reqwest::Client;
use reqwest_middleware::ClientBuilder; use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware; use reqwest_tracing::TracingMiddleware;
@ -139,7 +144,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.worker_count(settings.worker_count) .worker_count(settings.worker_count)
.retry_count(settings.retry_count) .retry_count(settings.retry_count)
.debug(cfg!(debug_assertions)) .debug(*SYNCHRONOUS_FEDERATION)
.http_signature_compat(true) .http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(context.pool().clone()))) .url_verifier(Box::new(VerifyUrlData(context.pool().clone())))
.build() .build()

View file

@ -103,7 +103,7 @@ fn create_db_pool_metrics() -> DbPoolMetrics {
.register(Box::new(metrics.available.clone())) .register(Box::new(metrics.available.clone()))
.unwrap(); .unwrap();
return metrics; metrics
} }
async fn collect_db_pool_metrics(context: &PromContext) { async fn collect_db_pool_metrics(context: &PromContext) {