Check for dead federated instances (fixes #2221) (#3427)

* Check for dead federated instances (fixes #2221)

* move to apub crate, use timestamp

* make it compile

* clippy

* use moka to cache blocklists, dead instances, restore orig scheduled tasks

* remove leftover last_alive var

* error handling

* wip

* fix alive check for instances without nodeinfo, add coalesce

* clippy

* move federation blocklist cache to #3486

* unused deps
This commit is contained in:
Nutomic 2023-07-13 16:12:01 +02:00 committed by GitHub
parent c0b7865896
commit 7d8cb93b53
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 59 deletions

View file

@ -13,11 +13,16 @@ use activitypub_federation::{
}; };
use anyhow::anyhow; use anyhow::anyhow;
use lemmy_api_common::context::LemmyContext; use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{newtypes::CommunityId, source::community::Community}; use lemmy_db_schema::{
newtypes::CommunityId,
source::{community::Community, instance::Instance},
};
use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType}; use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType};
use moka::future::Cache;
use once_cell::sync::Lazy;
use serde::Serialize; use serde::Serialize;
use std::ops::Deref; use std::{ops::Deref, sync::Arc, time::Duration};
use tracing::info; use tracing::info;
use url::{ParseError, Url}; use url::{ParseError, Url};
use uuid::Uuid; use uuid::Uuid;
@ -30,6 +35,10 @@ pub mod following;
pub mod unfederated; pub mod unfederated;
pub mod voting; pub mod voting;
/// Amount of time that the list of dead instances is cached. This is only updated once a day,
/// so there is no harm in caching it for a longer time.
pub static DEAD_INSTANCE_LIST_CACHE_DURATION: Duration = Duration::from_secs(30 * 60);
/// Checks that the specified Url actually identifies a Person (by fetching it), and that the person /// Checks that the specified Url actually identifies a Person (by fetching it), and that the person
/// doesn't have a site ban. /// doesn't have a site ban.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
@ -148,7 +157,7 @@ async fn send_lemmy_activity<Activity, ActorT>(
data: &Data<LemmyContext>, data: &Data<LemmyContext>,
activity: Activity, activity: Activity,
actor: &ActorT, actor: &ActorT,
inbox: Vec<Url>, mut inbox: Vec<Url>,
sensitive: bool, sensitive: bool,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
@ -156,6 +165,22 @@ where
ActorT: Actor, ActorT: Actor,
Activity: ActivityHandler<Error = LemmyError>, Activity: ActivityHandler<Error = LemmyError>,
{ {
static CACHE: Lazy<Cache<(), Arc<Vec<String>>>> = Lazy::new(|| {
Cache::builder()
.max_capacity(1)
.time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION)
.build()
});
let dead_instances = CACHE
.try_get_with((), async {
Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(&mut data.pool()).await?))
})
.await?;
inbox.retain(|i| {
let domain = i.domain().expect("has domain").to_string();
!dead_instances.contains(&domain)
});
info!("Sending activity {}", activity.id().to_string()); info!("Sending activity {}", activity.id().to_string());
let activity = WithContext::new(activity, CONTEXT.deref().clone()); let activity = WithContext::new(activity, CONTEXT.deref().clone());

View file

@ -1,10 +1,17 @@
use crate::{ use crate::{
diesel::dsl::IntervalDsl,
newtypes::InstanceId, newtypes::InstanceId,
schema::{federation_allowlist, federation_blocklist, instance}, schema::{federation_allowlist, federation_blocklist, instance},
source::instance::{Instance, InstanceForm}, source::instance::{Instance, InstanceForm},
utils::{get_conn, naive_now, DbPool}, utils::{get_conn, naive_now, DbPool},
}; };
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; use diesel::{
dsl::{insert_into, now},
result::Error,
sql_types::{Nullable, Timestamp},
ExpressionMethods,
QueryDsl,
};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
impl Instance { impl Instance {
@ -46,6 +53,24 @@ impl Instance {
.execute(conn) .execute(conn)
.await .await
} }
pub async fn read_all(pool: &mut DbPool<'_>) -> Result<Vec<Instance>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::all_columns)
.get_results(conn)
.await
}
pub async fn dead_instances(pool: &mut DbPool<'_>) -> Result<Vec<String>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::domain)
.filter(coalesce(instance::updated, instance::published).lt(now - 3.days()))
.get_results(conn)
.await
}
#[cfg(test)] #[cfg(test)]
pub async fn delete_all(pool: &mut DbPool<'_>) -> Result<usize, Error> { pub async fn delete_all(pool: &mut DbPool<'_>) -> Result<usize, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
@ -79,3 +104,5 @@ impl Instance {
.await .await
} }
} }
sql_function! { fn coalesce(x: Nullable<Timestamp>, y: Timestamp) -> Timestamp; }

View file

@ -81,7 +81,6 @@ impl Site {
) )
} }
// TODO this needs fixed
pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> { pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result<Vec<Self>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
site.order_by(id).offset(1).get_results::<Self>(conn).await site.order_by(id).offset(1).get_results::<Self>(conn).await

View file

@ -1,6 +1,7 @@
use crate::newtypes::{DbUrl, InstanceId, SiteId}; use crate::newtypes::{DbUrl, InstanceId, SiteId};
#[cfg(feature = "full")] #[cfg(feature = "full")]
use crate::schema::site; use crate::schema::site;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none; use serde_with::skip_serializing_none;
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -18,8 +19,8 @@ pub struct Site {
pub name: String, pub name: String,
/// A sidebar for the site in markdown. /// A sidebar for the site in markdown.
pub sidebar: Option<String>, pub sidebar: Option<String>,
pub published: chrono::NaiveDateTime, pub published: NaiveDateTime,
pub updated: Option<chrono::NaiveDateTime>, pub updated: Option<NaiveDateTime>,
/// An icon URL. /// An icon URL.
pub icon: Option<DbUrl>, pub icon: Option<DbUrl>,
/// A banner url. /// A banner url.
@ -29,7 +30,7 @@ pub struct Site {
/// The federated actor_id. /// The federated actor_id.
pub actor_id: DbUrl, pub actor_id: DbUrl,
/// The time the site was last refreshed. /// The time the site was last refreshed.
pub last_refreshed_at: chrono::NaiveDateTime, pub last_refreshed_at: NaiveDateTime,
/// The site inbox /// The site inbox
pub inbox_url: DbUrl, pub inbox_url: DbUrl,
pub private_key: Option<String>, pub private_key: Option<String>,
@ -45,12 +46,12 @@ pub struct SiteInsertForm {
#[builder(!default)] #[builder(!default)]
pub name: String, pub name: String,
pub sidebar: Option<String>, pub sidebar: Option<String>,
pub updated: Option<chrono::NaiveDateTime>, pub updated: Option<NaiveDateTime>,
pub icon: Option<DbUrl>, pub icon: Option<DbUrl>,
pub banner: Option<DbUrl>, pub banner: Option<DbUrl>,
pub description: Option<String>, pub description: Option<String>,
pub actor_id: Option<DbUrl>, pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>, pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>, pub inbox_url: Option<DbUrl>,
pub private_key: Option<String>, pub private_key: Option<String>,
pub public_key: Option<String>, pub public_key: Option<String>,
@ -65,13 +66,13 @@ pub struct SiteInsertForm {
pub struct SiteUpdateForm { pub struct SiteUpdateForm {
pub name: Option<String>, pub name: Option<String>,
pub sidebar: Option<Option<String>>, pub sidebar: Option<Option<String>>,
pub updated: Option<Option<chrono::NaiveDateTime>>, pub updated: Option<Option<NaiveDateTime>>,
// when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column. // when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column.
pub icon: Option<Option<DbUrl>>, pub icon: Option<Option<DbUrl>>,
pub banner: Option<Option<DbUrl>>, pub banner: Option<Option<DbUrl>>,
pub description: Option<Option<String>>, pub description: Option<Option<String>>,
pub actor_id: Option<DbUrl>, pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>, pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>, pub inbox_url: Option<DbUrl>,
pub private_key: Option<Option<String>>, pub private_key: Option<Option<String>>,
pub public_key: Option<String>, pub public_key: Option<String>,

View file

@ -18,10 +18,13 @@ use lemmy_db_schema::{
utils::{naive_now, DELETED_REPLACEMENT_TEXT}, utils::{naive_now, DELETED_REPLACEMENT_TEXT},
}; };
use lemmy_routes::nodeinfo::NodeInfo; use lemmy_routes::nodeinfo::NodeInfo;
use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT}; use lemmy_utils::{
error::{LemmyError, LemmyResult},
REQWEST_TIMEOUT,
};
use reqwest::blocking::Client; use reqwest::blocking::Client;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use tracing::{error, info}; use tracing::{error, info, warn};
/// Schedules various cleanup tasks for lemmy in a background thread /// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup( pub fn setup(
@ -79,7 +82,9 @@ pub fn setup(
// Update the Instance Software // Update the Instance Software
scheduler.every(CTimeUnits::days(1)).run(move || { scheduler.every(CTimeUnits::days(1)).run(move || {
let mut conn = PgConnection::establish(&db_url).expect("could not establish connection"); let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
update_instance_software(&mut conn, &user_agent); update_instance_software(&mut conn, &user_agent)
.map_err(|e| warn!("Failed to update instance software: {e}"))
.ok();
}); });
// Manually run the scheduler in an event loop // Manually run the scheduler in an event loop
@ -323,62 +328,65 @@ fn update_banned_when_expired(conn: &mut PgConnection) {
} }
/// Updates the instance software and version /// Updates the instance software and version
fn update_instance_software(conn: &mut PgConnection, user_agent: &str) { ///
/// TODO: this should be async
/// TODO: if instance has been dead for a long time, it should be checked less frequently
fn update_instance_software(conn: &mut PgConnection, user_agent: &str) -> LemmyResult<()> {
info!("Updating instances software and versions..."); info!("Updating instances software and versions...");
let client = match Client::builder() let client = Client::builder()
.user_agent(user_agent) .user_agent(user_agent)
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.build() .build()?;
{
Ok(client) => client,
Err(e) => {
error!("Failed to build reqwest client: {}", e);
return;
}
};
let instances = match instance::table.get_results::<Instance>(conn) { let instances = instance::table.get_results::<Instance>(conn)?;
Ok(instances) => instances,
Err(e) => {
error!("Failed to get instances: {}", e);
return;
}
};
for instance in instances { for instance in instances {
let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain); let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
// Skip it if it can't connect // The `updated` column is used to check if instances are alive. If it is more than three days
let res = client // in the past, no outgoing activities will be sent to that instance. However not every
.get(&node_info_url) // Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
.send() // why we always need to mark instances as updated if they are alive.
.ok() let default_form = InstanceForm::builder()
.and_then(|t| t.json::<NodeInfo>().ok()); .domain(instance.domain.clone())
if let Some(node_info) = res {
let software = node_info.software.as_ref();
let form = InstanceForm::builder()
.domain(instance.domain)
.software(software.and_then(|s| s.name.clone()))
.version(software.and_then(|s| s.version.clone()))
.updated(Some(naive_now())) .updated(Some(naive_now()))
.build(); .build();
let form = match client.get(&node_info_url).send() {
match diesel::update(instance::table.find(instance.id)) Ok(res) if res.status().is_client_error() => {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some(default_form)
}
Ok(res) => match res.json::<NodeInfo>() {
Ok(node_info) => {
// Instance sent valid nodeinfo, write it to db
Some(
InstanceForm::builder()
.domain(instance.domain)
.updated(Some(naive_now()))
.software(node_info.software.and_then(|s| s.name))
.version(node_info.version.clone())
.build(),
)
}
Err(_) => {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some(default_form)
}
},
Err(_) => {
// dead instance, do nothing
None
}
};
if let Some(form) = form {
diesel::update(instance::table.find(instance.id))
.set(form) .set(form)
.execute(conn) .execute(conn)?;
{
Ok(_) => {
info!("Done.");
}
Err(e) => {
error!("Failed to update site instance software: {}", e);
return;
}
}
} }
} }
info!("Finished updating instances software and versions...");
Ok(())
} }
#[cfg(test)] #[cfg(test)]