Remove websocket dependency on API

This commit is contained in:
Felix Ableitner 2020-09-22 20:58:18 +02:00
parent fc525c8144
commit 4e51f6da1c
8 changed files with 177 additions and 161 deletions

1
Cargo.lock generated
View file

@ -1887,6 +1887,7 @@ dependencies = [
"lemmy_utils", "lemmy_utils",
"log", "log",
"serde 1.0.116", "serde 1.0.116",
"serde_json",
"strum", "strum",
"strum_macros", "strum_macros",
] ]

View file

@ -19,3 +19,4 @@ actix-web = { version = "3.0" }
strum = "0.19" strum = "0.19"
strum_macros = "0.19" strum_macros = "0.19"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
serde_json = { version = "1.0", features = ["preserve_order"]}

View file

@ -6,6 +6,7 @@ extern crate serde;
#[macro_use] #[macro_use]
extern crate strum_macros; extern crate strum_macros;
extern crate chrono; extern crate chrono;
extern crate serde_json;
pub mod comment; pub mod comment;
pub mod community; pub mod community;

View file

@ -1,8 +1,28 @@
use crate::{comment::CommentResponse, post::PostResponse}; use crate::{comment::CommentResponse, post::PostResponse};
use actix::{prelude::*, Recipient}; use actix::{prelude::*, Recipient};
use lemmy_utils::{CommunityId, ConnectionId, IPAddr, PostId, UserId}; use lemmy_utils::{CommunityId, ConnectionId, IPAddr, LemmyError, PostId, UserId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub fn serialize_websocket_message<Response>(
op: &UserOperation,
data: &Response,
) -> Result<String, LemmyError>
where
Response: Serialize,
{
let response = WebsocketResponse {
op: op.to_string(),
data,
};
Ok(serde_json::to_string(&response)?)
}
#[derive(Serialize)]
struct WebsocketResponse<T> {
op: String,
data: T,
}
#[derive(EnumString, ToString, Debug, Clone)] #[derive(EnumString, ToString, Debug, Clone)]
pub enum UserOperation { pub enum UserOperation {
Login, Login,

View file

@ -1,5 +1,5 @@
use crate::{api::claims::Claims, DbPool, LemmyContext}; use crate::{api::claims::Claims, websocket::handlers::Args, DbPool, LemmyContext};
use actix_web::web::Data; use actix_web::{web, web::Data};
use lemmy_db::{ use lemmy_db::{
community::Community, community::Community,
community_view::CommunityUserBanView, community_view::CommunityUserBanView,
@ -7,8 +7,17 @@ use lemmy_db::{
user::User_, user::User_,
Crud, Crud,
}; };
use lemmy_structs::blocking; use lemmy_structs::{
blocking,
comment::*,
community::*,
post::*,
site::*,
user::*,
websocket::{serialize_websocket_message, UserOperation},
};
use lemmy_utils::{APIError, ConnectionId, LemmyError}; use lemmy_utils::{APIError, ConnectionId, LemmyError};
use serde::Deserialize;
pub mod claims; pub mod claims;
pub mod comment; pub mod comment;
@ -96,3 +105,115 @@ pub(in crate::api) async fn check_community_ban(
Ok(()) Ok(())
} }
} }
pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
where
for<'de> Data: Deserialize<'de> + 'a,
Data: Perform,
{
let Args {
context,
id,
op,
data,
} = args;
do_websocket_operation::<Data>(context, data, id, op).await
}
pub async fn do_websocket_operation<'a, 'b, Data>(
context: LemmyContext,
data: &'a str,
id: ConnectionId,
op: UserOperation,
) -> Result<String, LemmyError>
where
for<'de> Data: Deserialize<'de> + 'a,
Data: Perform,
{
let parsed_data: Data = serde_json::from_str(&data)?;
let res = parsed_data
.perform(&web::Data::new(context), Some(id))
.await?;
serialize_websocket_message(&op, &res)
}
pub async fn xxx(args: Args<'_>) -> Result<String, LemmyError> {
match args.op {
// User ops
UserOperation::Login => do_user_operation::<Login>(args).await,
UserOperation::Register => do_user_operation::<Register>(args).await,
UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
UserOperation::MarkUserMentionAsRead => do_user_operation::<MarkUserMentionAsRead>(args).await,
UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
// Private Message ops
UserOperation::CreatePrivateMessage => do_user_operation::<CreatePrivateMessage>(args).await,
UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
UserOperation::DeletePrivateMessage => do_user_operation::<DeletePrivateMessage>(args).await,
UserOperation::MarkPrivateMessageAsRead => {
do_user_operation::<MarkPrivateMessageAsRead>(args).await
}
UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
// Site ops
UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
UserOperation::Search => do_user_operation::<Search>(args).await,
UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
// Community ops
UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
UserOperation::GetFollowedCommunities => {
do_user_operation::<GetFollowedCommunities>(args).await
}
UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
// Post ops
UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
// Comment ops
UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,
}
}

View file

@ -19,6 +19,7 @@ use lazy_static::lazy_static;
use lemmy_db::get_database_url_from_env; use lemmy_db::get_database_url_from_env;
use lemmy_rate_limit::{rate_limiter::RateLimiter, RateLimit}; use lemmy_rate_limit::{rate_limiter::RateLimiter, RateLimit};
use lemmy_server::{ use lemmy_server::{
api::xxx,
apub::activity_queue::create_activity_queue, apub::activity_queue::create_activity_queue,
code_migrations::run_advanced_migrations, code_migrations::run_advanced_migrations,
routes::*, routes::*,
@ -77,6 +78,7 @@ async fn main() -> Result<(), LemmyError> {
let chat_server = ChatServer::startup( let chat_server = ChatServer::startup(
pool.clone(), pool.clone(),
rate_limiter.clone(), rate_limiter.clone(),
|args| Box::pin(xxx(args)),
Client::default(), Client::default(),
activity_queue.clone(), activity_queue.clone(),
) )

View file

@ -1,7 +1,4 @@
use crate::{ use crate::{websocket::handlers::Args, LemmyContext};
websocket::handlers::{do_user_operation, to_json_string, Args},
LemmyContext,
};
use actix::prelude::*; use actix::prelude::*;
use anyhow::Context as acontext; use anyhow::Context as acontext;
use background_jobs::QueueHandle; use background_jobs::QueueHandle;
@ -10,7 +7,7 @@ use diesel::{
PgConnection, PgConnection,
}; };
use lemmy_rate_limit::RateLimit; use lemmy_rate_limit::RateLimit;
use lemmy_structs::{comment::*, community::*, post::*, site::*, user::*, websocket::*}; use lemmy_structs::{comment::*, post::*, websocket::*};
use lemmy_utils::{ use lemmy_utils::{
location_info, location_info,
APIError, APIError,
@ -29,6 +26,10 @@ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
str::FromStr, str::FromStr,
}; };
use tokio::macros::support::Pin;
type MessageHandlerType =
fn(args: Args) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
/// `ChatServer` manages chat rooms and responsible for coordinating chat /// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. /// session.
@ -57,6 +58,8 @@ pub struct ChatServer {
/// A list of the current captchas /// A list of the current captchas
pub(super) captchas: Vec<CaptchaItem>, pub(super) captchas: Vec<CaptchaItem>,
message_handler: MessageHandlerType,
/// An HTTP Client /// An HTTP Client
client: Client, client: Client,
@ -75,6 +78,7 @@ impl ChatServer {
pub fn startup( pub fn startup(
pool: Pool<ConnectionManager<PgConnection>>, pool: Pool<ConnectionManager<PgConnection>>,
rate_limiter: RateLimit, rate_limiter: RateLimit,
message_handler: MessageHandlerType,
client: Client, client: Client,
activity_queue: QueueHandle, activity_queue: QueueHandle,
) -> ChatServer { ) -> ChatServer {
@ -87,6 +91,7 @@ impl ChatServer {
pool, pool,
rate_limiter, rate_limiter,
captchas: Vec::new(), captchas: Vec::new(),
message_handler,
client, client,
activity_queue, activity_queue,
} }
@ -180,7 +185,7 @@ impl ChatServer {
where where
Response: Serialize, Response: Serialize,
{ {
let res_str = &to_json_string(op, response)?; let res_str = &serialize_websocket_message(op, response)?;
if let Some(sessions) = self.post_rooms.get(&post_id) { if let Some(sessions) = self.post_rooms.get(&post_id) {
for id in sessions { for id in sessions {
if let Some(my_id) = websocket_id { if let Some(my_id) = websocket_id {
@ -204,7 +209,7 @@ impl ChatServer {
where where
Response: Serialize, Response: Serialize,
{ {
let res_str = &to_json_string(op, response)?; let res_str = &serialize_websocket_message(op, response)?;
if let Some(sessions) = self.community_rooms.get(&community_id) { if let Some(sessions) = self.community_rooms.get(&community_id) {
for id in sessions { for id in sessions {
if let Some(my_id) = websocket_id { if let Some(my_id) = websocket_id {
@ -227,7 +232,7 @@ impl ChatServer {
where where
Response: Serialize, Response: Serialize,
{ {
let res_str = &to_json_string(op, response)?; let res_str = &serialize_websocket_message(op, response)?;
for id in self.sessions.keys() { for id in self.sessions.keys() {
if let Some(my_id) = websocket_id { if let Some(my_id) = websocket_id {
if *id == my_id { if *id == my_id {
@ -249,7 +254,7 @@ impl ChatServer {
where where
Response: Serialize, Response: Serialize,
{ {
let res_str = &to_json_string(op, response)?; let res_str = &serialize_websocket_message(op, response)?;
if let Some(sessions) = self.user_rooms.get(&recipient_id) { if let Some(sessions) = self.user_rooms.get(&recipient_id) {
for id in sessions { for id in sessions {
if let Some(my_id) = websocket_id { if let Some(my_id) = websocket_id {
@ -351,6 +356,7 @@ impl ChatServer {
let client = self.client.clone(); let client = self.client.clone();
let activity_queue = self.activity_queue.clone(); let activity_queue = self.activity_queue.clone();
let message_handler = self.message_handler;
async move { async move {
let msg = msg; let msg = msg;
let json: Value = serde_json::from_str(&msg.msg)?; let json: Value = serde_json::from_str(&msg.msg)?;
@ -369,95 +375,17 @@ impl ChatServer {
}; };
let args = Args { let args = Args {
context, context,
rate_limiter,
id: msg.id, id: msg.id,
ip,
op: user_operation.clone(), op: user_operation.clone(),
data, data,
}; };
let fut = (message_handler)(args);
match user_operation { match user_operation {
// User ops UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
UserOperation::Login => do_user_operation::<Login>(args).await, UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
UserOperation::Register => do_user_operation::<Register>(args).await, UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await, _ => rate_limiter.message().wrap(ip, fut).await,
UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
UserOperation::MarkUserMentionAsRead => {
do_user_operation::<MarkUserMentionAsRead>(args).await
}
UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
// Private Message ops
UserOperation::CreatePrivateMessage => {
do_user_operation::<CreatePrivateMessage>(args).await
}
UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
UserOperation::DeletePrivateMessage => {
do_user_operation::<DeletePrivateMessage>(args).await
}
UserOperation::MarkPrivateMessageAsRead => {
do_user_operation::<MarkPrivateMessageAsRead>(args).await
}
UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
// Site ops
UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
UserOperation::Search => do_user_operation::<Search>(args).await,
UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
// Community ops
UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
UserOperation::GetFollowedCommunities => {
do_user_operation::<GetFollowedCommunities>(args).await
}
UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
// Post ops
UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
// Comment ops
UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,
} }
} }
} }

View file

@ -1,58 +1,20 @@
use crate::{ use crate::{
api::Perform,
websocket::chat_server::{ChatServer, SessionInfo}, websocket::chat_server::{ChatServer, SessionInfo},
LemmyContext, LemmyContext,
}; };
use actix::{Actor, Context, Handler, ResponseFuture}; use actix::{Actor, Context, Handler, ResponseFuture};
use actix_web::web;
use lemmy_db::naive_now; use lemmy_db::naive_now;
use lemmy_rate_limit::RateLimit;
use lemmy_structs::websocket::*; use lemmy_structs::websocket::*;
use lemmy_utils::{ConnectionId, IPAddr, LemmyError}; use lemmy_utils::ConnectionId;
use log::{error, info}; use log::{error, info};
use rand::Rng; use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::Serialize;
pub(super) struct Args<'a> { pub struct Args<'a> {
pub(super) context: LemmyContext, pub context: LemmyContext,
pub(super) rate_limiter: RateLimit, pub id: ConnectionId,
pub(super) id: ConnectionId, pub op: UserOperation,
pub(super) ip: IPAddr, pub data: &'a str,
pub(super) op: UserOperation,
pub(super) data: &'a str,
}
pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
where
for<'de> Data: Deserialize<'de> + 'a,
Data: Perform,
{
let Args {
context,
rate_limiter,
id,
ip,
op,
data,
} = args;
let data = data.to_string();
let op2 = op.clone();
let fut = async move {
let parsed_data: Data = serde_json::from_str(&data)?;
let res = parsed_data
.perform(&web::Data::new(context), Some(id))
.await?;
to_json_string(&op, &res)
};
match op2 {
UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
_ => rate_limiter.message().wrap(ip, fut).await,
}
} }
/// Make actor from `ChatServer` /// Make actor from `ChatServer`
@ -241,26 +203,6 @@ impl Handler<GetCommunityUsersOnline> for ChatServer {
} }
} }
#[derive(Serialize)]
struct WebsocketResponse<T> {
op: String,
data: T,
}
pub(super) fn to_json_string<Response>(
op: &UserOperation,
data: &Response,
) -> Result<String, LemmyError>
where
Response: Serialize,
{
let response = WebsocketResponse {
op: op.to_string(),
data,
};
Ok(serde_json::to_string(&response)?)
}
impl Handler<CaptchaItem> for ChatServer { impl Handler<CaptchaItem> for ChatServer {
type Result = (); type Result = ();