implement ActivitySender actor #89

Merged
dessalines merged 28 commits from activity-sender into main 2020-08-31 13:48:03 +00:00
46 changed files with 869 additions and 352 deletions

198
server/Cargo.lock generated vendored
View File

@ -490,6 +490,56 @@ dependencies = [
"serde_urlencoded",
]
[[package]]
name = "background-jobs"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7"
dependencies = [
"background-jobs-actix",
"background-jobs-core",
]
[[package]]
name = "background-jobs-actix"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"chrono",
"log",
"num_cpus",
"rand 0.7.3",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "background-jobs-core"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"chrono",
"futures",
"log",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "backtrace"
version = "0.3.50"
@ -1501,6 +1551,16 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-signature-normalization"
version = "0.5.2"
@ -1544,6 +1604,43 @@ dependencies = [
"quick-error",
]
[[package]]
name = "hyper"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"itoa",
"pin-project",
"socket2",
"time 0.1.43",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-tls"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@ -1618,9 +1715,15 @@ dependencies = [
"socket2",
"widestring",
"winapi 0.3.9",
"winreg",
"winreg 0.6.2",
]
[[package]]
name = "ipnet"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]]
name = "itertools"
version = "0.9.0"
@ -1723,6 +1826,7 @@ dependencies = [
"anyhow",
"async-trait",
"awc",
"background-jobs",
"base64 0.12.3",
"bcrypt",
"captcha",
@ -1743,6 +1847,7 @@ dependencies = [
"openssl",
"percent-encoding",
"rand 0.7.3",
"reqwest",
"rss",
"serde 1.0.114",
"serde_json",
@ -2669,6 +2774,42 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "reqwest"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e"
dependencies = [
"base64 0.12.3",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
"serde 1.0.114",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tls",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg 0.7.0",
]
[[package]]
name = "resolv-conf"
version = "0.6.3"
@ -3261,6 +3402,16 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.2.0"
@ -3290,6 +3441,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
[[package]]
name = "tracing"
version = "0.1.18"
@ -3350,6 +3507,12 @@ dependencies = [
"trust-dns-proto",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "twoway"
version = "0.2.1"
@ -3528,6 +3691,16 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -3541,6 +3714,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0563a9a4b071746dd5aedbc3a28c6fe9be4586fb3fbadb67c400d4f53c6b16c"
dependencies = [
"cfg-if",
"serde 1.0.114",
"serde_json",
"wasm-bindgen-macro",
]
@ -3559,6 +3734,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95f8d235a77f880bcef268d379810ea6c0af2eacfa90b1ad5af731776e0c4699"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.67"
@ -3675,6 +3862,15 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "winreg"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "winutil"
version = "0.1.1"

2
server/Cargo.toml vendored
View File

@ -53,3 +53,5 @@ async-trait = "0.1.36"
captcha = "0.0.7"
anyhow = "1.0.32"
thiserror = "1.0.20"
background-jobs = " 0.8.0-alpha.2"
reqwest = { version = "0.10", features = ["json"] }

View File

@ -113,7 +113,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_862362".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

View File

@ -39,13 +39,13 @@ pub struct CommentForm {
pub published: Option<chrono::NaiveDateTime>,
pub updated: Option<chrono::NaiveDateTime>,
pub deleted: Option<bool>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
impl CommentForm {
pub fn get_ap_id(&self) -> Result<Url, ParseError> {
Url::parse(&self.ap_id)
Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
}
}
@ -163,12 +163,13 @@ impl Comment {
}
pub fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
let existing = Self::read_from_apub_id(conn, &comment_form.ap_id);
match existing {
Err(NotFound {}) => Ok(Self::create(conn, &comment_form)?),
Ok(p) => Ok(Self::update(conn, p.id, &comment_form)?),
Err(e) => Err(e),
}
use crate::schema::comment::dsl::*;
insert_into(comment)
.values(comment_form)
.on_conflict(ap_id)
.do_update()
.set(comment_form)
.get_result::<Self>(conn)
}
}
@ -272,7 +273,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_283687".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -292,7 +293,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_928738972".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -320,7 +321,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -337,7 +338,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -354,7 +355,7 @@ mod tests {
parent_id: None,
published: inserted_comment.published,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
};
@ -368,7 +369,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

View File

@ -517,7 +517,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_92873982".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -537,7 +537,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_7625376".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -565,7 +565,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -582,7 +582,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -627,7 +627,7 @@ mod tests {
my_vote: None,
subscribed: None,
saved: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
community_actor_id: inserted_community.actor_id.to_owned(),
community_local: true,
@ -665,7 +665,7 @@ mod tests {
my_vote: Some(1),
subscribed: Some(false),
saved: Some(false),
ap_id: "http://fake.com".to_string(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
community_actor_id: inserted_community.actor_id.to_owned(),
community_local: true,

View File

@ -45,7 +45,7 @@ pub struct CommunityForm {
pub updated: Option<chrono::NaiveDateTime>,
pub deleted: Option<bool>,
pub nsfw: bool,
pub actor_id: String,
pub actor_id: Option<String>,
pub local: bool,
pub private_key: Option<String>,
pub public_key: Option<String>,
@ -160,6 +160,16 @@ impl Community {
.unwrap_or_default()
.contains(&user_id)
}
pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
use crate::schema::community::dsl::*;
insert_into(community)
.values(community_form)
.on_conflict(actor_id)
.do_update()
.set(community_form)
.get_result::<Self>(conn)
}
}
#[derive(Identifiable, Queryable, Associations, PartialEq, Debug)]
@ -320,7 +330,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8266238".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -340,7 +350,7 @@ mod tests {
removed: None,
deleted: None,
updated: None,
actor_id: "changeme_7625376".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,

View File

@ -426,7 +426,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_829398".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -454,7 +454,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_82982738".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -474,7 +474,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_283687".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -502,7 +502,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -519,7 +519,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

View File

@ -103,7 +103,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8292378".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

View File

@ -53,13 +53,13 @@ pub struct PostForm {
pub embed_description: Option<String>,
pub embed_html: Option<String>,
pub thumbnail_url: Option<String>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
impl PostForm {
pub fn get_ap_id(&self) -> Result<Url, ParseError> {
Url::parse(&self.ap_id)
Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
}
}
@ -180,12 +180,13 @@ impl Post {
}
pub fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error> {
let existing = Self::read_from_apub_id(conn, &post_form.ap_id);
match existing {
Err(NotFound {}) => Ok(Self::create(conn, &post_form)?),
Ok(p) => Ok(Self::update(conn, p.id, &post_form)?),
Err(e) => Err(e),
}
use crate::schema::post::dsl::*;
insert_into(post)
.values(post_form)
.on_conflict(ap_id)
.do_update()
.set(post_form)
.get_result::<Self>(conn)
}
}
@ -358,7 +359,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8292683678".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -378,7 +379,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_8223262378".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -406,7 +407,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -431,7 +432,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
};

View File

@ -423,7 +423,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8282738268".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -443,7 +443,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_2763".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -471,7 +471,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -555,7 +555,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
creator_actor_id: inserted_user.actor_id.to_owned(),
creator_local: true,
@ -604,7 +604,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
creator_actor_id: inserted_user.actor_id.to_owned(),
creator_local: true,

View File

@ -27,7 +27,7 @@ pub struct PrivateMessageForm {
pub read: Option<bool>,
pub published: Option<chrono::NaiveDateTime>,
pub updated: Option<chrono::NaiveDateTime>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
@ -119,6 +119,17 @@ impl PrivateMessage {
.set(read.eq(true))
.get_results::<Self>(conn)
}
// TODO use this
pub fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result<Self, Error> {
use crate::schema::private_message::dsl::*;
insert_into(private_message)
.values(private_message_form)
.on_conflict(ap_id)
.do_update()
.set(private_message_form)
.get_result::<Self>(conn)
}
}
#[cfg(test)]
@ -153,7 +164,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_6723878".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -181,7 +192,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_287263876".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -199,7 +210,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -214,7 +225,7 @@ mod tests {
read: false,
updated: None,
published: inserted_private_message.published,
ap_id: "http://fake.com".into(),
ap_id: inserted_private_message.ap_id.to_owned(),
local: true,
};

View File

@ -57,7 +57,7 @@ pub struct UserForm {
pub show_avatars: bool,
pub send_notifications_to_email: bool,
pub matrix_user_id: Option<String>,
pub actor_id: String,
pub actor_id: Option<String>,
pub bio: Option<String>,
pub local: bool,
pub private_key: Option<String>,
@ -152,6 +152,15 @@ impl User_ {
pub fn get_profile_url(&self, hostname: &str) -> String {
format!("https://{}/u/{}", hostname, self.name)
}
pub fn upsert(conn: &PgConnection, user_form: &UserForm) -> Result<User_, Error> {
insert_into(user_)
.values(user_form)
.on_conflict(actor_id)
.do_update()
.set(user_form)
.get_result::<Self>(conn)
}
}
#[cfg(test)]
@ -180,7 +189,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_9826382637".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

View File

@ -106,7 +106,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_628763".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -134,7 +134,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_927389278".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -154,7 +154,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_876238".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -182,7 +182,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -199,7 +199,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

View File

@ -0,0 +1,27 @@
-- Drop the uniques
alter table private_message drop constraint idx_private_message_ap_id;
alter table post drop constraint idx_post_ap_id;
alter table comment drop constraint idx_comment_ap_id;
alter table user_ drop constraint idx_user_actor_id;
alter table community drop constraint idx_community_actor_id;
alter table private_message alter column ap_id set not null;
alter table private_message alter column ap_id set default 'http://fake.com';
alter table post alter column ap_id set not null;
alter table post alter column ap_id set default 'http://fake.com';
alter table comment alter column ap_id set not null;
alter table comment alter column ap_id set default 'http://fake.com';
update private_message
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';
update post
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';
update comment
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';

View File

@ -0,0 +1,56 @@
-- Add unique ap_id for private_message, comment, and post
-- Need to delete the possible dupes for ones that don't start with the fake one
delete from private_message a using (
select min(id) as id, ap_id
from private_message
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
delete from post a using (
select min(id) as id, ap_id
from post
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
delete from comment a using (
select min(id) as id, ap_id
from comment
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
-- Replacing the current default on the columns, to the unique one
update private_message
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
update post
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
update comment
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
-- Add the unique indexes
alter table private_message alter column ap_id set not null;
alter table private_message alter column ap_id set default generate_unique_changeme();
alter table post alter column ap_id set not null;
alter table post alter column ap_id set default generate_unique_changeme();
alter table comment alter column ap_id set not null;
alter table comment alter column ap_id set default generate_unique_changeme();
-- Add the uniques, for user_ and community too
alter table private_message add constraint idx_private_message_ap_id unique (ap_id);
alter table post add constraint idx_post_ap_id unique (ap_id);
alter table comment add constraint idx_comment_ap_id unique (ap_id);
alter table user_ add constraint idx_user_actor_id unique (actor_id);
alter table community add constraint idx_community_actor_id unique (actor_id);

View File

@ -146,7 +146,7 @@ impl Perform for CreateComment {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

View File

@ -274,7 +274,7 @@ impl Perform for CreateCommunity {
deleted: None,
nsfw: data.nsfw,
updated: None,
actor_id,
actor_id: Some(actor_id),
local: true,
private_key: Some(keypair.private_key),
public_key: Some(keypair.public_key),
@ -368,7 +368,7 @@ impl Perform for EditCommunity {
deleted: Some(read_community.deleted),
nsfw: data.nsfw,
updated: Some(naive_now()),
actor_id: read_community.actor_id,
actor_id: Some(read_community.actor_id),
local: read_community.local,
private_key: read_community.private_key,
public_key: read_community.public_key,

View File

@ -187,7 +187,7 @@ impl Perform for CreatePost {
embed_description: iframely_description,
embed_html: iframely_html,
thumbnail_url: pictrs_thumbnail,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -518,7 +518,7 @@ impl Perform for EditPost {
embed_description: iframely_description,
embed_html: iframely_html,
thumbnail_url: pictrs_thumbnail,
ap_id: orig_post.ap_id,
ap_id: Some(orig_post.ap_id),
local: orig_post.local,
published: None,
};

View File

@ -410,7 +410,7 @@ impl Perform for Register {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: make_apub_endpoint(EndpointType::User, &data.username).to_string(),
actor_id: Some(make_apub_endpoint(EndpointType::User, &data.username).to_string()),
bio: None,
local: true,
private_key: Some(user_keypair.private_key),
@ -441,9 +441,8 @@ impl Perform for Register {
let main_community_keypair = generate_actor_keypair()?;
// Create the main community if it doesn't exist
let main_community = match blocking(context.pool(), move |conn| Community::read(conn, 2))
.await?
{
let main_community =
match blocking(context.pool(), move |conn| Community::read(conn, 2)).await? {
Ok(c) => c,
Err(_e) => {
let default_community_name = "main";
@ -457,7 +456,9 @@ impl Perform for Register {
removed: None,
deleted: None,
updated: None,
actor_id: make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
actor_id: Some(
make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
),
local: true,
private_key: Some(main_community_keypair.private_key),
public_key: Some(main_community_keypair.public_key),
@ -643,7 +644,7 @@ impl Perform for SaveUserSettings {
lang: data.lang.to_owned(),
show_avatars: data.show_avatars,
send_notifications_to_email: data.send_notifications_to_email,
actor_id: read_user.actor_id,
actor_id: Some(read_user.actor_id),
bio,
local: read_user.local,
private_key: read_user.private_key,
@ -1218,7 +1219,7 @@ impl Perform for CreatePrivateMessage {
deleted: None,
read: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};

View File

@ -1,72 +1,38 @@
use crate::{
apub::{
check_is_apub_id_valid,
community::do_announce,
extensions::signatures::sign,
insert_activity,
ActorType,
},
request::retry_custom,
apub::{activity_queue::send_activity, community::do_announce, insert_activity},
LemmyContext,
LemmyError,
};
use activitystreams::base::AnyBase;
use actix_web::client::Client;
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use lemmy_db::{community::Community, user::User_};
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
use log::debug;
use serde::{export::fmt::Debug, Serialize};
use url::{ParseError, Url};
use uuid::Uuid;
pub async fn send_activity_to_community(
pub async fn send_activity_to_community<T, Kind>(
creator: &User_,
community: &Community,
to: Vec<Url>,
activity: AnyBase,
activity: T,
context: &LemmyContext,
) -> Result<(), LemmyError> {
) -> Result<(), LemmyError>
where
T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
// TODO: looks like call this sometimes with activity, and sometimes with any_base
insert_activity(creator.id, activity.clone(), true, context.pool()).await?;

send is only when you want the result, do_send is when you don't want to wait, which is what we want for these.

https://actix.rs/book/actix/sec-3-address.html#message

`send` is only when you want the result, `do_send` is when you don't want to wait, which is what we want for these. https://actix.rs/book/actix/sec-3-address.html#message

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
// if this is a local community, we need to do an announce from the community instead
if community.local {
do_announce(activity, &community, creator, context).await?;
do_announce(activity.into_any_base()?, &community, creator, context).await?;
} else {
send_activity(context.client(), &activity, creator, to).await?;
}
Ok(())
}
/// Send an activity to a list of recipients, using the correct headers etc.
pub async fn send_activity(
client: &Client,
activity: &AnyBase,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError> {
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = serde_json::to_string(&activity)?;
debug!("Sending activitypub activity {} to {:?}", activity, to);
for to_url in to {
check_is_apub_id_valid(&to_url)?;
let res = retry_custom(|| async {
let request = client
.post(to_url.as_str())
.header("Content-Type", "application/json");
match sign(request, actor, activity.clone()).await {
Ok(signed) => Ok(signed.send().await),
Err(e) => Err(e),
}
})
.await?;
debug!("Result for activity send: {:?}", res);
send_activity(context.activity_queue(), activity, creator, to)?;
}
Ok(())

View File

@ -0,0 +1,133 @@
use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError,
};
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use anyhow::{anyhow, Context, Error};
use awc::Client;
use background_jobs::{
create_server,
memory_storage::Storage,
ActixJob,
Backoff,
MaxRetries,
QueueHandle,
WorkerConfig,
};
use lemmy_utils::{location_info, settings::Settings};
use log::warn;
use serde::{Deserialize, Serialize};
use std::{future::Future, pin::Pin};
use url::Url;
pub fn send_activity<T, Kind>(
activity_sender: &QueueHandle,
activity: T,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind>,
T: Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = activity.into_any_base()?;
let serialised_activity = serde_json::to_string(&activity)?;
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
}
// TODO: it would make sense to create a separate task for each destination server
let message = SendActivityTask {
activity: serialised_activity,
to,
actor_id: actor.actor_id()?,
private_key: actor.private_key().context(location_info!())?,
};
activity_sender.queue::<SendActivityTask>(message)?;
Ok(())
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SendActivityTask {
activity: String,
to: Vec<Url>,
actor_id: Url,
private_key: String,
}
impl ActixJob for SendActivityTask {

This is failing to compile, even though it seems identical to the code in ap-relay.

Here's the error message:

This is failing to compile, even though it seems identical to the code in [ap-relay](https://git.asonix.dog/asonix/ap-relay/src/branch/main/src/jobs/deliver.rs). Here's the error message: ![](https://dev.lemmy.ml/pictrs/image/v4kDVr2U99.png)
type State = MyState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "SendActivityTask";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(2);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
for to_url in &self.to {
let request = state
.client
.post(to_url.as_str())
.header("Content-Type", "application/json");
// TODO: i believe we have to do the signing in here because it is only valid for a few seconds
let signed = sign(
request,
self.activity.clone(),
&self.actor_id,
self.private_key.to_owned(),
)
.await;
let signed = match signed {
Ok(s) => s,
Err(e) => {
warn!("{}", e);
// dont return an error because retrying would probably not fix the signing
return Ok(());
}
};
if let Err(e) = signed.send().await {
warn!("{}", e);
return Err(anyhow!(
"Failed to send activity {} to {}",
&self.activity,
to_url
));
}
}
Ok(())
})
}
}
pub fn create_activity_queue() -> QueueHandle {
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(Storage::new());
// Configure and start our workers
WorkerConfig::new(|| MyState {
client: Client::default(),
})
.register::<SendActivityTask>()
.start(queue_handle.clone());
queue_handle
}
#[derive(Clone)]
struct MyState {
pub client: Client,
}

View File

@ -192,7 +192,7 @@ impl FromApub for CommentForm {
published: note.published().map(|u| u.to_owned().naive_local()),
updated: note.updated().map(|u| u.to_owned().naive_local()),
deleted: None,
ap_id: check_actor_domain(note, expected_domain)?,
ap_id: Some(check_actor_domain(note, expected_domain)?),
local: false,
})
}
@ -224,14 +224,7 @@ impl ApubObjectType for Comment {
// Set the mention tags
.set_many_tags(maa.get_tags()?);
send_activity_to_community(
&creator,
&community,
maa.inboxes,
create.into_any_base()?,
context,
)
.await?;
send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?;
Ok(())
}
@ -259,14 +252,7 @@ impl ApubObjectType for Comment {
// Set the mention tags
.set_many_tags(maa.get_tags()?);
send_activity_to_community(
&creator,
&community,
maa.inboxes,
update.into_any_base()?,
context,
)
.await?;
send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?;
Ok(())
}
@ -293,7 +279,7 @@ impl ApubObjectType for Comment {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
delete.into_any_base()?,
delete,
context,
)
.await?;
@ -336,7 +322,7 @@ impl ApubObjectType for Comment {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;
@ -366,7 +352,7 @@ impl ApubObjectType for Comment {
&mod_,
&community,
vec![community.get_shared_inbox_url()?],
remove.into_any_base()?,
remove,
context,
)
.await?;
@ -405,7 +391,7 @@ impl ApubObjectType for Comment {
&mod_,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;
@ -438,7 +424,7 @@ impl ApubLikeableType for Comment {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
like.into_any_base()?,
like,
context,
)
.await?;
@ -468,7 +454,7 @@ impl ApubLikeableType for Comment {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
dislike.into_any_base()?,
dislike,
context,
)
.await?;
@ -510,7 +496,7 @@ impl ApubLikeableType for Comment {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;

View File

@ -1,7 +1,8 @@
use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_queue::send_activity,
check_actor_domain,
create_apub_response,
create_apub_tombstone_response,
@ -155,7 +156,7 @@ impl ActorType for Community {
insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
send_activity(context.client(), &accept.into_any_base()?, self, vec![to]).await?;
send_activity(context.activity_queue(), accept, self, vec![to])?;
Ok(())
}
@ -176,7 +177,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &delete.into_any_base()?, creator, inboxes).await?;
send_activity(context.activity_queue(), delete, creator, inboxes)?;
Ok(())
}
@ -208,7 +209,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &undo.into_any_base()?, creator, inboxes).await?;
send_activity(context.activity_queue(), undo, creator, inboxes)?;
Ok(())
}
@ -229,7 +230,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for delete, the creator is the actor, and does the signing
send_activity(context.client(), &remove.into_any_base()?, mod_, inboxes).await?;
send_activity(context.activity_queue(), remove, mod_, inboxes)?;
Ok(())
}
@ -258,7 +259,7 @@ impl ActorType for Community {
// Note: For an accept, since it was automatic, no one pushed a button,
// the community was the actor.
// But for remove , the creator is the actor, and does the signing
send_activity(context.client(), &undo.into_any_base()?, mod_, inboxes).await?;
send_activity(context.activity_queue(), undo, mod_, inboxes)?;
Ok(())
}
@ -402,7 +403,7 @@ impl FromApub for CommunityForm {
updated: group.inner.updated().map(|u| u.to_owned().naive_local()),
deleted: None,
nsfw: group.ext_one.sensitive,
actor_id: check_actor_domain(group, expected_domain)?,
actor_id: Some(check_actor_domain(group, expected_domain)?),
local: false,
private_key: None,
public_key: Some(group.ext_two.to_owned().public_key.public_key_pem),
@ -511,7 +512,7 @@ pub async fn do_announce(
let community_shared_inbox = community.get_shared_inbox_url()?;
to.retain(|x| x != &community_shared_inbox);
send_activity(context.client(), &announce.into_any_base()?, community, to).await?;
send_activity(context.activity_queue(), announce, community, to)?;
Ok(())
}

View File

@ -16,6 +16,7 @@ use openssl::{
};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use url::Url;
lazy_static! {
static ref HTTP_SIG_CONFIG: Config = Config::new();
@ -24,11 +25,11 @@ lazy_static! {
/// Signs request headers with the given keypair.
pub async fn sign(
request: ClientRequest,
actor: &dyn ActorType,
activity: String,
actor_id: &Url,
private_key: String,
) -> Result<DigestClient<String>, LemmyError> {
let signing_key_id = format!("{}#main-key", actor.actor_id()?);
let private_key = actor.private_key().context(location_info!())?;
let signing_key_id = format!("{}#main-key", actor_id);
let digest_client = request
.signature_with_digest(

View File

@ -15,7 +15,6 @@ use crate::{
LemmyError,
};
use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
use actix_web::client::Client;
use anyhow::{anyhow, Context};
use chrono::NaiveDateTime;
use diesel::result::Error::NotFound;
@ -35,6 +34,7 @@ use lemmy_db::{
};
use lemmy_utils::{get_apub_protocol_string, location_info};
use log::debug;
use reqwest::Client;
use serde::Deserialize;
use std::{fmt::Debug, time::Duration};
use url::Url;
@ -55,6 +55,9 @@ where
let timeout = Duration::from_secs(60);
// speed up tests
// before: 305s
// after: 240s
let json = retry(|| {
client
.get(url.as_str())
@ -230,7 +233,7 @@ pub async fn get_or_fetch_and_upsert_user(
let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
let user = blocking(context.pool(), move |conn| User_::create(conn, &uf)).await??;
let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
Ok(user)
}
@ -286,14 +289,7 @@ async fn fetch_remote_community(
let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
let community = blocking(context.pool(), move |conn| {
if let Some(cid) = community_id {
Community::update(conn, cid, &cf)
} else {
Community::create(conn, &cf)
}
})
.await??;
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
// Also add the community moderators too
let attributed_to = group.inner.attributed_to().context(location_info!())?;
@ -341,7 +337,7 @@ async fn fetch_remote_community(
for o in outbox_items {
let page = PageExt::from_any_base(o)?.context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?;
let post_ap_id = post.ap_id.clone();
let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
// Check whether the post already exists in the local db
let existing = blocking(context.pool(), move |conn| {
Post::read_from_apub_id(conn, &post_ap_id)
@ -349,7 +345,7 @@ async fn fetch_remote_community(
.await?;
match existing {
Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
Err(_) => blocking(context.pool(), move |conn| Post::create(conn, &post)).await??,
Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
};
// TODO: we need to send a websocket update here
}
@ -374,7 +370,7 @@ pub async fn get_or_fetch_and_insert_post(
let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
let post = blocking(context.pool(), move |conn| Post::create(conn, &post_form)).await??;
let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
Ok(post)
}
@ -404,7 +400,7 @@ pub async fn get_or_fetch_and_insert_comment(
CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
let comment = blocking(context.pool(), move |conn| {
Comment::create(conn, &comment_form)
Comment::upsert(conn, &comment_form)
})
.await??;

View File

@ -78,7 +78,7 @@ async fn receive_delete_post(
embed_description: post.embed_description,
embed_html: post.embed_html,
thumbnail_url: post.thumbnail_url,
ap_id: post.ap_id,
ap_id: Some(post.ap_id),
local: post.local,
published: None,
};
@ -131,7 +131,7 @@ async fn receive_delete_comment(
read: None,
published: None,
updated: Some(naive_now()),
ap_id: comment.ap_id,
ap_id: Some(comment.ap_id),
local: comment.local,
};
let comment_id = comment.id;
@ -175,7 +175,8 @@ async fn receive_delete_community(
let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?))
.await?
.actor_id;
.actor_id
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_actor_id(conn, &community_actor_id)
@ -193,7 +194,7 @@ async fn receive_delete_community(
updated: Some(naive_now()),
deleted: Some(true),
nsfw: community.nsfw,
actor_id: community.actor_id,
actor_id: Some(community.actor_id),
local: community.local,
private_key: community.private_key,
public_key: community.public_key,

View File

@ -85,7 +85,7 @@ async fn receive_remove_post(
embed_description: post.embed_description,
embed_html: post.embed_html,
thumbnail_url: post.thumbnail_url,
ap_id: post.ap_id,
ap_id: Some(post.ap_id),
local: post.local,
published: None,
};
@ -138,7 +138,7 @@ async fn receive_remove_comment(
read: None,
published: None,
updated: Some(naive_now()),
ap_id: comment.ap_id,
ap_id: Some(comment.ap_id),
local: comment.local,
};
let comment_id = comment.id;
@ -182,7 +182,8 @@ async fn receive_remove_community(
let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?))
.await?
.actor_id;
.actor_id
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_actor_id(conn, &community_actor_id)
@ -200,7 +201,7 @@ async fn receive_remove_community(
updated: Some(naive_now()),
deleted: None,
nsfw: community.nsfw,
actor_id: community.actor_id,
actor_id: Some(community.actor_id),
local: community.local,
private_key: community.private_key,
public_key: community.public_key,

View File

@ -175,7 +175,7 @@ async fn receive_undo_delete_comment(
read: None,
published: None,
updated: Some(naive_now()),
ap_id: comment.ap_id,
ap_id: Some(comment.ap_id),
local: comment.local,
};
let comment_id = comment.id;
@ -234,7 +234,7 @@ async fn receive_undo_remove_comment(
read: None,
published: None,
updated: Some(naive_now()),
ap_id: comment.ap_id,
ap_id: Some(comment.ap_id),
local: comment.local,
};
let comment_id = comment.id;
@ -299,7 +299,7 @@ async fn receive_undo_delete_post(
embed_description: post.embed_description,
embed_html: post.embed_html,
thumbnail_url: post.thumbnail_url,
ap_id: post.ap_id,
ap_id: Some(post.ap_id),
local: post.local,
published: None,
};
@ -359,7 +359,7 @@ async fn receive_undo_remove_post(
embed_description: post.embed_description,
embed_html: post.embed_html,
thumbnail_url: post.thumbnail_url,
ap_id: post.ap_id,
ap_id: Some(post.ap_id),
local: post.local,
published: None,
};
@ -399,7 +399,8 @@ async fn receive_undo_delete_community(
let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?))
.await?
.actor_id;
.actor_id
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_actor_id(conn, &community_actor_id)
@ -417,7 +418,7 @@ async fn receive_undo_delete_community(
updated: Some(naive_now()),
deleted: Some(false),
nsfw: community.nsfw,
actor_id: community.actor_id,
actor_id: Some(community.actor_id),
local: community.local,
private_key: community.private_key,
public_key: community.public_key,
@ -464,7 +465,8 @@ async fn receive_undo_remove_community(
let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?))
.await?
.actor_id;
.actor_id
.context(location_info!())?;
let community = blocking(context.pool(), move |conn| {
Community::read_from_actor_id(conn, &community_actor_id)
@ -482,7 +484,7 @@ async fn receive_undo_remove_community(
updated: Some(naive_now()),
deleted: None,
nsfw: community.nsfw,
actor_id: community.actor_id,
actor_id: Some(community.actor_id),
local: community.local,
private_key: community.private_key,
public_key: community.public_key,

View File

@ -66,6 +66,8 @@ pub async fn shared_inbox(
let json = serde_json::to_string(&activity)?;
debug!("Shared inbox received activity: {}", json);
// TODO: if we already received an activity with identical ID, then ignore this (same in other inboxes)
let sender = &activity
.actor()?
.to_owned()

View File

@ -175,7 +175,11 @@ async fn receive_update_private_message(
let domain = Some(update.id_unchecked().context(location_info!())?.to_owned());
let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
let private_message_ap_id = private_message_form.ap_id.clone();
let private_message_ap_id = private_message_form
.ap_id
.as_ref()
.context(location_info!())?
.clone();
let private_message = blocking(&context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
})
@ -224,7 +228,7 @@ async fn receive_delete_private_message(
let domain = Some(delete.id_unchecked().context(location_info!())?.to_owned());
let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
let private_message_ap_id = private_message_form.ap_id;
let private_message_ap_id = private_message_form.ap_id.context(location_info!())?;
let private_message = blocking(&context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
})
@ -236,7 +240,7 @@ async fn receive_delete_private_message(
creator_id: private_message.creator_id,
deleted: Some(true),
read: None,
ap_id: private_message.ap_id,
ap_id: Some(private_message.ap_id),
local: private_message.local,
published: None,
updated: Some(naive_now()),
@ -287,7 +291,11 @@ async fn receive_undo_delete_private_message(
let domain = Some(undo.id_unchecked().context(location_info!())?.to_owned());
let private_message = PrivateMessageForm::from_apub(&note, context, domain).await?;
let private_message_ap_id = private_message.ap_id.clone();
let private_message_ap_id = private_message
.ap_id
.as_ref()
.context(location_info!())?
.clone();
let private_message_id = blocking(&context.pool(), move |conn| {
PrivateMessage::read_from_apub_id(conn, &private_message_ap_id).map(|pm| pm.id)
})

View File

@ -1,4 +1,5 @@
pub mod activities;
pub mod activity_queue;
pub mod comment;
pub mod community;
pub mod extensions;
@ -30,7 +31,7 @@ use activitystreams::{
prelude::*,
};
use activitystreams_ext::{Ext1, Ext2};
use actix_web::{body::Body, client::Client, HttpResponse};
use actix_web::{body::Body, HttpResponse};
use anyhow::{anyhow, Context};
use chrono::NaiveDateTime;
use lemmy_db::{activity::do_insert_activity, user::User_};
@ -42,6 +43,7 @@ use lemmy_utils::{
MentionData,
};
use log::debug;
use reqwest::Client;
use serde::Serialize;
use url::{ParseError, Url};
@ -326,7 +328,7 @@ pub async fn fetch_webfinger_url(
);
debug!("Fetching webfinger url: {}", &fetch_url);
let mut response = retry(|| client.get(&fetch_url).send()).await?;
let response = retry(|| client.get(&fetch_url).send()).await?;
let res: WebFingerResponse = response
.json()

View File

@ -289,7 +289,7 @@ impl FromApub for PostForm {
embed_description: embed.description,
embed_html: embed.html,
thumbnail_url,
ap_id: check_actor_domain(page, expected_domain)?,
ap_id: Some(check_actor_domain(page, expected_domain)?),
local: false,
})
}
@ -318,7 +318,7 @@ impl ApubObjectType for Post {
creator,
&community,
vec![community.get_shared_inbox_url()?],
create.into_any_base()?,
create,
context,
)
.await?;
@ -346,7 +346,7 @@ impl ApubObjectType for Post {
creator,
&community,
vec![community.get_shared_inbox_url()?],
update.into_any_base()?,
update,
context,
)
.await?;
@ -373,7 +373,7 @@ impl ApubObjectType for Post {
creator,
&community,
vec![community.get_shared_inbox_url()?],
delete.into_any_base()?,
delete,
context,
)
.await?;
@ -412,7 +412,7 @@ impl ApubObjectType for Post {
creator,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;
@ -439,7 +439,7 @@ impl ApubObjectType for Post {
mod_,
&community,
vec![community.get_shared_inbox_url()?],
remove.into_any_base()?,
remove,
context,
)
.await?;
@ -474,7 +474,7 @@ impl ApubObjectType for Post {
mod_,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;
@ -504,7 +504,7 @@ impl ApubLikeableType for Post {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
like.into_any_base()?,
like,
context,
)
.await?;
@ -531,7 +531,7 @@ impl ApubLikeableType for Post {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
dislike.into_any_base()?,
dislike,
context,
)
.await?;
@ -570,7 +570,7 @@ impl ApubLikeableType for Post {
&creator,
&community,
vec![community.get_shared_inbox_url()?],
undo.into_any_base()?,
undo,
context,
)
.await?;

View File

@ -1,6 +1,7 @@
use crate::{
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_queue::send_activity,
check_actor_domain,
check_is_apub_id_valid,
create_tombstone,
@ -110,7 +111,7 @@ impl FromApub for PrivateMessageForm {
updated: note.updated().map(|u| u.to_owned().naive_local()),
deleted: None,
read: None,
ap_id: check_actor_domain(note, expected_domain)?,
ap_id: Some(check_actor_domain(note, expected_domain)?),
local: false,
})
}
@ -134,13 +135,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, create.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&create.into_any_base()?,
creator,
vec![to],
)
.await?;
send_activity(context.activity_queue(), create, creator, vec![to])?;
Ok(())
}
@ -160,13 +155,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, update.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&update.into_any_base()?,
creator,
vec![to],
)
.await?;
send_activity(context.activity_queue(), update, creator, vec![to])?;
Ok(())
}
@ -185,13 +174,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
send_activity(
context.client(),
&delete.into_any_base()?,
creator,
vec![to],
)
.await?;
send_activity(context.activity_queue(), delete, creator, vec![to])?;
Ok(())
}
@ -221,7 +204,7 @@ impl ApubObjectType for PrivateMessage {
insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
send_activity(context.client(), &undo.into_any_base()?, creator, vec![to]).await?;
send_activity(context.activity_queue(), undo, creator, vec![to])?;
Ok(())
}

View File

@ -1,7 +1,8 @@
use crate::{
api::{check_slurs, check_slurs_opt},
apub::{
activities::{generate_activity_id, send_activity},
activities::generate_activity_id,
activity_queue::send_activity,
check_actor_domain,
create_apub_response,
fetcher::get_or_fetch_and_upsert_actor,
@ -127,7 +128,7 @@ impl ActorType for User_ {
insert_activity(self.id, follow.clone(), true, context.pool()).await?;
send_activity(context.client(), &follow.into_any_base()?, self, vec![to]).await?;
send_activity(context.activity_queue(), follow, self, vec![to])?;
Ok(())
}
@ -152,7 +153,7 @@ impl ActorType for User_ {
insert_activity(self.id, undo.clone(), true, context.pool()).await?;
send_activity(context.client(), &undo.into_any_base()?, self, vec![to]).await?;
send_activity(context.activity_queue(), undo, self, vec![to])?;
Ok(())
}
@ -268,7 +269,7 @@ impl FromApub for UserForm {
show_avatars: false,
send_notifications_to_email: false,
matrix_user_id: None,
actor_id: check_actor_domain(person, expected_domain)?,
actor_id: Some(check_actor_domain(person, expected_domain)?),
bio,
local: false,
private_key: None,

View File

@ -67,7 +67,7 @@ fn user_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> {
lang: cuser.lang.to_owned(),
show_avatars: cuser.show_avatars,
send_notifications_to_email: cuser.send_notifications_to_email,
actor_id: make_apub_endpoint(EndpointType::User, &cuser.name).to_string(),
actor_id: Some(make_apub_endpoint(EndpointType::User, &cuser.name).to_string()),
bio: cuser.bio.to_owned(),
local: cuser.local,
private_key: Some(keypair.private_key),
@ -111,7 +111,7 @@ fn community_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> {
deleted: None,
nsfw: ccommunity.nsfw,
updated: None,
actor_id: make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string(),
actor_id: Some(make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string()),
local: ccommunity.local,
private_key: Some(keypair.private_key),
public_key: Some(keypair.public_key),
@ -138,7 +138,7 @@ fn post_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> {
// Update the ap_id
let incorrect_posts = post
.filter(ap_id.eq("http://fake.com"))
.filter(ap_id.eq("changeme_%"))
.filter(local.eq(true))
.load::<Post>(conn)?;
@ -163,7 +163,7 @@ fn comment_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> {
// Update the ap_id
let incorrect_comments = comment
.filter(ap_id.eq("http://fake.com"))
.filter(ap_id.eq("changeme_%"))
.filter(local.eq(true))
.load::<Comment>(conn)?;
@ -188,7 +188,7 @@ fn private_message_updates_2020_05_05(conn: &PgConnection) -> Result<(), LemmyEr
// Update the ap_id
let incorrect_pms = private_message
.filter(ap_id.eq("http://fake.com"))
.filter(ap_id.eq("changeme_%"))
.filter(local.eq(true))
.load::<PrivateMessage>(conn)?;

View File

@ -14,6 +14,7 @@ pub extern crate dotenv;
pub extern crate jsonwebtoken;
extern crate log;
pub extern crate openssl;
pub extern crate reqwest;
pub extern crate rss;
pub extern crate serde;
pub extern crate serde_json;
@ -33,12 +34,15 @@ use crate::{
request::{retry, RecvError},
websocket::server::ChatServer,
};
use actix::Addr;
use actix_web::{client::Client, dev::ConnectionInfo};
use actix_web::dev::ConnectionInfo;
use anyhow::anyhow;
use background_jobs::QueueHandle;
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
use log::error;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::Client;
use serde::Deserialize;
use std::process::Command;
@ -75,14 +79,21 @@ pub struct LemmyContext {
pub pool: DbPool,
pub chat_server: Addr<ChatServer>,
pub client: Client,
pub activity_queue: QueueHandle,
}
impl LemmyContext {
pub fn create(pool: DbPool, chat_server: Addr<ChatServer>, client: Client) -> LemmyContext {
pub fn create(
pool: DbPool,
chat_server: Addr<ChatServer>,
client: Client,
activity_queue: QueueHandle,
) -> LemmyContext {
LemmyContext {
pool,
chat_server,
client,
activity_queue,
}
}
pub fn pool(&self) -> &DbPool {
@ -94,6 +105,9 @@ impl LemmyContext {
pub fn client(&self) -> &Client {
&self.client
}
pub fn activity_queue(&self) -> &QueueHandle {
&self.activity_queue
}
}
impl Clone for LemmyContext {
@ -102,6 +116,7 @@ impl Clone for LemmyContext {
pool: self.pool.clone(),
chat_server: self.chat_server.clone(),
client: self.client.clone(),
activity_queue: self.activity_queue.clone(),
}
}
}
@ -117,7 +132,7 @@ pub struct IframelyResponse {
pub async fn fetch_iframely(client: &Client, url: &str) -> Result<IframelyResponse, LemmyError> {
let fetch_url = format!("http://iframely/oembed?url={}", url);
let mut response = retry(|| client.get(&fetch_url).send()).await?;
let response = retry(|| client.get(&fetch_url).send()).await?;
let res: IframelyResponse = response
.json()
@ -146,7 +161,7 @@ pub async fn fetch_pictrs(client: &Client, image_url: &str) -> Result<PictrsResp
utf8_percent_encode(image_url, NON_ALPHANUMERIC) // TODO this might not be needed
);
let mut response = retry(|| client.get(&fetch_url).send()).await?;
let response = retry(|| client.get(&fetch_url).send()).await?;
let response: PictrsResponse = response
.json()
@ -319,7 +334,7 @@ mod tests {
#[test]
fn test_image() {
actix_rt::System::new("tset_image").block_on(async move {
let client = actix_web::client::Client::default();
let client = reqwest::Client::default();
assert!(is_image_content_type(&client, "https://1734811051.rsc.cdn77.org/data/images/full/365645/as-virus-kills-navajos-in-their-homes-tribal-women-provide-lifeline.jpg?w=600?w=650").await.is_ok());
assert!(is_image_content_type(&client,
"https://twitter.com/BenjaminNorton/status/1259922424272957440?s=20"

View File

@ -6,7 +6,6 @@ pub extern crate lazy_static;
use actix::prelude::*;
use actix_web::{
body::Body,
client::Client,
dev::{Service, ServiceRequest, ServiceResponse},
http::{
header::{CACHE_CONTROL, CONTENT_TYPE},
@ -20,6 +19,7 @@ use diesel::{
};
use lemmy_db::get_database_url_from_env;
use lemmy_server::{
apub::activity_queue::create_activity_queue,
blocking,
code_migrations::run_advanced_migrations,
rate_limit::{rate_limiter::RateLimiter, RateLimit},
@ -29,6 +29,7 @@ use lemmy_server::{
LemmyError,
};
use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX};
use reqwest::Client;
use std::sync::Arc;
use tokio::sync::Mutex;
@ -74,12 +75,23 @@ async fn main() -> Result<(), LemmyError> {
settings.bind, settings.port
);
let chat_server =
ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start();
let activity_queue = create_activity_queue();
let chat_server = ChatServer::startup(
pool.clone(),
rate_limiter.clone(),
Client::default(),
activity_queue.clone(),
)
.start();
// Create Http server with websocket support
HttpServer::new(move || {
let context = LemmyContext::create(pool.clone(), chat_server.to_owned(), Client::default());
let context = LemmyContext::create(
pool.clone(),
chat_server.to_owned(),
Client::default(),
activity_queue.to_owned(),
);
let settings = Settings::get();
let rate_limiter = rate_limiter.clone();
App::new()

View File

@ -14,15 +14,15 @@ pub struct RecvError(pub String);
pub async fn retry<F, Fut, T>(f: F) -> Result<T, LemmyError>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, actix_web::client::SendRequestError>>,
Fut: Future<Output = Result<T, reqwest::Error>>,
{
retry_custom(|| async { Ok((f)().await) }).await
}
pub async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<Result<T, actix_web::client::SendRequestError>, LemmyError>>,
Fut: Future<Output = Result<Result<T, reqwest::Error>, LemmyError>>,
{
let mut response = Err(anyhow!("connect timeout").into());
@ -30,7 +30,7 @@ where
match (f)().await? {
Ok(t) => return Ok(t),
Err(e) => {
if is_connect_timeout(&e) {
if e.is_timeout() {
response = Err(SendError(e.to_string()).into());
continue;
}
@ -41,13 +41,3 @@ where
response
}
fn is_connect_timeout(e: &actix_web::client::SendRequestError) -> bool {
if let actix_web::client::SendRequestError::Connect(e) = e {
if let actix_web::client::ConnectError::Timeout = e {
return true;
}
}
false
}

View File

@ -15,10 +15,12 @@ use crate::{
PostId,
UserId,
};
use actix_web::{client::Client, web};
use actix_web::web;
use anyhow::Context as acontext;
use background_jobs::QueueHandle;
use lemmy_db::naive_now;
use lemmy_utils::location_info;
use reqwest::Client;
/// Chat server sends this messages to session
#[derive(Message)]
@ -181,6 +183,8 @@ pub struct ChatServer {
/// An HTTP Client
client: Client,
activity_queue: QueueHandle,
}
impl ChatServer {
@ -188,6 +192,7 @@ impl ChatServer {
pool: Pool<ConnectionManager<PgConnection>>,
rate_limiter: RateLimit,
client: Client,
activity_queue: QueueHandle,
) -> ChatServer {
ChatServer {
sessions: HashMap::new(),
@ -199,6 +204,7 @@ impl ChatServer {
rate_limiter,
captchas: Vec::new(),
client,
activity_queue,
}
}
@ -460,6 +466,7 @@ impl ChatServer {
};
let client = self.client.clone();
let activity_queue = self.activity_queue.clone();
async move {
let msg = msg;
let json: Value = serde_json::from_str(&msg.msg)?;
@ -474,6 +481,7 @@ impl ChatServer {
pool,
chat_server: addr,
client,
activity_queue,
};
let args = Args {
context,

View File

@ -1,3 +1,4 @@
jest.setTimeout(120000);
import {
alpha,
beta,
@ -19,6 +20,7 @@ import {
createCommunity,
registerUser,
API,
delay,
} from './shared';
import { PostResponse } from 'lemmy-js-client';
@ -30,6 +32,7 @@ beforeAll(async () => {
await followBeta(alpha);
await followBeta(gamma);
let search = await searchForBetaCommunity(alpha);
await delay(10000);
postRes = await createPost(
alpha,
search.communities.filter(c => c.local == false)[0].id
@ -47,6 +50,7 @@ test('Create a comment', async () => {
expect(commentRes.comment.community_local).toBe(false);
expect(commentRes.comment.creator_local).toBe(true);
expect(commentRes.comment.score).toBe(1);
await delay();
// Make sure that comment is liked on beta
let searchBeta = await searchComment(beta, commentRes.comment);
@ -64,12 +68,14 @@ test('Create a comment in a non-existent post', async () => {
test('Update a comment', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let updateCommentRes = await updateComment(alpha, commentRes.comment.id);
expect(updateCommentRes.comment.content).toBe(
'A jest test federated comment update'
);
expect(updateCommentRes.comment.community_local).toBe(false);
expect(updateCommentRes.comment.creator_local).toBe(true);
await delay();
// Make sure that post is updated on beta
let searchBeta = await searchComment(beta, commentRes.comment);
@ -79,23 +85,21 @@ test('Update a comment', async () => {
test('Delete a comment', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let deleteCommentRes = await deleteComment(
alpha,
true,
commentRes.comment.id
);
expect(deleteCommentRes.comment.deleted).toBe(true);
await delay();
// Make sure that comment is deleted on beta
// The search doesnt work below, because it returns a tombstone / http::gone
// let searchBeta = await searchComment(beta, commentRes.comment);
// console.log(searchBeta);
// let betaComment = searchBeta.comments[0];
// Create a fake post, just to get the previous new post id
let createdBetaPostJustToGetId = await createPost(beta, 2);
let betaPost = await getPost(beta, createdBetaPostJustToGetId.post.id - 1);
let betaComment = betaPost.comments[0];
expect(betaComment.deleted).toBe(true);
// Make sure that comment is undefined on beta
let searchBeta = await searchComment(beta, commentRes.comment);
let betaComment = searchBeta.comments[0];
expect(betaComment).toBeUndefined();
await delay();
let undeleteCommentRes = await deleteComment(
alpha,
@ -103,6 +107,7 @@ test('Delete a comment', async () => {
commentRes.comment.id
);
expect(undeleteCommentRes.comment.deleted).toBe(false);
await delay();
// Make sure that comment is undeleted on beta
let searchBeta2 = await searchComment(beta, commentRes.comment);
@ -112,6 +117,7 @@ test('Delete a comment', async () => {
test('Remove a comment from admin and community on the same instance', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
// Get the id for beta
let betaCommentId = (await searchComment(beta, commentRes.comment))
@ -120,6 +126,7 @@ test('Remove a comment from admin and community on the same instance', async ()
// The beta admin removes it (the community lives on beta)
let removeCommentRes = await removeComment(beta, true, betaCommentId);
expect(removeCommentRes.comment.removed).toBe(true);
await delay();
// Make sure that comment is removed on alpha (it gets pushed since an admin from beta removed it)
let refetchedPost = await getPost(alpha, postRes.post.id);
@ -127,6 +134,7 @@ test('Remove a comment from admin and community on the same instance', async ()
let unremoveCommentRes = await removeComment(beta, false, betaCommentId);
expect(unremoveCommentRes.comment.removed).toBe(false);
await delay();
// Make sure that comment is unremoved on beta
let refetchedPost2 = await getPost(alpha, postRes.post.id);
@ -142,15 +150,19 @@ test('Remove a comment from admin and community on different instance', async ()
// New alpha user creates a community, post, and comment.
let newCommunity = await createCommunity(newAlphaApi);
await delay();
let newPost = await createPost(newAlphaApi, newCommunity.community.id);
await delay();
let commentRes = await createComment(newAlphaApi, newPost.post.id);
expect(commentRes.comment.content).toBeDefined();
await delay();
// Beta searches that to cache it, then removes it
let searchBeta = await searchComment(beta, commentRes.comment);
let betaComment = searchBeta.comments[0];
let removeCommentRes = await removeComment(beta, true, betaComment.id);
expect(removeCommentRes.comment.removed).toBe(true);
await delay();
// Make sure its not removed on alpha
let refetchedPost = await getPost(newAlphaApi, newPost.post.id);
@ -159,8 +171,10 @@ test('Remove a comment from admin and community on different instance', async ()
test('Unlike a comment', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let unlike = await likeComment(alpha, 0, commentRes.comment);
expect(unlike.comment.score).toBe(0);
await delay();
// Make sure that post is unliked on beta
let searchBeta = await searchComment(beta, commentRes.comment);
@ -173,6 +187,7 @@ test('Unlike a comment', async () => {
test('Federated comment like', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
// Find the comment on beta
let searchBeta = await searchComment(beta, commentRes.comment);
@ -180,6 +195,7 @@ test('Federated comment like', async () => {
let like = await likeComment(beta, 1, betaComment);
expect(like.comment.score).toBe(2);
await delay();
// Get the post from alpha, check the likes
let post = await getPost(alpha, postRes.post.id);
@ -189,6 +205,7 @@ test('Federated comment like', async () => {
test('Reply to a comment', async () => {
// Create a comment on alpha, find it on beta
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let searchBeta = await searchComment(beta, commentRes.comment);
let betaComment = searchBeta.comments[0];
@ -201,6 +218,7 @@ test('Reply to a comment', async () => {
expect(replyRes.comment.creator_local).toBe(true);
expect(replyRes.comment.parent_id).toBe(betaComment.id);
expect(replyRes.comment.score).toBe(1);
await delay();
// Make sure that comment is seen on alpha
// TODO not sure why, but a searchComment back to alpha, for the ap_id of betas
@ -219,6 +237,7 @@ test('Mention beta', async () => {
// Create a mention on alpha
let mentionContent = 'A test mention of @lemmy_beta@lemmy-beta:8550';
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let mentionRes = await createComment(
alpha,
postRes.post.id,
@ -229,6 +248,7 @@ test('Mention beta', async () => {
expect(mentionRes.comment.community_local).toBe(false);
expect(mentionRes.comment.creator_local).toBe(true);
expect(mentionRes.comment.score).toBe(1);
await delay();
let mentionsRes = await getMentions(beta);
expect(mentionsRes.mentions[0].content).toBeDefined();
@ -239,6 +259,7 @@ test('Mention beta', async () => {
test('Comment Search', async () => {
let commentRes = await createComment(alpha, postRes.post.id);
await delay();
let searchBeta = await searchComment(beta, commentRes.comment);
expect(searchBeta.comments[0].ap_id).toBe(commentRes.comment.ap_id);
});
@ -247,6 +268,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t
// Create a local post
let alphaPost = await createPost(alpha, 2);
expect(alphaPost.post.community_local).toBe(true);
await delay();
// Make sure gamma sees it
let search = await searchPost(gamma, alphaPost.post);
@ -264,6 +286,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t
expect(commentRes.comment.community_local).toBe(false);
expect(commentRes.comment.creator_local).toBe(true);
expect(commentRes.comment.score).toBe(1);
await delay();
// Make sure alpha sees it
let alphaPost2 = await getPost(alpha, alphaPost.post.id);
@ -291,6 +314,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
// B creates a post, and two comments, should be invisible to A
let postRes = await createPost(beta, 2);
expect(postRes.post.name).toBeDefined();
await delay();
let parentCommentContent = 'An invisible top level comment from beta';
let parentCommentRes = await createComment(
@ -300,6 +324,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
parentCommentContent
);
expect(parentCommentRes.comment.content).toBe(parentCommentContent);
await delay();
// B creates a comment, then a child one of that.
let childCommentContent = 'An invisible child comment from beta';
@ -310,11 +335,13 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
childCommentContent
);
expect(childCommentRes.comment.content).toBe(childCommentContent);
await delay();
// Follow beta again
let follow = await followBeta(alpha);
expect(follow.community.local).toBe(false);
expect(follow.community.name).toBe('main');
await delay();
// An update to the child comment on beta, should push the post, parent, and child to alpha now
let updatedCommentContent = 'An update child comment from beta';
@ -324,10 +351,14 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
updatedCommentContent
);
expect(updateRes.comment.content).toBe(updatedCommentContent);
await delay();
// Get the post from alpha
let createFakeAlphaPostToGetId = await createPost(alpha, 2);
let alphaPost = await getPost(alpha, createFakeAlphaPostToGetId.post.id - 1);
let search = await searchPost(alpha, postRes.post);
let alphaPostB = search.posts[0];
await delay();
let alphaPost = await getPost(alpha, alphaPostB.id);
expect(alphaPost.post.name).toBeDefined();
expect(alphaPost.comments[1].content).toBe(parentCommentContent);
expect(alphaPost.comments[0].content).toBe(updatedCommentContent);

View File

@ -6,6 +6,7 @@ import {
createCommunity,
deleteCommunity,
removeCommunity,
delay,
} from './shared';
beforeAll(async () => {
@ -24,12 +25,14 @@ test('Create community', async () => {
test('Delete community', async () => {
let communityRes = await createCommunity(beta);
await delay();
let deleteCommunityRes = await deleteCommunity(
beta,
true,
communityRes.community.id
);
expect(deleteCommunityRes.community.deleted).toBe(true);
await delay();
// Make sure it got deleted on A
let search = await searchForBetaCommunity(alpha);
@ -44,6 +47,7 @@ test('Delete community', async () => {
communityRes.community.id
);
expect(undeleteCommunityRes.community.deleted).toBe(false);
await delay();
// Make sure it got undeleted on A
let search2 = await searchForBetaCommunity(alpha);
@ -54,6 +58,7 @@ test('Delete community', async () => {
test('Remove community', async () => {
let communityRes = await createCommunity(beta);
await delay();
let removeCommunityRes = await removeCommunity(
beta,
true,
@ -66,6 +71,7 @@ test('Remove community', async () => {
let communityA = search.communities[0];
// TODO this fails currently, because no updates are pushed
// expect(communityA.removed).toBe(true);
await delay();
// unremove
let unremoveCommunityRes = await removeCommunity(
@ -74,6 +80,7 @@ test('Remove community', async () => {
communityRes.community.id
);
expect(unremoveCommunityRes.community.removed).toBe(false);
await delay();
// Make sure it got unremoved on A
let search2 = await searchForBetaCommunity(alpha);

View File

@ -5,6 +5,7 @@ import {
followCommunity,
checkFollowedCommunities,
unfollowRemotes,
delay,
} from './shared';
beforeAll(async () => {
@ -22,6 +23,7 @@ test('Follow federated community', async () => {
// Make sure the follow response went through
expect(follow.community.local).toBe(false);
expect(follow.community.name).toBe('main');
await delay();
// Check it from local
let followCheck = await checkFollowedCommunities(alpha);
@ -33,6 +35,7 @@ test('Follow federated community', async () => {
// Test an unfollow
let unfollow = await followCommunity(alpha, false, remoteCommunityId);
expect(unfollow.community.local).toBe(false);
await delay();
// Make sure you are unsubbed locally
let unfollowCheck = await checkFollowedCommunities(alpha);

View File

@ -1,3 +1,4 @@
jest.setTimeout(120000);
import {
alpha,
beta,
@ -18,6 +19,7 @@ import {
removePost,
getPost,
unfollowRemotes,
delay,
} from './shared';
beforeAll(async () => {
@ -26,6 +28,7 @@ beforeAll(async () => {
await followBeta(gamma);
await followBeta(delta);
await followBeta(epsilon);
await delay(10000);
});
afterAll(async () => {
@ -37,11 +40,13 @@ afterAll(async () => {
test('Create a post', async () => {
let search = await searchForBetaCommunity(alpha);
await delay();
let postRes = await createPost(alpha, search.communities[0].id);
expect(postRes.post).toBeDefined();
expect(postRes.post.community_local).toBe(false);
expect(postRes.post.creator_local).toBe(true);
expect(postRes.post.score).toBe(1);
await delay();
// Make sure that post is liked on beta
let searchBeta = await searchPost(beta, postRes.post);
@ -69,12 +74,15 @@ test('Create a post in a non-existent community', async () => {
test('Unlike a post', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let unlike = await likePost(alpha, 0, postRes.post);
expect(unlike.post.score).toBe(0);
await delay();
// Try to unlike it again, make sure it stays at 0
let unlike2 = await likePost(alpha, 0, postRes.post);
expect(unlike2.post.score).toBe(0);
await delay();
// Make sure that post is unliked on beta
let searchBeta = await searchPost(beta, postRes.post);
@ -89,12 +97,14 @@ test('Unlike a post', async () => {
test('Update a post', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let updatedName = 'A jest test federated post, updated';
let updatedPost = await updatePost(alpha, postRes.post);
expect(updatedPost.post.name).toBe(updatedName);
expect(updatedPost.post.community_local).toBe(false);
expect(updatedPost.post.creator_local).toBe(true);
await delay();
// Make sure that post is updated on beta
let searchBeta = await searchPost(beta, postRes.post);
@ -102,6 +112,7 @@ test('Update a post', async () => {
expect(betaPost.community_local).toBe(true);
expect(betaPost.creator_local).toBe(false);
expect(betaPost.name).toBe(updatedName);
await delay();
// Make sure lemmy beta cannot update the post
let updatedPostBeta = await updatePost(beta, betaPost);
@ -111,9 +122,11 @@ test('Update a post', async () => {
test('Sticky a post', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let stickiedPostRes = await stickyPost(alpha, true, postRes.post);
expect(stickiedPostRes.post.stickied).toBe(true);
await delay();
// Make sure that post is stickied on beta
let searchBeta = await searchPost(beta, postRes.post);
@ -125,6 +138,7 @@ test('Sticky a post', async () => {
// Unsticky a post
let unstickiedPost = await stickyPost(alpha, false, postRes.post);
expect(unstickiedPost.post.stickied).toBe(false);
await delay();
// Make sure that post is unstickied on beta
let searchBeta2 = await searchPost(beta, postRes.post);
@ -137,6 +151,7 @@ test('Sticky a post', async () => {
let searchGamma = await searchPost(gamma, postRes.post);
let gammaPost = searchGamma.posts[0];
let gammaTrySticky = await stickyPost(gamma, true, gammaPost);
await delay();
let searchBeta3 = await searchPost(beta, postRes.post);
let betaPost3 = searchBeta3.posts[0];
expect(gammaTrySticky.post.stickied).toBe(true);
@ -145,10 +160,13 @@ test('Sticky a post', async () => {
test('Lock a post', async () => {
let search = await searchForBetaCommunity(alpha);
await delay();
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let lockedPostRes = await lockPost(alpha, true, postRes.post);
expect(lockedPostRes.post.locked).toBe(true);
await delay();
// Make sure that post is locked on beta
let searchBeta = await searchPost(beta, postRes.post);
@ -160,14 +178,17 @@ test('Lock a post', async () => {
// Try to make a new comment there, on alpha
let comment = await createComment(alpha, postRes.post.id);
expect(comment['error']).toBe('locked');
await delay();
// Try to create a new comment, on beta
let commentBeta = await createComment(beta, betaPost.id);
expect(commentBeta['error']).toBe('locked');
await delay();
// Unlock a post
let unlockedPost = await lockPost(alpha, false, postRes.post);
expect(unlockedPost.post.locked).toBe(false);
await delay();
// Make sure that post is unlocked on beta
let searchBeta2 = await searchPost(beta, postRes.post);
@ -180,68 +201,84 @@ test('Lock a post', async () => {
test('Delete a post', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let deletedPost = await deletePost(alpha, true, postRes.post);
expect(deletedPost.post.deleted).toBe(true);
await delay();
// Make sure lemmy beta sees post is deleted
let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
let betaPost = await getPost(beta, createFakeBetaPostToGetId);
expect(betaPost.post.deleted).toBe(true);
let searchBeta = await searchPost(beta, postRes.post);
let betaPost = searchBeta.posts[0];
// This will be undefined because of the tombstone
expect(betaPost).toBeUndefined();
await delay();
// Undelete
let undeletedPost = await deletePost(alpha, false, postRes.post);
expect(undeletedPost.post.deleted).toBe(false);
await delay();
// Make sure lemmy beta sees post is undeleted
let betaPost2 = await getPost(beta, createFakeBetaPostToGetId);
expect(betaPost2.post.deleted).toBe(false);
let searchBeta2 = await searchPost(beta, postRes.post);
let betaPost2 = searchBeta2.posts[0];
expect(betaPost2.deleted).toBe(false);
// Make sure lemmy beta cannot delete the post
let deletedPostBeta = await deletePost(beta, true, betaPost2.post);
let deletedPostBeta = await deletePost(beta, true, betaPost2);
expect(deletedPostBeta).toStrictEqual({ error: 'no_post_edit_allowed' });
});
test('Remove a post from admin and community on different instance', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let removedPost = await removePost(alpha, true, postRes.post);
expect(removedPost.post.removed).toBe(true);
await delay();
// Make sure lemmy beta sees post is NOT removed
let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
let betaPost = await getPost(beta, createFakeBetaPostToGetId);
expect(betaPost.post.removed).toBe(false);
let searchBeta = await searchPost(beta, postRes.post);
let betaPost = searchBeta.posts[0];
expect(betaPost.removed).toBe(false);
await delay();
// Undelete
let undeletedPost = await removePost(alpha, false, postRes.post);
expect(undeletedPost.post.removed).toBe(false);
await delay();
// Make sure lemmy beta sees post is undeleted
let betaPost2 = await getPost(beta, createFakeBetaPostToGetId);
expect(betaPost2.post.removed).toBe(false);
let searchBeta2 = await searchPost(beta, postRes.post);
let betaPost2 = searchBeta2.posts[0];
expect(betaPost2.removed).toBe(false);
});
test('Remove a post from admin and community on same instance', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
// Get the id for beta
let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
let betaPost = await getPost(beta, createFakeBetaPostToGetId);
let searchBeta = await searchPost(beta, postRes.post);
let betaPost = searchBeta.posts[0];
await delay();
// The beta admin removes it (the community lives on beta)
let removePostRes = await removePost(beta, true, betaPost.post);
let removePostRes = await removePost(beta, true, betaPost);
expect(removePostRes.post.removed).toBe(true);
await delay();
// Make sure lemmy alpha sees post is removed
let alphaPost = await getPost(alpha, postRes.post.id);
expect(alphaPost.post.removed).toBe(true);
await delay();
// Undelete
let undeletedPost = await removePost(beta, false, betaPost.post);
let undeletedPost = await removePost(beta, false, betaPost);
expect(undeletedPost.post.removed).toBe(false);
await delay();
// Make sure lemmy alpha sees post is undeleted
let alphaPost2 = await getPost(alpha, postRes.post.id);
@ -250,7 +287,9 @@ test('Remove a post from admin and community on same instance', async () => {
test('Search for a post', async () => {
let search = await searchForBetaCommunity(alpha);
await delay();
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let searchBeta = await searchPost(beta, postRes.post);
expect(searchBeta.posts[0].name).toBeDefined();
@ -259,6 +298,7 @@ test('Search for a post', async () => {
test('A and G subscribe to B (center) A posts, it gets announced to G', async () => {
let search = await searchForBetaCommunity(alpha);
let postRes = await createPost(alpha, search.communities[0].id);
await delay();
let search2 = await searchPost(gamma, postRes.post);
expect(search2.posts[0].name).toBeDefined();

View File

@ -8,13 +8,16 @@ import {
listPrivateMessages,
deletePrivateMessage,
unfollowRemotes,
delay,
} from './shared';
let recipient_id: number;
beforeAll(async () => {
await setupLogins();
recipient_id = (await followBeta(alpha)).community.creator_id;
let follow = await followBeta(alpha);
await delay(10000);
recipient_id = follow.community.creator_id;
});
afterAll(async () => {
@ -27,6 +30,7 @@ test('Create a private message', async () => {
expect(pmRes.message.local).toBe(true);
expect(pmRes.message.creator_local).toBe(true);
expect(pmRes.message.recipient_local).toBe(false);
await delay();
let betaPms = await listPrivateMessages(beta);
expect(betaPms.messages[0].content).toBeDefined();
@ -41,6 +45,7 @@ test('Update a private message', async () => {
let pmRes = await createPrivateMessage(alpha, recipient_id);
let pmUpdated = await updatePrivateMessage(alpha, pmRes.message.id);
expect(pmUpdated.message.content).toBe(updatedContent);
await delay();
let betaPms = await listPrivateMessages(beta);
expect(betaPms.messages[0].content).toBe(updatedContent);
@ -48,15 +53,18 @@ test('Update a private message', async () => {
test('Delete a private message', async () => {
let pmRes = await createPrivateMessage(alpha, recipient_id);
await delay();
let betaPms1 = await listPrivateMessages(beta);
let deletedPmRes = await deletePrivateMessage(alpha, true, pmRes.message.id);
expect(deletedPmRes.message.deleted).toBe(true);
await delay();
// The GetPrivateMessages filters out deleted,
// even though they are in the actual database.
// no reason to show them
let betaPms2 = await listPrivateMessages(beta);
expect(betaPms2.messages.length).toBe(betaPms1.messages.length - 1);
await delay();
// Undelete
let undeletedPmRes = await deletePrivateMessage(
@ -65,6 +73,7 @@ test('Delete a private message', async () => {
pmRes.message.id
);
expect(undeletedPmRes.message.deleted).toBe(false);
await delay();
let betaPms3 = await listPrivateMessages(beta);
expect(betaPms3.messages.length).toBe(betaPms1.messages.length);

View File

@ -198,7 +198,7 @@ export async function searchPost(
): Promise<SearchResponse> {
let form: SearchForm = {
q: post.ap_id,
type_: SearchType.All,
type_: SearchType.Posts,
sort: SortType.TopAll,
};
return api.client.search(form);
@ -220,7 +220,7 @@ export async function searchComment(
): Promise<SearchResponse> {
let form: SearchForm = {
q: comment.ap_id,
type_: SearchType.All,
type_: SearchType.Comments,
sort: SortType.TopAll,
};
return api.client.search(form);
@ -233,7 +233,7 @@ export async function searchForBetaCommunity(
// Use short-hand search url
let form: SearchForm = {
q: '!main@lemmy-beta:8550',
type_: SearchType.All,
type_: SearchType.Communities,
sort: SortType.TopAll,
};
return api.client.search(form);
@ -247,7 +247,7 @@ export async function searchForUser(
// Use short-hand search url
let form: SearchForm = {
q: apShortname,
type_: SearchType.All,
type_: SearchType.Users,
sort: SortType.TopAll,
};
return api.client.search(form);
@ -524,6 +524,11 @@ export async function followBeta(api: API): Promise<CommunityResponse> {
}
}
export const delay = (millis: number = 1500) =>
new Promise((resolve, _reject) => {
setTimeout(_ => resolve(), millis);
});
export function wrapper(form: any): string {
return JSON.stringify(form);
}