diff --git a/Cargo.lock b/Cargo.lock index c8c79d873..3a5e0f09a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,7 @@ dependencies = [ "lemmy_utils", "log", "serde 1.0.116", + "serde_json", "strum", "strum_macros", ] diff --git a/lemmy_structs/Cargo.toml b/lemmy_structs/Cargo.toml index 58658b924..9373b058c 100644 --- a/lemmy_structs/Cargo.toml +++ b/lemmy_structs/Cargo.toml @@ -19,3 +19,4 @@ actix-web = { version = "3.0" } strum = "0.19" strum_macros = "0.19" chrono = { version = "0.4", features = ["serde"] } +serde_json = { version = "1.0", features = ["preserve_order"]} diff --git a/lemmy_structs/src/lib.rs b/lemmy_structs/src/lib.rs index f7140205a..5b7688ef4 100644 --- a/lemmy_structs/src/lib.rs +++ b/lemmy_structs/src/lib.rs @@ -6,6 +6,7 @@ extern crate serde; #[macro_use] extern crate strum_macros; extern crate chrono; +extern crate serde_json; pub mod comment; pub mod community; diff --git a/lemmy_structs/src/websocket.rs b/lemmy_structs/src/websocket.rs index c5c6c5d60..bd09889c2 100644 --- a/lemmy_structs/src/websocket.rs +++ b/lemmy_structs/src/websocket.rs @@ -1,8 +1,28 @@ use crate::{comment::CommentResponse, post::PostResponse}; use actix::{prelude::*, Recipient}; -use lemmy_utils::{CommunityId, ConnectionId, IPAddr, PostId, UserId}; +use lemmy_utils::{CommunityId, ConnectionId, IPAddr, LemmyError, PostId, UserId}; use serde::{Deserialize, Serialize}; +pub fn serialize_websocket_message( + op: &UserOperation, + data: &Response, +) -> Result +where + Response: Serialize, +{ + let response = WebsocketResponse { + op: op.to_string(), + data, + }; + Ok(serde_json::to_string(&response)?) +} + +#[derive(Serialize)] +struct WebsocketResponse { + op: String, + data: T, +} + #[derive(EnumString, ToString, Debug, Clone)] pub enum UserOperation { Login, diff --git a/src/api/mod.rs b/src/api/mod.rs index d63f38eb8..11d9aa5d6 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,5 +1,5 @@ -use crate::{api::claims::Claims, DbPool, LemmyContext}; -use actix_web::web::Data; +use crate::{api::claims::Claims, websocket::handlers::Args, DbPool, LemmyContext}; +use actix_web::{web, web::Data}; use lemmy_db::{ community::Community, community_view::CommunityUserBanView, @@ -7,8 +7,17 @@ use lemmy_db::{ user::User_, Crud, }; -use lemmy_structs::blocking; +use lemmy_structs::{ + blocking, + comment::*, + community::*, + post::*, + site::*, + user::*, + websocket::{serialize_websocket_message, UserOperation}, +}; use lemmy_utils::{APIError, ConnectionId, LemmyError}; +use serde::Deserialize; pub mod claims; pub mod comment; @@ -96,3 +105,115 @@ pub(in crate::api) async fn check_community_ban( Ok(()) } } + +pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result +where + for<'de> Data: Deserialize<'de> + 'a, + Data: Perform, +{ + let Args { + context, + id, + op, + data, + } = args; + + do_websocket_operation::(context, data, id, op).await +} + +pub async fn do_websocket_operation<'a, 'b, Data>( + context: LemmyContext, + data: &'a str, + id: ConnectionId, + op: UserOperation, +) -> Result +where + for<'de> Data: Deserialize<'de> + 'a, + Data: Perform, +{ + let parsed_data: Data = serde_json::from_str(&data)?; + let res = parsed_data + .perform(&web::Data::new(context), Some(id)) + .await?; + serialize_websocket_message(&op, &res) +} + +pub async fn xxx(args: Args<'_>) -> Result { + match args.op { + // User ops + UserOperation::Login => do_user_operation::(args).await, + UserOperation::Register => do_user_operation::(args).await, + UserOperation::GetCaptcha => do_user_operation::(args).await, + UserOperation::GetUserDetails => do_user_operation::(args).await, + UserOperation::GetReplies => do_user_operation::(args).await, + UserOperation::AddAdmin => do_user_operation::(args).await, + UserOperation::BanUser => do_user_operation::(args).await, + UserOperation::GetUserMentions => do_user_operation::(args).await, + UserOperation::MarkUserMentionAsRead => do_user_operation::(args).await, + UserOperation::MarkAllAsRead => do_user_operation::(args).await, + UserOperation::DeleteAccount => do_user_operation::(args).await, + UserOperation::PasswordReset => do_user_operation::(args).await, + UserOperation::PasswordChange => do_user_operation::(args).await, + UserOperation::UserJoin => do_user_operation::(args).await, + UserOperation::PostJoin => do_user_operation::(args).await, + UserOperation::CommunityJoin => do_user_operation::(args).await, + UserOperation::SaveUserSettings => do_user_operation::(args).await, + + // Private Message ops + UserOperation::CreatePrivateMessage => do_user_operation::(args).await, + UserOperation::EditPrivateMessage => do_user_operation::(args).await, + UserOperation::DeletePrivateMessage => do_user_operation::(args).await, + UserOperation::MarkPrivateMessageAsRead => { + do_user_operation::(args).await + } + UserOperation::GetPrivateMessages => do_user_operation::(args).await, + + // Site ops + UserOperation::GetModlog => do_user_operation::(args).await, + UserOperation::CreateSite => do_user_operation::(args).await, + UserOperation::EditSite => do_user_operation::(args).await, + UserOperation::GetSite => do_user_operation::(args).await, + UserOperation::GetSiteConfig => do_user_operation::(args).await, + UserOperation::SaveSiteConfig => do_user_operation::(args).await, + UserOperation::Search => do_user_operation::(args).await, + UserOperation::TransferCommunity => do_user_operation::(args).await, + UserOperation::TransferSite => do_user_operation::(args).await, + UserOperation::ListCategories => do_user_operation::(args).await, + + // Community ops + UserOperation::GetCommunity => do_user_operation::(args).await, + UserOperation::ListCommunities => do_user_operation::(args).await, + UserOperation::CreateCommunity => do_user_operation::(args).await, + UserOperation::EditCommunity => do_user_operation::(args).await, + UserOperation::DeleteCommunity => do_user_operation::(args).await, + UserOperation::RemoveCommunity => do_user_operation::(args).await, + UserOperation::FollowCommunity => do_user_operation::(args).await, + UserOperation::GetFollowedCommunities => { + do_user_operation::(args).await + } + UserOperation::BanFromCommunity => do_user_operation::(args).await, + UserOperation::AddModToCommunity => do_user_operation::(args).await, + + // Post ops + UserOperation::CreatePost => do_user_operation::(args).await, + UserOperation::GetPost => do_user_operation::(args).await, + UserOperation::GetPosts => do_user_operation::(args).await, + UserOperation::EditPost => do_user_operation::(args).await, + UserOperation::DeletePost => do_user_operation::(args).await, + UserOperation::RemovePost => do_user_operation::(args).await, + UserOperation::LockPost => do_user_operation::(args).await, + UserOperation::StickyPost => do_user_operation::(args).await, + UserOperation::CreatePostLike => do_user_operation::(args).await, + UserOperation::SavePost => do_user_operation::(args).await, + + // Comment ops + UserOperation::CreateComment => do_user_operation::(args).await, + UserOperation::EditComment => do_user_operation::(args).await, + UserOperation::DeleteComment => do_user_operation::(args).await, + UserOperation::RemoveComment => do_user_operation::(args).await, + UserOperation::MarkCommentAsRead => do_user_operation::(args).await, + UserOperation::SaveComment => do_user_operation::(args).await, + UserOperation::GetComments => do_user_operation::(args).await, + UserOperation::CreateCommentLike => do_user_operation::(args).await, + } +} diff --git a/src/main.rs b/src/main.rs index 800196cfe..331a7490a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ use lazy_static::lazy_static; use lemmy_db::get_database_url_from_env; use lemmy_rate_limit::{rate_limiter::RateLimiter, RateLimit}; use lemmy_server::{ + api::xxx, apub::activity_queue::create_activity_queue, code_migrations::run_advanced_migrations, routes::*, @@ -77,6 +78,7 @@ async fn main() -> Result<(), LemmyError> { let chat_server = ChatServer::startup( pool.clone(), rate_limiter.clone(), + |args| Box::pin(xxx(args)), Client::default(), activity_queue.clone(), ) diff --git a/src/websocket/chat_server.rs b/src/websocket/chat_server.rs index 0c4dfb0ba..ccc8155ae 100644 --- a/src/websocket/chat_server.rs +++ b/src/websocket/chat_server.rs @@ -1,7 +1,4 @@ -use crate::{ - websocket::handlers::{do_user_operation, to_json_string, Args}, - LemmyContext, -}; +use crate::{websocket::handlers::Args, LemmyContext}; use actix::prelude::*; use anyhow::Context as acontext; use background_jobs::QueueHandle; @@ -10,7 +7,7 @@ use diesel::{ PgConnection, }; use lemmy_rate_limit::RateLimit; -use lemmy_structs::{comment::*, community::*, post::*, site::*, user::*, websocket::*}; +use lemmy_structs::{comment::*, post::*, websocket::*}; use lemmy_utils::{ location_info, APIError, @@ -29,6 +26,10 @@ use std::{ collections::{HashMap, HashSet}, str::FromStr, }; +use tokio::macros::support::Pin; + +type MessageHandlerType = + fn(args: Args) -> Pin> + '_>>; /// `ChatServer` manages chat rooms and responsible for coordinating chat /// session. @@ -57,6 +58,8 @@ pub struct ChatServer { /// A list of the current captchas pub(super) captchas: Vec, + message_handler: MessageHandlerType, + /// An HTTP Client client: Client, @@ -75,6 +78,7 @@ impl ChatServer { pub fn startup( pool: Pool>, rate_limiter: RateLimit, + message_handler: MessageHandlerType, client: Client, activity_queue: QueueHandle, ) -> ChatServer { @@ -87,6 +91,7 @@ impl ChatServer { pool, rate_limiter, captchas: Vec::new(), + message_handler, client, activity_queue, } @@ -180,7 +185,7 @@ impl ChatServer { where Response: Serialize, { - let res_str = &to_json_string(op, response)?; + let res_str = &serialize_websocket_message(op, response)?; if let Some(sessions) = self.post_rooms.get(&post_id) { for id in sessions { if let Some(my_id) = websocket_id { @@ -204,7 +209,7 @@ impl ChatServer { where Response: Serialize, { - let res_str = &to_json_string(op, response)?; + let res_str = &serialize_websocket_message(op, response)?; if let Some(sessions) = self.community_rooms.get(&community_id) { for id in sessions { if let Some(my_id) = websocket_id { @@ -227,7 +232,7 @@ impl ChatServer { where Response: Serialize, { - let res_str = &to_json_string(op, response)?; + let res_str = &serialize_websocket_message(op, response)?; for id in self.sessions.keys() { if let Some(my_id) = websocket_id { if *id == my_id { @@ -249,7 +254,7 @@ impl ChatServer { where Response: Serialize, { - let res_str = &to_json_string(op, response)?; + let res_str = &serialize_websocket_message(op, response)?; if let Some(sessions) = self.user_rooms.get(&recipient_id) { for id in sessions { if let Some(my_id) = websocket_id { @@ -351,6 +356,7 @@ impl ChatServer { let client = self.client.clone(); let activity_queue = self.activity_queue.clone(); + let message_handler = self.message_handler; async move { let msg = msg; let json: Value = serde_json::from_str(&msg.msg)?; @@ -369,95 +375,17 @@ impl ChatServer { }; let args = Args { context, - rate_limiter, id: msg.id, - ip, op: user_operation.clone(), data, }; + let fut = (message_handler)(args); match user_operation { - // User ops - UserOperation::Login => do_user_operation::(args).await, - UserOperation::Register => do_user_operation::(args).await, - UserOperation::GetCaptcha => do_user_operation::(args).await, - UserOperation::GetUserDetails => do_user_operation::(args).await, - UserOperation::GetReplies => do_user_operation::(args).await, - UserOperation::AddAdmin => do_user_operation::(args).await, - UserOperation::BanUser => do_user_operation::(args).await, - UserOperation::GetUserMentions => do_user_operation::(args).await, - UserOperation::MarkUserMentionAsRead => { - do_user_operation::(args).await - } - UserOperation::MarkAllAsRead => do_user_operation::(args).await, - UserOperation::DeleteAccount => do_user_operation::(args).await, - UserOperation::PasswordReset => do_user_operation::(args).await, - UserOperation::PasswordChange => do_user_operation::(args).await, - UserOperation::UserJoin => do_user_operation::(args).await, - UserOperation::PostJoin => do_user_operation::(args).await, - UserOperation::CommunityJoin => do_user_operation::(args).await, - UserOperation::SaveUserSettings => do_user_operation::(args).await, - - // Private Message ops - UserOperation::CreatePrivateMessage => { - do_user_operation::(args).await - } - UserOperation::EditPrivateMessage => do_user_operation::(args).await, - UserOperation::DeletePrivateMessage => { - do_user_operation::(args).await - } - UserOperation::MarkPrivateMessageAsRead => { - do_user_operation::(args).await - } - UserOperation::GetPrivateMessages => do_user_operation::(args).await, - - // Site ops - UserOperation::GetModlog => do_user_operation::(args).await, - UserOperation::CreateSite => do_user_operation::(args).await, - UserOperation::EditSite => do_user_operation::(args).await, - UserOperation::GetSite => do_user_operation::(args).await, - UserOperation::GetSiteConfig => do_user_operation::(args).await, - UserOperation::SaveSiteConfig => do_user_operation::(args).await, - UserOperation::Search => do_user_operation::(args).await, - UserOperation::TransferCommunity => do_user_operation::(args).await, - UserOperation::TransferSite => do_user_operation::(args).await, - UserOperation::ListCategories => do_user_operation::(args).await, - - // Community ops - UserOperation::GetCommunity => do_user_operation::(args).await, - UserOperation::ListCommunities => do_user_operation::(args).await, - UserOperation::CreateCommunity => do_user_operation::(args).await, - UserOperation::EditCommunity => do_user_operation::(args).await, - UserOperation::DeleteCommunity => do_user_operation::(args).await, - UserOperation::RemoveCommunity => do_user_operation::(args).await, - UserOperation::FollowCommunity => do_user_operation::(args).await, - UserOperation::GetFollowedCommunities => { - do_user_operation::(args).await - } - UserOperation::BanFromCommunity => do_user_operation::(args).await, - UserOperation::AddModToCommunity => do_user_operation::(args).await, - - // Post ops - UserOperation::CreatePost => do_user_operation::(args).await, - UserOperation::GetPost => do_user_operation::(args).await, - UserOperation::GetPosts => do_user_operation::(args).await, - UserOperation::EditPost => do_user_operation::(args).await, - UserOperation::DeletePost => do_user_operation::(args).await, - UserOperation::RemovePost => do_user_operation::(args).await, - UserOperation::LockPost => do_user_operation::(args).await, - UserOperation::StickyPost => do_user_operation::(args).await, - UserOperation::CreatePostLike => do_user_operation::(args).await, - UserOperation::SavePost => do_user_operation::(args).await, - - // Comment ops - UserOperation::CreateComment => do_user_operation::(args).await, - UserOperation::EditComment => do_user_operation::(args).await, - UserOperation::DeleteComment => do_user_operation::(args).await, - UserOperation::RemoveComment => do_user_operation::(args).await, - UserOperation::MarkCommentAsRead => do_user_operation::(args).await, - UserOperation::SaveComment => do_user_operation::(args).await, - UserOperation::GetComments => do_user_operation::(args).await, - UserOperation::CreateCommentLike => do_user_operation::(args).await, + UserOperation::Register => rate_limiter.register().wrap(ip, fut).await, + UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await, + UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await, + _ => rate_limiter.message().wrap(ip, fut).await, } } } diff --git a/src/websocket/handlers.rs b/src/websocket/handlers.rs index 75e638f94..601fd0896 100644 --- a/src/websocket/handlers.rs +++ b/src/websocket/handlers.rs @@ -1,58 +1,20 @@ use crate::{ - api::Perform, websocket::chat_server::{ChatServer, SessionInfo}, LemmyContext, }; use actix::{Actor, Context, Handler, ResponseFuture}; -use actix_web::web; use lemmy_db::naive_now; -use lemmy_rate_limit::RateLimit; use lemmy_structs::websocket::*; -use lemmy_utils::{ConnectionId, IPAddr, LemmyError}; +use lemmy_utils::ConnectionId; use log::{error, info}; use rand::Rng; -use serde::{Deserialize, Serialize}; +use serde::Serialize; -pub(super) struct Args<'a> { - pub(super) context: LemmyContext, - pub(super) rate_limiter: RateLimit, - pub(super) id: ConnectionId, - pub(super) ip: IPAddr, - pub(super) op: UserOperation, - pub(super) data: &'a str, -} - -pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result -where - for<'de> Data: Deserialize<'de> + 'a, - Data: Perform, -{ - let Args { - context, - rate_limiter, - id, - ip, - op, - data, - } = args; - - let data = data.to_string(); - let op2 = op.clone(); - - let fut = async move { - let parsed_data: Data = serde_json::from_str(&data)?; - let res = parsed_data - .perform(&web::Data::new(context), Some(id)) - .await?; - to_json_string(&op, &res) - }; - - match op2 { - UserOperation::Register => rate_limiter.register().wrap(ip, fut).await, - UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await, - UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await, - _ => rate_limiter.message().wrap(ip, fut).await, - } +pub struct Args<'a> { + pub context: LemmyContext, + pub id: ConnectionId, + pub op: UserOperation, + pub data: &'a str, } /// Make actor from `ChatServer` @@ -241,26 +203,6 @@ impl Handler for ChatServer { } } -#[derive(Serialize)] -struct WebsocketResponse { - op: String, - data: T, -} - -pub(super) fn to_json_string( - op: &UserOperation, - data: &Response, -) -> Result -where - Response: Serialize, -{ - let response = WebsocketResponse { - op: op.to_string(), - data, - }; - Ok(serde_json::to_string(&response)?) -} - impl Handler for ChatServer { type Result = ();