Compare commits

...

2 commits

24 changed files with 788 additions and 550 deletions

View file

@ -23,7 +23,8 @@ steps:
- name: cargo clippy
image: ekidd/rust-musl-builder:1.47.0
commands:
- cargo clippy --workspace --tests --all-targets --all-features -- -D warnings
- cargo clippy --workspace --tests --all-targets --all-features -- \
-D warnings -D deprecated -D clippy::perf -D clippy::complexity -D clippy::dbg_macro
- name: check documentation build
image: ekidd/rust-musl-builder:1.47.0

View file

@ -8,14 +8,4 @@ for Item in alpha beta gamma delta epsilon ; do
sudo chown -R 991:991 volumes/pictrs_$Item
done
sudo docker-compose up -d
echo "Waiting for Lemmy to start..."
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8541/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8551/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8561/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8571/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8581/api/v1/site')" != "200" ]]; do sleep 1; done
echo "All instances started."
sudo docker-compose logs -f
sudo docker-compose up

View file

@ -8,7 +8,7 @@ use crate::{
};
use actix_web::web::Data;
use anyhow::Context;
use lemmy_apub::fetcher::search_by_apub_id;
use lemmy_apub::fetcher::search::search_by_apub_id;
use lemmy_db_queries::{
diesel_option_overwrite,
source::{category::Category_, site::Site_},

View file

@ -1,4 +1,4 @@
use crate::fetcher::get_or_fetch_and_upsert_user;
use crate::fetcher::user::get_or_fetch_and_upsert_user;
use activitystreams::{
activity::{ActorAndObjectRef, ActorAndObjectRefExt},
base::{AsBase, BaseExt},

View file

@ -1,7 +1,7 @@
use crate::{
activities::receive::verify_activity_domains_valid,
check_is_apub_id_valid,
fetcher::get_or_fetch_and_upsert_user,
fetcher::user::get_or_fetch_and_upsert_user,
inbox::get_activity_to_and_cc,
objects::FromApub,
NoteExt,

View file

@ -2,7 +2,7 @@ use crate::{
activities::send::generate_activity_id,
activity_queue::{send_comment_mentions, send_to_community},
extensions::context::lemmy_context,
fetcher::get_or_fetch_and_upsert_user,
fetcher::user::get_or_fetch_and_upsert_user,
objects::ToApub,
ActorType,
ApubLikeableType,

View file

@ -3,7 +3,7 @@ use crate::{
activity_queue::{send_activity_single_dest, send_to_community_followers},
check_is_apub_id_valid,
extensions::context::lemmy_context,
fetcher::get_or_fetch_and_upsert_user,
fetcher::user::get_or_fetch_and_upsert_user,
ActorType,
};
use activitystreams::{

View file

@ -1,475 +0,0 @@
use crate::{
check_is_apub_id_valid,
objects::FromApub,
ActorType,
GroupExt,
NoteExt,
PageExt,
PersonExt,
APUB_JSON_CONTENT_TYPE,
};
use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
use anyhow::{anyhow, Context};
use chrono::NaiveDateTime;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{source::user::User, ApubObject, Crud, Joinable, SearchType};
use lemmy_db_schema::{
naive_now,
source::{
comment::Comment,
community::{Community, CommunityModerator, CommunityModeratorForm},
post::Post,
user::User_,
},
};
use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
use lemmy_structs::{blocking, site::SearchResponse};
use lemmy_utils::{
location_info,
request::{retry, RecvError},
settings::Settings,
LemmyError,
};
use lemmy_websocket::LemmyContext;
use log::debug;
use reqwest::Client;
use serde::Deserialize;
use std::{fmt::Debug, time::Duration};
use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
/// fetch through the search).
///
/// Tests are passing with a value of 5, so 10 should be safe for production.
static MAX_REQUEST_NUMBER: i32 = 10;
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
async fn fetch_remote_object<Response>(
client: &Client,
url: &Url,
recursion_counter: &mut i32,
) -> Result<Response, LemmyError>
where
Response: for<'de> Deserialize<'de>,
{
*recursion_counter += 1;
if *recursion_counter > MAX_REQUEST_NUMBER {
return Err(anyhow!("Maximum recursion depth reached").into());
}
check_is_apub_id_valid(&url)?;
let timeout = Duration::from_secs(60);
let json = retry(|| {
client
.get(url.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(timeout)
.send()
})
.await?
.json()
.await
.map_err(|e| {
debug!("Receive error, {}", e);
RecvError(e.to_string())
})?;
Ok(json)
}
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[serde(untagged)]
#[derive(serde::Deserialize, Debug)]
enum SearchAcceptedObjects {
Person(Box<PersonExt>),
Group(Box<GroupExt>),
Page(Box<PageExt>),
Comment(Box<NoteExt>),
}
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the `docker/federation/` setup:
/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
/// http://lemmy_gamma:8561/post/3
/// http://lemmy_delta:8571/comment/2
pub async fn search_by_apub_id(
query: &str,
context: &LemmyContext,
) -> Result<SearchResponse, LemmyError> {
// Parse the shorthand query url
let query_url = if query.contains('@') {
debug!("Search for {}", query);
let split = query.split('@').collect::<Vec<&str>>();
// User type will look like ['', username, instance]
// Community will look like [!community, instance]
let (name, instance) = if split.len() == 3 {
(format!("/u/{}", split[1]), split[2])
} else if split.len() == 2 {
if split[0].contains('!') {
let split2 = split[0].split('!').collect::<Vec<&str>>();
(format!("/c/{}", split2[1]), split[1])
} else {
return Err(anyhow!("Invalid search query: {}", query).into());
}
} else {
return Err(anyhow!("Invalid search query: {}", query).into());
};
let url = format!(
"{}://{}{}",
Settings::get().get_protocol_string(),
instance,
name
);
Url::parse(&url)?
} else {
Url::parse(&query)?
};
let mut response = SearchResponse {
type_: SearchType::All.to_string(),
comments: vec![],
posts: vec![],
communities: vec![],
users: vec![],
};
let domain = query_url.domain().context("url has no domain")?;
let recursion_counter = &mut 0;
let response = match fetch_remote_object::<SearchAcceptedObjects>(
context.client(),
&query_url,
recursion_counter,
)
.await?
{
SearchAcceptedObjects::Person(p) => {
let user_uri = p.inner.id(domain)?.context("person has no id")?;
let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
response.users = vec![
blocking(context.pool(), move |conn| {
UserViewSafe::read(conn, user.id)
})
.await??,
];
response
}
SearchAcceptedObjects::Group(g) => {
let community_uri = g.inner.id(domain)?.context("group has no id")?;
let community =
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
response.communities = vec![
blocking(context.pool(), move |conn| {
CommunityView::read(conn, community.id, None)
})
.await??,
];
response
}
SearchAcceptedObjects::Page(p) => {
let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
response.posts =
vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
response
}
SearchAcceptedObjects::Comment(c) => {
let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
response.comments = vec![
blocking(context.pool(), move |conn| {
CommentView::read(conn, c.id, None)
})
.await??,
];
response
}
};
Ok(response)
}
/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_actor(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
};
Ok(actor)
}
/// Get a user from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_user(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<User_, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let user = blocking(context.pool(), move |conn| {
User_::read_from_apub_id(conn, apub_id_owned.as_ref())
})
.await?;
match user {
// If its older than a day, re-fetch it
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
debug!("Fetching and updating from remote user: {}", apub_id);
let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
// If fetching failed, return the existing data.
if person.is_err() {
return Ok(u);
}
let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
let user_id = user.id;
blocking(context.pool(), move |conn| {
User_::mark_as_updated(conn, user_id)
})
.await??;
Ok(user)
}
Ok(u) => Ok(u),
Err(NotFound {}) => {
debug!("Fetching and creating remote user: {}", apub_id);
let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
Ok(user)
}
Err(e) => Err(e.into()),
}
}
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
} else {
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
};
last_refreshed.lt(&(naive_now() - update_interval))
}
/// Get a community from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_community(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, apub_id_owned.as_str())
})
.await?;
match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
}
Ok(c) => Ok(c),
Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None, recursion_counter).await
}
Err(e) => Err(e.into()),
}
}
/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
/// is set, this is an update for a community which is already known locally. If not, we don't know
/// the community yet and also pull the outbox, to get some initial posts.
async fn fetch_remote_community(
apub_id: &Url,
context: &LemmyContext,
old_community: Option<Community>,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
// If fetching failed, return the existing data.
if let Some(ref c) = old_community {
if group.is_err() {
return Ok(c.to_owned());
}
}
let group = group?;
let community =
Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
// Also add the community moderators too
let attributed_to = group.inner.attributed_to().context(location_info!())?;
let creator_and_moderator_uris: Vec<&Url> = attributed_to
.as_many()
.context(location_info!())?
.iter()
.map(|a| a.as_xsd_any_uri().context(""))
.collect::<Result<Vec<&Url>, anyhow::Error>>()?;
let mut creator_and_moderators = Vec::new();
for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
creator_and_moderators.push(c_or_m);
}
// TODO: need to make this work to update mods of existing communities
if old_community.is_none() {
let community_id = community.id;
blocking(context.pool(), move |conn| {
for mod_ in creator_and_moderators {
let community_moderator_form = CommunityModeratorForm {
community_id,
user_id: mod_.id,
};
CommunityModerator::join(conn, &community_moderator_form)?;
}
Ok(()) as Result<(), LemmyError>
})
.await??;
}
// fetch outbox (maybe make this conditional)
let outbox = fetch_remote_object::<OrderedCollection>(
context.client(),
&community.get_outbox_url()?,
recursion_counter,
)
.await?;
let outbox_items = outbox.items().context(location_info!())?.clone();
let mut outbox_items = outbox_items.many().context(location_info!())?;
if outbox_items.len() > 20 {
outbox_items = outbox_items[0..20].to_vec();
}
for o in outbox_items {
let page = PageExt::from_any_base(o)?.context(location_info!())?;
let page_id = page.id_unchecked().context(location_info!())?;
// The post creator may be from a blocked instance, if it errors, then skip it
if check_is_apub_id_valid(page_id).is_err() {
continue;
}
Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
// TODO: we need to send a websocket update here
}
Ok(community)
}
/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community is also pulled if necessary. Comments are not pulled.
pub(crate) async fn get_or_fetch_and_insert_post(
post_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Post, LemmyError> {
let post_ap_id_owned = post_ap_id.to_owned();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
})
.await?;
match post {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!("Fetching and creating remote post: {}", post_ap_id);
let page =
fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
Ok(post)
}
Err(e) => Err(e.into()),
}
}
/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community, post and comment are also pulled if necessary.
pub(crate) async fn get_or_fetch_and_insert_comment(
comment_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Comment, LemmyError> {
let comment_ap_id_owned = comment_ap_id.to_owned();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
})
.await?;
match comment {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!(
"Fetching and creating remote comment and its parents: {}",
comment_ap_id
);
let comment =
fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
let comment = Comment::from_apub(
&comment,
context,
comment_ap_id.to_owned(),
recursion_counter,
)
.await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
Ok(comment)
}
Err(e) => Err(e.into()),
}
}

View file

@ -0,0 +1,147 @@
use crate::{
check_is_apub_id_valid,
fetcher::{
fetch::fetch_remote_object,
get_or_fetch_and_upsert_user,
is_deleted,
should_refetch_actor,
},
objects::FromApub,
ActorType,
GroupExt,
PageExt,
};
use activitystreams::{
base::{BaseExt, ExtendsExt},
collection::{CollectionExt, OrderedCollection},
object::ObjectExt,
};
use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
use lemmy_db_schema::source::{
community::{Community, CommunityModerator, CommunityModeratorForm},
post::Post,
};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Get a community from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_community(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, apub_id_owned.as_str())
})
.await?;
match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id);
fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
}
Ok(c) => Ok(c),
Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id);
fetch_remote_community(apub_id, context, None, recursion_counter).await
}
Err(e) => Err(e.into()),
}
}
/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
/// is set, this is an update for a community which is already known locally. If not, we don't know
/// the community yet and also pull the outbox, to get some initial posts.
async fn fetch_remote_community(
apub_id: &Url,
context: &LemmyContext,
old_community: Option<Community>,
recursion_counter: &mut i32,
) -> Result<Community, LemmyError> {
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
if let Some(c) = old_community.to_owned() {
if is_deleted(&group) {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
} else if group.is_err() {
// If fetching failed, return the existing data.
return Ok(c);
}
}
let group = group?;
let community =
Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
// Also add the community moderators too
let attributed_to = group.inner.attributed_to().context(location_info!())?;
let creator_and_moderator_uris: Vec<&Url> = attributed_to
.as_many()
.context(location_info!())?
.iter()
.map(|a| a.as_xsd_any_uri().context(""))
.collect::<Result<Vec<&Url>, anyhow::Error>>()?;
let mut creator_and_moderators = Vec::new();
for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
creator_and_moderators.push(c_or_m);
}
// TODO: need to make this work to update mods of existing communities
if old_community.is_none() {
let community_id = community.id;
blocking(context.pool(), move |conn| {
for mod_ in creator_and_moderators {
let community_moderator_form = CommunityModeratorForm {
community_id,
user_id: mod_.id,
};
CommunityModerator::join(conn, &community_moderator_form)?;
}
Ok(()) as Result<(), LemmyError>
})
.await??;
}
// fetch outbox (maybe make this conditional)
let outbox = fetch_remote_object::<OrderedCollection>(
context.client(),
&community.get_outbox_url()?,
recursion_counter,
)
.await?;
let outbox_items = outbox.items().context(location_info!())?.clone();
let mut outbox_items = outbox_items.many().context(location_info!())?;
if outbox_items.len() > 20 {
outbox_items = outbox_items[0..20].to_vec();
}
for o in outbox_items {
let page = PageExt::from_any_base(o)?.context(location_info!())?;
let page_id = page.id_unchecked().context(location_info!())?;
// The post creator may be from a blocked instance, if it errors, then skip it
if check_is_apub_id_valid(page_id).is_err() {
continue;
}
Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
// TODO: we need to send a websocket update here
}
Ok(community)
}

View file

@ -0,0 +1,82 @@
use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE};
use anyhow::anyhow;
use lemmy_utils::{request::retry, LemmyError};
use reqwest::{Client, StatusCode};
use serde::Deserialize;
use std::time::Duration;
use thiserror::Error;
use url::Url;
/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
/// fetch through the search).
///
/// Tests are passing with a value of 5, so 10 should be safe for production.
static MAX_REQUEST_NUMBER: i32 = 10;
#[derive(Debug, Error)]
pub(in crate::fetcher) struct FetchError {
pub inner: anyhow::Error,
pub status_code: Option<StatusCode>,
}
impl From<LemmyError> for FetchError {
fn from(t: LemmyError) -> Self {
FetchError {
inner: t.inner,
status_code: None,
}
}
}
impl From<reqwest::Error> for FetchError {
fn from(t: reqwest::Error) -> Self {
let status = t.status();
FetchError {
inner: t.into(),
status_code: status,
}
}
}
impl std::fmt::Display for FetchError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
pub(in crate::fetcher) async fn fetch_remote_object<Response>(
client: &Client,
url: &Url,
recursion_counter: &mut i32,
) -> Result<Response, FetchError>
where
Response: for<'de> Deserialize<'de> + std::fmt::Debug,
{
*recursion_counter += 1;
if *recursion_counter > MAX_REQUEST_NUMBER {
return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into());
}
check_is_apub_id_valid(&url)?;
let timeout = Duration::from_secs(60);
let res = retry(|| {
client
.get(url.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(timeout)
.send()
})
.await?;
if res.status() == StatusCode::GONE {
return Err(FetchError {
inner: anyhow!("Remote object {} was deleted", url),
status_code: Some(res.status()),
});
}
Ok(res.json().await?)
}

View file

@ -0,0 +1,72 @@
pub(crate) mod community;
mod fetch;
pub(crate) mod objects;
pub mod search;
pub(crate) mod user;
use crate::{
fetcher::{
community::get_or_fetch_and_upsert_community,
fetch::FetchError,
user::get_or_fetch_and_upsert_user,
},
ActorType,
};
use chrono::NaiveDateTime;
use http::StatusCode;
use lemmy_db_schema::naive_now;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::Deserialize;
use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
fn is_deleted<Response>(fetch_response: &Result<Response, FetchError>) -> bool
where
Response: for<'de> Deserialize<'de>,
{
if let Err(e) = fetch_response {
if let Some(status) = e.status_code {
if status == StatusCode::GONE {
return true;
}
}
}
false
}
/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_actor(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Box<dyn ActorType>, LemmyError> {
let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
let actor: Box<dyn ActorType> = match community {
Ok(c) => Box::new(c),
Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
};
Ok(actor)
}
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
} else {
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
};
last_refreshed.lt(&(naive_now() - update_interval))
}

View file

@ -0,0 +1,83 @@
use crate::{fetcher::fetch::fetch_remote_object, objects::FromApub, NoteExt, PageExt};
use anyhow::anyhow;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{ApubObject, Crud};
use lemmy_db_schema::source::{comment::Comment, post::Post};
use lemmy_structs::blocking;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community is also pulled if necessary. Comments are not pulled.
pub(crate) async fn get_or_fetch_and_insert_post(
post_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Post, LemmyError> {
let post_ap_id_owned = post_ap_id.to_owned();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
})
.await?;
match post {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!("Fetching and creating remote post: {}", post_ap_id);
let page =
fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
Ok(post)
}
Err(e) => Err(e.into()),
}
}
/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
/// pulled from its apub ID, inserted and returned.
///
/// The parent community, post and comment are also pulled if necessary.
pub(crate) async fn get_or_fetch_and_insert_comment(
comment_ap_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<Comment, LemmyError> {
let comment_ap_id_owned = comment_ap_id.to_owned();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
})
.await?;
match comment {
Ok(p) => Ok(p),
Err(NotFound {}) => {
debug!(
"Fetching and creating remote comment and its parents: {}",
comment_ap_id
);
let comment =
fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
let comment = Comment::from_apub(
&comment,
context,
comment_ap_id.to_owned(),
recursion_counter,
)
.await?;
let post_id = comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
if post.locked {
return Err(anyhow!("Post is locked").into());
}
Ok(comment)
}
Err(e) => Err(e.into()),
}
}

View file

@ -0,0 +1,204 @@
use crate::{
fetcher::{
fetch::fetch_remote_object,
get_or_fetch_and_upsert_community,
get_or_fetch_and_upsert_user,
is_deleted,
},
find_object_by_id,
objects::FromApub,
GroupExt,
NoteExt,
Object,
PageExt,
PersonExt,
};
use activitystreams::base::BaseExt;
use anyhow::{anyhow, Context};
use lemmy_db_queries::{
source::{
comment::Comment_,
community::Community_,
post::Post_,
private_message::PrivateMessage_,
user::User,
},
SearchType,
};
use lemmy_db_schema::source::{
comment::Comment,
community::Community,
post::Post,
private_message::PrivateMessage,
user::User_,
};
use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
use lemmy_structs::{blocking, site::SearchResponse};
use lemmy_utils::{settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
#[serde(untagged)]
#[derive(serde::Deserialize, Debug)]
enum SearchAcceptedObjects {
Person(Box<PersonExt>),
Group(Box<GroupExt>),
Page(Box<PageExt>),
Comment(Box<NoteExt>),
}
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the `docker/federation/` setup:
/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
/// http://lemmy_gamma:8561/post/3
/// http://lemmy_delta:8571/comment/2
pub async fn search_by_apub_id(
query: &str,
context: &LemmyContext,
) -> Result<SearchResponse, LemmyError> {
// Parse the shorthand query url
let query_url = if query.contains('@') {
debug!("Search for {}", query);
let split = query.split('@').collect::<Vec<&str>>();
// User type will look like ['', username, instance]
// Community will look like [!community, instance]
let (name, instance) = if split.len() == 3 {
(format!("/u/{}", split[1]), split[2])
} else if split.len() == 2 {
if split[0].contains('!') {
let split2 = split[0].split('!').collect::<Vec<&str>>();
(format!("/c/{}", split2[1]), split[1])
} else {
return Err(anyhow!("Invalid search query: {}", query).into());
}
} else {
return Err(anyhow!("Invalid search query: {}", query).into());
};
let url = format!(
"{}://{}{}",
Settings::get().get_protocol_string(),
instance,
name
);
Url::parse(&url)?
} else {
Url::parse(&query)?
};
let recursion_counter = &mut 0;
let fetch_response =
fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url, recursion_counter)
.await;
if is_deleted(&fetch_response) {
delete_object_locally(&query_url, context).await?;
}
build_response(fetch_response?, query_url, recursion_counter, context).await
}
async fn build_response(
fetch_response: SearchAcceptedObjects,
query_url: Url,
recursion_counter: &mut i32,
context: &LemmyContext,
) -> Result<SearchResponse, LemmyError> {
let domain = query_url.domain().context("url has no domain")?;
let mut response = SearchResponse {
type_: SearchType::All.to_string(),
comments: vec![],
posts: vec![],
communities: vec![],
users: vec![],
};
match fetch_response {
SearchAcceptedObjects::Person(p) => {
let user_uri = p.inner.id(domain)?.context("person has no id")?;
let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
response.users = vec![
blocking(context.pool(), move |conn| {
UserViewSafe::read(conn, user.id)
})
.await??,
];
}
SearchAcceptedObjects::Group(g) => {
let community_uri = g.inner.id(domain)?.context("group has no id")?;
let community =
get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
response.communities = vec![
blocking(context.pool(), move |conn| {
CommunityView::read(conn, community.id, None)
})
.await??,
];
}
SearchAcceptedObjects::Page(p) => {
let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
response.posts =
vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
}
SearchAcceptedObjects::Comment(c) => {
let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
response.comments = vec![
blocking(context.pool(), move |conn| {
CommentView::read(conn, c.id, None)
})
.await??,
];
}
};
Ok(response)
}
async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> {
let res = find_object_by_id(context, query_url.to_owned()).await?;
match res {
Object::Comment(c) => {
blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, c.id, true)
})
.await??;
}
Object::Post(p) => {
blocking(context.pool(), move |conn| {
Post::update_deleted(conn, p.id, true)
})
.await??;
}
Object::User(u) => {
// TODO: implement update_deleted() for user, move it to ApubObject trait
blocking(context.pool(), move |conn| {
User_::delete_account(conn, u.id)
})
.await??;
}
Object::Community(c) => {
blocking(context.pool(), move |conn| {
Community::update_deleted(conn, c.id, true)
})
.await??;
}
Object::PrivateMessage(pm) => {
blocking(context.pool(), move |conn| {
PrivateMessage::update_deleted(conn, pm.id, true)
})
.await??;
}
}
Err(anyhow!("Object was deleted").into())
}

View file

@ -0,0 +1,71 @@
use crate::{
fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor},
objects::FromApub,
PersonExt,
};
use anyhow::anyhow;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{source::user::User, ApubObject};
use lemmy_db_schema::source::user::User_;
use lemmy_structs::blocking;
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use log::debug;
use url::Url;
/// Get a user from its apub ID.
///
/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
/// Otherwise it is fetched from the remote instance, stored and returned.
pub(crate) async fn get_or_fetch_and_upsert_user(
apub_id: &Url,
context: &LemmyContext,
recursion_counter: &mut i32,
) -> Result<User_, LemmyError> {
let apub_id_owned = apub_id.to_owned();
let user = blocking(context.pool(), move |conn| {
User_::read_from_apub_id(conn, apub_id_owned.as_ref())
})
.await?;
match user {
// If its older than a day, re-fetch it
Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
debug!("Fetching and updating from remote user: {}", apub_id);
let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
if is_deleted(&person) {
// TODO: use User_::update_deleted() once implemented
blocking(context.pool(), move |conn| {
User_::delete_account(conn, u.id)
})
.await??;
return Err(anyhow!("User was deleted by remote instance").into());
} else if person.is_err() {
return Ok(u);
}
let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
let user_id = user.id;
blocking(context.pool(), move |conn| {
User_::mark_as_updated(conn, user_id)
})
.await??;
Ok(user)
}
Ok(u) => Ok(u),
Err(NotFound {}) => {
debug!("Fetching and creating remote user: {}", apub_id);
let person =
fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
Ok(user)
}
Err(e) => Err(e.into()),
}
}

View file

@ -1,6 +1,6 @@
use crate::{
extensions::context::lemmy_context,
http::create_apub_response,
http::{create_apub_response, create_apub_tombstone_response},
objects::ToApub,
ActorType,
};
@ -28,12 +28,19 @@ pub async fn get_apub_user_http(
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
let user_name = info.into_inner().user_name;
// TODO: this needs to be able to read deleted users, so that it can send tombstones
let user = blocking(context.pool(), move |conn| {
User_::find_by_email_or_username(conn, &user_name)
})
.await??;
let u = user.to_apub(context.pool()).await?;
Ok(create_apub_response(&u))
if !user.deleted {
let apub = user.to_apub(context.pool()).await?;
Ok(create_apub_response(&apub))
} else {
Ok(create_apub_tombstone_response(&user.to_tombstone()?))
}
}
pub async fn get_apub_user_outbox(

View file

@ -31,8 +31,10 @@ use crate::{
receive_unhandled_activity,
verify_activity_domains_valid,
},
fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
find_post_or_comment_by_id,
inbox::is_addressed_to_public,
PostOrComment,
};
use activitystreams::{
activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
@ -41,8 +43,8 @@ use activitystreams::{
};
use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{ApubObject, Crud};
use lemmy_db_schema::source::{comment::Comment, post::Post, site::Site};
use lemmy_db_queries::Crud;
use lemmy_db_schema::source::site::Site;
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
@ -317,39 +319,6 @@ pub(in crate::inbox) async fn receive_undo_dislike_for_community(
}
}
enum PostOrComment {
Comment(Comment),
Post(Post),
}
/// Tries to find a post or comment in the local database, without any network requests.
/// This is used to handle deletions and removals, because in case we dont have the object, we can
/// simply ignore the activity.
async fn find_post_or_comment_by_id(
context: &LemmyContext,
apub_id: Url,
) -> Result<PostOrComment, LemmyError> {
let ap_id = apub_id.to_string();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(p) = post {
return Ok(PostOrComment::Post(p));
}
let ap_id = apub_id.to_string();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(c) = comment {
return Ok(PostOrComment::Comment(c));
}
Err(NotFound.into())
}
async fn fetch_post_or_comment_by_id(
apub_id: &Url,
context: &LemmyContext,

View file

@ -17,7 +17,7 @@ use crate::{
verify_activity_domains_valid,
},
check_is_apub_id_valid,
fetcher::get_or_fetch_and_upsert_community,
fetcher::community::get_or_fetch_and_upsert_community,
inbox::{
assert_activity_not_local,
get_activity_id,

View file

@ -22,8 +22,16 @@ use activitystreams::{
};
use activitystreams_ext::{Ext1, Ext2};
use anyhow::{anyhow, Context};
use lemmy_db_queries::{source::activity::Activity_, DbPool};
use lemmy_db_schema::source::{activity::Activity, user::User_};
use diesel::NotFound;
use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool};
use lemmy_db_schema::source::{
activity::Activity,
comment::Comment,
community::Community,
post::Post,
private_message::PrivateMessage,
user::User_,
};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
@ -239,3 +247,85 @@ where
.await??;
Ok(())
}
pub(crate) enum PostOrComment {
Comment(Comment),
Post(Post),
}
/// Tries to find a post or comment in the local database, without any network requests.
/// This is used to handle deletions and removals, because in case we dont have the object, we can
/// simply ignore the activity.
pub(crate) async fn find_post_or_comment_by_id(
context: &LemmyContext,
apub_id: Url,
) -> Result<PostOrComment, LemmyError> {
let ap_id = apub_id.to_string();
let post = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(p) = post {
return Ok(PostOrComment::Post(p));
}
let ap_id = apub_id.to_string();
let comment = blocking(context.pool(), move |conn| {
Comment::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(c) = comment {
return Ok(PostOrComment::Comment(c));
}
Err(NotFound.into())
}
pub(crate) enum Object {
Comment(Comment),
Post(Post),
Community(Community),
User(User_),
PrivateMessage(PrivateMessage),
}
pub(crate) async fn find_object_by_id(
context: &LemmyContext,
apub_id: Url,
) -> Result<Object, LemmyError> {
if let Ok(pc) = find_post_or_comment_by_id(context, apub_id.to_owned()).await {
return Ok(match pc {
PostOrComment::Post(p) => Object::Post(p),
PostOrComment::Comment(c) => Object::Comment(c),
});
}
let ap_id = apub_id.to_string();
let user = blocking(context.pool(), move |conn| {
User_::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(u) = user {
return Ok(Object::User(u));
}
let ap_id = apub_id.to_string();
let community = blocking(context.pool(), move |conn| {
Community::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(c) = community {
return Ok(Object::Community(c));
}
let ap_id = apub_id.to_string();
let private_message = blocking(context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &ap_id)
})
.await?;
if let Ok(pm) = private_message {
return Ok(Object::PrivateMessage(pm));
}
Err(NotFound.into())
}

View file

@ -1,15 +1,12 @@
use crate::{
extensions::context::lemmy_context,
fetcher::{
get_or_fetch_and_insert_comment,
get_or_fetch_and_insert_post,
get_or_fetch_and_upsert_user,
},
fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
objects::{
check_object_domain,
check_object_for_community_or_site_ban,
create_tombstone,
get_object_from_apub,
get_or_fetch_and_upsert_user,
get_source_markdown_value,
set_content_and_source,
FromApub,

View file

@ -1,6 +1,6 @@
use crate::{
extensions::{context::lemmy_context, group_extensions::GroupExtension},
fetcher::get_or_fetch_and_upsert_user,
fetcher::user::get_or_fetch_and_upsert_user,
objects::{
check_object_domain,
create_tombstone,

View file

@ -1,6 +1,6 @@
use crate::{
check_is_apub_id_valid,
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
inbox::community_inbox::check_community_or_site_ban,
};
use activitystreams::{

View file

@ -1,6 +1,6 @@
use crate::{
extensions::{context::lemmy_context, page_extension::PageExtension},
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
objects::{
check_object_domain,
check_object_for_community_or_site_ban,

View file

@ -1,7 +1,7 @@
use crate::{
check_is_apub_id_valid,
extensions::context::lemmy_context,
fetcher::get_or_fetch_and_upsert_user,
fetcher::user::get_or_fetch_and_upsert_user,
objects::{
check_object_domain,
create_tombstone,

View file

@ -15,7 +15,7 @@ struct SendError(pub String);
#[error("Error receiving response, {0}")]
pub struct RecvError(pub String);
pub async fn retry<F, Fut, T>(f: F) -> Result<T, LemmyError>
pub async fn retry<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, reqwest::Error>>,
@ -23,27 +23,27 @@ where
retry_custom(|| async { Ok((f)().await) }).await
}
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<Result<T, reqwest::Error>, LemmyError>>,
Fut: Future<Output = Result<Result<T, reqwest::Error>, reqwest::Error>>,
{
let mut response = Err(anyhow!("connect timeout").into());
let mut response: Option<Result<T, reqwest::Error>> = None;
for _ in 0u8..3 {
match (f)().await? {
Ok(t) => return Ok(t),
Err(e) => {
if e.is_timeout() {
response = Err(SendError(e.to_string()).into());
response = Some(Err(e));
continue;
}
return Err(SendError(e.to_string()).into());
return Err(e);
}
}
}
response
response.unwrap()
}
#[derive(Deserialize, Debug)]