use activitypub_federation::config::Data as ContextData; use actix::{ fut, Actor, ActorContext, ActorFutureExt, AsyncContext, ContextFutureSpawner, Handler, Running, StreamHandler, WrapFuture, }; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; use lemmy_api::Perform; use lemmy_api_common::{ comment::{ CreateComment, CreateCommentLike, CreateCommentReport, DeleteComment, DistinguishComment, EditComment, GetComment, GetComments, ListCommentReports, RemoveComment, ResolveCommentReport, SaveComment, }, community::{ AddModToCommunity, BanFromCommunity, BlockCommunity, CreateCommunity, DeleteCommunity, EditCommunity, FollowCommunity, GetCommunity, ListCommunities, RemoveCommunity, TransferCommunity, }, context::LemmyContext, custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji}, person::{ AddAdmin, BanPerson, BlockPerson, ChangePassword, DeleteAccount, GetBannedPersons, GetCaptcha, GetPersonDetails, GetPersonMentions, GetReplies, GetReportCount, GetUnreadCount, Login, MarkAllAsRead, MarkCommentReplyAsRead, MarkPersonMentionAsRead, PasswordChangeAfterReset, PasswordReset, Register, SaveUserSettings, VerifyEmail, }, post::{ CreatePost, CreatePostLike, CreatePostReport, DeletePost, EditPost, FeaturePost, GetPost, GetPosts, GetSiteMetadata, ListPostReports, LockPost, MarkPostAsRead, RemovePost, ResolvePostReport, SavePost, }, private_message::{ CreatePrivateMessage, CreatePrivateMessageReport, DeletePrivateMessage, EditPrivateMessage, GetPrivateMessages, ListPrivateMessageReports, MarkPrivateMessageAsRead, ResolvePrivateMessageReport, }, site::{ ApproveRegistrationApplication, CreateSite, EditSite, GetFederatedInstances, GetModlog, GetSite, GetUnreadRegistrationApplicationCount, LeaveAdmin, ListRegistrationApplications, PurgeComment, PurgeCommunity, PurgePerson, PurgePost, ResolveObject, Search, }, websocket::{ handlers::{ connect::{Connect, Disconnect}, WsMessage, }, serialize_websocket_message, structs::{CommunityJoin, ModJoin, PostJoin, UserJoin}, UserOperation, UserOperationApub, UserOperationCrud, }, }; use lemmy_api_crud::PerformCrud; use lemmy_apub::{api::PerformApub, SendActivity}; use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr}; use serde::Deserialize; use serde_json::Value; use std::{ ops::Deref, result, str::FromStr, time::{Duration, Instant}, }; use tracing::{debug, error}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); pub struct WsChatSession { /// unique session id pub id: ConnectionId, pub ip: IpAddr, /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), /// otherwise we drop connection. pub hb: Instant, /// The context data apub_data: ContextData, } pub async fn websocket( req: HttpRequest, body: web::Payload, rate_limiter: web::Data, apub_data: ContextData, ) -> Result { let client_ip = IpAddr( req .connection_info() .realip_remote_addr() .unwrap_or("blank_ip") .to_string(), ); let check = rate_limiter.message().check(client_ip.clone()); if !check { debug!( "Websocket join with IP: {} has been rate limited.", &client_ip ); return Ok(HttpResponse::TooManyRequests().finish()); } ws::start( WsChatSession { id: 0, ip: client_ip, hb: Instant::now(), apub_data, }, &req, body, ) } /// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL). /// /// also this method checks heartbeats from client fn hb(ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out // notify chat server act .apub_data .chat_server() .do_send(Disconnect { id: act.id }); // stop actor ctx.stop(); // don't try to send a ping return; } ctx.ping(b""); }); } impl Actor for WsChatSession { type Context = ws::WebsocketContext; /// Method is called on actor start. /// We register ws session with ChatServer fn started(&mut self, ctx: &mut Self::Context) { // we'll start heartbeat process on session start. hb(ctx); // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared // across all routes within application let addr = ctx.address(); self .apub_data .chat_server() .send(Connect { addr: addr.recipient(), }) .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify chat server self .apub_data .chat_server() .do_send(Disconnect { id: self.id }); Running::Stop } } /// Handle messages from chat server, we simply send it to peer websocket impl Handler for WsChatSession { type Result = (); fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { ctx.text(msg.0); } } /// WebSocket message handler impl StreamHandler> for WsChatSession { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Err(_) => { ctx.stop(); return; } Ok(msg) => msg, }; match msg { ws::Message::Ping(msg) => { self.hb = Instant::now(); ctx.pong(&msg); } ws::Message::Pong(_) => { self.hb = Instant::now(); } ws::Message::Text(text) => { let ip_clone = self.ip.clone(); let id_clone = self.id.to_owned(); let context_clone = self.apub_data.reset_request_count(); let fut = Box::pin(async move { let msg = text.trim().to_string(); parse_json_message(msg, ip_clone, id_clone, context_clone).await }); fut .into_actor(self) .then(|res, _, ctx| { match res { Ok(res) => ctx.text(res), Err(e) => error!("{}", &e), } actix::fut::ready(()) }) .spawn(ctx); } ws::Message::Binary(_) => println!("Unexpected binary"), ws::Message::Close(reason) => { ctx.close(reason); ctx.stop(); } ws::Message::Continuation(_) => { ctx.stop(); } ws::Message::Nop => (), } } } /// Entry point for our websocket route async fn parse_json_message( msg: String, ip: IpAddr, connection_id: ConnectionId, context: ContextData, ) -> Result { let rate_limiter = context.settings_updated_channel(); let json: Value = serde_json::from_str(&msg)?; let data = json .get("data") .cloned() .ok_or_else(|| LemmyError::from_message("missing data"))?; let missing_op_err = || LemmyError::from_message("missing op"); let op = json .get("op") .ok_or_else(missing_op_err)? .as_str() .ok_or_else(missing_op_err)?; // check if api call passes the rate limit, and generate future for later execution if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) { let passed = match user_operation_crud { UserOperationCrud::Register => rate_limiter.register().check(ip), UserOperationCrud::CreatePost => rate_limiter.post().check(ip), UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip), UserOperationCrud::CreateComment => rate_limiter.comment().check(ip), _ => rate_limiter.message().check(ip), }; check_rate_limit_passed(passed)?; match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await } else if let Ok(user_operation) = UserOperation::from_str(op) { let passed = match user_operation { UserOperation::GetCaptcha => rate_limiter.post().check(ip), _ => rate_limiter.message().check(ip), }; check_rate_limit_passed(passed)?; match_websocket_operation(context, connection_id, user_operation, data).await } else { let user_operation = UserOperationApub::from_str(op)?; let passed = match user_operation { UserOperationApub::Search => rate_limiter.search().check(ip), _ => rate_limiter.message().check(ip), }; check_rate_limit_passed(passed)?; match_websocket_operation_apub(context, connection_id, user_operation, data).await } } fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> { if passed { Ok(()) } else { // if rate limit was hit, respond with message Err(LemmyError::from_message("rate_limit_error")) } } pub async fn match_websocket_operation_crud( context: ContextData, id: ConnectionId, op: UserOperationCrud, data: Value, ) -> result::Result { match op { // User ops UserOperationCrud::Register => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeleteAccount => { do_websocket_operation_crud::(context, id, op, data).await } // Private Message ops UserOperationCrud::CreatePrivateMessage => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditPrivateMessage => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeletePrivateMessage => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::GetPrivateMessages => { do_websocket_operation_crud::(context, id, op, data).await } // Site ops UserOperationCrud::CreateSite => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditSite => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::GetSite => { do_websocket_operation_crud::(context, id, op, data).await } // Community ops UserOperationCrud::ListCommunities => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::CreateCommunity => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditCommunity => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeleteCommunity => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::RemoveCommunity => { do_websocket_operation_crud::(context, id, op, data).await } // Post ops UserOperationCrud::CreatePost => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::GetPost => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditPost => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeletePost => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::RemovePost => { do_websocket_operation_crud::(context, id, op, data).await } // Comment ops UserOperationCrud::CreateComment => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditComment => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeleteComment => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::RemoveComment => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::GetComment => { do_websocket_operation_crud::(context, id, op, data).await } // Emojis UserOperationCrud::CreateCustomEmoji => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::EditCustomEmoji => { do_websocket_operation_crud::(context, id, op, data).await } UserOperationCrud::DeleteCustomEmoji => { do_websocket_operation_crud::(context, id, op, data).await } } } async fn do_websocket_operation_crud<'a, 'b, Data>( context: ContextData, id: ConnectionId, op: UserOperationCrud, data: Value, ) -> result::Result where Data: PerformCrud + SendActivity::Response> + Send, for<'de> Data: Deserialize<'de>, { let parsed_data: Data = serde_json::from_value(data)?; let res = parsed_data .perform(&web::Data::new(context.deref().clone()), Some(id)) .await?; SendActivity::send_activity(&parsed_data, &res, &context).await?; serialize_websocket_message(&op, &res) } pub async fn match_websocket_operation_apub( context: ContextData, id: ConnectionId, op: UserOperationApub, data: Value, ) -> result::Result { match op { UserOperationApub::GetPersonDetails => { do_websocket_operation_apub::(context, id, op, data).await } UserOperationApub::GetCommunity => { do_websocket_operation_apub::(context, id, op, data).await } UserOperationApub::GetComments => { do_websocket_operation_apub::(context, id, op, data).await } UserOperationApub::GetPosts => { do_websocket_operation_apub::(context, id, op, data).await } UserOperationApub::ResolveObject => { do_websocket_operation_apub::(context, id, op, data).await } UserOperationApub::Search => do_websocket_operation_apub::(context, id, op, data).await, } } async fn do_websocket_operation_apub<'a, 'b, Data>( context: ContextData, id: ConnectionId, op: UserOperationApub, data: Value, ) -> result::Result where Data: PerformApub + SendActivity::Response> + Send, for<'de> Data: Deserialize<'de>, { let parsed_data: Data = serde_json::from_value(data)?; let res = parsed_data.perform(&context, Some(id)).await?; SendActivity::send_activity(&parsed_data, &res, &context).await?; serialize_websocket_message(&op, &res) } pub async fn match_websocket_operation( context: ContextData, id: ConnectionId, op: UserOperation, data: Value, ) -> result::Result { match op { // User ops UserOperation::Login => do_websocket_operation::(context, id, op, data).await, UserOperation::GetCaptcha => do_websocket_operation::(context, id, op, data).await, UserOperation::GetReplies => do_websocket_operation::(context, id, op, data).await, UserOperation::AddAdmin => do_websocket_operation::(context, id, op, data).await, UserOperation::GetUnreadRegistrationApplicationCount => { do_websocket_operation::(context, id, op, data).await } UserOperation::ListRegistrationApplications => { do_websocket_operation::(context, id, op, data).await } UserOperation::ApproveRegistrationApplication => { do_websocket_operation::(context, id, op, data).await } UserOperation::BanPerson => do_websocket_operation::(context, id, op, data).await, UserOperation::GetBannedPersons => { do_websocket_operation::(context, id, op, data).await } UserOperation::BlockPerson => { do_websocket_operation::(context, id, op, data).await } UserOperation::GetPersonMentions => { do_websocket_operation::(context, id, op, data).await } UserOperation::MarkPersonMentionAsRead => { do_websocket_operation::(context, id, op, data).await } UserOperation::MarkCommentReplyAsRead => { do_websocket_operation::(context, id, op, data).await } UserOperation::MarkAllAsRead => { do_websocket_operation::(context, id, op, data).await } UserOperation::PasswordReset => { do_websocket_operation::(context, id, op, data).await } UserOperation::PasswordChange => { do_websocket_operation::(context, id, op, data).await } UserOperation::UserJoin => do_websocket_operation::(context, id, op, data).await, UserOperation::PostJoin => do_websocket_operation::(context, id, op, data).await, UserOperation::CommunityJoin => { do_websocket_operation::(context, id, op, data).await } UserOperation::ModJoin => do_websocket_operation::(context, id, op, data).await, UserOperation::SaveUserSettings => { do_websocket_operation::(context, id, op, data).await } UserOperation::ChangePassword => { do_websocket_operation::(context, id, op, data).await } UserOperation::GetReportCount => { do_websocket_operation::(context, id, op, data).await } UserOperation::GetUnreadCount => { do_websocket_operation::(context, id, op, data).await } UserOperation::VerifyEmail => { do_websocket_operation::(context, id, op, data).await } // Private Message ops UserOperation::MarkPrivateMessageAsRead => { do_websocket_operation::(context, id, op, data).await } UserOperation::CreatePrivateMessageReport => { do_websocket_operation::(context, id, op, data).await } UserOperation::ResolvePrivateMessageReport => { do_websocket_operation::(context, id, op, data).await } UserOperation::ListPrivateMessageReports => { do_websocket_operation::(context, id, op, data).await } // Site ops UserOperation::GetModlog => do_websocket_operation::(context, id, op, data).await, UserOperation::PurgePerson => { do_websocket_operation::(context, id, op, data).await } UserOperation::PurgeCommunity => { do_websocket_operation::(context, id, op, data).await } UserOperation::PurgePost => do_websocket_operation::(context, id, op, data).await, UserOperation::PurgeComment => { do_websocket_operation::(context, id, op, data).await } UserOperation::TransferCommunity => { do_websocket_operation::(context, id, op, data).await } UserOperation::LeaveAdmin => do_websocket_operation::(context, id, op, data).await, UserOperation::GetFederatedInstances => { do_websocket_operation::(context, id, op, data).await } // Community ops UserOperation::FollowCommunity => { do_websocket_operation::(context, id, op, data).await } UserOperation::BlockCommunity => { do_websocket_operation::(context, id, op, data).await } UserOperation::BanFromCommunity => { do_websocket_operation::(context, id, op, data).await } UserOperation::AddModToCommunity => { do_websocket_operation::(context, id, op, data).await } // Post ops UserOperation::LockPost => do_websocket_operation::(context, id, op, data).await, UserOperation::FeaturePost => { do_websocket_operation::(context, id, op, data).await } UserOperation::CreatePostLike => { do_websocket_operation::(context, id, op, data).await } UserOperation::MarkPostAsRead => { do_websocket_operation::(context, id, op, data).await } UserOperation::SavePost => do_websocket_operation::(context, id, op, data).await, UserOperation::CreatePostReport => { do_websocket_operation::(context, id, op, data).await } UserOperation::ListPostReports => { do_websocket_operation::(context, id, op, data).await } UserOperation::ResolvePostReport => { do_websocket_operation::(context, id, op, data).await } UserOperation::GetSiteMetadata => { do_websocket_operation::(context, id, op, data).await } // Comment ops UserOperation::SaveComment => { do_websocket_operation::(context, id, op, data).await } UserOperation::CreateCommentLike => { do_websocket_operation::(context, id, op, data).await } UserOperation::DistinguishComment => { do_websocket_operation::(context, id, op, data).await } UserOperation::CreateCommentReport => { do_websocket_operation::(context, id, op, data).await } UserOperation::ListCommentReports => { do_websocket_operation::(context, id, op, data).await } UserOperation::ResolveCommentReport => { do_websocket_operation::(context, id, op, data).await } } } async fn do_websocket_operation<'a, 'b, Data>( context: ContextData, id: ConnectionId, op: UserOperation, data: Value, ) -> result::Result where Data: Perform + SendActivity::Response> + Send, for<'de> Data: Deserialize<'de>, { let parsed_data: Data = serde_json::from_value(data)?; let res = parsed_data .perform(&web::Data::new(context.deref().clone()), Some(id)) .await?; SendActivity::send_activity(&parsed_data, &res, &context).await?; serialize_websocket_message(&op, &res) }