implement ActivitySender actor #89

Merged
dessalines merged 28 commits from activity-sender into main 2020-08-31 13:48:03 +00:00
Owner

TODO:

  • fix federation tests
  • proper error handling without unwrap
  • retry if the destination server is unreachable
  • persistent storage of outgoing activities, so that lemmy can retry sending after a restart (probably leave this for later)
TODO: - [x] fix federation tests - [x] proper error handling without unwrap - [x] retry if the destination server is unreachable - [ ] persistent storage of outgoing activities, so that lemmy can retry sending after a restart (probably leave this for later)
dessalines reviewed 2020-08-18 20:11:43 +00:00
@ -0,0 +12,4 @@
// We cant use ActorType here, because it doesnt implement Sized
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
Owner

This can just be #[rtype(result = "()")], because we're not gonna wait for the result.

This can just be `#[rtype(result = "()")]`, because we're not gonna wait for the result.
nutomic marked this conversation as resolved
dessalines reviewed 2020-08-18 20:12:18 +00:00
@ -0,0 +20,4 @@
}
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
Owner

Same.

Same.
nutomic marked this conversation as resolved
dessalines reviewed 2020-08-18 20:12:55 +00:00
@ -0,0 +50,4 @@
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
Owner

type Result = ();

type Result = ();
nutomic marked this conversation as resolved
dessalines reviewed 2020-08-18 20:14:49 +00:00
@ -70,0 +27,4 @@
actor: creator.to_owned(),
to,
};
context.activity_sender().send(message).await??;
Owner

send is only when you want the result, do_send is when you don't want to wait, which is what we want for these.

https://actix.rs/book/actix/sec-3-address.html#message

`send` is only when you want the result, `do_send` is when you don't want to wait, which is what we want for these. https://actix.rs/book/actix/sec-3-address.html#message
Author
Owner

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
dessalines reviewed 2020-08-18 20:19:28 +00:00
@ -0,0 +81,4 @@
.header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone();
Box::pin(async move {
Owner

Hrm, not sure why this is required.

Hrm, not sure why this is required.
Author
Owner

Because I'm calling some async functions in there and need to await them.

Because I'm calling some async functions in there and need to await them.
nutomic reviewed 2020-08-20 16:57:33 +00:00
@ -0,0 +66,4 @@
impl Job for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
Author
Owner

This is failing to compile, even though it seems identical to the code in ap-relay.

Here's the error message:

This is failing to compile, even though it seems identical to the code in [ap-relay](https://git.asonix.dog/asonix/ap-relay/src/branch/main/src/jobs/deliver.rs). Here's the error message: ![](https://dev.lemmy.ml/pictrs/image/v4kDVr2U99.png)
Author
Owner

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before.

One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this.

Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time.

Also, websocket updates are still not working reliably even after merging main.

Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before. One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this. Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time. Also, websocket updates are still not working reliably even after merging main. Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.
Author
Owner

Yep its definitely a race condition between Comment::upsert() and fetcher::get_or_fetch_and_insert_comment(). It happens roughly 50% of the time. Here is an excerpt from the log:

lemmy-beta_1        | Comment::upsert() (entered into function)
lemmy-beta_1        | Comment::upsert() (checked if comment exists)
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12
lemmy-alpha_1       | [2020-08-24T13:45:04Z INFO  actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | Comment::upsert() (ran the match statement)

So upsert() needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post.

fetcher::get_or_fetch_and_insert_comment() (and similar functions) will need to be changed, as a comment could be inserted between the calls to Comment::read_from_apub_id() (line 384) and Comment::create() (line 400). Its probably enough to replace Comment::create() with Comment::upsert().

Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.

Yep its definitely a race condition between `Comment::upsert()` and `fetcher::get_or_fetch_and_insert_comment()`. It happens roughly 50% of the time. Here is an excerpt from the log: ``` lemmy-beta_1 | Comment::upsert() (entered into function) lemmy-beta_1 | Comment::upsert() (checked if comment exists) lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12 lemmy-alpha_1 | [2020-08-24T13:45:04Z INFO actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361 lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | Comment::upsert() (ran the match statement) ``` So `upsert()` needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post. `fetcher::get_or_fetch_and_insert_comment()` (and similar functions) will need to be changed, as a comment could be inserted between the calls to `Comment::read_from_apub_id()` (line 384) and `Comment::create()` (line 400). Its probably enough to replace `Comment::create()` with `Comment::upsert()`. Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.
nutomic changed title from WIP: implement ActivitySender actor to implement ActivitySender actor 2020-08-28 17:47:17 +00:00
nutomic reviewed 2020-08-31 12:32:47 +00:00
@ -61,6 +61,7 @@ impl Crud<CommentForm> for Comment {
}
fn create(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
println!("creating {}", &comment_form.ap_id);
Author
Owner

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.
dessalines merged commit d4dccd17ae into main 2020-08-31 13:48:03 +00:00
Sign in to join this conversation.
No reviewers
No Label
No Milestone
No Assignees
2 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: LemmyNet/lemmy#89
No description provided.