Remove websocket code (#2878)
Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
This commit is contained in:
parent
a5707328cf
commit
4240af86ca
7 changed files with 4 additions and 758 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
@ -305,23 +305,6 @@ dependencies = [
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "actix-web-actors"
|
|
||||||
version = "4.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "31efe7896f3933ce03dd4710be560254272334bb321a18fd8ff62b1a557d9d19"
|
|
||||||
dependencies = [
|
|
||||||
"actix",
|
|
||||||
"actix-codec",
|
|
||||||
"actix-http",
|
|
||||||
"actix-web",
|
|
||||||
"bytes",
|
|
||||||
"bytestring",
|
|
||||||
"futures-core",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "actix-web-codegen"
|
name = "actix-web-codegen"
|
||||||
version = "4.1.0"
|
version = "4.1.0"
|
||||||
|
@ -2559,7 +2542,6 @@ version = "0.17.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix",
|
"actix",
|
||||||
"actix-rt",
|
"actix-rt",
|
||||||
"actix-web",
|
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"encoding",
|
"encoding",
|
||||||
|
@ -2750,15 +2732,12 @@ version = "0.17.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
"actix",
|
"actix",
|
||||||
"actix-rt",
|
|
||||||
"actix-web",
|
"actix-web",
|
||||||
"actix-web-actors",
|
|
||||||
"clokwerk",
|
"clokwerk",
|
||||||
"console-subscriber",
|
"console-subscriber",
|
||||||
"diesel",
|
"diesel",
|
||||||
"diesel-async",
|
"diesel-async",
|
||||||
"doku",
|
"doku",
|
||||||
"futures",
|
|
||||||
"lemmy_api",
|
"lemmy_api",
|
||||||
"lemmy_api_common",
|
"lemmy_api_common",
|
||||||
"lemmy_api_crud",
|
"lemmy_api_crud",
|
||||||
|
|
|
@ -134,12 +134,9 @@ reqwest-tracing = { workspace = true }
|
||||||
clokwerk = { workspace = true }
|
clokwerk = { workspace = true }
|
||||||
doku = { workspace = true }
|
doku = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
futures = { workspace = true }
|
|
||||||
actix = { workspace = true }
|
actix = { workspace = true }
|
||||||
tracing-opentelemetry = { workspace = true, optional = true }
|
tracing-opentelemetry = { workspace = true, optional = true }
|
||||||
opentelemetry = { workspace = true, optional = true }
|
opentelemetry = { workspace = true, optional = true }
|
||||||
actix-web-actors = { version = "4.1.0", default-features = false }
|
|
||||||
actix-rt = "2.6"
|
|
||||||
console-subscriber = { version = "0.1.8", optional = true }
|
console-subscriber = { version = "0.1.8", optional = true }
|
||||||
opentelemetry-otlp = { version = "0.10.0", optional = true }
|
opentelemetry-otlp = { version = "0.10.0", optional = true }
|
||||||
pict-rs = { version = "0.4.0-beta.9", optional = true }
|
pict-rs = { version = "0.4.0-beta.9", optional = true }
|
||||||
|
|
|
@ -14,7 +14,7 @@ path = "src/lib.rs"
|
||||||
doctest = false
|
doctest = false
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
full = ["tracing", "rosetta-i18n", "chrono", "actix-web", "lemmy_utils",
|
full = ["tracing", "rosetta-i18n", "chrono", "lemmy_utils",
|
||||||
"lemmy_db_views/full", "lemmy_db_views_actor/full", "lemmy_db_views_moderator/full",
|
"lemmy_db_views/full", "lemmy_db_views_actor/full", "lemmy_db_views_moderator/full",
|
||||||
"percent-encoding", "encoding", "reqwest-middleware", "webpage", "ts-rs"]
|
"percent-encoding", "encoding", "reqwest-middleware", "webpage", "ts-rs"]
|
||||||
|
|
||||||
|
@ -22,12 +22,11 @@ full = ["tracing", "rosetta-i18n", "chrono", "actix-web", "lemmy_utils",
|
||||||
lemmy_db_views = { workspace = true }
|
lemmy_db_views = { workspace = true }
|
||||||
lemmy_db_views_moderator = { workspace = true }
|
lemmy_db_views_moderator = { workspace = true }
|
||||||
lemmy_db_views_actor = { workspace = true }
|
lemmy_db_views_actor = { workspace = true }
|
||||||
lemmy_db_schema = { workspace = true, default-features = false }
|
lemmy_db_schema = { workspace = true }
|
||||||
lemmy_utils = { workspace = true, optional = true }
|
lemmy_utils = { workspace = true, optional = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_with = { workspace = true }
|
serde_with = { workspace = true }
|
||||||
url = { workspace = true }
|
url = { workspace = true }
|
||||||
actix-web = { workspace = true, features = ["cookies"], optional = true }
|
|
||||||
chrono = { workspace = true, optional = true }
|
chrono = { workspace = true, optional = true }
|
||||||
tracing = { workspace = true, optional = true }
|
tracing = { workspace = true, optional = true }
|
||||||
reqwest-middleware = { workspace = true, optional = true }
|
reqwest-middleware = { workspace = true, optional = true }
|
||||||
|
@ -46,4 +45,4 @@ futures = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
actix-rt = { workspace = true }
|
actix-rt = { workspace = true }
|
||||||
reqwest = { workspace = true }
|
reqwest = { workspace = true }
|
||||||
ts-rs = { workspace = true, optional = true }
|
ts-rs = { workspace = true, optional = true }
|
||||||
|
|
|
@ -18,7 +18,7 @@ lemmy_db_views_actor = { workspace = true }
|
||||||
lemmy_db_schema = { workspace = true }
|
lemmy_db_schema = { workspace = true }
|
||||||
lemmy_api_common = { workspace = true, features = ["full"] }
|
lemmy_api_common = { workspace = true, features = ["full"] }
|
||||||
activitypub_federation = { workspace = true }
|
activitypub_federation = { workspace = true }
|
||||||
actix-web = { workspace = true }
|
actix-web = { workspace = true, features = ["cookies"] }
|
||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
futures = { workspace = true }
|
futures = { workspace = true }
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use crate::api_routes_websocket::websocket;
|
|
||||||
use actix_web::{guard, web, Error, HttpResponse, Result};
|
use actix_web::{guard, web, Error, HttpResponse, Result};
|
||||||
use lemmy_api::Perform;
|
use lemmy_api::Perform;
|
||||||
use lemmy_api_common::{
|
use lemmy_api_common::{
|
||||||
|
@ -109,8 +108,6 @@ use serde::Deserialize;
|
||||||
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
|
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
|
||||||
cfg.service(
|
cfg.service(
|
||||||
web::scope("/api/v3")
|
web::scope("/api/v3")
|
||||||
// Websocket
|
|
||||||
.service(web::resource("/ws").to(websocket))
|
|
||||||
// Site
|
// Site
|
||||||
.service(
|
.service(
|
||||||
web::scope("/site")
|
web::scope("/site")
|
||||||
|
|
|
@ -1,725 +0,0 @@
|
||||||
use activitypub_federation::config::Data as ContextData;
|
|
||||||
use actix::{
|
|
||||||
fut,
|
|
||||||
Actor,
|
|
||||||
ActorContext,
|
|
||||||
ActorFutureExt,
|
|
||||||
AsyncContext,
|
|
||||||
ContextFutureSpawner,
|
|
||||||
Handler,
|
|
||||||
Running,
|
|
||||||
StreamHandler,
|
|
||||||
WrapFuture,
|
|
||||||
};
|
|
||||||
use actix_web::{web, Error, HttpRequest, HttpResponse};
|
|
||||||
use actix_web_actors::ws;
|
|
||||||
use lemmy_api::Perform;
|
|
||||||
use lemmy_api_common::{
|
|
||||||
comment::{
|
|
||||||
CreateComment,
|
|
||||||
CreateCommentLike,
|
|
||||||
CreateCommentReport,
|
|
||||||
DeleteComment,
|
|
||||||
DistinguishComment,
|
|
||||||
EditComment,
|
|
||||||
GetComment,
|
|
||||||
GetComments,
|
|
||||||
ListCommentReports,
|
|
||||||
RemoveComment,
|
|
||||||
ResolveCommentReport,
|
|
||||||
SaveComment,
|
|
||||||
},
|
|
||||||
community::{
|
|
||||||
AddModToCommunity,
|
|
||||||
BanFromCommunity,
|
|
||||||
BlockCommunity,
|
|
||||||
CreateCommunity,
|
|
||||||
DeleteCommunity,
|
|
||||||
EditCommunity,
|
|
||||||
FollowCommunity,
|
|
||||||
GetCommunity,
|
|
||||||
ListCommunities,
|
|
||||||
RemoveCommunity,
|
|
||||||
TransferCommunity,
|
|
||||||
},
|
|
||||||
context::LemmyContext,
|
|
||||||
custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
|
|
||||||
person::{
|
|
||||||
AddAdmin,
|
|
||||||
BanPerson,
|
|
||||||
BlockPerson,
|
|
||||||
ChangePassword,
|
|
||||||
DeleteAccount,
|
|
||||||
GetBannedPersons,
|
|
||||||
GetCaptcha,
|
|
||||||
GetPersonDetails,
|
|
||||||
GetPersonMentions,
|
|
||||||
GetReplies,
|
|
||||||
GetReportCount,
|
|
||||||
GetUnreadCount,
|
|
||||||
Login,
|
|
||||||
MarkAllAsRead,
|
|
||||||
MarkCommentReplyAsRead,
|
|
||||||
MarkPersonMentionAsRead,
|
|
||||||
PasswordChangeAfterReset,
|
|
||||||
PasswordReset,
|
|
||||||
Register,
|
|
||||||
SaveUserSettings,
|
|
||||||
VerifyEmail,
|
|
||||||
},
|
|
||||||
post::{
|
|
||||||
CreatePost,
|
|
||||||
CreatePostLike,
|
|
||||||
CreatePostReport,
|
|
||||||
DeletePost,
|
|
||||||
EditPost,
|
|
||||||
FeaturePost,
|
|
||||||
GetPost,
|
|
||||||
GetPosts,
|
|
||||||
GetSiteMetadata,
|
|
||||||
ListPostReports,
|
|
||||||
LockPost,
|
|
||||||
MarkPostAsRead,
|
|
||||||
RemovePost,
|
|
||||||
ResolvePostReport,
|
|
||||||
SavePost,
|
|
||||||
},
|
|
||||||
private_message::{
|
|
||||||
CreatePrivateMessage,
|
|
||||||
CreatePrivateMessageReport,
|
|
||||||
DeletePrivateMessage,
|
|
||||||
EditPrivateMessage,
|
|
||||||
GetPrivateMessages,
|
|
||||||
ListPrivateMessageReports,
|
|
||||||
MarkPrivateMessageAsRead,
|
|
||||||
ResolvePrivateMessageReport,
|
|
||||||
},
|
|
||||||
site::{
|
|
||||||
ApproveRegistrationApplication,
|
|
||||||
CreateSite,
|
|
||||||
EditSite,
|
|
||||||
GetFederatedInstances,
|
|
||||||
GetModlog,
|
|
||||||
GetSite,
|
|
||||||
GetUnreadRegistrationApplicationCount,
|
|
||||||
LeaveAdmin,
|
|
||||||
ListRegistrationApplications,
|
|
||||||
PurgeComment,
|
|
||||||
PurgeCommunity,
|
|
||||||
PurgePerson,
|
|
||||||
PurgePost,
|
|
||||||
ResolveObject,
|
|
||||||
Search,
|
|
||||||
},
|
|
||||||
websocket::{
|
|
||||||
handlers::{
|
|
||||||
connect::{Connect, Disconnect},
|
|
||||||
WsMessage,
|
|
||||||
},
|
|
||||||
serialize_websocket_message,
|
|
||||||
structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
|
|
||||||
UserOperation,
|
|
||||||
UserOperationApub,
|
|
||||||
UserOperationCrud,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use lemmy_api_crud::PerformCrud;
|
|
||||||
use lemmy_apub::{api::PerformApub, SendActivity};
|
|
||||||
use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
|
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_json::Value;
|
|
||||||
use std::{
|
|
||||||
ops::Deref,
|
|
||||||
result,
|
|
||||||
str::FromStr,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
|
||||||
use tracing::{debug, error};
|
|
||||||
|
|
||||||
/// How often heartbeat pings are sent
|
|
||||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
|
|
||||||
|
|
||||||
/// How long before lack of client response causes a timeout
|
|
||||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
|
||||||
|
|
||||||
pub struct WsChatSession {
|
|
||||||
/// unique session id
|
|
||||||
pub id: ConnectionId,
|
|
||||||
|
|
||||||
pub ip: IpAddr,
|
|
||||||
|
|
||||||
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
|
|
||||||
/// otherwise we drop connection.
|
|
||||||
pub hb: Instant,
|
|
||||||
|
|
||||||
/// The context data
|
|
||||||
apub_data: ContextData<LemmyContext>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn websocket(
|
|
||||||
req: HttpRequest,
|
|
||||||
body: web::Payload,
|
|
||||||
rate_limiter: web::Data<RateLimitCell>,
|
|
||||||
apub_data: ContextData<LemmyContext>,
|
|
||||||
) -> Result<HttpResponse, Error> {
|
|
||||||
let client_ip = IpAddr(
|
|
||||||
req
|
|
||||||
.connection_info()
|
|
||||||
.realip_remote_addr()
|
|
||||||
.unwrap_or("blank_ip")
|
|
||||||
.to_string(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let check = rate_limiter.message().check(client_ip.clone());
|
|
||||||
if !check {
|
|
||||||
debug!(
|
|
||||||
"Websocket join with IP: {} has been rate limited.",
|
|
||||||
&client_ip
|
|
||||||
);
|
|
||||||
return Ok(HttpResponse::TooManyRequests().finish());
|
|
||||||
}
|
|
||||||
|
|
||||||
ws::start(
|
|
||||||
WsChatSession {
|
|
||||||
id: 0,
|
|
||||||
ip: client_ip,
|
|
||||||
hb: Instant::now(),
|
|
||||||
apub_data,
|
|
||||||
},
|
|
||||||
&req,
|
|
||||||
body,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
|
|
||||||
///
|
|
||||||
/// also this method checks heartbeats from client
|
|
||||||
fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
|
|
||||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
|
||||||
// check client heartbeats
|
|
||||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
|
||||||
// heartbeat timed out
|
|
||||||
|
|
||||||
// notify chat server
|
|
||||||
act
|
|
||||||
.apub_data
|
|
||||||
.chat_server()
|
|
||||||
.do_send(Disconnect { id: act.id });
|
|
||||||
|
|
||||||
// stop actor
|
|
||||||
ctx.stop();
|
|
||||||
|
|
||||||
// don't try to send a ping
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.ping(b"");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Actor for WsChatSession {
|
|
||||||
type Context = ws::WebsocketContext<Self>;
|
|
||||||
|
|
||||||
/// Method is called on actor start.
|
|
||||||
/// We register ws session with ChatServer
|
|
||||||
fn started(&mut self, ctx: &mut Self::Context) {
|
|
||||||
// we'll start heartbeat process on session start.
|
|
||||||
hb(ctx);
|
|
||||||
|
|
||||||
// register self in chat server. `AsyncContext::wait` register
|
|
||||||
// future within context, but context waits until this future resolves
|
|
||||||
// before processing any other events.
|
|
||||||
// HttpContext::state() is instance of WsChatSessionState, state is shared
|
|
||||||
// across all routes within application
|
|
||||||
let addr = ctx.address();
|
|
||||||
self
|
|
||||||
.apub_data
|
|
||||||
.chat_server()
|
|
||||||
.send(Connect {
|
|
||||||
addr: addr.recipient(),
|
|
||||||
})
|
|
||||||
.into_actor(self)
|
|
||||||
.then(|res, act, ctx| {
|
|
||||||
match res {
|
|
||||||
Ok(res) => act.id = res,
|
|
||||||
// something is wrong with chat server
|
|
||||||
_ => ctx.stop(),
|
|
||||||
}
|
|
||||||
fut::ready(())
|
|
||||||
})
|
|
||||||
.wait(ctx);
|
|
||||||
}
|
|
||||||
fn stopping(&mut self, _: &mut Self::Context) -> Running {
|
|
||||||
// notify chat server
|
|
||||||
self
|
|
||||||
.apub_data
|
|
||||||
.chat_server()
|
|
||||||
.do_send(Disconnect { id: self.id });
|
|
||||||
Running::Stop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handle messages from chat server, we simply send it to peer websocket
|
|
||||||
impl Handler<WsMessage> for WsChatSession {
|
|
||||||
type Result = ();
|
|
||||||
|
|
||||||
fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
|
|
||||||
ctx.text(msg.0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// WebSocket message handler
|
|
||||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
|
|
||||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
|
||||||
let msg = match msg {
|
|
||||||
Err(_) => {
|
|
||||||
ctx.stop();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Ok(msg) => msg,
|
|
||||||
};
|
|
||||||
|
|
||||||
match msg {
|
|
||||||
ws::Message::Ping(msg) => {
|
|
||||||
self.hb = Instant::now();
|
|
||||||
ctx.pong(&msg);
|
|
||||||
}
|
|
||||||
ws::Message::Pong(_) => {
|
|
||||||
self.hb = Instant::now();
|
|
||||||
}
|
|
||||||
ws::Message::Text(text) => {
|
|
||||||
let ip_clone = self.ip.clone();
|
|
||||||
let id_clone = self.id.to_owned();
|
|
||||||
let context_clone = self.apub_data.reset_request_count();
|
|
||||||
|
|
||||||
let fut = Box::pin(async move {
|
|
||||||
let msg = text.trim().to_string();
|
|
||||||
parse_json_message(msg, ip_clone, id_clone, context_clone).await
|
|
||||||
});
|
|
||||||
fut
|
|
||||||
.into_actor(self)
|
|
||||||
.then(|res, _, ctx| {
|
|
||||||
match res {
|
|
||||||
Ok(res) => ctx.text(res),
|
|
||||||
Err(e) => error!("{}", &e),
|
|
||||||
}
|
|
||||||
actix::fut::ready(())
|
|
||||||
})
|
|
||||||
.spawn(ctx);
|
|
||||||
}
|
|
||||||
ws::Message::Binary(_) => println!("Unexpected binary"),
|
|
||||||
ws::Message::Close(reason) => {
|
|
||||||
ctx.close(reason);
|
|
||||||
ctx.stop();
|
|
||||||
}
|
|
||||||
ws::Message::Continuation(_) => {
|
|
||||||
ctx.stop();
|
|
||||||
}
|
|
||||||
ws::Message::Nop => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Entry point for our websocket route
|
|
||||||
async fn parse_json_message(
|
|
||||||
msg: String,
|
|
||||||
ip: IpAddr,
|
|
||||||
connection_id: ConnectionId,
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
) -> Result<String, LemmyError> {
|
|
||||||
let rate_limiter = context.settings_updated_channel();
|
|
||||||
let json: Value = serde_json::from_str(&msg)?;
|
|
||||||
let data = json
|
|
||||||
.get("data")
|
|
||||||
.cloned()
|
|
||||||
.ok_or_else(|| LemmyError::from_message("missing data"))?;
|
|
||||||
|
|
||||||
let missing_op_err = || LemmyError::from_message("missing op");
|
|
||||||
|
|
||||||
let op = json
|
|
||||||
.get("op")
|
|
||||||
.ok_or_else(missing_op_err)?
|
|
||||||
.as_str()
|
|
||||||
.ok_or_else(missing_op_err)?;
|
|
||||||
|
|
||||||
// check if api call passes the rate limit, and generate future for later execution
|
|
||||||
if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
|
|
||||||
let passed = match user_operation_crud {
|
|
||||||
UserOperationCrud::Register => rate_limiter.register().check(ip),
|
|
||||||
UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
|
|
||||||
UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
|
|
||||||
UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
|
|
||||||
_ => rate_limiter.message().check(ip),
|
|
||||||
};
|
|
||||||
check_rate_limit_passed(passed)?;
|
|
||||||
match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
|
|
||||||
} else if let Ok(user_operation) = UserOperation::from_str(op) {
|
|
||||||
let passed = match user_operation {
|
|
||||||
UserOperation::GetCaptcha => rate_limiter.post().check(ip),
|
|
||||||
_ => rate_limiter.message().check(ip),
|
|
||||||
};
|
|
||||||
check_rate_limit_passed(passed)?;
|
|
||||||
match_websocket_operation(context, connection_id, user_operation, data).await
|
|
||||||
} else {
|
|
||||||
let user_operation = UserOperationApub::from_str(op)?;
|
|
||||||
let passed = match user_operation {
|
|
||||||
UserOperationApub::Search => rate_limiter.search().check(ip),
|
|
||||||
_ => rate_limiter.message().check(ip),
|
|
||||||
};
|
|
||||||
check_rate_limit_passed(passed)?;
|
|
||||||
match_websocket_operation_apub(context, connection_id, user_operation, data).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
|
|
||||||
if passed {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
// if rate limit was hit, respond with message
|
|
||||||
Err(LemmyError::from_message("rate_limit_error"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn match_websocket_operation_crud(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperationCrud,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError> {
|
|
||||||
match op {
|
|
||||||
// User ops
|
|
||||||
UserOperationCrud::Register => {
|
|
||||||
do_websocket_operation_crud::<Register>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeleteAccount => {
|
|
||||||
do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Private Message ops
|
|
||||||
UserOperationCrud::CreatePrivateMessage => {
|
|
||||||
do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditPrivateMessage => {
|
|
||||||
do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeletePrivateMessage => {
|
|
||||||
do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::GetPrivateMessages => {
|
|
||||||
do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Site ops
|
|
||||||
UserOperationCrud::CreateSite => {
|
|
||||||
do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditSite => {
|
|
||||||
do_websocket_operation_crud::<EditSite>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::GetSite => {
|
|
||||||
do_websocket_operation_crud::<GetSite>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Community ops
|
|
||||||
UserOperationCrud::ListCommunities => {
|
|
||||||
do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::CreateCommunity => {
|
|
||||||
do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditCommunity => {
|
|
||||||
do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeleteCommunity => {
|
|
||||||
do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::RemoveCommunity => {
|
|
||||||
do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Post ops
|
|
||||||
UserOperationCrud::CreatePost => {
|
|
||||||
do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::GetPost => {
|
|
||||||
do_websocket_operation_crud::<GetPost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditPost => {
|
|
||||||
do_websocket_operation_crud::<EditPost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeletePost => {
|
|
||||||
do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::RemovePost => {
|
|
||||||
do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Comment ops
|
|
||||||
UserOperationCrud::CreateComment => {
|
|
||||||
do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditComment => {
|
|
||||||
do_websocket_operation_crud::<EditComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeleteComment => {
|
|
||||||
do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::RemoveComment => {
|
|
||||||
do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::GetComment => {
|
|
||||||
do_websocket_operation_crud::<GetComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
// Emojis
|
|
||||||
UserOperationCrud::CreateCustomEmoji => {
|
|
||||||
do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::EditCustomEmoji => {
|
|
||||||
do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationCrud::DeleteCustomEmoji => {
|
|
||||||
do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_websocket_operation_crud<'a, 'b, Data>(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperationCrud,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError>
|
|
||||||
where
|
|
||||||
Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
|
|
||||||
for<'de> Data: Deserialize<'de>,
|
|
||||||
{
|
|
||||||
let parsed_data: Data = serde_json::from_value(data)?;
|
|
||||||
let res = parsed_data
|
|
||||||
.perform(&web::Data::new(context.deref().clone()), Some(id))
|
|
||||||
.await?;
|
|
||||||
SendActivity::send_activity(&parsed_data, &res, &context).await?;
|
|
||||||
serialize_websocket_message(&op, &res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn match_websocket_operation_apub(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperationApub,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError> {
|
|
||||||
match op {
|
|
||||||
UserOperationApub::GetPersonDetails => {
|
|
||||||
do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationApub::GetCommunity => {
|
|
||||||
do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationApub::GetComments => {
|
|
||||||
do_websocket_operation_apub::<GetComments>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationApub::GetPosts => {
|
|
||||||
do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationApub::ResolveObject => {
|
|
||||||
do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_websocket_operation_apub<'a, 'b, Data>(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperationApub,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError>
|
|
||||||
where
|
|
||||||
Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
|
|
||||||
for<'de> Data: Deserialize<'de>,
|
|
||||||
{
|
|
||||||
let parsed_data: Data = serde_json::from_value(data)?;
|
|
||||||
let res = parsed_data.perform(&context, Some(id)).await?;
|
|
||||||
SendActivity::send_activity(&parsed_data, &res, &context).await?;
|
|
||||||
serialize_websocket_message(&op, &res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn match_websocket_operation(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperation,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError> {
|
|
||||||
match op {
|
|
||||||
// User ops
|
|
||||||
UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
|
|
||||||
UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
|
|
||||||
UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
|
|
||||||
UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
|
|
||||||
UserOperation::GetUnreadRegistrationApplicationCount => {
|
|
||||||
do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ListRegistrationApplications => {
|
|
||||||
do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ApproveRegistrationApplication => {
|
|
||||||
do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
|
|
||||||
UserOperation::GetBannedPersons => {
|
|
||||||
do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::BlockPerson => {
|
|
||||||
do_websocket_operation::<BlockPerson>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::GetPersonMentions => {
|
|
||||||
do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::MarkPersonMentionAsRead => {
|
|
||||||
do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::MarkCommentReplyAsRead => {
|
|
||||||
do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::MarkAllAsRead => {
|
|
||||||
do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::PasswordReset => {
|
|
||||||
do_websocket_operation::<PasswordReset>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::PasswordChange => {
|
|
||||||
do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
|
|
||||||
UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
|
|
||||||
UserOperation::CommunityJoin => {
|
|
||||||
do_websocket_operation::<CommunityJoin>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
|
|
||||||
UserOperation::SaveUserSettings => {
|
|
||||||
do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ChangePassword => {
|
|
||||||
do_websocket_operation::<ChangePassword>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::GetReportCount => {
|
|
||||||
do_websocket_operation::<GetReportCount>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::GetUnreadCount => {
|
|
||||||
do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::VerifyEmail => {
|
|
||||||
do_websocket_operation::<VerifyEmail>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Private Message ops
|
|
||||||
UserOperation::MarkPrivateMessageAsRead => {
|
|
||||||
do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::CreatePrivateMessageReport => {
|
|
||||||
do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ResolvePrivateMessageReport => {
|
|
||||||
do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ListPrivateMessageReports => {
|
|
||||||
do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Site ops
|
|
||||||
UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
|
|
||||||
UserOperation::PurgePerson => {
|
|
||||||
do_websocket_operation::<PurgePerson>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::PurgeCommunity => {
|
|
||||||
do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
|
|
||||||
UserOperation::PurgeComment => {
|
|
||||||
do_websocket_operation::<PurgeComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::TransferCommunity => {
|
|
||||||
do_websocket_operation::<TransferCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
|
|
||||||
UserOperation::GetFederatedInstances => {
|
|
||||||
do_websocket_operation::<GetFederatedInstances>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Community ops
|
|
||||||
UserOperation::FollowCommunity => {
|
|
||||||
do_websocket_operation::<FollowCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::BlockCommunity => {
|
|
||||||
do_websocket_operation::<BlockCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::BanFromCommunity => {
|
|
||||||
do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::AddModToCommunity => {
|
|
||||||
do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Post ops
|
|
||||||
UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
|
|
||||||
UserOperation::FeaturePost => {
|
|
||||||
do_websocket_operation::<FeaturePost>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::CreatePostLike => {
|
|
||||||
do_websocket_operation::<CreatePostLike>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::MarkPostAsRead => {
|
|
||||||
do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
|
|
||||||
UserOperation::CreatePostReport => {
|
|
||||||
do_websocket_operation::<CreatePostReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ListPostReports => {
|
|
||||||
do_websocket_operation::<ListPostReports>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ResolvePostReport => {
|
|
||||||
do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::GetSiteMetadata => {
|
|
||||||
do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
|
|
||||||
// Comment ops
|
|
||||||
UserOperation::SaveComment => {
|
|
||||||
do_websocket_operation::<SaveComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::CreateCommentLike => {
|
|
||||||
do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::DistinguishComment => {
|
|
||||||
do_websocket_operation::<DistinguishComment>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::CreateCommentReport => {
|
|
||||||
do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ListCommentReports => {
|
|
||||||
do_websocket_operation::<ListCommentReports>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
UserOperation::ResolveCommentReport => {
|
|
||||||
do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_websocket_operation<'a, 'b, Data>(
|
|
||||||
context: ContextData<LemmyContext>,
|
|
||||||
id: ConnectionId,
|
|
||||||
op: UserOperation,
|
|
||||||
data: Value,
|
|
||||||
) -> result::Result<String, LemmyError>
|
|
||||||
where
|
|
||||||
Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
|
|
||||||
for<'de> Data: Deserialize<'de>,
|
|
||||||
{
|
|
||||||
let parsed_data: Data = serde_json::from_value(data)?;
|
|
||||||
let res = parsed_data
|
|
||||||
.perform(&web::Data::new(context.deref().clone()), Some(id))
|
|
||||||
.await?;
|
|
||||||
SendActivity::send_activity(&parsed_data, &res, &context).await?;
|
|
||||||
serialize_websocket_message(&op, &res)
|
|
||||||
}
|
|
|
@ -1,5 +1,4 @@
|
||||||
pub mod api_routes_http;
|
pub mod api_routes_http;
|
||||||
pub mod api_routes_websocket;
|
|
||||||
pub mod code_migrations;
|
pub mod code_migrations;
|
||||||
pub mod root_span_builder;
|
pub mod root_span_builder;
|
||||||
pub mod scheduled_tasks;
|
pub mod scheduled_tasks;
|
||||||
|
|
Loading…
Reference in a new issue