Check for error when fetching link metadata (fixes #5127) (#5129)

* Check for error when fetching link metadata (fixes #5127)

* use error_for_status everywhere

* dont ignore errors

* enable lint

* fixes

* review

* more review
This commit is contained in:
Nutomic 2024-11-15 15:13:43 +01:00 committed by GitHub
parent fa4825b524
commit 797aac7281
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 85 additions and 111 deletions

View file

@ -79,6 +79,7 @@ unused_self = "deny"
unwrap_used = "deny" unwrap_used = "deny"
unimplemented = "deny" unimplemented = "deny"
unused_async = "deny" unused_async = "deny"
map_err_ignore = "deny"
expect_used = "deny" expect_used = "deny"
[workspace.dependencies] [workspace.dependencies]

View file

@ -145,7 +145,7 @@ fn build_totp_2fa(hostname: &str, username: &str, secret: &str) -> LemmyResult<T
let sec = Secret::Raw(secret.as_bytes().to_vec()); let sec = Secret::Raw(secret.as_bytes().to_vec());
let sec_bytes = sec let sec_bytes = sec
.to_bytes() .to_bytes()
.map_err(|_| LemmyErrorType::CouldntParseTotpSecret)?; .with_lemmy_type(LemmyErrorType::CouldntParseTotpSecret)?;
TOTP::new( TOTP::new(
totp_rs::Algorithm::SHA1, totp_rs::Algorithm::SHA1,

View file

@ -37,7 +37,7 @@ pub async fn add_admin(
// Make sure that the person_id added is local // Make sure that the person_id added is local
let added_local_user = LocalUserView::read_person(&mut context.pool(), data.person_id) let added_local_user = LocalUserView::read_person(&mut context.pool(), data.person_id)
.await .await
.map_err(|_| LemmyErrorType::ObjectNotLocal)?; .with_lemmy_type(LemmyErrorType::ObjectNotLocal)?;
LocalUser::update( LocalUser::update(
&mut context.pool(), &mut context.pool(),

View file

@ -6,7 +6,7 @@ use lemmy_api_common::{
SuccessResponse, SuccessResponse,
}; };
use lemmy_db_views::structs::{LocalUserView, SiteView}; use lemmy_db_views::structs::{LocalUserView, SiteView};
use lemmy_utils::error::{LemmyErrorType, LemmyResult}; use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
#[tracing::instrument(skip(context))] #[tracing::instrument(skip(context))]
pub async fn reset_password( pub async fn reset_password(
@ -17,7 +17,7 @@ pub async fn reset_password(
let email = data.email.to_lowercase(); let email = data.email.to_lowercase();
let local_user_view = LocalUserView::find_by_email(&mut context.pool(), &email) let local_user_view = LocalUserView::find_by_email(&mut context.pool(), &email)
.await .await
.map_err(|_| LemmyErrorType::IncorrectLogin)?; .with_lemmy_type(LemmyErrorType::IncorrectLogin)?;
let site_view = SiteView::read_local(&mut context.pool()).await?; let site_view = SiteView::read_local(&mut context.pool()).await?;
check_email_verified(&local_user_view, &site_view)?; check_email_verified(&local_user_view, &site_view)?;

View file

@ -18,7 +18,7 @@ use lemmy_db_schema::{
}, },
}; };
use lemmy_utils::{ use lemmy_utils::{
error::{LemmyError, LemmyErrorType, LemmyResult}, error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult},
settings::structs::{PictrsImageMode, Settings}, settings::structs::{PictrsImageMode, Settings},
REQWEST_TIMEOUT, REQWEST_TIMEOUT,
VERSION, VERSION,
@ -61,7 +61,8 @@ pub async fn fetch_link_metadata(url: &Url, context: &LemmyContext) -> LemmyResu
// server may ignore this and still respond with the full response // server may ignore this and still respond with the full response
.header(RANGE, format!("bytes=0-{}", bytes_to_fetch - 1)) /* -1 because inclusive */ .header(RANGE, format!("bytes=0-{}", bytes_to_fetch - 1)) /* -1 because inclusive */
.send() .send()
.await?; .await?
.error_for_status()?;
let content_type: Option<Mime> = response let content_type: Option<Mime> = response
.headers() .headers()
@ -308,7 +309,8 @@ pub async fn purge_image_from_pictrs(image_url: &Url, context: &LemmyContext) ->
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.header("x-api-token", pictrs_api_key) .header("x-api-token", pictrs_api_key)
.send() .send()
.await?; .await?
.error_for_status()?;
let response: PictrsPurgeResponse = response.json().await.map_err(LemmyError::from)?; let response: PictrsPurgeResponse = response.json().await.map_err(LemmyError::from)?;
@ -333,8 +335,8 @@ pub async fn delete_image_from_pictrs(
.delete(&url) .delete(&url)
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.send() .send()
.await .await?
.map_err(LemmyError::from)?; .error_for_status()?;
Ok(()) Ok(())
} }
@ -366,6 +368,7 @@ async fn generate_pictrs_thumbnail(image_url: &Url, context: &LemmyContext) -> L
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.send() .send()
.await? .await?
.error_for_status()?
.json::<PictrsResponse>() .json::<PictrsResponse>()
.await?; .await?;
@ -406,16 +409,14 @@ pub async fn fetch_pictrs_proxied_image_details(
// Pictrs needs you to fetch the proxied image before you can fetch the details // Pictrs needs you to fetch the proxied image before you can fetch the details
let proxy_url = format!("{pictrs_url}image/original?proxy={encoded_image_url}"); let proxy_url = format!("{pictrs_url}image/original?proxy={encoded_image_url}");
let res = context context
.client() .client()
.get(&proxy_url) .get(&proxy_url)
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.send() .send()
.await? .await?
.status(); .error_for_status()
if !res.is_success() { .with_lemmy_type(LemmyErrorType::NotAnImageType)?;
Err(LemmyErrorType::NotAnImageType)?
}
let details_url = format!("{pictrs_url}image/details/original?proxy={encoded_image_url}"); let details_url = format!("{pictrs_url}image/details/original?proxy={encoded_image_url}");
@ -425,6 +426,7 @@ pub async fn fetch_pictrs_proxied_image_details(
.timeout(REQWEST_TIMEOUT) .timeout(REQWEST_TIMEOUT)
.send() .send()
.await? .await?
.error_for_status()?
.json() .json()
.await?; .await?;

View file

@ -305,7 +305,7 @@ pub async fn authenticate_with_oauth(
OAuthAccount::create(&mut context.pool(), &oauth_account_form) OAuthAccount::create(&mut context.pool(), &oauth_account_form)
.await .await
.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
local_user = user_view.local_user.clone(); local_user = user_view.local_user.clone();
} else { } else {
@ -366,7 +366,7 @@ pub async fn authenticate_with_oauth(
OAuthAccount::create(&mut context.pool(), &oauth_account_form) OAuthAccount::create(&mut context.pool(), &oauth_account_form)
.await .await
.map_err(|_| LemmyErrorType::IncorrectLogin)?; .with_lemmy_type(LemmyErrorType::IncorrectLogin)?;
// prevent sign in until application is accepted // prevent sign in until application is accepted
if local_site.site_setup if local_site.site_setup
@ -527,18 +527,16 @@ async fn oauth_request_access_token(
("client_secret", &oauth_provider.client_secret), ("client_secret", &oauth_provider.client_secret),
]) ])
.send() .send()
.await; .await
.with_lemmy_type(LemmyErrorType::OauthLoginFailed)?
let response = response.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .error_for_status()
if !response.status().is_success() { .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
Err(LemmyErrorType::OauthLoginFailed)?;
}
// Extract the access token // Extract the access token
let token_response = response let token_response = response
.json::<TokenResponse>() .json::<TokenResponse>()
.await .await
.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
Ok(token_response) Ok(token_response)
} }
@ -555,18 +553,16 @@ async fn oidc_get_user_info(
.header("Accept", "application/json") .header("Accept", "application/json")
.bearer_auth(access_token) .bearer_auth(access_token)
.send() .send()
.await; .await
.with_lemmy_type(LemmyErrorType::OauthLoginFailed)?
let response = response.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .error_for_status()
if !response.status().is_success() { .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
Err(LemmyErrorType::OauthLoginFailed)?;
}
// Extract the OAUTH user_id claim from the returned user_info // Extract the OAUTH user_id claim from the returned user_info
let user_info = response let user_info = response
.json::<serde_json::Value>() .json::<serde_json::Value>()
.await .await
.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
Ok(user_info) Ok(user_info)
} }
@ -574,7 +570,7 @@ async fn oidc_get_user_info(
fn read_user_info(user_info: &serde_json::Value, key: &str) -> LemmyResult<String> { fn read_user_info(user_info: &serde_json::Value, key: &str) -> LemmyResult<String> {
if let Some(value) = user_info.get(key) { if let Some(value) = user_info.get(key) {
let result = serde_json::from_value::<String>(value.clone()) let result = serde_json::from_value::<String>(value.clone())
.map_err(|_| LemmyErrorType::OauthLoginFailed)?; .with_lemmy_type(LemmyErrorType::OauthLoginFailed)?;
return Ok(result); return Ok(result);
} }
Err(LemmyErrorType::OauthLoginFailed)? Err(LemmyErrorType::OauthLoginFailed)?

View file

@ -70,7 +70,8 @@ impl Report {
let object_creator = Person::read(&mut context.pool(), object_creator_id).await?; let object_creator = Person::read(&mut context.pool(), object_creator_id).await?;
let object_creator_site: Option<ApubSite> = let object_creator_site: Option<ApubSite> =
Site::read_from_instance_id(&mut context.pool(), object_creator.instance_id) Site::read_from_instance_id(&mut context.pool(), object_creator.instance_id)
.await? .await
.ok()
.map(Into::into); .map(Into::into);
if let Some(inbox) = object_creator_site.map(|s| s.shared_inbox_or_inbox()) { if let Some(inbox) = object_creator_site.map(|s| s.shared_inbox_or_inbox()) {
inboxes.add_inbox(inbox); inboxes.add_inbox(inbox);

View file

@ -18,7 +18,7 @@ use lemmy_db_schema::{
CommunityVisibility, CommunityVisibility,
}; };
use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_db_views_actor::structs::CommunityFollowerView;
use lemmy_utils::error::{FederationError, LemmyErrorType, LemmyResult}; use lemmy_utils::error::{FederationError, LemmyErrorExt, LemmyErrorType, LemmyResult};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ops::Deref, time::Duration}; use std::{ops::Deref, time::Duration};
use tokio::time::timeout; use tokio::time::timeout;
@ -46,7 +46,7 @@ pub async fn shared_inbox(
// consider the activity broken and move on. // consider the activity broken and move on.
timeout(INCOMING_ACTIVITY_TIMEOUT, receive_fut) timeout(INCOMING_ACTIVITY_TIMEOUT, receive_fut)
.await .await
.map_err(|_| FederationError::InboxTimeout)? .with_lemmy_type(FederationError::InboxTimeout.into())?
} }
/// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub
@ -109,7 +109,7 @@ pub(crate) async fn get_activity(
.into(); .into();
let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id) let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id)
.await .await
.map_err(|_| FederationError::CouldntFindActivity)?; .with_lemmy_type(FederationError::CouldntFindActivity.into())?;
let sensitive = activity.sensitive; let sensitive = activity.sensitive;
if sensitive { if sensitive {

View file

@ -10,7 +10,7 @@ use crate::{
}; };
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, OptionalExtension, QueryDsl}; use diesel::{dsl::insert_into, result::Error, ExpressionMethods, OptionalExtension, QueryDsl};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use lemmy_utils::error::{LemmyErrorType, LemmyResult}; use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
use url::Url; use url::Url;
#[async_trait] #[async_trait]
@ -65,13 +65,13 @@ impl Site {
pub async fn read_from_instance_id( pub async fn read_from_instance_id(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
_instance_id: InstanceId, _instance_id: InstanceId,
) -> Result<Option<Self>, Error> { ) -> LemmyResult<Self> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
site::table site::table
.filter(site::instance_id.eq(_instance_id)) .filter(site::instance_id.eq(_instance_id))
.first(conn) .first(conn)
.await .await
.optional() .with_lemmy_type(LemmyErrorType::NotFound)
} }
pub async fn read_from_apub_id( pub async fn read_from_apub_id(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,

View file

@ -21,7 +21,7 @@ use lemmy_db_schema::{
CommunityVisibility, CommunityVisibility,
SubscribedType, SubscribedType,
}; };
use lemmy_utils::error::{LemmyErrorType, LemmyResult}; use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
impl CommunityFollowerView { impl CommunityFollowerView {
/// return a list of local community ids and remote inboxes that at least one user of the given /// return a list of local community ids and remote inboxes that at least one user of the given
@ -30,7 +30,7 @@ impl CommunityFollowerView {
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
instance_id: InstanceId, instance_id: InstanceId,
published_since: chrono::DateTime<Utc>, published_since: chrono::DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, Error> { ) -> LemmyResult<Vec<(CommunityId, DbUrl)>> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
// In most cases this will fetch the same url many times (the shared inbox url) // In most cases this will fetch the same url many times (the shared inbox url)
// PG will only send a single copy to rust, but it has to scan through all follower rows (same // PG will only send a single copy to rust, but it has to scan through all follower rows (same
@ -51,6 +51,7 @@ impl CommunityFollowerView {
.distinct() // only need each community_id, inbox combination once .distinct() // only need each community_id, inbox combination once
.load::<(CommunityId, DbUrl)>(conn) .load::<(CommunityId, DbUrl)>(conn)
.await .await
.with_lemmy_type(LemmyErrorType::NotFound)
} }
pub async fn get_community_follower_inboxes( pub async fn get_community_follower_inboxes(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,

View file

@ -1,5 +1,4 @@
use crate::util::LEMMY_TEST_FAST_FEDERATION; use crate::util::LEMMY_TEST_FAST_FEDERATION;
use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use lemmy_db_schema::{ use lemmy_db_schema::{
@ -8,6 +7,7 @@ use lemmy_db_schema::{
utils::{ActualDbPool, DbPool}, utils::{ActualDbPool, DbPool},
}; };
use lemmy_db_views_actor::structs::CommunityFollowerView; use lemmy_db_views_actor::structs::CommunityFollowerView;
use lemmy_utils::error::LemmyResult;
use reqwest::Url; use reqwest::Url;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -40,15 +40,12 @@ static FOLLOW_REMOVALS_RECHECK_DELAY: LazyLock<chrono::TimeDelta> =
#[async_trait] #[async_trait]
pub trait DataSource: Send + Sync { pub trait DataSource: Send + Sync {
async fn read_site_from_instance_id( async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> LemmyResult<Site>;
&self,
instance_id: InstanceId,
) -> Result<Option<Site>, diesel::result::Error>;
async fn get_instance_followed_community_inboxes( async fn get_instance_followed_community_inboxes(
&self, &self,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>; ) -> LemmyResult<Vec<(CommunityId, DbUrl)>>;
} }
pub struct DbDataSource { pub struct DbDataSource {
pool: ActualDbPool, pool: ActualDbPool,
@ -62,10 +59,7 @@ impl DbDataSource {
#[async_trait] #[async_trait]
impl DataSource for DbDataSource { impl DataSource for DbDataSource {
async fn read_site_from_instance_id( async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> LemmyResult<Site> {
&self,
instance_id: InstanceId,
) -> Result<Option<Site>, diesel::result::Error> {
Site::read_from_instance_id(&mut DbPool::Pool(&self.pool), instance_id).await Site::read_from_instance_id(&mut DbPool::Pool(&self.pool), instance_id).await
} }
@ -73,7 +67,7 @@ impl DataSource for DbDataSource {
&self, &self,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error> { ) -> LemmyResult<Vec<(CommunityId, DbUrl)>> {
CommunityFollowerView::get_instance_followed_community_inboxes( CommunityFollowerView::get_instance_followed_community_inboxes(
&mut DbPool::Pool(&self.pool), &mut DbPool::Pool(&self.pool),
instance_id, instance_id,
@ -128,7 +122,7 @@ impl<T: DataSource> CommunityInboxCollector<T> {
/// most often this will return 0 values (if instance doesn't care about the activity) /// most often this will return 0 values (if instance doesn't care about the activity)
/// or 1 value (the shared inbox) /// or 1 value (the shared inbox)
/// > 1 values only happens for non-lemmy software /// > 1 values only happens for non-lemmy software
pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result<Vec<Url>> { pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> LemmyResult<Vec<Url>> {
let mut inbox_urls: HashSet<Url> = HashSet::new(); let mut inbox_urls: HashSet<Url> = HashSet::new();
if activity.send_all_instances { if activity.send_all_instances {
@ -136,7 +130,8 @@ impl<T: DataSource> CommunityInboxCollector<T> {
self.site = self self.site = self
.data_source .data_source
.read_site_from_instance_id(self.instance_id) .read_site_from_instance_id(self.instance_id)
.await?; .await
.ok();
self.site_loaded = true; self.site_loaded = true;
} }
if let Some(site) = &self.site { if let Some(site) = &self.site {
@ -170,7 +165,7 @@ impl<T: DataSource> CommunityInboxCollector<T> {
Ok(inbox_urls.into_iter().collect()) Ok(inbox_urls.into_iter().collect())
} }
pub async fn update_communities(&mut self) -> Result<()> { pub async fn update_communities(&mut self) -> LemmyResult<()> {
if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
tracing::debug!("{}: fetching full list of communities", self.domain); tracing::debug!("{}: fetching full list of communities", self.domain);
// process removals every hour // process removals every hour
@ -203,7 +198,7 @@ impl<T: DataSource> CommunityInboxCollector<T> {
&mut self, &mut self,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> { ) -> LemmyResult<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
// update to time before fetch to ensure overlap. subtract some time to ensure overlap even if // update to time before fetch to ensure overlap. subtract some time to ensure overlap even if
// published date is not exact // published date is not exact
let new_last_fetch = Utc::now() - *FOLLOW_ADDITIONS_RECHECK_DELAY / 2; let new_last_fetch = Utc::now() - *FOLLOW_ADDITIONS_RECHECK_DELAY / 2;
@ -238,12 +233,12 @@ mod tests {
DataSource {} DataSource {}
#[async_trait] #[async_trait]
impl DataSource for DataSource { impl DataSource for DataSource {
async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> Result<Option<Site>, diesel::result::Error>; async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> LemmyResult<Site>;
async fn get_instance_followed_community_inboxes( async fn get_instance_followed_community_inboxes(
&self, &self,
instance_id: InstanceId, instance_id: InstanceId,
last_fetch: DateTime<Utc>, last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>; ) -> LemmyResult<Vec<(CommunityId, DbUrl)>>;
} }
} }
@ -301,7 +296,7 @@ mod tests {
collector collector
.data_source .data_source
.expect_read_site_from_instance_id() .expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site))); .return_once(move |_| Ok(site));
let activity = SentActivity { let activity = SentActivity {
id: ActivityId(1), id: ActivityId(1),
@ -335,14 +330,8 @@ mod tests {
.expect_get_instance_followed_community_inboxes() .expect_get_instance_followed_community_inboxes()
.return_once(move |_, _| { .return_once(move |_, _| {
Ok(vec![ Ok(vec![
( (community_id, Url::parse(url1)?.into()),
community_id, (community_id, Url::parse(url2)?.into()),
Url::parse(url1).map_err(|_| diesel::NotFound)?.into(),
),
(
community_id,
Url::parse(url2).map_err(|_| diesel::NotFound)?.into(),
),
]) ])
}); });
@ -430,20 +419,13 @@ mod tests {
collector collector
.data_source .data_source
.expect_read_site_from_instance_id() .expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site))); .return_once(move |_| Ok(site));
let subdomain_inbox = "https://follower.example.com/inbox"; let subdomain_inbox = "https://follower.example.com/inbox";
collector collector
.data_source .data_source
.expect_get_instance_followed_community_inboxes() .expect_get_instance_followed_community_inboxes()
.return_once(move |_, _| { .return_once(move |_, _| Ok(vec![(community_id, Url::parse(subdomain_inbox)?.into())]));
Ok(vec![(
community_id,
Url::parse(subdomain_inbox)
.map_err(|_| diesel::NotFound)?
.into(),
)])
});
collector.update_communities().await?; collector.update_communities().await?;
let user1_inbox = Url::parse("https://example.com/user1/inbox")?; let user1_inbox = Url::parse("https://example.com/user1/inbox")?;
@ -496,26 +478,11 @@ mod tests {
.returning(move |_, last_fetch| { .returning(move |_, last_fetch| {
if last_fetch == Utc.timestamp_nanos(0) { if last_fetch == Utc.timestamp_nanos(0) {
Ok(vec![ Ok(vec![
( (community_id1, Url::parse(user1_inbox_str)?.into()),
community_id1, (community_id2, Url::parse(user2_inbox_str)?.into()),
Url::parse(user1_inbox_str)
.map_err(|_| diesel::NotFound)?
.into(),
),
(
community_id2,
Url::parse(user2_inbox_str)
.map_err(|_| diesel::NotFound)?
.into(),
),
]) ])
} else { } else {
Ok(vec![( Ok(vec![(community_id3, Url::parse(user3_inbox_str)?.into())])
community_id3,
Url::parse(user3_inbox_str)
.map_err(|_| diesel::NotFound)?
.into(),
)])
} }
}); });
@ -569,7 +536,7 @@ mod tests {
collector collector
.data_source .data_source
.expect_read_site_from_instance_id() .expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site))); .return_once(move |_| Ok(site));
collector collector
.data_source .data_source

View file

@ -24,6 +24,7 @@ use lemmy_db_schema::{
}, },
utils::{ActualDbPool, DbPool}, utils::{ActualDbPool, DbPool},
}; };
use lemmy_utils::error::LemmyResult;
use std::{collections::BinaryHeap, ops::Add, time::Duration}; use std::{collections::BinaryHeap, ops::Add, time::Duration};
use tokio::{ use tokio::{
sync::mpsc::{self, UnboundedSender}, sync::mpsc::{self, UnboundedSender},
@ -86,7 +87,7 @@ impl InstanceWorker {
federation_worker_config: FederationWorkerConfig, federation_worker_config: FederationWorkerConfig,
stop: CancellationToken, stop: CancellationToken,
stats_sender: UnboundedSender<FederationQueueStateWithDomain>, stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
) -> Result<(), anyhow::Error> { ) -> LemmyResult<()> {
let pool = config.to_request_data().inner_pool().clone(); let pool = config.to_request_data().inner_pool().clone();
let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?; let state = FederationQueueState::load(&mut DbPool::Pool(&pool), instance.id).await?;
let (report_send_result, receive_send_result) = let (report_send_result, receive_send_result) =
@ -116,7 +117,7 @@ impl InstanceWorker {
/// loop fetch new activities from db and send them to the inboxes of the given instances /// loop fetch new activities from db and send them to the inboxes of the given instances
/// this worker only returns if (a) there is an internal error or (b) the cancellation token is /// this worker only returns if (a) there is an internal error or (b) the cancellation token is
/// cancelled (graceful exit) /// cancelled (graceful exit)
async fn loop_until_stopped(&mut self) -> Result<()> { async fn loop_until_stopped(&mut self) -> LemmyResult<()> {
self.initial_fail_sleep().await?; self.initial_fail_sleep().await?;
let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?; let (mut last_sent_id, mut newest_id) = self.get_latest_ids().await?;
@ -149,12 +150,15 @@ impl InstanceWorker {
}); });
// compare to next id based on incrementing // compare to next id based on incrementing
if expected_next_id != Some(next_id_to_send.0) { if expected_next_id != Some(next_id_to_send.0) {
anyhow::bail!( return Err(
anyhow::anyhow!(
"{}: next id to send is not as expected: {:?} != {:?}", "{}: next id to send is not as expected: {:?} != {:?}",
self.instance.domain, self.instance.domain,
expected_next_id, expected_next_id,
next_id_to_send next_id_to_send
) )
.into(),
);
} }
} }
@ -341,7 +345,7 @@ impl InstanceWorker {
/// we collect the relevant inboxes in the main instance worker task, and only spawn the send task /// we collect the relevant inboxes in the main instance worker task, and only spawn the send task
/// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many) /// if we have inboxes to send to this limits CPU usage and reduces overhead for the (many)
/// cases where we don't have any inboxes /// cases where we don't have any inboxes
async fn spawn_send_if_needed(&mut self, activity_id: ActivityId) -> Result<()> { async fn spawn_send_if_needed(&mut self, activity_id: ActivityId) -> LemmyResult<()> {
let Some(ele) = get_activity_cached(&mut self.pool(), activity_id) let Some(ele) = get_activity_cached(&mut self.pool(), activity_id)
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
@ -357,11 +361,7 @@ impl InstanceWorker {
return Ok(()); return Ok(());
}; };
let activity = &ele.0; let activity = &ele.0;
let inbox_urls = self let inbox_urls = self.inbox_collector.get_inbox_urls(activity).await?;
.inbox_collector
.get_inbox_urls(activity)
.await
.context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() { if inbox_urls.is_empty() {
// this is the case when the activity is not relevant to this receiving instance (e.g. no user // this is the case when the activity is not relevant to this receiving instance (e.g. no user
// subscribed to the relevant community) // subscribed to the relevant community)

View file

@ -278,6 +278,12 @@ cfg_if! {
} }
} }
impl From<FederationError> for LemmyErrorType {
fn from(error: FederationError) -> Self {
LemmyErrorType::FederationError { error: Some(error) }
}
}
pub trait LemmyErrorExt<T, E: Into<anyhow::Error>> { pub trait LemmyErrorExt<T, E: Into<anyhow::Error>> {
fn with_lemmy_type(self, error_type: LemmyErrorType) -> LemmyResult<T>; fn with_lemmy_type(self, error_type: LemmyErrorType) -> LemmyResult<T>;
} }

View file

@ -329,7 +329,7 @@ pub fn build_url_str_without_scheme(url_str: &str) -> LemmyResult<String> {
// Set the scheme to http, then remove the http:// part // Set the scheme to http, then remove the http:// part
url url
.set_scheme("http") .set_scheme("http")
.map_err(|_| LemmyErrorType::InvalidUrl)?; .map_err(|_e| LemmyErrorType::InvalidUrl)?;
let mut out = url let mut out = url
.to_string() .to_string()

View file

@ -178,7 +178,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
.set(Box::new(move |d, c| { .set(Box::new(move |d, c| {
Box::pin(match_outgoing_activities(d, c)) Box::pin(match_outgoing_activities(d, c))
})) }))
.map_err(|_| LemmyErrorType::Unknown("couldnt set function pointer".into()))?; .map_err(|_e| LemmyErrorType::Unknown("couldnt set function pointer".into()))?;
let request_data = federation_config.to_request_data(); let request_data = federation_config.to_request_data();
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities( let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(

View file

@ -17,7 +17,7 @@ pub async fn main() -> LemmyResult<()> {
rustls::crypto::ring::default_provider() rustls::crypto::ring::default_provider()
.install_default() .install_default()
.map_err(|_| LemmyErrorType::Unknown("Failed to install rustls crypto provider".into()))?; .map_err(|_e| LemmyErrorType::Unknown("Failed to install rustls crypto provider".into()))?;
start_lemmy_server(args).await?; start_lemmy_server(args).await?;
Ok(()) Ok(())