implement ActivitySender actor #89
Loading…
Reference in New Issue
No description provided.
Delete Branch "activity-sender"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
TODO:
@ -0,0 +12,4 @@
// We cant use ActorType here, because it doesnt implement Sized
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
This can just be
#[rtype(result = "()")]
, because we're not gonna wait for the result.@ -0,0 +20,4 @@
}
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
Same.
@ -0,0 +50,4 @@
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
type Result = ();
@ -70,0 +27,4 @@
actor: creator.to_owned(),
to,
};
context.activity_sender().send(message).await??;
send
is only when you want the result,do_send
is when you don't want to wait, which is what we want for these.https://actix.rs/book/actix/sec-3-address.html#message
Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
@ -0,0 +81,4 @@
.header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone();
Box::pin(async move {
Hrm, not sure why this is required.
Because I'm calling some async functions in there and need to await them.
@ -0,0 +66,4 @@
impl Job for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
This is failing to compile, even though it seems identical to the code in ap-relay.
Here's the error message:
![](https://dev.lemmy.ml/pictrs/image/v4kDVr2U99.png)
I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before.
One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this.
Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time.
Also, websocket updates are still not working reliably even after merging main.
Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.
Yep its definitely a race condition between
Comment::upsert()
andfetcher::get_or_fetch_and_insert_comment()
. It happens roughly 50% of the time. Here is an excerpt from the log:So
upsert()
needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post.fetcher::get_or_fetch_and_insert_comment()
(and similar functions) will need to be changed, as a comment could be inserted between the calls toComment::read_from_apub_id()
(line 384) andComment::create()
(line 400). Its probably enough to replaceComment::create()
withComment::upsert()
.Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.
WIP: implement ActivitySender actorto implement ActivitySender actor@ -61,6 +61,7 @@ impl Crud<CommentForm> for Comment {
}
fn create(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
println!("creating {}", &comment_form.ap_id);
I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.