2020-05-16 14:04:08 +00:00
use crate ::{
api ::site ::SearchResponse ,
2020-07-10 18:15:41 +00:00
apub ::{ is_apub_id_valid , FromApub , GroupExt , PageExt , PersonExt , APUB_JSON_CONTENT_TYPE } ,
2020-07-01 12:54:29 +00:00
blocking ,
request ::{ retry , RecvError } ,
2020-05-16 14:04:08 +00:00
routes ::nodeinfo ::{ NodeInfo , NodeInfoWellKnown } ,
2020-07-13 13:56:58 +00:00
DbPool , LemmyError ,
2020-05-16 14:04:08 +00:00
} ;
2020-07-03 12:20:28 +00:00
use activitystreams ::object ::Note ;
use activitystreams_new ::{ base ::BaseExt , prelude ::* , primitives ::XsdAnyUri } ;
use actix_web ::client ::Client ;
2020-06-16 11:35:26 +00:00
use chrono ::NaiveDateTime ;
2020-07-03 12:20:28 +00:00
use diesel ::{ result ::Error ::NotFound , PgConnection } ;
2020-07-10 18:15:41 +00:00
use lemmy_db ::{
comment ::{ Comment , CommentForm } ,
comment_view ::CommentView ,
community ::{ Community , CommunityForm , CommunityModerator , CommunityModeratorForm } ,
community_view ::CommunityView ,
naive_now ,
post ::{ Post , PostForm } ,
post_view ::PostView ,
user ::{ UserForm , User_ } ,
user_view ::UserView ,
2020-07-13 13:56:58 +00:00
Crud , Joinable , SearchType ,
2020-07-10 18:15:41 +00:00
} ;
use lemmy_utils ::get_apub_protocol_string ;
2020-07-03 12:20:28 +00:00
use log ::debug ;
use serde ::Deserialize ;
use std ::{ fmt ::Debug , time ::Duration } ;
use url ::Url ;
2020-06-16 11:35:26 +00:00
static ACTOR_REFETCH_INTERVAL_SECONDS : i64 = 24 * 60 * 60 ;
2019-12-27 17:25:07 +00:00
2020-04-17 15:33:55 +00:00
// Fetch nodeinfo metadata from a remote instance.
2020-07-01 12:54:29 +00:00
async fn _fetch_node_info ( client : & Client , domain : & str ) -> Result < NodeInfo , LemmyError > {
2020-04-08 12:37:05 +00:00
let well_known_uri = Url ::parse ( & format! (
2020-03-18 21:09:00 +00:00
" {}://{}/.well-known/nodeinfo " ,
get_apub_protocol_string ( ) ,
2020-04-17 14:39:03 +00:00
domain
2020-04-08 12:37:05 +00:00
) ) ? ;
2020-07-01 12:54:29 +00:00
let well_known = fetch_remote_object ::< NodeInfoWellKnown > ( client , & well_known_uri ) . await ? ;
let nodeinfo = fetch_remote_object ::< NodeInfo > ( client , & well_known . links . href ) . await ? ;
Ok ( nodeinfo )
2020-03-18 15:08:08 +00:00
}
2020-03-18 21:09:00 +00:00
2020-04-17 15:33:55 +00:00
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
2020-07-01 12:54:29 +00:00
pub async fn fetch_remote_object < Response > (
client : & Client ,
url : & Url ,
) -> Result < Response , LemmyError >
2020-03-14 21:03:05 +00:00
where
Response : for < ' de > Deserialize < ' de > ,
{
2020-04-18 18:54:20 +00:00
if ! is_apub_id_valid ( & url ) {
2020-07-01 12:54:29 +00:00
return Err ( format_err! ( " Activitypub uri invalid or blocked: {} " , url ) . into ( ) ) ;
2020-03-18 21:09:00 +00:00
}
2020-07-01 12:54:29 +00:00
2020-04-07 15:29:23 +00:00
let timeout = Duration ::from_secs ( 60 ) ;
2020-07-01 12:54:29 +00:00
let json = retry ( | | {
client
. get ( url . as_str ( ) )
. header ( " Accept " , APUB_JSON_CONTENT_TYPE )
. timeout ( timeout )
. send ( )
} )
. await ?
. json ( )
. await
. map_err ( | e | {
debug! ( " Receive error, {} " , e ) ;
RecvError ( e . to_string ( ) )
} ) ? ;
Ok ( json )
2020-03-14 21:03:05 +00:00
}
2020-04-17 15:33:55 +00:00
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
2020-04-17 13:46:08 +00:00
#[ serde(untagged) ]
2020-04-17 17:34:18 +00:00
#[ derive(serde::Deserialize, Debug) ]
2020-04-17 13:46:08 +00:00
pub enum SearchAcceptedObjects {
Person ( Box < PersonExt > ) ,
Group ( Box < GroupExt > ) ,
2020-05-05 00:04:48 +00:00
Page ( Box < PageExt > ) ,
2020-05-13 17:21:32 +00:00
Comment ( Box < Note > ) ,
2020-04-17 13:46:08 +00:00
}
2020-04-17 15:33:55 +00:00
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the docker/federation/ setup:
2020-04-30 15:45:12 +00:00
/// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
/// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
2020-05-13 17:21:32 +00:00
/// http://lemmy_alpha:8540/post/3
/// http://lemmy_alpha:8540/comment/2
2020-07-01 12:54:29 +00:00
pub async fn search_by_apub_id (
query : & str ,
client : & Client ,
pool : & DbPool ,
) -> Result < SearchResponse , LemmyError > {
2020-04-29 16:55:54 +00:00
// Parse the shorthand query url
let query_url = if query . contains ( '@' ) {
2020-04-30 15:45:12 +00:00
debug! ( " {} " , query ) ;
2020-04-29 16:55:54 +00:00
let split = query . split ( '@' ) . collect ::< Vec < & str > > ( ) ;
2020-04-30 15:45:12 +00:00
// User type will look like ['', username, instance]
// Community will look like [!community, instance]
let ( name , instance ) = if split . len ( ) = = 3 {
( format! ( " /u/ {} " , split [ 1 ] ) , split [ 2 ] )
} else if split . len ( ) = = 2 {
if split [ 0 ] . contains ( '!' ) {
let split2 = split [ 0 ] . split ( '!' ) . collect ::< Vec < & str > > ( ) ;
( format! ( " /c/ {} " , split2 [ 1 ] ) , split [ 1 ] )
} else {
2020-07-01 12:54:29 +00:00
return Err ( format_err! ( " Invalid search query: {} " , query ) . into ( ) ) ;
2020-04-30 15:45:12 +00:00
}
} else {
2020-07-01 12:54:29 +00:00
return Err ( format_err! ( " Invalid search query: {} " , query ) . into ( ) ) ;
2020-04-30 15:45:12 +00:00
} ;
let url = format! ( " {} :// {} {} " , get_apub_protocol_string ( ) , instance , name ) ;
2020-04-29 16:55:54 +00:00
Url ::parse ( & url ) ?
} else {
Url ::parse ( & query ) ?
} ;
2020-04-17 13:46:08 +00:00
let mut response = SearchResponse {
type_ : SearchType ::All . to_string ( ) ,
comments : vec ! [ ] ,
posts : vec ! [ ] ,
communities : vec ! [ ] ,
users : vec ! [ ] ,
} ;
2020-07-01 12:54:29 +00:00
let response = match fetch_remote_object ::< SearchAcceptedObjects > ( client , & query_url ) . await ? {
2020-04-17 13:46:08 +00:00
SearchAcceptedObjects ::Person ( p ) = > {
2020-07-03 12:20:28 +00:00
let user_uri = p . inner . id ( ) . unwrap ( ) . to_string ( ) ;
2020-07-01 12:54:29 +00:00
let user = get_or_fetch_and_upsert_remote_user ( & user_uri , client , pool ) . await ? ;
response . users = vec! [ blocking ( pool , move | conn | UserView ::read ( conn , user . id ) ) . await ? ? ] ;
response
2020-04-17 13:46:08 +00:00
}
SearchAcceptedObjects ::Group ( g ) = > {
2020-07-03 12:20:28 +00:00
let community_uri = g . inner . id ( ) . unwrap ( ) . to_string ( ) ;
2020-07-01 12:54:29 +00:00
let community =
get_or_fetch_and_upsert_remote_community ( & community_uri , client , pool ) . await ? ;
2020-04-24 19:55:54 +00:00
// TODO Maybe at some point in the future, fetch all the history of a community
2020-04-24 14:04:36 +00:00
// fetch_community_outbox(&c, conn)?;
2020-07-01 12:54:29 +00:00
response . communities = vec! [
blocking ( pool , move | conn | {
CommunityView ::read ( conn , community . id , None )
} )
. await ? ? ,
] ;
response
2020-04-17 13:46:08 +00:00
}
2020-07-13 13:56:58 +00:00
SearchAcceptedObjects ::Page ( p ) = > {
let post_form = PostForm ::from_apub ( & p , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
let p = blocking ( pool , move | conn | upsert_post ( & post_form , conn ) ) . await ? ? ;
response . posts = vec! [ blocking ( pool , move | conn | PostView ::read ( conn , p . id , None ) ) . await ? ? ] ;
response
2020-04-25 15:49:15 +00:00
}
2020-07-13 13:56:58 +00:00
SearchAcceptedObjects ::Comment ( c ) = > {
2020-05-13 17:21:32 +00:00
let post_url = c
. object_props
. get_many_in_reply_to_xsd_any_uris ( )
. unwrap ( )
. next ( )
. unwrap ( )
. to_string ( ) ;
2020-07-01 12:54:29 +00:00
2020-05-13 17:21:32 +00:00
// TODO: also fetch parent comments if any
2020-07-13 13:56:58 +00:00
let post = fetch_remote_object ( client , & Url ::parse ( & post_url ) ? ) . await ? ;
let post_form = PostForm ::from_apub ( & post , client , pool ) . await ? ;
let comment_form = CommentForm ::from_apub ( & c , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
blocking ( pool , move | conn | upsert_post ( & post_form , conn ) ) . await ? ? ;
let c = blocking ( pool , move | conn | upsert_comment ( & comment_form , conn ) ) . await ? ? ;
response . comments =
vec! [ blocking ( pool , move | conn | CommentView ::read ( conn , c . id , None ) ) . await ? ? ] ;
response
2020-05-13 17:21:32 +00:00
}
2020-07-01 12:54:29 +00:00
} ;
2020-04-17 13:46:08 +00:00
Ok ( response )
}
2020-04-24 14:04:36 +00:00
/// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
2020-07-01 12:54:29 +00:00
pub async fn get_or_fetch_and_upsert_remote_user (
2020-04-24 19:55:54 +00:00
apub_id : & str ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < User_ , LemmyError > {
let apub_id_owned = apub_id . to_owned ( ) ;
let user = blocking ( pool , move | conn | {
User_ ::read_from_actor_id ( conn , & apub_id_owned )
} )
. await ? ;
match user {
// If its older than a day, re-fetch it
Ok ( u ) if ! u . local & & should_refetch_actor ( u . last_refreshed_at ) = > {
debug! ( " Fetching and updating from remote user: {} " , apub_id ) ;
2020-07-13 13:56:58 +00:00
let person = fetch_remote_object ::< PersonExt > ( client , & Url ::parse ( apub_id ) ? ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-07-13 13:56:58 +00:00
let mut uf = UserForm ::from_apub ( & person , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
uf . last_refreshed_at = Some ( naive_now ( ) ) ;
let user = blocking ( pool , move | conn | User_ ::update ( conn , u . id , & uf ) ) . await ? ? ;
Ok ( user )
2020-04-24 19:55:54 +00:00
}
2020-07-01 12:54:29 +00:00
Ok ( u ) = > Ok ( u ) ,
2020-04-24 14:04:36 +00:00
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote user: {} " , apub_id ) ;
2020-07-13 13:56:58 +00:00
let person = fetch_remote_object ::< PersonExt > ( client , & Url ::parse ( apub_id ) ? ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-07-13 13:56:58 +00:00
let uf = UserForm ::from_apub ( & person , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
let user = blocking ( pool , move | conn | User_ ::create ( conn , & uf ) ) . await ? ? ;
Ok ( user )
2020-04-24 14:04:36 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-04-24 14:04:36 +00:00
}
2020-04-07 21:02:32 +00:00
}
2020-04-17 13:46:08 +00:00
2020-06-16 11:35:26 +00:00
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor ( last_refreshed : NaiveDateTime ) -> bool {
if cfg! ( debug_assertions ) {
true
} else {
let update_interval = chrono ::Duration ::seconds ( ACTOR_REFETCH_INTERVAL_SECONDS ) ;
last_refreshed . lt ( & ( naive_now ( ) - update_interval ) )
}
}
2020-04-24 14:04:36 +00:00
/// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
2020-07-01 12:54:29 +00:00
pub async fn get_or_fetch_and_upsert_remote_community (
2020-04-24 19:55:54 +00:00
apub_id : & str ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Community , LemmyError > {
let apub_id_owned = apub_id . to_owned ( ) ;
let community = blocking ( pool , move | conn | {
Community ::read_from_actor_id ( conn , & apub_id_owned )
} )
. await ? ;
match community {
Ok ( c ) if ! c . local & & should_refetch_actor ( c . last_refreshed_at ) = > {
debug! ( " Fetching and updating from remote community: {} " , apub_id ) ;
2020-07-13 13:56:58 +00:00
let group = fetch_remote_object ::< GroupExt > ( client , & Url ::parse ( apub_id ) ? ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-07-13 13:56:58 +00:00
let mut cf = CommunityForm ::from_apub ( & group , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
cf . last_refreshed_at = Some ( naive_now ( ) ) ;
let community = blocking ( pool , move | conn | Community ::update ( conn , c . id , & cf ) ) . await ? ? ;
Ok ( community )
2020-04-24 19:55:54 +00:00
}
2020-07-01 12:54:29 +00:00
Ok ( c ) = > Ok ( c ) ,
2020-04-24 14:04:36 +00:00
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote community: {} " , apub_id ) ;
2020-07-13 13:56:58 +00:00
let group = fetch_remote_object ::< GroupExt > ( client , & Url ::parse ( apub_id ) ? ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-07-13 13:56:58 +00:00
let cf = CommunityForm ::from_apub ( & group , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
let community = blocking ( pool , move | conn | Community ::create ( conn , & cf ) ) . await ? ? ;
2020-05-03 14:00:59 +00:00
// Also add the community moderators too
2020-07-13 13:56:58 +00:00
let attributed_to = group . inner . attributed_to ( ) . unwrap ( ) ;
2020-07-03 12:20:28 +00:00
let creator_and_moderator_uris : Vec < & XsdAnyUri > = attributed_to
. as_many ( )
. unwrap ( )
. iter ( )
. map ( | a | a . as_xsd_any_uri ( ) . unwrap ( ) )
. collect ( ) ;
2020-07-01 12:54:29 +00:00
let mut creator_and_moderators = Vec ::new ( ) ;
for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_remote_user ( uri . as_str ( ) , client , pool ) . await ? ;
creator_and_moderators . push ( c_or_m ) ;
2020-05-03 14:00:59 +00:00
}
2020-07-01 12:54:29 +00:00
let community_id = community . id ;
blocking ( pool , move | conn | {
for mod_ in creator_and_moderators {
let community_moderator_form = CommunityModeratorForm {
community_id ,
user_id : mod_ . id ,
} ;
CommunityModerator ::join ( conn , & community_moderator_form ) ? ;
}
Ok ( ( ) ) as Result < ( ) , LemmyError >
} )
. await ? ? ;
2020-05-03 14:00:59 +00:00
Ok ( community )
2020-04-24 14:04:36 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-04-24 14:04:36 +00:00
}
2020-04-09 19:04:31 +00:00
}
2020-04-24 19:55:54 +00:00
2020-07-01 12:54:29 +00:00
fn upsert_post ( post_form : & PostForm , conn : & PgConnection ) -> Result < Post , LemmyError > {
2020-04-25 15:49:15 +00:00
let existing = Post ::read_from_apub_id ( conn , & post_form . ap_id ) ;
match existing {
Err ( NotFound { } ) = > Ok ( Post ::create ( conn , & post_form ) ? ) ,
Ok ( p ) = > Ok ( Post ::update ( conn , p . id , & post_form ) ? ) ,
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-04-25 15:49:15 +00:00
}
}
2020-05-13 17:21:32 +00:00
2020-07-01 12:54:29 +00:00
pub async fn get_or_fetch_and_insert_remote_post (
2020-06-11 15:16:33 +00:00
post_ap_id : & str ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Post , LemmyError > {
let post_ap_id_owned = post_ap_id . to_owned ( ) ;
let post = blocking ( pool , move | conn | {
Post ::read_from_apub_id ( conn , & post_ap_id_owned )
} )
. await ? ;
match post {
2020-06-11 15:16:33 +00:00
Ok ( p ) = > Ok ( p ) ,
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote post: {} " , post_ap_id ) ;
2020-07-13 13:56:58 +00:00
let post = fetch_remote_object ::< PageExt > ( client , & Url ::parse ( post_ap_id ) ? ) . await ? ;
let post_form = PostForm ::from_apub ( & post , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
let post = blocking ( pool , move | conn | Post ::create ( conn , & post_form ) ) . await ? ? ;
Ok ( post )
2020-06-11 15:16:33 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-06-11 15:16:33 +00:00
}
}
2020-07-01 12:54:29 +00:00
fn upsert_comment ( comment_form : & CommentForm , conn : & PgConnection ) -> Result < Comment , LemmyError > {
2020-05-13 17:21:32 +00:00
let existing = Comment ::read_from_apub_id ( conn , & comment_form . ap_id ) ;
match existing {
Err ( NotFound { } ) = > Ok ( Comment ::create ( conn , & comment_form ) ? ) ,
Ok ( p ) = > Ok ( Comment ::update ( conn , p . id , & comment_form ) ? ) ,
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-05-13 17:21:32 +00:00
}
}
2020-06-11 15:16:33 +00:00
2020-07-01 12:54:29 +00:00
pub async fn get_or_fetch_and_insert_remote_comment (
2020-06-11 15:16:33 +00:00
comment_ap_id : & str ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Comment , LemmyError > {
let comment_ap_id_owned = comment_ap_id . to_owned ( ) ;
let comment = blocking ( pool , move | conn | {
Comment ::read_from_apub_id ( conn , & comment_ap_id_owned )
} )
. await ? ;
match comment {
2020-06-11 15:16:33 +00:00
Ok ( p ) = > Ok ( p ) ,
Err ( NotFound { } ) = > {
debug! (
" Fetching and creating remote comment and its parents: {} " ,
comment_ap_id
) ;
2020-07-13 13:56:58 +00:00
let comment = fetch_remote_object ::< Note > ( client , & Url ::parse ( comment_ap_id ) ? ) . await ? ;
let comment_form = CommentForm ::from_apub ( & comment , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
let comment = blocking ( pool , move | conn | Comment ::create ( conn , & comment_form ) ) . await ? ? ;
Ok ( comment )
2020-06-11 15:16:33 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-06-11 15:16:33 +00:00
}
}
2020-04-24 19:55:54 +00:00
// TODO It should not be fetching data from a community outbox.
// All posts, comments, comment likes, etc should be posts to our community_inbox
// The only data we should be periodically fetching (if it hasn't been fetched in the last day
// maybe), is community and user actors
// and user actors
// Fetch all posts in the outbox of the given user, and insert them into the database.
2020-07-01 12:54:29 +00:00
// fn fetch_community_outbox(community: &Community, conn: &PgConnection) -> Result<Vec<Post>, LemmyError> {
2020-04-24 19:55:54 +00:00
// let outbox_url = Url::parse(&community.get_outbox_url())?;
// let outbox = fetch_remote_object::<OrderedCollection>(&outbox_url)?;
// let items = outbox.collection_props.get_many_items_base_boxes();
// Ok(
// items
// .unwrap()
2020-07-01 12:54:29 +00:00
// .map(|obox: &BaseBox| -> Result<PostForm, LemmyError> {
2020-04-24 19:55:54 +00:00
// let page = obox.clone().to_concrete::<Page>()?;
// PostForm::from_page(&page, conn)
// })
// .map(|pf| upsert_post(&pf?, conn))
2020-07-01 12:54:29 +00:00
// .collect::<Result<Vec<Post>, LemmyError>>()?,
2020-04-24 19:55:54 +00:00
// )
// }