use crate::{traits::ApubObject, APUB_JSON_CONTENT_TYPE}; use activitystreams::chrono::{Duration as ChronoDuration, NaiveDateTime, Utc}; use anyhow::anyhow; use diesel::NotFound; use lemmy_utils::{ request::{build_user_agent, retry}, settings::structs::Settings, LemmyError, }; use log::info; use reqwest::{Client, StatusCode}; use serde::{Deserialize, Serialize}; use std::{ fmt::{Debug, Display, Formatter}, marker::PhantomData, time::Duration, }; use url::Url; /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object /// fetch through the search). This should be configurable. static REQUEST_LIMIT: i32 = 25; lazy_static! { static ref CLIENT: Client = Client::builder() .user_agent(build_user_agent(&Settings::get())) .build() .unwrap(); } /// We store Url on the heap because it is quite large (88 bytes). #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] #[serde(transparent)] pub struct ObjectId(Box, #[serde(skip)] PhantomData) where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>; impl ObjectId where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { pub fn new(url: T) -> Self where T: Into, { ObjectId(Box::new(url.into()), PhantomData::) } pub fn inner(&self) -> &Url { &self.0 } /// Fetches an activitypub object, either from local database (if possible), or over http. pub async fn dereference( &self, data: &::DataType, request_counter: &mut i32, ) -> Result { let db_object = self.dereference_from_db(data).await?; // if its a local object, only fetch it from the database and not over http if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) { return match db_object { None => Err(NotFound {}.into()), Some(o) => Ok(o), }; } // object found in database if let Some(object) = db_object { // object is old and should be refetched if let Some(last_refreshed_at) = object.last_refreshed_at() { if should_refetch_object(last_refreshed_at) { return self .dereference_from_http(data, request_counter, Some(object)) .await; } } Ok(object) } // object not found, need to fetch over http else { self .dereference_from_http(data, request_counter, None) .await } } /// Fetch an object from the local db. Instead of falling back to http, this throws an error if /// the object is not found in the database. pub async fn dereference_local( &self, data: &::DataType, ) -> Result { let object = self.dereference_from_db(data).await?; object.ok_or_else(|| anyhow!("object not found in database {}", self).into()) } /// returning none means the object was not found in local db async fn dereference_from_db( &self, data: &::DataType, ) -> Result, LemmyError> { let id = self.0.clone(); ApubObject::read_from_apub_id(*id, data).await } async fn dereference_from_http( &self, data: &::DataType, request_counter: &mut i32, db_object: Option, ) -> Result { // dont fetch local objects this way debug_assert!(self.0.domain() != Some(&Settings::get().hostname)); info!("Fetching remote object {}", self.to_string()); *request_counter += 1; if *request_counter > REQUEST_LIMIT { return Err(LemmyError::from(anyhow!("Request limit reached"))); } let res = retry(|| { CLIENT .get(self.0.as_str()) .header("Accept", APUB_JSON_CONTENT_TYPE) .timeout(Duration::from_secs(60)) .send() }) .await?; if res.status() == StatusCode::GONE { if let Some(db_object) = db_object { db_object.delete(data).await?; } return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); } let res2: Kind::ApubType = res.json().await?; Kind::verify(&res2, self.inner(), data, request_counter).await?; Ok(Kind::from_apub(res2, data, request_counter).await?) } } static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; /// Determines when a remote actor should be refetched from its instance. In release builds, this is /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`. /// /// TODO it won't pick up new avatars, summaries etc until a day after. /// Actors need an "update" activity pushed to other servers to fix this. fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { let update_interval = if cfg!(debug_assertions) { // avoid infinite loop when fetching community outbox ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) } else { ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS) }; let refresh_limit = Utc::now().naive_utc() - update_interval; last_refreshed.lt(&refresh_limit) } impl Display for ObjectId where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { #[allow(clippy::to_string_in_display)] fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { // Use to_string here because Url.display is not useful for us write!(f, "{}", self.0.to_string()) } } impl From> for Url where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { *id.0 } } #[cfg(test)] mod tests { use super::*; use crate::object_id::should_refetch_object; #[test] fn test_should_refetch_object() { let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1); assert!(!should_refetch_object(one_second_ago)); let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2); assert!(should_refetch_object(two_days_ago)); } }