Some refactoring of puller.rs
This commit is contained in:
parent
2c7d1caee6
commit
45d13145b5
1 changed files with 58 additions and 55 deletions
|
@ -12,6 +12,7 @@ use diesel::result::Error::NotFound;
|
||||||
use diesel::PgConnection;
|
use diesel::PgConnection;
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use isahc::prelude::*;
|
use isahc::prelude::*;
|
||||||
|
use log::warn;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -26,30 +27,29 @@ fn fetch_node_info(domain: &str) -> Result<NodeInfo, Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fetch_communities_from_instance(
|
fn fetch_communities_from_instance(
|
||||||
domain: &str,
|
community_list_url: &str,
|
||||||
conn: &PgConnection,
|
conn: &PgConnection,
|
||||||
) -> Result<Vec<CommunityForm>, Error> {
|
) -> Result<Vec<Community>, Error> {
|
||||||
let node_info = fetch_node_info(domain)?;
|
fetch_remote_object::<UnorderedCollection>(community_list_url)?
|
||||||
|
.collection_props
|
||||||
if let Some(community_list_url) = node_info.metadata.community_list_url {
|
.get_many_items_base_boxes()
|
||||||
let collection = fetch_remote_object::<UnorderedCollection>(&community_list_url)?;
|
.unwrap()
|
||||||
let object_boxes = collection
|
.map(|b| -> Result<CommunityForm, Error> {
|
||||||
.collection_props
|
let group = b.to_owned().to_concrete::<GroupExt>()?;
|
||||||
.get_many_items_base_boxes()
|
Ok(CommunityForm::from_group(&group, conn)?)
|
||||||
.unwrap();
|
})
|
||||||
let communities: Result<Vec<CommunityForm>, Error> = object_boxes
|
.map(
|
||||||
.map(|c| {
|
|cf: Result<CommunityForm, Error>| -> Result<Community, Error> {
|
||||||
let group = c.to_owned().to_concrete::<GroupExt>()?;
|
let cf2 = cf?;
|
||||||
CommunityForm::from_group(&group, conn)
|
let existing = Community::read_from_actor_id(conn, &cf2.actor_id);
|
||||||
})
|
match existing {
|
||||||
.collect();
|
Err(NotFound {}) => Ok(Community::create(conn, &cf2)?),
|
||||||
Ok(communities?)
|
Ok(c) => Ok(Community::update(conn, c.id, &cf2)?),
|
||||||
} else {
|
Err(e) => Err(Error::from(e)),
|
||||||
Err(format_err!(
|
}
|
||||||
"{} is not a Lemmy instance, federation is not supported",
|
},
|
||||||
domain
|
)
|
||||||
))
|
.collect()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add an optional param last_updated and only fetch if its too old
|
// TODO: add an optional param last_updated and only fetch if its too old
|
||||||
|
@ -60,7 +60,6 @@ where
|
||||||
if Settings::get().federation.tls_enabled && !uri.starts_with("https://") {
|
if Settings::get().federation.tls_enabled && !uri.starts_with("https://") {
|
||||||
return Err(format_err!("Activitypub uri is insecure: {}", uri));
|
return Err(format_err!("Activitypub uri is insecure: {}", uri));
|
||||||
}
|
}
|
||||||
// TODO: should cache responses here when we are in production
|
|
||||||
// TODO: this function should return a future
|
// TODO: this function should return a future
|
||||||
let timeout = Duration::from_secs(60);
|
let timeout = Duration::from_secs(60);
|
||||||
let text = Request::get(uri)
|
let text = Request::get(uri)
|
||||||
|
@ -76,23 +75,35 @@ where
|
||||||
|
|
||||||
fn fetch_remote_community_posts(
|
fn fetch_remote_community_posts(
|
||||||
instance: &str,
|
instance: &str,
|
||||||
community: &str,
|
community: &Community,
|
||||||
conn: &PgConnection,
|
conn: &PgConnection,
|
||||||
) -> Result<Vec<PostForm>, Error> {
|
) -> Result<Vec<Post>, Error> {
|
||||||
let endpoint = format!("http://{}/federation/c/{}", instance, community);
|
let endpoint = format!("http://{}/federation/c/{}", instance, community.name);
|
||||||
let community = fetch_remote_object::<GroupExt>(&endpoint)?;
|
let group = fetch_remote_object::<GroupExt>(&endpoint)?;
|
||||||
let outbox_uri = &community.extension.get_outbox().to_string();
|
let outbox_uri = &group.extension.get_outbox().to_string();
|
||||||
|
// TODO: outbox url etc should be stored in local db
|
||||||
let outbox = fetch_remote_object::<OrderedCollection>(outbox_uri)?;
|
let outbox = fetch_remote_object::<OrderedCollection>(outbox_uri)?;
|
||||||
let items = outbox.collection_props.get_many_items_base_boxes();
|
let items = outbox.collection_props.get_many_items_base_boxes();
|
||||||
|
|
||||||
let posts = items
|
Ok(
|
||||||
.unwrap()
|
items
|
||||||
.map(|obox: &BaseBox| {
|
.unwrap()
|
||||||
let page = obox.clone().to_concrete::<Page>().unwrap();
|
.map(|obox: &BaseBox| -> Result<PostForm, Error> {
|
||||||
PostForm::from_page(&page, conn)
|
let page = obox.clone().to_concrete::<Page>()?;
|
||||||
})
|
PostForm::from_page(&page, conn)
|
||||||
.collect::<Result<Vec<PostForm>, Error>>()?;
|
})
|
||||||
Ok(posts)
|
.map(|pf: Result<PostForm, Error>| -> Result<Post, Error> {
|
||||||
|
let mut pf2 = pf?;
|
||||||
|
pf2.community_id = community.id;
|
||||||
|
let existing = Post::read_from_apub_id(conn, &pf2.ap_id);
|
||||||
|
match existing {
|
||||||
|
Err(NotFound {}) => Ok(Post::create(conn, &pf2)?),
|
||||||
|
Ok(p) => Ok(Post::update(conn, p.id, &pf2)?),
|
||||||
|
Err(e) => Err(Error::from(e)),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<Post>, Error>>()?,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Error> {
|
pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Error> {
|
||||||
|
@ -110,25 +121,17 @@ pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Er
|
||||||
// after that, we should rely in the inbox, and fetch on demand when needed
|
// after that, we should rely in the inbox, and fetch on demand when needed
|
||||||
pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> {
|
pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> {
|
||||||
for instance in &get_following_instances() {
|
for instance in &get_following_instances() {
|
||||||
let communities = fetch_communities_from_instance(instance, conn)?;
|
let node_info = fetch_node_info(instance)?;
|
||||||
|
if let Some(community_list_url) = node_info.metadata.community_list_url {
|
||||||
for community in &communities {
|
let communities = fetch_communities_from_instance(&community_list_url, conn)?;
|
||||||
let existing = Community::read_from_actor_id(conn, &community.actor_id);
|
for c in communities {
|
||||||
let community_id = match existing {
|
fetch_remote_community_posts(instance, &c, conn)?;
|
||||||
Err(NotFound {}) => Community::create(conn, community)?.id,
|
|
||||||
Ok(c) => Community::update(conn, c.id, community)?.id,
|
|
||||||
Err(e) => return Err(Error::from(e)),
|
|
||||||
};
|
|
||||||
let mut posts = fetch_remote_community_posts(instance, &community.name, conn)?;
|
|
||||||
for post_ in &mut posts {
|
|
||||||
post_.community_id = community_id;
|
|
||||||
let existing = Post::read_from_apub_id(conn, &post_.ap_id);
|
|
||||||
match existing {
|
|
||||||
Err(NotFound {}) => Post::create(conn, post_)?,
|
|
||||||
Ok(p) => Post::update(conn, p.id, post_)?,
|
|
||||||
Err(e) => return Err(Error::from(e)),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"{} is not a Lemmy instance, federation is not supported",
|
||||||
|
instance
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Reference in a new issue