2020-05-16 14:04:08 +00:00
use crate ::{
api ::site ::SearchResponse ,
2020-07-23 14:36:45 +00:00
apub ::{
2020-08-04 14:39:55 +00:00
check_is_apub_id_valid ,
2020-07-29 11:46:11 +00:00
ActorType ,
FromApub ,
GroupExt ,
PageExt ,
PersonExt ,
APUB_JSON_CONTENT_TYPE ,
2020-07-23 14:36:45 +00:00
} ,
2020-07-01 12:54:29 +00:00
blocking ,
request ::{ retry , RecvError } ,
2020-07-29 11:46:11 +00:00
DbPool ,
LemmyError ,
2020-05-16 14:04:08 +00:00
} ;
2020-08-01 13:25:17 +00:00
use activitystreams ::{ base ::BaseExt , collection ::OrderedCollection , object ::Note , prelude ::* } ;
2020-07-03 12:20:28 +00:00
use actix_web ::client ::Client ;
2020-08-11 14:31:05 +00:00
use anyhow ::{ anyhow , Context } ;
2020-06-16 11:35:26 +00:00
use chrono ::NaiveDateTime ;
2020-08-13 15:48:10 +00:00
use diesel ::result ::Error ::NotFound ;
2020-07-10 18:15:41 +00:00
use lemmy_db ::{
comment ::{ Comment , CommentForm } ,
comment_view ::CommentView ,
community ::{ Community , CommunityForm , CommunityModerator , CommunityModeratorForm } ,
community_view ::CommunityView ,
naive_now ,
post ::{ Post , PostForm } ,
post_view ::PostView ,
user ::{ UserForm , User_ } ,
user_view ::UserView ,
2020-07-29 11:46:11 +00:00
Crud ,
Joinable ,
SearchType ,
2020-07-10 18:15:41 +00:00
} ;
2020-08-11 14:31:05 +00:00
use lemmy_utils ::{ get_apub_protocol_string , location_info } ;
2020-07-03 12:20:28 +00:00
use log ::debug ;
use serde ::Deserialize ;
use std ::{ fmt ::Debug , time ::Duration } ;
use url ::Url ;
2020-06-16 11:35:26 +00:00
static ACTOR_REFETCH_INTERVAL_SECONDS : i64 = 24 * 60 * 60 ;
2020-07-24 15:07:33 +00:00
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG : i64 = 10 ;
2019-12-27 17:25:07 +00:00
2020-04-17 15:33:55 +00:00
/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
/// timeouts etc.
2020-07-01 12:54:29 +00:00
pub async fn fetch_remote_object < Response > (
client : & Client ,
url : & Url ,
) -> Result < Response , LemmyError >
2020-03-14 21:03:05 +00:00
where
Response : for < ' de > Deserialize < ' de > ,
{
2020-08-04 14:39:55 +00:00
check_is_apub_id_valid ( & url ) ? ;
2020-07-01 12:54:29 +00:00
2020-04-07 15:29:23 +00:00
let timeout = Duration ::from_secs ( 60 ) ;
2020-07-01 12:54:29 +00:00
let json = retry ( | | {
client
. get ( url . as_str ( ) )
. header ( " Accept " , APUB_JSON_CONTENT_TYPE )
. timeout ( timeout )
. send ( )
} )
. await ?
. json ( )
. await
. map_err ( | e | {
debug! ( " Receive error, {} " , e ) ;
RecvError ( e . to_string ( ) )
} ) ? ;
Ok ( json )
2020-03-14 21:03:05 +00:00
}
2020-04-17 15:33:55 +00:00
/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
2020-04-17 13:46:08 +00:00
#[ serde(untagged) ]
2020-04-17 17:34:18 +00:00
#[ derive(serde::Deserialize, Debug) ]
2020-04-17 13:46:08 +00:00
pub enum SearchAcceptedObjects {
Person ( Box < PersonExt > ) ,
Group ( Box < GroupExt > ) ,
2020-05-05 00:04:48 +00:00
Page ( Box < PageExt > ) ,
2020-05-13 17:21:32 +00:00
Comment ( Box < Note > ) ,
2020-04-17 13:46:08 +00:00
}
2020-04-17 15:33:55 +00:00
/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
///
/// Some working examples for use with the docker/federation/ setup:
2020-04-30 15:45:12 +00:00
/// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
/// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
2020-05-13 17:21:32 +00:00
/// http://lemmy_alpha:8540/post/3
/// http://lemmy_alpha:8540/comment/2
2020-07-01 12:54:29 +00:00
pub async fn search_by_apub_id (
query : & str ,
client : & Client ,
pool : & DbPool ,
) -> Result < SearchResponse , LemmyError > {
2020-04-29 16:55:54 +00:00
// Parse the shorthand query url
let query_url = if query . contains ( '@' ) {
2020-04-30 15:45:12 +00:00
debug! ( " {} " , query ) ;
2020-04-29 16:55:54 +00:00
let split = query . split ( '@' ) . collect ::< Vec < & str > > ( ) ;
2020-04-30 15:45:12 +00:00
// User type will look like ['', username, instance]
// Community will look like [!community, instance]
let ( name , instance ) = if split . len ( ) = = 3 {
( format! ( " /u/ {} " , split [ 1 ] ) , split [ 2 ] )
} else if split . len ( ) = = 2 {
if split [ 0 ] . contains ( '!' ) {
let split2 = split [ 0 ] . split ( '!' ) . collect ::< Vec < & str > > ( ) ;
( format! ( " /c/ {} " , split2 [ 1 ] ) , split [ 1 ] )
} else {
2020-08-01 14:04:42 +00:00
return Err ( anyhow! ( " Invalid search query: {} " , query ) . into ( ) ) ;
2020-04-30 15:45:12 +00:00
}
} else {
2020-08-01 14:04:42 +00:00
return Err ( anyhow! ( " Invalid search query: {} " , query ) . into ( ) ) ;
2020-04-30 15:45:12 +00:00
} ;
let url = format! ( " {} :// {} {} " , get_apub_protocol_string ( ) , instance , name ) ;
2020-04-29 16:55:54 +00:00
Url ::parse ( & url ) ?
} else {
Url ::parse ( & query ) ?
} ;
2020-04-17 13:46:08 +00:00
let mut response = SearchResponse {
type_ : SearchType ::All . to_string ( ) ,
comments : vec ! [ ] ,
posts : vec ! [ ] ,
communities : vec ! [ ] ,
users : vec ! [ ] ,
} ;
2020-07-01 12:54:29 +00:00
2020-08-11 14:31:05 +00:00
let domain = query_url . domain ( ) . context ( " url has no domain " ) ? ;
2020-07-01 12:54:29 +00:00
let response = match fetch_remote_object ::< SearchAcceptedObjects > ( client , & query_url ) . await ? {
2020-04-17 13:46:08 +00:00
SearchAcceptedObjects ::Person ( p ) = > {
2020-08-11 14:31:05 +00:00
let user_uri = p . inner . id ( domain ) ? . context ( " person has no id " ) ? ;
2020-07-01 12:54:29 +00:00
2020-07-29 11:46:11 +00:00
let user = get_or_fetch_and_upsert_user ( & user_uri , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-08-09 02:36:29 +00:00
response . users =
vec! [ blocking ( pool , move | conn | UserView ::get_user_secure ( conn , user . id ) ) . await ? ? ] ;
2020-07-01 12:54:29 +00:00
response
2020-04-17 13:46:08 +00:00
}
SearchAcceptedObjects ::Group ( g ) = > {
2020-08-11 14:31:05 +00:00
let community_uri = g . inner . id ( domain ) ? . context ( " group has no id " ) ? ;
2020-07-01 12:54:29 +00:00
2020-07-29 11:46:11 +00:00
let community = get_or_fetch_and_upsert_community ( community_uri , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
response . communities = vec! [
blocking ( pool , move | conn | {
CommunityView ::read ( conn , community . id , None )
} )
. await ? ? ,
] ;
response
2020-04-17 13:46:08 +00:00
}
2020-07-13 13:56:58 +00:00
SearchAcceptedObjects ::Page ( p ) = > {
2020-08-06 12:53:58 +00:00
let post_form = PostForm ::from_apub ( & p , client , pool , Some ( query_url ) ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-08-13 15:48:10 +00:00
let p = blocking ( pool , move | conn | Post ::upsert ( conn , & post_form ) ) . await ? ? ;
2020-07-01 12:54:29 +00:00
response . posts = vec! [ blocking ( pool , move | conn | PostView ::read ( conn , p . id , None ) ) . await ? ? ] ;
response
2020-04-25 15:49:15 +00:00
}
2020-07-13 13:56:58 +00:00
SearchAcceptedObjects ::Comment ( c ) = > {
2020-08-06 12:53:58 +00:00
let comment_form = CommentForm ::from_apub ( & c , client , pool , Some ( query_url ) ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-08-13 15:48:10 +00:00
let c = blocking ( pool , move | conn | Comment ::upsert ( conn , & comment_form ) ) . await ? ? ;
2020-07-01 12:54:29 +00:00
response . comments =
vec! [ blocking ( pool , move | conn | CommentView ::read ( conn , c . id , None ) ) . await ? ? ] ;
response
2020-05-13 17:21:32 +00:00
}
2020-07-01 12:54:29 +00:00
} ;
2020-04-17 13:46:08 +00:00
Ok ( response )
}
2020-07-29 11:46:11 +00:00
pub async fn get_or_fetch_and_upsert_actor (
2020-07-23 14:36:45 +00:00
apub_id : & Url ,
client : & Client ,
pool : & DbPool ,
) -> Result < Box < dyn ActorType > , LemmyError > {
2020-07-29 11:46:11 +00:00
let user = get_or_fetch_and_upsert_user ( apub_id , client , pool ) . await ;
2020-07-23 14:36:45 +00:00
let actor : Box < dyn ActorType > = match user {
Ok ( u ) = > Box ::new ( u ) ,
2020-07-29 11:46:11 +00:00
Err ( _ ) = > Box ::new ( get_or_fetch_and_upsert_community ( apub_id , client , pool ) . await ? ) ,
2020-07-23 14:36:45 +00:00
} ;
Ok ( actor )
}
2020-04-24 14:04:36 +00:00
/// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
2020-07-29 11:46:11 +00:00
pub async fn get_or_fetch_and_upsert_user (
2020-07-17 21:11:07 +00:00
apub_id : & Url ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < User_ , LemmyError > {
let apub_id_owned = apub_id . to_owned ( ) ;
let user = blocking ( pool , move | conn | {
2020-08-06 12:53:58 +00:00
User_ ::read_from_actor_id ( conn , apub_id_owned . as_ref ( ) )
2020-07-01 12:54:29 +00:00
} )
. await ? ;
match user {
// If its older than a day, re-fetch it
Ok ( u ) if ! u . local & & should_refetch_actor ( u . last_refreshed_at ) = > {
debug! ( " Fetching and updating from remote user: {} " , apub_id ) ;
2020-07-17 21:11:07 +00:00
let person = fetch_remote_object ::< PersonExt > ( client , apub_id ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-08-06 12:53:58 +00:00
let mut uf = UserForm ::from_apub ( & person , client , pool , Some ( apub_id . to_owned ( ) ) ) . await ? ;
2020-07-01 12:54:29 +00:00
uf . last_refreshed_at = Some ( naive_now ( ) ) ;
let user = blocking ( pool , move | conn | User_ ::update ( conn , u . id , & uf ) ) . await ? ? ;
Ok ( user )
2020-04-24 19:55:54 +00:00
}
2020-07-01 12:54:29 +00:00
Ok ( u ) = > Ok ( u ) ,
2020-04-24 14:04:36 +00:00
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote user: {} " , apub_id ) ;
2020-07-17 21:11:07 +00:00
let person = fetch_remote_object ::< PersonExt > ( client , apub_id ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-08-06 12:53:58 +00:00
let uf = UserForm ::from_apub ( & person , client , pool , Some ( apub_id . to_owned ( ) ) ) . await ? ;
2020-07-01 12:54:29 +00:00
let user = blocking ( pool , move | conn | User_ ::create ( conn , & uf ) ) . await ? ? ;
Ok ( user )
2020-04-24 14:04:36 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-04-24 14:04:36 +00:00
}
2020-04-07 21:02:32 +00:00
}
2020-04-17 13:46:08 +00:00
2020-06-16 11:35:26 +00:00
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor ( last_refreshed : NaiveDateTime ) -> bool {
2020-07-24 15:07:33 +00:00
let update_interval = if cfg! ( debug_assertions ) {
// avoid infinite loop when fetching community outbox
chrono ::Duration ::seconds ( ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG )
2020-06-16 11:35:26 +00:00
} else {
2020-07-24 15:07:33 +00:00
chrono ::Duration ::seconds ( ACTOR_REFETCH_INTERVAL_SECONDS )
} ;
last_refreshed . lt ( & ( naive_now ( ) - update_interval ) )
2020-06-16 11:35:26 +00:00
}
2020-04-24 14:04:36 +00:00
/// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
2020-07-29 11:46:11 +00:00
pub async fn get_or_fetch_and_upsert_community (
2020-07-17 21:11:07 +00:00
apub_id : & Url ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Community , LemmyError > {
let apub_id_owned = apub_id . to_owned ( ) ;
let community = blocking ( pool , move | conn | {
2020-07-17 21:11:07 +00:00
Community ::read_from_actor_id ( conn , apub_id_owned . as_str ( ) )
2020-07-01 12:54:29 +00:00
} )
. await ? ;
match community {
Ok ( c ) if ! c . local & & should_refetch_actor ( c . last_refreshed_at ) = > {
debug! ( " Fetching and updating from remote community: {} " , apub_id ) ;
2020-07-24 15:07:33 +00:00
fetch_remote_community ( apub_id , client , pool , Some ( c . id ) ) . await
2020-04-24 19:55:54 +00:00
}
2020-07-01 12:54:29 +00:00
Ok ( c ) = > Ok ( c ) ,
2020-04-24 14:04:36 +00:00
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote community: {} " , apub_id ) ;
2020-07-24 15:07:33 +00:00
fetch_remote_community ( apub_id , client , pool , None ) . await
}
Err ( e ) = > Err ( e . into ( ) ) ,
}
}
async fn fetch_remote_community (
apub_id : & Url ,
client : & Client ,
pool : & DbPool ,
community_id : Option < i32 > ,
) -> Result < Community , LemmyError > {
let group = fetch_remote_object ::< GroupExt > ( client , apub_id ) . await ? ;
2020-08-06 12:53:58 +00:00
let cf = CommunityForm ::from_apub ( & group , client , pool , Some ( apub_id . to_owned ( ) ) ) . await ? ;
2020-07-24 15:07:33 +00:00
let community = blocking ( pool , move | conn | {
if let Some ( cid ) = community_id {
Community ::update ( conn , cid , & cf )
} else {
Community ::create ( conn , & cf )
}
} )
. await ? ? ;
2020-07-01 12:54:29 +00:00
2020-07-24 15:07:33 +00:00
// Also add the community moderators too
2020-08-11 14:31:05 +00:00
let attributed_to = group . inner . attributed_to ( ) . context ( location_info! ( ) ) ? ;
2020-07-24 15:07:33 +00:00
let creator_and_moderator_uris : Vec < & Url > = attributed_to
. as_many ( )
2020-08-11 14:31:05 +00:00
. context ( location_info! ( ) ) ?
2020-07-24 15:07:33 +00:00
. iter ( )
2020-08-11 14:31:05 +00:00
. map ( | a | a . as_xsd_any_uri ( ) . context ( " " ) )
. collect ::< Result < Vec < & Url > , anyhow ::Error > > ( ) ? ;
2020-05-03 14:00:59 +00:00
2020-07-24 15:07:33 +00:00
let mut creator_and_moderators = Vec ::new ( ) ;
2020-07-01 12:54:29 +00:00
2020-07-24 15:07:33 +00:00
for uri in creator_and_moderator_uris {
let c_or_m = get_or_fetch_and_upsert_user ( uri , client , pool ) . await ? ;
2020-07-01 12:54:29 +00:00
2020-07-24 15:07:33 +00:00
creator_and_moderators . push ( c_or_m ) ;
}
2020-07-01 12:54:29 +00:00
2020-07-24 15:07:33 +00:00
// TODO: need to make this work to update mods of existing communities
if community_id . is_none ( ) {
let community_id = community . id ;
blocking ( pool , move | conn | {
for mod_ in creator_and_moderators {
let community_moderator_form = CommunityModeratorForm {
community_id ,
user_id : mod_ . id ,
} ;
CommunityModerator ::join ( conn , & community_moderator_form ) ? ;
2020-05-03 14:00:59 +00:00
}
2020-07-24 15:07:33 +00:00
Ok ( ( ) ) as Result < ( ) , LemmyError >
} )
. await ? ? ;
}
2020-05-03 14:00:59 +00:00
2020-07-24 15:07:33 +00:00
// fetch outbox (maybe make this conditional)
let outbox =
fetch_remote_object ::< OrderedCollection > ( client , & community . get_outbox_url ( ) ? ) . await ? ;
2020-08-11 14:31:05 +00:00
let outbox_items = outbox . items ( ) . context ( location_info! ( ) ) ? . clone ( ) ;
2020-08-13 13:20:35 +00:00
let outbox_items = outbox_items . many ( ) . context ( location_info! ( ) ) ? ;
let outbox_items = outbox_items [ 0 .. 20 ] . to_vec ( ) ;
for o in outbox_items {
2020-08-11 14:31:05 +00:00
let page = PageExt ::from_any_base ( o ) ? . context ( location_info! ( ) ) ? ;
2020-08-07 13:15:44 +00:00
let post = PostForm ::from_apub ( & page , client , pool , None ) . await ? ;
2020-07-24 15:07:33 +00:00
let post_ap_id = post . ap_id . clone ( ) ;
// Check whether the post already exists in the local db
let existing = blocking ( pool , move | conn | Post ::read_from_apub_id ( conn , & post_ap_id ) ) . await ? ;
match existing {
Ok ( e ) = > blocking ( pool , move | conn | Post ::update ( conn , e . id , & post ) ) . await ? ? ,
Err ( _ ) = > blocking ( pool , move | conn | Post ::create ( conn , & post ) ) . await ? ? ,
} ;
2020-08-07 13:15:44 +00:00
// TODO: we need to send a websocket update here
2020-04-24 14:04:36 +00:00
}
2020-07-24 15:07:33 +00:00
Ok ( community )
2020-04-09 19:04:31 +00:00
}
2020-04-24 19:55:54 +00:00
2020-07-29 11:46:11 +00:00
pub async fn get_or_fetch_and_insert_post (
2020-07-17 21:11:07 +00:00
post_ap_id : & Url ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Post , LemmyError > {
let post_ap_id_owned = post_ap_id . to_owned ( ) ;
let post = blocking ( pool , move | conn | {
2020-07-17 21:11:07 +00:00
Post ::read_from_apub_id ( conn , post_ap_id_owned . as_str ( ) )
2020-07-01 12:54:29 +00:00
} )
. await ? ;
match post {
2020-06-11 15:16:33 +00:00
Ok ( p ) = > Ok ( p ) ,
Err ( NotFound { } ) = > {
debug! ( " Fetching and creating remote post: {} " , post_ap_id ) ;
2020-07-17 21:11:07 +00:00
let post = fetch_remote_object ::< PageExt > ( client , post_ap_id ) . await ? ;
2020-08-06 12:53:58 +00:00
let post_form = PostForm ::from_apub ( & post , client , pool , Some ( post_ap_id . to_owned ( ) ) ) . await ? ;
2020-07-01 12:54:29 +00:00
let post = blocking ( pool , move | conn | Post ::create ( conn , & post_form ) ) . await ? ? ;
Ok ( post )
2020-06-11 15:16:33 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-06-11 15:16:33 +00:00
}
}
2020-07-29 11:46:11 +00:00
pub async fn get_or_fetch_and_insert_comment (
2020-07-17 21:11:07 +00:00
comment_ap_id : & Url ,
2020-07-01 12:54:29 +00:00
client : & Client ,
pool : & DbPool ,
) -> Result < Comment , LemmyError > {
let comment_ap_id_owned = comment_ap_id . to_owned ( ) ;
let comment = blocking ( pool , move | conn | {
2020-07-17 21:11:07 +00:00
Comment ::read_from_apub_id ( conn , comment_ap_id_owned . as_str ( ) )
2020-07-01 12:54:29 +00:00
} )
. await ? ;
match comment {
2020-06-11 15:16:33 +00:00
Ok ( p ) = > Ok ( p ) ,
Err ( NotFound { } ) = > {
debug! (
" Fetching and creating remote comment and its parents: {} " ,
comment_ap_id
) ;
2020-07-17 21:11:07 +00:00
let comment = fetch_remote_object ::< Note > ( client , comment_ap_id ) . await ? ;
2020-08-06 12:53:58 +00:00
let comment_form =
CommentForm ::from_apub ( & comment , client , pool , Some ( comment_ap_id . to_owned ( ) ) ) . await ? ;
2020-07-01 12:54:29 +00:00
let comment = blocking ( pool , move | conn | Comment ::create ( conn , & comment_form ) ) . await ? ? ;
Ok ( comment )
2020-06-11 15:16:33 +00:00
}
2020-07-01 12:54:29 +00:00
Err ( e ) = > Err ( e . into ( ) ) ,
2020-06-11 15:16:33 +00:00
}
}