parent
cb44b14717
commit
b406342a14
4 changed files with 42 additions and 19 deletions
|
@ -14,6 +14,7 @@ use lemmy_apub_lib::{
|
||||||
};
|
};
|
||||||
use lemmy_utils::LemmyError;
|
use lemmy_utils::LemmyError;
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
pub(crate) trait GetCommunity {
|
pub(crate) trait GetCommunity {
|
||||||
|
@ -113,7 +114,15 @@ impl ActivityHandler for AnnounceActivity {
|
||||||
let object_value = serde_json::to_value(&self.object)?;
|
let object_value = serde_json::to_value(&self.object)?;
|
||||||
let object_data: ActivityCommonFields = serde_json::from_value(object_value.to_owned())?;
|
let object_data: ActivityCommonFields = serde_json::from_value(object_value.to_owned())?;
|
||||||
|
|
||||||
insert_activity(&object_data.id, object_value, false, true, context.pool()).await?;
|
let insert =
|
||||||
|
insert_activity(&object_data.id, object_value, false, true, context.pool()).await?;
|
||||||
|
if !insert {
|
||||||
|
debug!(
|
||||||
|
"Received duplicate activity in announce {}",
|
||||||
|
object_data.id.to_string()
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.object.receive(context, request_counter).await
|
self.object.receive(context, request_counter).await
|
||||||
|
|
|
@ -28,7 +28,7 @@ use lemmy_utils::{location_info, LemmyError};
|
||||||
use lemmy_websocket::LemmyContext;
|
use lemmy_websocket::LemmyContext;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{fmt::Debug, io::Read};
|
use std::{fmt::Debug, io::Read};
|
||||||
use tracing::info;
|
use tracing::{debug, info};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
mod comment;
|
mod comment;
|
||||||
|
@ -108,7 +108,15 @@ where
|
||||||
// Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
|
// Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
|
||||||
// if we receive the same activity twice in very quick succession.
|
// if we receive the same activity twice in very quick succession.
|
||||||
let object_value = serde_json::to_value(&activity)?;
|
let object_value = serde_json::to_value(&activity)?;
|
||||||
insert_activity(&activity_data.id, object_value, false, true, context.pool()).await?;
|
let insert =
|
||||||
|
insert_activity(&activity_data.id, object_value, false, true, context.pool()).await?;
|
||||||
|
if !insert {
|
||||||
|
debug!(
|
||||||
|
"Received duplicate activity {}",
|
||||||
|
activity_data.id.to_string()
|
||||||
|
);
|
||||||
|
return Ok(HttpResponse::BadRequest().finish());
|
||||||
|
}
|
||||||
|
|
||||||
info!("Receiving activity {}", activity_data.id.to_string());
|
info!("Receiving activity {}", activity_data.id.to_string());
|
||||||
activity
|
activity
|
||||||
|
|
|
@ -201,11 +201,12 @@ async fn insert_activity(
|
||||||
local: bool,
|
local: bool,
|
||||||
sensitive: bool,
|
sensitive: bool,
|
||||||
pool: &DbPool,
|
pool: &DbPool,
|
||||||
) -> Result<(), LemmyError> {
|
) -> Result<bool, LemmyError> {
|
||||||
let ap_id = ap_id.to_owned().into();
|
let ap_id = ap_id.to_owned().into();
|
||||||
blocking(pool, move |conn| {
|
Ok(
|
||||||
Activity::insert(conn, ap_id, activity, local, sensitive)
|
blocking(pool, move |conn| {
|
||||||
})
|
Activity::insert(conn, ap_id, activity, local, sensitive)
|
||||||
.await??;
|
})
|
||||||
Ok(())
|
.await??,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
use crate::{newtypes::DbUrl, source::activity::*, traits::Crud};
|
use crate::{newtypes::DbUrl, source::activity::*, traits::Crud};
|
||||||
use diesel::{dsl::*, result::Error, *};
|
use diesel::{
|
||||||
|
dsl::*,
|
||||||
|
result::{DatabaseErrorKind, Error},
|
||||||
|
*,
|
||||||
|
};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::{Error as IoError, ErrorKind};
|
|
||||||
|
|
||||||
impl Crud for Activity {
|
impl Crud for Activity {
|
||||||
type Form = ActivityForm;
|
type Form = ActivityForm;
|
||||||
|
@ -35,13 +38,14 @@ impl Crud for Activity {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Activity {
|
impl Activity {
|
||||||
|
/// Returns true if the insert was successful
|
||||||
pub fn insert(
|
pub fn insert(
|
||||||
conn: &PgConnection,
|
conn: &PgConnection,
|
||||||
ap_id: DbUrl,
|
ap_id: DbUrl,
|
||||||
data: Value,
|
data: Value,
|
||||||
local: bool,
|
local: bool,
|
||||||
sensitive: bool,
|
sensitive: bool,
|
||||||
) -> Result<Activity, IoError> {
|
) -> Result<bool, Error> {
|
||||||
let activity_form = ActivityForm {
|
let activity_form = ActivityForm {
|
||||||
ap_id,
|
ap_id,
|
||||||
data,
|
data,
|
||||||
|
@ -49,13 +53,14 @@ impl Activity {
|
||||||
sensitive,
|
sensitive,
|
||||||
updated: None,
|
updated: None,
|
||||||
};
|
};
|
||||||
let result = Activity::create(conn, &activity_form);
|
match Activity::create(conn, &activity_form) {
|
||||||
match result {
|
Ok(_) => Ok(true),
|
||||||
Ok(s) => Ok(s),
|
Err(e) => {
|
||||||
Err(e) => Err(IoError::new(
|
if let Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) = e {
|
||||||
ErrorKind::Other,
|
return Ok(false);
|
||||||
format!("Failed to insert activity into database: {}", e),
|
}
|
||||||
)),
|
Err(e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue