diff --git a/server/src/lib.rs b/server/src/lib.rs index cddd5b86..e23ec4ba 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -25,12 +25,10 @@ pub extern crate strum; pub mod api; pub mod apub; pub mod db; -pub mod feeds; -pub mod nodeinfo; +pub mod routes; pub mod schema; pub mod settings; pub mod version; -pub mod webfinger; pub mod websocket; use crate::settings::Settings; diff --git a/server/src/main.rs b/server/src/main.rs index 52c395d3..bf105323 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,186 +2,18 @@ extern crate lemmy_server; #[macro_use] extern crate diesel_migrations; -use actix::prelude::*; -use actix_files::NamedFile; use actix_web::*; -use actix_web_actors::ws; -use lemmy_server::apub; use lemmy_server::db::establish_connection; -use lemmy_server::feeds; -use lemmy_server::nodeinfo; +use lemmy_server::routes::federation; +use lemmy_server::routes::feeds; +use lemmy_server::routes::index; +use lemmy_server::routes::nodeinfo; +use lemmy_server::routes::webfinger; +use lemmy_server::routes::websocket; use lemmy_server::settings::Settings; -use lemmy_server::webfinger; -use lemmy_server::websocket::server::*; -use std::time::{Duration, Instant}; embed_migrations!(); -/// How often heartbeat pings are sent -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); -/// How long before lack of client response causes a timeout -const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); - -/// Entry point for our route -fn chat_route( - req: HttpRequest, - stream: web::Payload, - chat_server: web::Data>, -) -> Result { - ws::start( - WSSession { - cs_addr: chat_server.get_ref().to_owned(), - id: 0, - hb: Instant::now(), - ip: req - .connection_info() - .remote() - .unwrap_or("127.0.0.1:12345") - .split(":") - .next() - .unwrap_or("127.0.0.1") - .to_string(), - }, - &req, - stream, - ) -} - -struct WSSession { - cs_addr: Addr, - /// unique session id - id: usize, - ip: String, - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), - /// otherwise we drop connection. - hb: Instant, -} - -impl Actor for WSSession { - type Context = ws::WebsocketContext; - - /// Method is called on actor start. - /// We register ws session with ChatServer - fn started(&mut self, ctx: &mut Self::Context) { - // we'll start heartbeat process on session start. - self.hb(ctx); - - // register self in chat server. `AsyncContext::wait` register - // future within context, but context waits until this future resolves - // before processing any other events. - // across all routes within application - let addr = ctx.address(); - self - .cs_addr - .send(Connect { - addr: addr.recipient(), - ip: self.ip.to_owned(), - }) - .into_actor(self) - .then(|res, act, ctx| { - match res { - Ok(res) => act.id = res, - // something is wrong with chat server - _ => ctx.stop(), - } - fut::ok(()) - }) - .wait(ctx); - } - - fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { - // notify chat server - self.cs_addr.do_send(Disconnect { - id: self.id, - ip: self.ip.to_owned(), - }); - Running::Stop - } -} - -/// Handle messages from chat server, we simply send it to peer websocket -/// These are room messages, IE sent to others in the room -impl Handler for WSSession { - type Result = (); - - fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { - // println!("id: {} msg: {}", self.id, msg.0); - ctx.text(msg.0); - } -} - -/// WebSocket message handler -impl StreamHandler for WSSession { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); - match msg { - ws::Message::Ping(msg) => { - self.hb = Instant::now(); - ctx.pong(&msg); - } - ws::Message::Pong(_) => { - self.hb = Instant::now(); - } - ws::Message::Text(text) => { - let m = text.trim().to_owned(); - println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); - - self - .cs_addr - .send(StandardMessage { - id: self.id, - msg: m, - }) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(res) => ctx.text(res), - Err(e) => { - eprintln!("{}", &e); - } - } - fut::ok(()) - }) - .wait(ctx); - } - ws::Message::Binary(_bin) => println!("Unexpected binary"), - ws::Message::Close(_) => { - ctx.stop(); - } - _ => {} - } - } -} - -impl WSSession { - /// helper method that sends ping to client every second. - /// - /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // heartbeat timed out - println!("Websocket Client heartbeat failed, disconnecting!"); - - // notify chat server - act.cs_addr.do_send(Disconnect { - id: act.id, - ip: act.ip.to_owned(), - }); - - // stop actor - ctx.stop(); - - // don't try to send a ping - return; - } - - ctx.ping(""); - }); - } -} - fn main() { let _ = env_logger::init(); let sys = actix::System::new("lemmy"); @@ -190,87 +22,21 @@ fn main() { let conn = establish_connection(); embedded_migrations::run(&conn).unwrap(); - // Start chat server actor in separate thread - let server = ChatServer::default().start(); - let settings = Settings::get(); // Create Http server with websocket support HttpServer::new(move || { - let app = App::new() - .data(server.clone()) - // Front end routes + App::new() + .configure(federation::config) + .configure(feeds::config) + .configure(index::config) + .configure(nodeinfo::config) + .configure(webfinger::config) + .configure(websocket::config) .service(actix_files::Files::new( "/static", settings.front_end_dir.to_owned(), )) - .route("/", web::get().to(index)) - .route( - "/home/type/{type}/sort/{sort}/page/{page}", - web::get().to(index), - ) - .route("/login", web::get().to(index)) - .route("/create_post", web::get().to(index)) - .route("/create_community", web::get().to(index)) - .route("/communities/page/{page}", web::get().to(index)) - .route("/communities", web::get().to(index)) - .route("/post/{id}/comment/{id2}", web::get().to(index)) - .route("/post/{id}", web::get().to(index)) - .route("/c/{name}/sort/{sort}/page/{page}", web::get().to(index)) - .route("/c/{name}", web::get().to(index)) - .route("/community/{id}", web::get().to(index)) - .route( - "/u/{username}/view/{view}/sort/{sort}/page/{page}", - web::get().to(index), - ) - .route("/u/{username}", web::get().to(index)) - .route("/user/{id}", web::get().to(index)) - .route("/inbox", web::get().to(index)) - .route("/modlog/community/{community_id}", web::get().to(index)) - .route("/modlog", web::get().to(index)) - .route("/setup", web::get().to(index)) - .route( - "/search/q/{q}/type/{type}/sort/{sort}/page/{page}", - web::get().to(index), - ) - .route("/search", web::get().to(index)) - .route("/sponsors", web::get().to(index)) - .route("/password_change/{token}", web::get().to(index)) - // Websocket - .service(web::resource("/api/v1/ws").to(chat_route)) - // NodeInfo - .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info)) - .route( - "/.well-known/nodeinfo", - web::get().to(nodeinfo::node_info_well_known), - ) - // RSS - .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed)) - .route("/feeds/all.xml", web::get().to(feeds::get_all_feed)) - // Federation - .route( - "/federation/c/{community_name}", - web::get().to(apub::community::get_apub_community), - ) - .route( - "/federation/c/{community_name}/followers", - web::get().to(apub::community::get_apub_community_followers), - ) - .route( - "/federation/u/{user_name}", - web::get().to(apub::user::get_apub_user), - ) - .route("/feeds/all.xml", web::get().to(feeds::get_all_feed)); - - // Federation - if settings.federation_enabled { - app.route( - ".well-known/webfinger", - web::get().to(webfinger::get_webfinger_response), - ) - } else { - app - } }) .bind((settings.bind, settings.port)) .unwrap() @@ -279,9 +45,3 @@ fn main() { println!("Started http server at {}:{}", settings.bind, settings.port); let _ = sys.run(); } - -fn index() -> Result { - Ok(NamedFile::open( - Settings::get().front_end_dir.to_owned() + "/index.html", - )?) -} diff --git a/server/src/routes/federation.rs b/server/src/routes/federation.rs new file mode 100644 index 00000000..ea6039d6 --- /dev/null +++ b/server/src/routes/federation.rs @@ -0,0 +1,18 @@ +use crate::apub; +use actix_web::web; + +pub fn config(cfg: &mut web::ServiceConfig) { + cfg + .route( + "/federation/c/{community_name}", + web::get().to(apub::community::get_apub_community), + ) + .route( + "/federation/c/{community_name}/followers", + web::get().to(apub::community::get_apub_community_followers), + ) + .route( + "/federation/u/{user_name}", + web::get().to(apub::user::get_apub_user), + ); +} diff --git a/server/src/feeds.rs b/server/src/routes/feeds.rs similarity index 94% rename from server/src/feeds.rs rename to server/src/routes/feeds.rs index c624bcc5..55b457c7 100644 --- a/server/src/feeds.rs +++ b/server/src/routes/feeds.rs @@ -5,12 +5,13 @@ use crate::db::comment_view::{ReplyQueryBuilder, ReplyView}; use crate::db::community::Community; use crate::db::post_view::{PostQueryBuilder, PostView}; use crate::db::site_view::SiteView; -use crate::db::user::User_; +use crate::db::user::{Claims, User_}; use crate::db::user_mention_view::{UserMentionQueryBuilder, UserMentionView}; use crate::db::{establish_connection, ListingType, SortType}; use crate::Settings; use actix_web::body::Body; use actix_web::{web, HttpResponse, Result}; +use chrono::{DateTime, Utc}; use failure::Error; use rss::{CategoryBuilder, ChannelBuilder, GuidBuilder, Item, ItemBuilder}; use serde::Deserialize; @@ -29,7 +30,14 @@ enum RequestType { Inbox, } -pub fn get_all_feed(info: web::Query) -> HttpResponse { +pub fn config(cfg: &mut web::ServiceConfig) { + cfg + .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed)) + .route("/feeds/all.xml", web::get().to(feeds::get_all_feed)) + .route("/feeds/all.xml", web::get().to(feeds::get_all_feed)); +} + +fn get_all_feed(info: web::Query) -> HttpResponse { let sort_type = match get_sort_type(info) { Ok(sort_type) => sort_type, Err(_) => return HttpResponse::BadRequest().finish(), @@ -45,7 +53,7 @@ pub fn get_all_feed(info: web::Query) -> HttpResponse { } } -pub fn get_feed(path: web::Path<(String, String)>, info: web::Query) -> HttpResponse { +fn get_feed(path: web::Path<(String, String)>, info: web::Query) -> HttpResponse { let sort_type = match get_sort_type(info) { Ok(sort_type) => sort_type, Err(_) => return HttpResponse::BadRequest().finish(), @@ -162,7 +170,7 @@ fn get_feed_front(sort_type: &SortType, jwt: String) -> Result { let conn = establish_connection(); let site_view = SiteView::read(&conn)?; - let user_id = db::user::Claims::decode(&jwt)?.claims.id; + let user_id = Claims::decode(&jwt)?.claims.id; let posts = PostQueryBuilder::create(&conn) .listing_type(ListingType::Subscribed) @@ -189,7 +197,7 @@ fn get_feed_inbox(jwt: String) -> Result { let conn = establish_connection(); let site_view = SiteView::read(&conn)?; - let user_id = db::user::Claims::decode(&jwt)?.claims.id; + let user_id = Claims::decode(&jwt)?.claims.id; let sort = SortType::New; diff --git a/server/src/routes/index.rs b/server/src/routes/index.rs new file mode 100644 index 00000000..cd10a2b7 --- /dev/null +++ b/server/src/routes/index.rs @@ -0,0 +1,45 @@ +use crate::settings::Settings; +use actix_files::NamedFile; +use actix_web::web; + +pub fn config(cfg: &mut web::ServiceConfig) { + cfg + .route("/", web::get().to(index)) + .route( + "/home/type/{type}/sort/{sort}/page/{page}", + web::get().to(index), + ) + .route("/login", web::get().to(index)) + .route("/create_post", web::get().to(index)) + .route("/create_community", web::get().to(index)) + .route("/communities/page/{page}", web::get().to(index)) + .route("/communities", web::get().to(index)) + .route("/post/{id}/comment/{id2}", web::get().to(index)) + .route("/post/{id}", web::get().to(index)) + .route("/c/{name}/sort/{sort}/page/{page}", web::get().to(index)) + .route("/c/{name}", web::get().to(index)) + .route("/community/{id}", web::get().to(index)) + .route( + "/u/{username}/view/{view}/sort/{sort}/page/{page}", + web::get().to(index), + ) + .route("/u/{username}", web::get().to(index)) + .route("/user/{id}", web::get().to(index)) + .route("/inbox", web::get().to(index)) + .route("/modlog/community/{community_id}", web::get().to(index)) + .route("/modlog", web::get().to(index)) + .route("/setup", web::get().to(index)) + .route( + "/search/q/{q}/type/{type}/sort/{sort}/page/{page}", + web::get().to(index), + ) + .route("/search", web::get().to(index)) + .route("/sponsors", web::get().to(index)) + .route("/password_change/{token}", web::get().to(index)); +} + +fn index() -> Result { + Ok(NamedFile::open( + Settings::get().front_end_dir.to_owned() + "/index.html", + )?) +} diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs new file mode 100644 index 00000000..6556c8d5 --- /dev/null +++ b/server/src/routes/mod.rs @@ -0,0 +1,6 @@ +pub mod federation; +pub mod feeds; +pub mod index; +pub mod nodeinfo; +pub mod webfinger; +pub mod websocket; diff --git a/server/src/nodeinfo.rs b/server/src/routes/nodeinfo.rs similarity index 84% rename from server/src/nodeinfo.rs rename to server/src/routes/nodeinfo.rs index 65bd9370..3165aea1 100644 --- a/server/src/nodeinfo.rs +++ b/server/src/routes/nodeinfo.rs @@ -3,9 +3,16 @@ use crate::db::site_view::SiteView; use crate::version; use crate::Settings; use actix_web::body::Body; +use actix_web::web; use actix_web::HttpResponse; use serde_json::json; +pub fn config(cfg: &mut web::ServiceConfig) { + cfg + .route("/nodeinfo/2.0.json", web::get().to(node_info)) + .route("/.well-known/nodeinfo", web::get().to(node_info_well_known)); +} + pub fn node_info_well_known() -> HttpResponse { let json = json!({ "links": { @@ -19,7 +26,7 @@ pub fn node_info_well_known() -> HttpResponse { .body(json.to_string()); } -pub fn node_info() -> HttpResponse { +fn node_info() -> HttpResponse { let conn = establish_connection(); let site_view = match SiteView::read(&conn) { Ok(site_view) => site_view, diff --git a/server/src/webfinger.rs b/server/src/routes/webfinger.rs similarity index 89% rename from server/src/webfinger.rs rename to server/src/routes/webfinger.rs index 55894745..f013f3ef 100644 --- a/server/src/webfinger.rs +++ b/server/src/routes/webfinger.rs @@ -2,6 +2,7 @@ use crate::db::community::Community; use crate::db::establish_connection; use crate::Settings; use actix_web::body::Body; +use actix_web::web; use actix_web::web::Query; use actix_web::HttpResponse; use regex::Regex; @@ -13,6 +14,15 @@ pub struct Params { resource: String, } +pub fn config(cfg: &mut web::ServiceConfig) { + if Settings::get().federation_enabled { + cfg.route( + ".well-known/webfinger", + web::get().to(get_webfinger_response), + ); + } +} + lazy_static! { static ref WEBFINGER_COMMUNITY_REGEX: Regex = Regex::new(&format!( "^group:([a-z0-9_]{{3, 20}})@{}$", @@ -27,7 +37,7 @@ lazy_static! { /// /// You can also view the webfinger response that Mastodon sends: /// https://radical.town/.well-known/webfinger?resource=acct:felix@radical.town -pub fn get_webfinger_response(info: Query) -> HttpResponse { +fn get_webfinger_response(info: Query) -> HttpResponse { let regex_parsed = WEBFINGER_COMMUNITY_REGEX .captures(&info.resource) .map(|c| c.get(1)); diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs new file mode 100644 index 00000000..3af229bf --- /dev/null +++ b/server/src/routes/websocket.rs @@ -0,0 +1,179 @@ +use crate::websocket::server::*; +use actix::prelude::*; +use actix_web::web; +use actix_web::*; +use actix_web_actors::ws; +use std::time::{Duration, Instant}; + +pub fn config(cfg: &mut web::ServiceConfig) { + // Start chat server actor in separate thread + let server = ChatServer::default().start(); + cfg + .data(server.clone()) + .service(web::resource("/api/v1/ws").to(chat_route)); +} + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Entry point for our route +fn chat_route( + req: HttpRequest, + stream: web::Payload, + chat_server: web::Data>, +) -> Result { + ws::start( + WSSession { + cs_addr: chat_server.get_ref().to_owned(), + id: 0, + hb: Instant::now(), + ip: req + .connection_info() + .remote() + .unwrap_or("127.0.0.1:12345") + .split(":") + .next() + .unwrap_or("127.0.0.1") + .to_string(), + }, + &req, + stream, + ) +} + +struct WSSession { + cs_addr: Addr, + /// unique session id + id: usize, + ip: String, + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant, +} + +impl Actor for WSSession { + type Context = ws::WebsocketContext; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // across all routes within application + let addr = ctx.address(); + self + .cs_addr + .send(Connect { + addr: addr.recipient(), + ip: self.ip.to_owned(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + // notify chat server + self.cs_addr.do_send(Disconnect { + id: self.id, + ip: self.ip.to_owned(), + }); + Running::Stop + } +} + +/// Handle messages from chat server, we simply send it to peer websocket +/// These are room messages, IE sent to others in the room +impl Handler for WSSession { + type Result = (); + + fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + // println!("id: {} msg: {}", self.id, msg.0); + ctx.text(msg.0); + } +} + +/// WebSocket message handler +impl StreamHandler for WSSession { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.hb = Instant::now(); + } + ws::Message::Text(text) => { + let m = text.trim().to_owned(); + println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id); + + self + .cs_addr + .send(StandardMessage { + id: self.id, + msg: m, + }) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(res) => ctx.text(res), + Err(e) => { + eprintln!("{}", &e); + } + } + fut::ok(()) + }) + .wait(ctx); + } + ws::Message::Binary(_bin) => println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + } + _ => {} + } + } +} + +impl WSSession { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + act.cs_addr.do_send(Disconnect { + id: act.id, + ip: act.ip.to_owned(), + }); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(""); + }); + } +}