diff --git a/docker/federation/start-local-instances.bash b/docker/federation/start-local-instances.bash index ec2712f14..e963792ad 100755 --- a/docker/federation/start-local-instances.bash +++ b/docker/federation/start-local-instances.bash @@ -8,14 +8,4 @@ for Item in alpha beta gamma delta epsilon ; do sudo chown -R 991:991 volumes/pictrs_$Item done -sudo docker-compose up -d - -echo "Waiting for Lemmy to start..." -while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8541/api/v1/site')" != "200" ]]; do sleep 1; done -while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8551/api/v1/site')" != "200" ]]; do sleep 1; done -while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8561/api/v1/site')" != "200" ]]; do sleep 1; done -while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8571/api/v1/site')" != "200" ]]; do sleep 1; done -while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8581/api/v1/site')" != "200" ]]; do sleep 1; done -echo "All instances started." - -sudo docker-compose logs -f +sudo docker-compose up diff --git a/lemmy_api/src/site.rs b/lemmy_api/src/site.rs index 2f5f4a3a9..ff032562f 100644 --- a/lemmy_api/src/site.rs +++ b/lemmy_api/src/site.rs @@ -8,7 +8,7 @@ use crate::{ }; use actix_web::web::Data; use anyhow::Context; -use lemmy_apub::fetcher::search_by_apub_id; +use lemmy_apub::fetcher::search::search_by_apub_id; use lemmy_db_queries::{ diesel_option_overwrite, source::{category::Category_, site::Site_}, diff --git a/lemmy_apub/src/activities/receive/mod.rs b/lemmy_apub/src/activities/receive/mod.rs index f52bbea1a..a03e1ef52 100644 --- a/lemmy_apub/src/activities/receive/mod.rs +++ b/lemmy_apub/src/activities/receive/mod.rs @@ -1,4 +1,4 @@ -use crate::fetcher::get_or_fetch_and_upsert_user; +use crate::fetcher::user::get_or_fetch_and_upsert_user; use activitystreams::{ activity::{ActorAndObjectRef, ActorAndObjectRefExt}, base::{AsBase, BaseExt}, diff --git a/lemmy_apub/src/activities/receive/private_message.rs b/lemmy_apub/src/activities/receive/private_message.rs index bd21f4c7f..160b20ece 100644 --- a/lemmy_apub/src/activities/receive/private_message.rs +++ b/lemmy_apub/src/activities/receive/private_message.rs @@ -1,7 +1,7 @@ use crate::{ activities::receive::verify_activity_domains_valid, check_is_apub_id_valid, - fetcher::get_or_fetch_and_upsert_user, + fetcher::user::get_or_fetch_and_upsert_user, inbox::get_activity_to_and_cc, objects::FromApub, NoteExt, diff --git a/lemmy_apub/src/activities/send/comment.rs b/lemmy_apub/src/activities/send/comment.rs index 3f7a59de3..36917ecb2 100644 --- a/lemmy_apub/src/activities/send/comment.rs +++ b/lemmy_apub/src/activities/send/comment.rs @@ -2,7 +2,7 @@ use crate::{ activities::send::generate_activity_id, activity_queue::{send_comment_mentions, send_to_community}, extensions::context::lemmy_context, - fetcher::get_or_fetch_and_upsert_user, + fetcher::user::get_or_fetch_and_upsert_user, objects::ToApub, ActorType, ApubLikeableType, diff --git a/lemmy_apub/src/activities/send/community.rs b/lemmy_apub/src/activities/send/community.rs index e148b4e94..c2f22fa2d 100644 --- a/lemmy_apub/src/activities/send/community.rs +++ b/lemmy_apub/src/activities/send/community.rs @@ -3,7 +3,7 @@ use crate::{ activity_queue::{send_activity_single_dest, send_to_community_followers}, check_is_apub_id_valid, extensions::context::lemmy_context, - fetcher::get_or_fetch_and_upsert_user, + fetcher::user::get_or_fetch_and_upsert_user, ActorType, }; use activitystreams::{ diff --git a/lemmy_apub/src/fetcher.rs b/lemmy_apub/src/fetcher.rs deleted file mode 100644 index 4e1fa98a6..000000000 --- a/lemmy_apub/src/fetcher.rs +++ /dev/null @@ -1,475 +0,0 @@ -use crate::{ - check_is_apub_id_valid, - objects::FromApub, - ActorType, - GroupExt, - NoteExt, - PageExt, - PersonExt, - APUB_JSON_CONTENT_TYPE, -}; -use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*}; -use anyhow::{anyhow, Context}; -use chrono::NaiveDateTime; -use diesel::result::Error::NotFound; -use lemmy_db_queries::{source::user::User, ApubObject, Crud, Joinable, SearchType}; -use lemmy_db_schema::{ - naive_now, - source::{ - comment::Comment, - community::{Community, CommunityModerator, CommunityModeratorForm}, - post::Post, - user::User_, - }, -}; -use lemmy_db_views::{comment_view::CommentView, post_view::PostView}; -use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe}; -use lemmy_structs::{blocking, site::SearchResponse}; -use lemmy_utils::{ - location_info, - request::{retry, RecvError}, - settings::Settings, - LemmyError, -}; -use lemmy_websocket::LemmyContext; -use log::debug; -use reqwest::Client; -use serde::Deserialize; -use std::{fmt::Debug, time::Duration}; -use url::Url; - -static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; -static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; - -/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object -/// fetch through the search). -/// -/// Tests are passing with a value of 5, so 10 should be safe for production. -static MAX_REQUEST_NUMBER: i32 = 10; - -/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, -/// timeouts etc. -async fn fetch_remote_object( - client: &Client, - url: &Url, - recursion_counter: &mut i32, -) -> Result -where - Response: for<'de> Deserialize<'de>, -{ - *recursion_counter += 1; - if *recursion_counter > MAX_REQUEST_NUMBER { - return Err(anyhow!("Maximum recursion depth reached").into()); - } - check_is_apub_id_valid(&url)?; - - let timeout = Duration::from_secs(60); - - let json = retry(|| { - client - .get(url.as_str()) - .header("Accept", APUB_JSON_CONTENT_TYPE) - .timeout(timeout) - .send() - }) - .await? - .json() - .await - .map_err(|e| { - debug!("Receive error, {}", e); - RecvError(e.to_string()) - })?; - - Ok(json) -} - -/// The types of ActivityPub objects that can be fetched directly by searching for their ID. -#[serde(untagged)] -#[derive(serde::Deserialize, Debug)] -enum SearchAcceptedObjects { - Person(Box), - Group(Box), - Page(Box), - Comment(Box), -} - -/// Attempt to parse the query as URL, and fetch an ActivityPub object from it. -/// -/// Some working examples for use with the `docker/federation/` setup: -/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541 -/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551 -/// http://lemmy_gamma:8561/post/3 -/// http://lemmy_delta:8571/comment/2 -pub async fn search_by_apub_id( - query: &str, - context: &LemmyContext, -) -> Result { - // Parse the shorthand query url - let query_url = if query.contains('@') { - debug!("Search for {}", query); - let split = query.split('@').collect::>(); - - // User type will look like ['', username, instance] - // Community will look like [!community, instance] - let (name, instance) = if split.len() == 3 { - (format!("/u/{}", split[1]), split[2]) - } else if split.len() == 2 { - if split[0].contains('!') { - let split2 = split[0].split('!').collect::>(); - (format!("/c/{}", split2[1]), split[1]) - } else { - return Err(anyhow!("Invalid search query: {}", query).into()); - } - } else { - return Err(anyhow!("Invalid search query: {}", query).into()); - }; - - let url = format!( - "{}://{}{}", - Settings::get().get_protocol_string(), - instance, - name - ); - Url::parse(&url)? - } else { - Url::parse(&query)? - }; - - let mut response = SearchResponse { - type_: SearchType::All.to_string(), - comments: vec![], - posts: vec![], - communities: vec![], - users: vec![], - }; - - let domain = query_url.domain().context("url has no domain")?; - let recursion_counter = &mut 0; - let response = match fetch_remote_object::( - context.client(), - &query_url, - recursion_counter, - ) - .await? - { - SearchAcceptedObjects::Person(p) => { - let user_uri = p.inner.id(domain)?.context("person has no id")?; - - let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?; - - response.users = vec![ - blocking(context.pool(), move |conn| { - UserViewSafe::read(conn, user.id) - }) - .await??, - ]; - - response - } - SearchAcceptedObjects::Group(g) => { - let community_uri = g.inner.id(domain)?.context("group has no id")?; - - let community = - get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?; - - response.communities = vec![ - blocking(context.pool(), move |conn| { - CommunityView::read(conn, community.id, None) - }) - .await??, - ]; - - response - } - SearchAcceptedObjects::Page(p) => { - let p = Post::from_apub(&p, context, query_url, recursion_counter).await?; - - response.posts = - vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??]; - - response - } - SearchAcceptedObjects::Comment(c) => { - let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?; - - response.comments = vec![ - blocking(context.pool(), move |conn| { - CommentView::read(conn, c.id, None) - }) - .await??, - ]; - - response - } - }; - - Ok(response) -} - -/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around -/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`. -/// -/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. -/// Otherwise it is fetched from the remote instance, stored and returned. -pub(crate) async fn get_or_fetch_and_upsert_actor( - apub_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result, LemmyError> { - let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; - let actor: Box = match community { - Ok(c) => Box::new(c), - Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?), - }; - Ok(actor) -} - -/// Get a user from its apub ID. -/// -/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. -/// Otherwise it is fetched from the remote instance, stored and returned. -pub(crate) async fn get_or_fetch_and_upsert_user( - apub_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let apub_id_owned = apub_id.to_owned(); - let user = blocking(context.pool(), move |conn| { - User_::read_from_apub_id(conn, apub_id_owned.as_ref()) - }) - .await?; - - match user { - // If its older than a day, re-fetch it - Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => { - debug!("Fetching and updating from remote user: {}", apub_id); - let person = - fetch_remote_object::(context.client(), apub_id, recursion_counter).await; - // If fetching failed, return the existing data. - if person.is_err() { - return Ok(u); - } - - let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?; - - let user_id = user.id; - blocking(context.pool(), move |conn| { - User_::mark_as_updated(conn, user_id) - }) - .await??; - - Ok(user) - } - Ok(u) => Ok(u), - Err(NotFound {}) => { - debug!("Fetching and creating remote user: {}", apub_id); - let person = - fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; - - let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?; - - Ok(user) - } - Err(e) => Err(e.into()), - } -} - -/// Determines when a remote actor should be refetched from its instance. In release builds, this is -/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds -/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`. -/// -/// TODO it won't pick up new avatars, summaries etc until a day after. -/// Actors need an "update" activity pushed to other servers to fix this. -fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { - let update_interval = if cfg!(debug_assertions) { - // avoid infinite loop when fetching community outbox - chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) - } else { - chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS) - }; - last_refreshed.lt(&(naive_now() - update_interval)) -} - -/// Get a community from its apub ID. -/// -/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. -/// Otherwise it is fetched from the remote instance, stored and returned. -pub(crate) async fn get_or_fetch_and_upsert_community( - apub_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let apub_id_owned = apub_id.to_owned(); - let community = blocking(context.pool(), move |conn| { - Community::read_from_apub_id(conn, apub_id_owned.as_str()) - }) - .await?; - - match community { - Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { - debug!("Fetching and updating from remote community: {}", apub_id); - fetch_remote_community(apub_id, context, Some(c), recursion_counter).await - } - Ok(c) => Ok(c), - Err(NotFound {}) => { - debug!("Fetching and creating remote community: {}", apub_id); - fetch_remote_community(apub_id, context, None, recursion_counter).await - } - Err(e) => Err(e.into()), - } -} - -/// Request a community by apub ID from a remote instance, including moderators. If `old_community`, -/// is set, this is an update for a community which is already known locally. If not, we don't know -/// the community yet and also pull the outbox, to get some initial posts. -async fn fetch_remote_community( - apub_id: &Url, - context: &LemmyContext, - old_community: Option, - recursion_counter: &mut i32, -) -> Result { - let group = fetch_remote_object::(context.client(), apub_id, recursion_counter).await; - // If fetching failed, return the existing data. - if let Some(ref c) = old_community { - if group.is_err() { - return Ok(c.to_owned()); - } - } - - let group = group?; - let community = - Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?; - - // Also add the community moderators too - let attributed_to = group.inner.attributed_to().context(location_info!())?; - let creator_and_moderator_uris: Vec<&Url> = attributed_to - .as_many() - .context(location_info!())? - .iter() - .map(|a| a.as_xsd_any_uri().context("")) - .collect::, anyhow::Error>>()?; - - let mut creator_and_moderators = Vec::new(); - - for uri in creator_and_moderator_uris { - let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?; - - creator_and_moderators.push(c_or_m); - } - - // TODO: need to make this work to update mods of existing communities - if old_community.is_none() { - let community_id = community.id; - blocking(context.pool(), move |conn| { - for mod_ in creator_and_moderators { - let community_moderator_form = CommunityModeratorForm { - community_id, - user_id: mod_.id, - }; - - CommunityModerator::join(conn, &community_moderator_form)?; - } - Ok(()) as Result<(), LemmyError> - }) - .await??; - } - - // fetch outbox (maybe make this conditional) - let outbox = fetch_remote_object::( - context.client(), - &community.get_outbox_url()?, - recursion_counter, - ) - .await?; - let outbox_items = outbox.items().context(location_info!())?.clone(); - let mut outbox_items = outbox_items.many().context(location_info!())?; - if outbox_items.len() > 20 { - outbox_items = outbox_items[0..20].to_vec(); - } - for o in outbox_items { - let page = PageExt::from_any_base(o)?.context(location_info!())?; - let page_id = page.id_unchecked().context(location_info!())?; - - // The post creator may be from a blocked instance, if it errors, then skip it - if check_is_apub_id_valid(page_id).is_err() { - continue; - } - Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?; - // TODO: we need to send a websocket update here - } - - Ok(community) -} - -/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is -/// pulled from its apub ID, inserted and returned. -/// -/// The parent community is also pulled if necessary. Comments are not pulled. -pub(crate) async fn get_or_fetch_and_insert_post( - post_ap_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let post_ap_id_owned = post_ap_id.to_owned(); - let post = blocking(context.pool(), move |conn| { - Post::read_from_apub_id(conn, post_ap_id_owned.as_str()) - }) - .await?; - - match post { - Ok(p) => Ok(p), - Err(NotFound {}) => { - debug!("Fetching and creating remote post: {}", post_ap_id); - let page = - fetch_remote_object::(context.client(), post_ap_id, recursion_counter).await?; - let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?; - - Ok(post) - } - Err(e) => Err(e.into()), - } -} - -/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is -/// pulled from its apub ID, inserted and returned. -/// -/// The parent community, post and comment are also pulled if necessary. -pub(crate) async fn get_or_fetch_and_insert_comment( - comment_ap_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let comment_ap_id_owned = comment_ap_id.to_owned(); - let comment = blocking(context.pool(), move |conn| { - Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str()) - }) - .await?; - - match comment { - Ok(p) => Ok(p), - Err(NotFound {}) => { - debug!( - "Fetching and creating remote comment and its parents: {}", - comment_ap_id - ); - let comment = - fetch_remote_object::(context.client(), comment_ap_id, recursion_counter).await?; - let comment = Comment::from_apub( - &comment, - context, - comment_ap_id.to_owned(), - recursion_counter, - ) - .await?; - - let post_id = comment.post_id; - let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??; - if post.locked { - return Err(anyhow!("Post is locked").into()); - } - - Ok(comment) - } - Err(e) => Err(e.into()), - } -} diff --git a/lemmy_apub/src/fetcher/community.rs b/lemmy_apub/src/fetcher/community.rs new file mode 100644 index 000000000..2a2564122 --- /dev/null +++ b/lemmy_apub/src/fetcher/community.rs @@ -0,0 +1,147 @@ +use crate::{ + check_is_apub_id_valid, + fetcher::{ + fetch::fetch_remote_object, + get_or_fetch_and_upsert_user, + is_deleted, + should_refetch_actor, + }, + objects::FromApub, + ActorType, + GroupExt, + PageExt, +}; +use activitystreams::{ + base::{BaseExt, ExtendsExt}, + collection::{CollectionExt, OrderedCollection}, + object::ObjectExt, +}; +use anyhow::Context; +use diesel::result::Error::NotFound; +use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable}; +use lemmy_db_schema::source::{ + community::{Community, CommunityModerator, CommunityModeratorForm}, + post::Post, +}; +use lemmy_structs::blocking; +use lemmy_utils::{location_info, LemmyError}; +use lemmy_websocket::LemmyContext; +use log::debug; +use url::Url; + +/// Get a community from its apub ID. +/// +/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. +/// Otherwise it is fetched from the remote instance, stored and returned. +pub(crate) async fn get_or_fetch_and_upsert_community( + apub_id: &Url, + context: &LemmyContext, + recursion_counter: &mut i32, +) -> Result { + let apub_id_owned = apub_id.to_owned(); + let community = blocking(context.pool(), move |conn| { + Community::read_from_apub_id(conn, apub_id_owned.as_str()) + }) + .await?; + + match community { + Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { + debug!("Fetching and updating from remote community: {}", apub_id); + fetch_remote_community(apub_id, context, Some(c), recursion_counter).await + } + Ok(c) => Ok(c), + Err(NotFound {}) => { + debug!("Fetching and creating remote community: {}", apub_id); + fetch_remote_community(apub_id, context, None, recursion_counter).await + } + Err(e) => Err(e.into()), + } +} + +/// Request a community by apub ID from a remote instance, including moderators. If `old_community`, +/// is set, this is an update for a community which is already known locally. If not, we don't know +/// the community yet and also pull the outbox, to get some initial posts. +async fn fetch_remote_community( + apub_id: &Url, + context: &LemmyContext, + old_community: Option, + recursion_counter: &mut i32, +) -> Result { + let group = fetch_remote_object::(context.client(), apub_id, recursion_counter).await; + + if let Some(c) = old_community.to_owned() { + if is_deleted(&group) { + blocking(context.pool(), move |conn| { + Community::update_deleted(conn, c.id, true) + }) + .await??; + } else if group.is_err() { + // If fetching failed, return the existing data. + return Ok(c); + } + } + + let group = group?; + let community = + Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?; + + // Also add the community moderators too + let attributed_to = group.inner.attributed_to().context(location_info!())?; + let creator_and_moderator_uris: Vec<&Url> = attributed_to + .as_many() + .context(location_info!())? + .iter() + .map(|a| a.as_xsd_any_uri().context("")) + .collect::, anyhow::Error>>()?; + + let mut creator_and_moderators = Vec::new(); + + for uri in creator_and_moderator_uris { + let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?; + + creator_and_moderators.push(c_or_m); + } + + // TODO: need to make this work to update mods of existing communities + if old_community.is_none() { + let community_id = community.id; + blocking(context.pool(), move |conn| { + for mod_ in creator_and_moderators { + let community_moderator_form = CommunityModeratorForm { + community_id, + user_id: mod_.id, + }; + + CommunityModerator::join(conn, &community_moderator_form)?; + } + Ok(()) as Result<(), LemmyError> + }) + .await??; + } + + // fetch outbox (maybe make this conditional) + let outbox = fetch_remote_object::( + context.client(), + &community.get_outbox_url()?, + recursion_counter, + ) + .await?; + let outbox_items = outbox.items().context(location_info!())?.clone(); + let mut outbox_items = outbox_items.many().context(location_info!())?; + if outbox_items.len() > 20 { + outbox_items = outbox_items[0..20].to_vec(); + } + for o in outbox_items { + let page = PageExt::from_any_base(o)?.context(location_info!())?; + let page_id = page.id_unchecked().context(location_info!())?; + + // The post creator may be from a blocked instance, if it errors, then skip it + if check_is_apub_id_valid(page_id).is_err() { + continue; + } + Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?; + // TODO: we need to send a websocket update here + } + + Ok(community) +} diff --git a/lemmy_apub/src/fetcher/fetch.rs b/lemmy_apub/src/fetcher/fetch.rs new file mode 100644 index 000000000..e7e29075c --- /dev/null +++ b/lemmy_apub/src/fetcher/fetch.rs @@ -0,0 +1,82 @@ +use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE}; +use anyhow::anyhow; +use lemmy_utils::{request::retry, LemmyError}; +use reqwest::{Client, StatusCode}; +use serde::Deserialize; +use std::time::Duration; +use thiserror::Error; +use url::Url; + +/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object +/// fetch through the search). +/// +/// Tests are passing with a value of 5, so 10 should be safe for production. +static MAX_REQUEST_NUMBER: i32 = 10; + +#[derive(Debug, Error)] +pub(in crate::fetcher) struct FetchError { + pub inner: anyhow::Error, + pub status_code: Option, +} + +impl From for FetchError { + fn from(t: LemmyError) -> Self { + FetchError { + inner: t.inner, + status_code: None, + } + } +} + +impl From for FetchError { + fn from(t: reqwest::Error) -> Self { + let status = t.status(); + FetchError { + inner: t.into(), + status_code: status, + } + } +} + +impl std::fmt::Display for FetchError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + std::fmt::Display::fmt(&self, f) + } +} + +/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, +/// timeouts etc. +pub(in crate::fetcher) async fn fetch_remote_object( + client: &Client, + url: &Url, + recursion_counter: &mut i32, +) -> Result +where + Response: for<'de> Deserialize<'de> + std::fmt::Debug, +{ + *recursion_counter += 1; + if *recursion_counter > MAX_REQUEST_NUMBER { + return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into()); + } + check_is_apub_id_valid(&url)?; + + let timeout = Duration::from_secs(60); + + let res = retry(|| { + client + .get(url.as_str()) + .header("Accept", APUB_JSON_CONTENT_TYPE) + .timeout(timeout) + .send() + }) + .await?; + + if res.status() == StatusCode::GONE { + return Err(FetchError { + inner: anyhow!("Remote object {} was deleted", url), + status_code: Some(res.status()), + }); + } + + Ok(res.json().await?) +} diff --git a/lemmy_apub/src/fetcher/mod.rs b/lemmy_apub/src/fetcher/mod.rs new file mode 100644 index 000000000..593b163fa --- /dev/null +++ b/lemmy_apub/src/fetcher/mod.rs @@ -0,0 +1,72 @@ +pub(crate) mod community; +mod fetch; +pub(crate) mod objects; +pub mod search; +pub(crate) mod user; + +use crate::{ + fetcher::{ + community::get_or_fetch_and_upsert_community, + fetch::FetchError, + user::get_or_fetch_and_upsert_user, + }, + ActorType, +}; +use chrono::NaiveDateTime; +use http::StatusCode; +use lemmy_db_schema::naive_now; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; +use serde::Deserialize; +use url::Url; + +static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; +static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; + +fn is_deleted(fetch_response: &Result) -> bool +where + Response: for<'de> Deserialize<'de>, +{ + if let Err(e) = fetch_response { + if let Some(status) = e.status_code { + if status == StatusCode::GONE { + return true; + } + } + } + false +} + +/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around +/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`. +/// +/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. +/// Otherwise it is fetched from the remote instance, stored and returned. +pub(crate) async fn get_or_fetch_and_upsert_actor( + apub_id: &Url, + context: &LemmyContext, + recursion_counter: &mut i32, +) -> Result, LemmyError> { + let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; + let actor: Box = match community { + Ok(c) => Box::new(c), + Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?), + }; + Ok(actor) +} + +/// Determines when a remote actor should be refetched from its instance. In release builds, this is +/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds +/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`. +/// +/// TODO it won't pick up new avatars, summaries etc until a day after. +/// Actors need an "update" activity pushed to other servers to fix this. +fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { + let update_interval = if cfg!(debug_assertions) { + // avoid infinite loop when fetching community outbox + chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) + } else { + chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS) + }; + last_refreshed.lt(&(naive_now() - update_interval)) +} diff --git a/lemmy_apub/src/fetcher/objects.rs b/lemmy_apub/src/fetcher/objects.rs new file mode 100644 index 000000000..269b27ef8 --- /dev/null +++ b/lemmy_apub/src/fetcher/objects.rs @@ -0,0 +1,83 @@ +use crate::{fetcher::fetch::fetch_remote_object, objects::FromApub, NoteExt, PageExt}; +use anyhow::anyhow; +use diesel::result::Error::NotFound; +use lemmy_db_queries::{ApubObject, Crud}; +use lemmy_db_schema::source::{comment::Comment, post::Post}; +use lemmy_structs::blocking; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; +use log::debug; +use url::Url; + +/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is +/// pulled from its apub ID, inserted and returned. +/// +/// The parent community is also pulled if necessary. Comments are not pulled. +pub(crate) async fn get_or_fetch_and_insert_post( + post_ap_id: &Url, + context: &LemmyContext, + recursion_counter: &mut i32, +) -> Result { + let post_ap_id_owned = post_ap_id.to_owned(); + let post = blocking(context.pool(), move |conn| { + Post::read_from_apub_id(conn, post_ap_id_owned.as_str()) + }) + .await?; + + match post { + Ok(p) => Ok(p), + Err(NotFound {}) => { + debug!("Fetching and creating remote post: {}", post_ap_id); + let page = + fetch_remote_object::(context.client(), post_ap_id, recursion_counter).await?; + let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?; + + Ok(post) + } + Err(e) => Err(e.into()), + } +} + +/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is +/// pulled from its apub ID, inserted and returned. +/// +/// The parent community, post and comment are also pulled if necessary. +pub(crate) async fn get_or_fetch_and_insert_comment( + comment_ap_id: &Url, + context: &LemmyContext, + recursion_counter: &mut i32, +) -> Result { + let comment_ap_id_owned = comment_ap_id.to_owned(); + let comment = blocking(context.pool(), move |conn| { + Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str()) + }) + .await?; + + match comment { + Ok(p) => Ok(p), + Err(NotFound {}) => { + debug!( + "Fetching and creating remote comment and its parents: {}", + comment_ap_id + ); + let comment = + fetch_remote_object::(context.client(), comment_ap_id, recursion_counter).await?; + let comment = Comment::from_apub( + &comment, + context, + comment_ap_id.to_owned(), + recursion_counter, + ) + .await?; + + let post_id = comment.post_id; + let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??; + if post.locked { + return Err(anyhow!("Post is locked").into()); + } + + Ok(comment) + } + Err(e) => Err(e.into()), + } +} diff --git a/lemmy_apub/src/fetcher/search.rs b/lemmy_apub/src/fetcher/search.rs new file mode 100644 index 000000000..13187b0a5 --- /dev/null +++ b/lemmy_apub/src/fetcher/search.rs @@ -0,0 +1,204 @@ +use crate::{ + fetcher::{ + fetch::fetch_remote_object, + get_or_fetch_and_upsert_community, + get_or_fetch_and_upsert_user, + is_deleted, + }, + find_object_by_id, + objects::FromApub, + GroupExt, + NoteExt, + Object, + PageExt, + PersonExt, +}; +use activitystreams::base::BaseExt; +use anyhow::{anyhow, Context}; +use lemmy_db_queries::{ + source::{ + comment::Comment_, + community::Community_, + post::Post_, + private_message::PrivateMessage_, + user::User, + }, + SearchType, +}; +use lemmy_db_schema::source::{ + comment::Comment, + community::Community, + post::Post, + private_message::PrivateMessage, + user::User_, +}; +use lemmy_db_views::{comment_view::CommentView, post_view::PostView}; +use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe}; +use lemmy_structs::{blocking, site::SearchResponse}; +use lemmy_utils::{settings::Settings, LemmyError}; +use lemmy_websocket::LemmyContext; +use log::debug; +use url::Url; + +/// The types of ActivityPub objects that can be fetched directly by searching for their ID. +#[serde(untagged)] +#[derive(serde::Deserialize, Debug)] +enum SearchAcceptedObjects { + Person(Box), + Group(Box), + Page(Box), + Comment(Box), +} + +/// Attempt to parse the query as URL, and fetch an ActivityPub object from it. +/// +/// Some working examples for use with the `docker/federation/` setup: +/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541 +/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551 +/// http://lemmy_gamma:8561/post/3 +/// http://lemmy_delta:8571/comment/2 +pub async fn search_by_apub_id( + query: &str, + context: &LemmyContext, +) -> Result { + // Parse the shorthand query url + let query_url = if query.contains('@') { + debug!("Search for {}", query); + let split = query.split('@').collect::>(); + + // User type will look like ['', username, instance] + // Community will look like [!community, instance] + let (name, instance) = if split.len() == 3 { + (format!("/u/{}", split[1]), split[2]) + } else if split.len() == 2 { + if split[0].contains('!') { + let split2 = split[0].split('!').collect::>(); + (format!("/c/{}", split2[1]), split[1]) + } else { + return Err(anyhow!("Invalid search query: {}", query).into()); + } + } else { + return Err(anyhow!("Invalid search query: {}", query).into()); + }; + + let url = format!( + "{}://{}{}", + Settings::get().get_protocol_string(), + instance, + name + ); + Url::parse(&url)? + } else { + Url::parse(&query)? + }; + + let recursion_counter = &mut 0; + let fetch_response = + fetch_remote_object::(context.client(), &query_url, recursion_counter) + .await; + if is_deleted(&fetch_response) { + delete_object_locally(&query_url, context).await?; + } + + build_response(fetch_response?, query_url, recursion_counter, context).await +} + +async fn build_response( + fetch_response: SearchAcceptedObjects, + query_url: Url, + recursion_counter: &mut i32, + context: &LemmyContext, +) -> Result { + let domain = query_url.domain().context("url has no domain")?; + let mut response = SearchResponse { + type_: SearchType::All.to_string(), + comments: vec![], + posts: vec![], + communities: vec![], + users: vec![], + }; + + match fetch_response { + SearchAcceptedObjects::Person(p) => { + let user_uri = p.inner.id(domain)?.context("person has no id")?; + + let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?; + + response.users = vec![ + blocking(context.pool(), move |conn| { + UserViewSafe::read(conn, user.id) + }) + .await??, + ]; + } + SearchAcceptedObjects::Group(g) => { + let community_uri = g.inner.id(domain)?.context("group has no id")?; + + let community = + get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?; + + response.communities = vec![ + blocking(context.pool(), move |conn| { + CommunityView::read(conn, community.id, None) + }) + .await??, + ]; + } + SearchAcceptedObjects::Page(p) => { + let p = Post::from_apub(&p, context, query_url, recursion_counter).await?; + + response.posts = + vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??]; + } + SearchAcceptedObjects::Comment(c) => { + let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?; + + response.comments = vec![ + blocking(context.pool(), move |conn| { + CommentView::read(conn, c.id, None) + }) + .await??, + ]; + } + }; + + Ok(response) +} + +async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> { + let res = find_object_by_id(context, query_url.to_owned()).await?; + match res { + Object::Comment(c) => { + blocking(context.pool(), move |conn| { + Comment::update_deleted(conn, c.id, true) + }) + .await??; + } + Object::Post(p) => { + blocking(context.pool(), move |conn| { + Post::update_deleted(conn, p.id, true) + }) + .await??; + } + Object::User(u) => { + // TODO: implement update_deleted() for user, move it to ApubObject trait + blocking(context.pool(), move |conn| { + User_::delete_account(conn, u.id) + }) + .await??; + } + Object::Community(c) => { + blocking(context.pool(), move |conn| { + Community::update_deleted(conn, c.id, true) + }) + .await??; + } + Object::PrivateMessage(pm) => { + blocking(context.pool(), move |conn| { + PrivateMessage::update_deleted(conn, pm.id, true) + }) + .await??; + } + } + Err(anyhow!("Object was deleted").into()) +} diff --git a/lemmy_apub/src/fetcher/user.rs b/lemmy_apub/src/fetcher/user.rs new file mode 100644 index 000000000..8442519f7 --- /dev/null +++ b/lemmy_apub/src/fetcher/user.rs @@ -0,0 +1,71 @@ +use crate::{ + fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor}, + objects::FromApub, + PersonExt, +}; +use anyhow::anyhow; +use diesel::result::Error::NotFound; +use lemmy_db_queries::{source::user::User, ApubObject}; +use lemmy_db_schema::source::user::User_; +use lemmy_structs::blocking; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; +use log::debug; +use url::Url; + +/// Get a user from its apub ID. +/// +/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. +/// Otherwise it is fetched from the remote instance, stored and returned. +pub(crate) async fn get_or_fetch_and_upsert_user( + apub_id: &Url, + context: &LemmyContext, + recursion_counter: &mut i32, +) -> Result { + let apub_id_owned = apub_id.to_owned(); + let user = blocking(context.pool(), move |conn| { + User_::read_from_apub_id(conn, apub_id_owned.as_ref()) + }) + .await?; + + match user { + // If its older than a day, re-fetch it + Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => { + debug!("Fetching and updating from remote user: {}", apub_id); + let person = + fetch_remote_object::(context.client(), apub_id, recursion_counter).await; + + if is_deleted(&person) { + // TODO: use User_::update_deleted() once implemented + blocking(context.pool(), move |conn| { + User_::delete_account(conn, u.id) + }) + .await??; + return Err(anyhow!("User was deleted by remote instance").into()); + } else if person.is_err() { + return Ok(u); + } + + let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?; + + let user_id = user.id; + blocking(context.pool(), move |conn| { + User_::mark_as_updated(conn, user_id) + }) + .await??; + + Ok(user) + } + Ok(u) => Ok(u), + Err(NotFound {}) => { + debug!("Fetching and creating remote user: {}", apub_id); + let person = + fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; + + let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?; + + Ok(user) + } + Err(e) => Err(e.into()), + } +} diff --git a/lemmy_apub/src/http/user.rs b/lemmy_apub/src/http/user.rs index d0ce9251f..343f69cb8 100644 --- a/lemmy_apub/src/http/user.rs +++ b/lemmy_apub/src/http/user.rs @@ -1,6 +1,6 @@ use crate::{ extensions::context::lemmy_context, - http::create_apub_response, + http::{create_apub_response, create_apub_tombstone_response}, objects::ToApub, ActorType, }; @@ -28,12 +28,19 @@ pub async fn get_apub_user_http( context: web::Data, ) -> Result, LemmyError> { let user_name = info.into_inner().user_name; + // TODO: this needs to be able to read deleted users, so that it can send tombstones let user = blocking(context.pool(), move |conn| { User_::find_by_email_or_username(conn, &user_name) }) .await??; - let u = user.to_apub(context.pool()).await?; - Ok(create_apub_response(&u)) + dbg!(&user.deleted); + if !user.deleted { + let apub = user.to_apub(context.pool()).await?; + + Ok(create_apub_response(&apub)) + } else { + Ok(create_apub_tombstone_response(&user.to_tombstone()?)) + } } pub async fn get_apub_user_outbox( diff --git a/lemmy_apub/src/inbox/receive_for_community.rs b/lemmy_apub/src/inbox/receive_for_community.rs index 934404242..e0248cb6d 100644 --- a/lemmy_apub/src/inbox/receive_for_community.rs +++ b/lemmy_apub/src/inbox/receive_for_community.rs @@ -31,8 +31,10 @@ use crate::{ receive_unhandled_activity, verify_activity_domains_valid, }, - fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post}, + fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post}, + find_post_or_comment_by_id, inbox::is_addressed_to_public, + PostOrComment, }; use activitystreams::{ activity::{Create, Delete, Dislike, Like, Remove, Undo, Update}, @@ -41,8 +43,8 @@ use activitystreams::{ }; use anyhow::Context; use diesel::result::Error::NotFound; -use lemmy_db_queries::{ApubObject, Crud}; -use lemmy_db_schema::source::{comment::Comment, post::Post, site::Site}; +use lemmy_db_queries::Crud; +use lemmy_db_schema::source::site::Site; use lemmy_structs::blocking; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; @@ -317,39 +319,6 @@ pub(in crate::inbox) async fn receive_undo_dislike_for_community( } } -enum PostOrComment { - Comment(Comment), - Post(Post), -} - -/// Tries to find a post or comment in the local database, without any network requests. -/// This is used to handle deletions and removals, because in case we dont have the object, we can -/// simply ignore the activity. -async fn find_post_or_comment_by_id( - context: &LemmyContext, - apub_id: Url, -) -> Result { - let ap_id = apub_id.to_string(); - let post = blocking(context.pool(), move |conn| { - Post::read_from_apub_id(conn, &ap_id) - }) - .await?; - if let Ok(p) = post { - return Ok(PostOrComment::Post(p)); - } - - let ap_id = apub_id.to_string(); - let comment = blocking(context.pool(), move |conn| { - Comment::read_from_apub_id(conn, &ap_id) - }) - .await?; - if let Ok(c) = comment { - return Ok(PostOrComment::Comment(c)); - } - - Err(NotFound.into()) -} - async fn fetch_post_or_comment_by_id( apub_id: &Url, context: &LemmyContext, diff --git a/lemmy_apub/src/inbox/user_inbox.rs b/lemmy_apub/src/inbox/user_inbox.rs index 49c66dc7a..353a296e3 100644 --- a/lemmy_apub/src/inbox/user_inbox.rs +++ b/lemmy_apub/src/inbox/user_inbox.rs @@ -17,7 +17,7 @@ use crate::{ verify_activity_domains_valid, }, check_is_apub_id_valid, - fetcher::get_or_fetch_and_upsert_community, + fetcher::community::get_or_fetch_and_upsert_community, inbox::{ assert_activity_not_local, get_activity_id, diff --git a/lemmy_apub/src/lib.rs b/lemmy_apub/src/lib.rs index 6f0d41c8a..bbf1bbbc7 100644 --- a/lemmy_apub/src/lib.rs +++ b/lemmy_apub/src/lib.rs @@ -22,8 +22,16 @@ use activitystreams::{ }; use activitystreams_ext::{Ext1, Ext2}; use anyhow::{anyhow, Context}; -use lemmy_db_queries::{source::activity::Activity_, DbPool}; -use lemmy_db_schema::source::{activity::Activity, user::User_}; +use diesel::NotFound; +use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool}; +use lemmy_db_schema::source::{ + activity::Activity, + comment::Comment, + community::Community, + post::Post, + private_message::PrivateMessage, + user::User_, +}; use lemmy_structs::blocking; use lemmy_utils::{location_info, settings::Settings, LemmyError}; use lemmy_websocket::LemmyContext; @@ -239,3 +247,85 @@ where .await??; Ok(()) } + +pub(crate) enum PostOrComment { + Comment(Comment), + Post(Post), +} + +/// Tries to find a post or comment in the local database, without any network requests. +/// This is used to handle deletions and removals, because in case we dont have the object, we can +/// simply ignore the activity. +pub(crate) async fn find_post_or_comment_by_id( + context: &LemmyContext, + apub_id: Url, +) -> Result { + let ap_id = apub_id.to_string(); + let post = blocking(context.pool(), move |conn| { + Post::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(p) = post { + return Ok(PostOrComment::Post(p)); + } + + let ap_id = apub_id.to_string(); + let comment = blocking(context.pool(), move |conn| { + Comment::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(c) = comment { + return Ok(PostOrComment::Comment(c)); + } + + Err(NotFound.into()) +} + +pub(crate) enum Object { + Comment(Comment), + Post(Post), + Community(Community), + User(User_), + PrivateMessage(PrivateMessage), +} + +pub(crate) async fn find_object_by_id( + context: &LemmyContext, + apub_id: Url, +) -> Result { + if let Ok(pc) = find_post_or_comment_by_id(context, apub_id.to_owned()).await { + return Ok(match pc { + PostOrComment::Post(p) => Object::Post(p), + PostOrComment::Comment(c) => Object::Comment(c), + }); + } + + let ap_id = apub_id.to_string(); + let user = blocking(context.pool(), move |conn| { + User_::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(u) = user { + return Ok(Object::User(u)); + } + + let ap_id = apub_id.to_string(); + let community = blocking(context.pool(), move |conn| { + Community::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(c) = community { + return Ok(Object::Community(c)); + } + + let ap_id = apub_id.to_string(); + let private_message = blocking(context.pool(), move |conn| { + PrivateMessage::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(pm) = private_message { + return Ok(Object::PrivateMessage(pm)); + } + + Err(NotFound.into()) +} diff --git a/lemmy_apub/src/objects/comment.rs b/lemmy_apub/src/objects/comment.rs index c02055c45..5dba4149b 100644 --- a/lemmy_apub/src/objects/comment.rs +++ b/lemmy_apub/src/objects/comment.rs @@ -1,15 +1,12 @@ use crate::{ extensions::context::lemmy_context, - fetcher::{ - get_or_fetch_and_insert_comment, - get_or_fetch_and_insert_post, - get_or_fetch_and_upsert_user, - }, + fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post}, objects::{ check_object_domain, check_object_for_community_or_site_ban, create_tombstone, get_object_from_apub, + get_or_fetch_and_upsert_user, get_source_markdown_value, set_content_and_source, FromApub, diff --git a/lemmy_apub/src/objects/community.rs b/lemmy_apub/src/objects/community.rs index 39abcd1f3..4d7a235cc 100644 --- a/lemmy_apub/src/objects/community.rs +++ b/lemmy_apub/src/objects/community.rs @@ -1,6 +1,6 @@ use crate::{ extensions::{context::lemmy_context, group_extensions::GroupExtension}, - fetcher::get_or_fetch_and_upsert_user, + fetcher::user::get_or_fetch_and_upsert_user, objects::{ check_object_domain, create_tombstone, diff --git a/lemmy_apub/src/objects/mod.rs b/lemmy_apub/src/objects/mod.rs index 9e13782c9..d9eea762e 100644 --- a/lemmy_apub/src/objects/mod.rs +++ b/lemmy_apub/src/objects/mod.rs @@ -1,6 +1,6 @@ use crate::{ check_is_apub_id_valid, - fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user}, + fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user}, inbox::community_inbox::check_community_or_site_ban, }; use activitystreams::{ diff --git a/lemmy_apub/src/objects/post.rs b/lemmy_apub/src/objects/post.rs index fa1adfc84..6d5ed8e23 100644 --- a/lemmy_apub/src/objects/post.rs +++ b/lemmy_apub/src/objects/post.rs @@ -1,6 +1,6 @@ use crate::{ extensions::{context::lemmy_context, page_extension::PageExtension}, - fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user}, + fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user}, objects::{ check_object_domain, check_object_for_community_or_site_ban, diff --git a/lemmy_apub/src/objects/private_message.rs b/lemmy_apub/src/objects/private_message.rs index db5a06109..1a7b5e327 100644 --- a/lemmy_apub/src/objects/private_message.rs +++ b/lemmy_apub/src/objects/private_message.rs @@ -1,7 +1,7 @@ use crate::{ check_is_apub_id_valid, extensions::context::lemmy_context, - fetcher::get_or_fetch_and_upsert_user, + fetcher::user::get_or_fetch_and_upsert_user, objects::{ check_object_domain, create_tombstone, diff --git a/lemmy_utils/src/request.rs b/lemmy_utils/src/request.rs index 36baa4d42..411d43427 100644 --- a/lemmy_utils/src/request.rs +++ b/lemmy_utils/src/request.rs @@ -15,7 +15,7 @@ struct SendError(pub String); #[error("Error receiving response, {0}")] pub struct RecvError(pub String); -pub async fn retry(f: F) -> Result +pub async fn retry(f: F) -> Result where F: Fn() -> Fut, Fut: Future>, @@ -23,27 +23,27 @@ where retry_custom(|| async { Ok((f)().await) }).await } -async fn retry_custom(f: F) -> Result +async fn retry_custom(f: F) -> Result where F: Fn() -> Fut, - Fut: Future, LemmyError>>, + Fut: Future, reqwest::Error>>, { - let mut response = Err(anyhow!("connect timeout").into()); + let mut response: Option> = None; for _ in 0u8..3 { match (f)().await? { Ok(t) => return Ok(t), Err(e) => { if e.is_timeout() { - response = Err(SendError(e.to_string()).into()); + response = Some(Err(e)); continue; } - return Err(SendError(e.to_string()).into()); + return Err(e); } } } - response + response.unwrap() } #[derive(Deserialize, Debug)]