From 45d13145b542fce8676c6064245e2a921177bc95 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 8 Apr 2020 14:08:33 +0200 Subject: [PATCH] Some refactoring of puller.rs --- server/src/apub/puller.rs | 113 +++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 55 deletions(-) diff --git a/server/src/apub/puller.rs b/server/src/apub/puller.rs index 01a70d964..7e7a86ec0 100644 --- a/server/src/apub/puller.rs +++ b/server/src/apub/puller.rs @@ -12,6 +12,7 @@ use diesel::result::Error::NotFound; use diesel::PgConnection; use failure::Error; use isahc::prelude::*; +use log::warn; use serde::Deserialize; use std::time::Duration; @@ -26,30 +27,29 @@ fn fetch_node_info(domain: &str) -> Result { } fn fetch_communities_from_instance( - domain: &str, + community_list_url: &str, conn: &PgConnection, -) -> Result, Error> { - let node_info = fetch_node_info(domain)?; - - if let Some(community_list_url) = node_info.metadata.community_list_url { - let collection = fetch_remote_object::(&community_list_url)?; - let object_boxes = collection - .collection_props - .get_many_items_base_boxes() - .unwrap(); - let communities: Result, Error> = object_boxes - .map(|c| { - let group = c.to_owned().to_concrete::()?; - CommunityForm::from_group(&group, conn) - }) - .collect(); - Ok(communities?) - } else { - Err(format_err!( - "{} is not a Lemmy instance, federation is not supported", - domain - )) - } +) -> Result, Error> { + fetch_remote_object::(community_list_url)? + .collection_props + .get_many_items_base_boxes() + .unwrap() + .map(|b| -> Result { + let group = b.to_owned().to_concrete::()?; + Ok(CommunityForm::from_group(&group, conn)?) + }) + .map( + |cf: Result| -> Result { + let cf2 = cf?; + let existing = Community::read_from_actor_id(conn, &cf2.actor_id); + match existing { + Err(NotFound {}) => Ok(Community::create(conn, &cf2)?), + Ok(c) => Ok(Community::update(conn, c.id, &cf2)?), + Err(e) => Err(Error::from(e)), + } + }, + ) + .collect() } // TODO: add an optional param last_updated and only fetch if its too old @@ -60,7 +60,6 @@ where if Settings::get().federation.tls_enabled && !uri.starts_with("https://") { return Err(format_err!("Activitypub uri is insecure: {}", uri)); } - // TODO: should cache responses here when we are in production // TODO: this function should return a future let timeout = Duration::from_secs(60); let text = Request::get(uri) @@ -76,23 +75,35 @@ where fn fetch_remote_community_posts( instance: &str, - community: &str, + community: &Community, conn: &PgConnection, -) -> Result, Error> { - let endpoint = format!("http://{}/federation/c/{}", instance, community); - let community = fetch_remote_object::(&endpoint)?; - let outbox_uri = &community.extension.get_outbox().to_string(); +) -> Result, Error> { + let endpoint = format!("http://{}/federation/c/{}", instance, community.name); + let group = fetch_remote_object::(&endpoint)?; + let outbox_uri = &group.extension.get_outbox().to_string(); + // TODO: outbox url etc should be stored in local db let outbox = fetch_remote_object::(outbox_uri)?; let items = outbox.collection_props.get_many_items_base_boxes(); - let posts = items - .unwrap() - .map(|obox: &BaseBox| { - let page = obox.clone().to_concrete::().unwrap(); - PostForm::from_page(&page, conn) - }) - .collect::, Error>>()?; - Ok(posts) + Ok( + items + .unwrap() + .map(|obox: &BaseBox| -> Result { + let page = obox.clone().to_concrete::()?; + PostForm::from_page(&page, conn) + }) + .map(|pf: Result| -> Result { + let mut pf2 = pf?; + pf2.community_id = community.id; + let existing = Post::read_from_apub_id(conn, &pf2.ap_id); + match existing { + Err(NotFound {}) => Ok(Post::create(conn, &pf2)?), + Ok(p) => Ok(Post::update(conn, p.id, &pf2)?), + Err(e) => Err(Error::from(e)), + } + }) + .collect::, Error>>()?, + ) } pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result { @@ -110,25 +121,17 @@ pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result Result<(), Error> { for instance in &get_following_instances() { - let communities = fetch_communities_from_instance(instance, conn)?; - - for community in &communities { - let existing = Community::read_from_actor_id(conn, &community.actor_id); - let community_id = match existing { - Err(NotFound {}) => Community::create(conn, community)?.id, - Ok(c) => Community::update(conn, c.id, community)?.id, - Err(e) => return Err(Error::from(e)), - }; - let mut posts = fetch_remote_community_posts(instance, &community.name, conn)?; - for post_ in &mut posts { - post_.community_id = community_id; - let existing = Post::read_from_apub_id(conn, &post_.ap_id); - match existing { - Err(NotFound {}) => Post::create(conn, post_)?, - Ok(p) => Post::update(conn, p.id, post_)?, - Err(e) => return Err(Error::from(e)), - }; + let node_info = fetch_node_info(instance)?; + if let Some(community_list_url) = node_info.metadata.community_list_url { + let communities = fetch_communities_from_instance(&community_list_url, conn)?; + for c in communities { + fetch_remote_community_posts(instance, &c, conn)?; } + } else { + warn!( + "{} is not a Lemmy instance, federation is not supported", + instance + ); } } Ok(())