diff --git a/server/Cargo.lock b/server/Cargo.lock index 2a3bc0330..c61e77dfa 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -832,7 +832,7 @@ dependencies = [ [[package]] name = "diesel" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1416,13 +1416,14 @@ dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-files 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-threadpool 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "bcrypt 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "comrak 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1436,6 +1437,7 @@ dependencies = [ "lettre_email 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1589,7 +1591,7 @@ name = "migrations_internals" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3050,7 +3052,7 @@ dependencies = [ "checksum derive_builder 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" "checksum derive_builder_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" "checksum derive_more 0.99.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c" -"checksum diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9d7cc03b910de9935007861dce440881f69102aaaedfd4bc5a6f40340ca5840c" +"checksum diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "33d7ca63eb2efea87a7f56a283acc49e2ce4b2bd54adf7465dc1d81fef13d8fc" "checksum diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" "checksum diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5a4fdcece..c1849d183 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -5,7 +5,8 @@ authors = ["Dessalines "] edition = "2018" [dependencies] -diesel = { version = "1.4.2", features = ["postgres","chrono", "r2d2", "64-column-tables"] } +diesel = { version = "1.4.4", features = ["postgres","chrono", "r2d2", "64-column-tables"] } +r2d2 = "0.8.8" diesel_migrations = "1.4.0" dotenv = "0.15.0" bcrypt = "0.6.2" @@ -19,6 +20,7 @@ actix-web = "2.0.0" actix-files = "0.2.1" actix-web-actors = "2.0.0" actix-rt = "1.0.0" +actix-threadpool = "0.3.1" log = "0.4.0" env_logger = "0.7.1" rand = "0.7.3" diff --git a/server/src/lib.rs b/server/src/lib.rs index 9bbfe251a..e87972386 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -47,6 +47,13 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use regex::{Regex, RegexBuilder}; use serde::Deserialize; +use actix_threadpool::{run, BlockingError}; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use failure::Error; +use std::fmt::Debug; pub fn to_datetime_utc(ndt: NaiveDateTime) -> DateTime { DateTime::::from_utc(ndt, Utc) @@ -224,6 +231,39 @@ pub fn markdown_to_html(text: &str) -> String { comrak::markdown_to_html(text, &comrak::ComrakOptions::default()) } +#[derive(Clone)] +pub struct DbHandle(Pool>); + +impl DbHandle { + /// Start the DbActor with a SyncArbiter. + pub fn start(settings: &Settings) -> Result { + + // Set up the r2d2 connection pool + let manager = ConnectionManager::::new(&settings.get_database_url()); + let pool = Pool::builder() + .max_size(settings.database.pool_size) + .build(manager) + .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url())); + + Ok(DbHandle(pool)) + } + + pub async fn run(&self, f: F) -> Result> + where + F: FnOnce(&PgConnection) -> Result + Send + 'static, + T: Send + 'static, + E: Debug + From + Send + 'static, + { + let pool = self.0.clone(); + run(move || { + let conn = pool.get()?; + + (f)(&conn) + }) + .await + } +} + #[cfg(test)] mod tests { use crate::{extract_usernames, is_email_regex, remove_slurs, slur_check, slurs_vec_to_str}; diff --git a/server/src/main.rs b/server/src/main.rs index f38875275..c9961af85 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,33 +4,28 @@ extern crate diesel_migrations; use actix::prelude::*; use actix_web::*; -use diesel::r2d2::{ConnectionManager, Pool}; -use diesel::PgConnection; use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket}; use lemmy_server::settings::Settings; use lemmy_server::websocket::server::*; +use lemmy_server::db::establish_unpooled_connection; +use lemmy_server::DbHandle; use std::io; embed_migrations!(); + #[actix_rt::main] async fn main() -> io::Result<()> { env_logger::init(); let settings = Settings::get(); - // Set up the r2d2 connection pool - let manager = ConnectionManager::::new(&settings.get_database_url()); - let pool = Pool::builder() - .max_size(settings.database.pool_size) - .build(manager) - .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url())); + let db_handle = DbHandle::start(&settings).unwrap(); // Run the migrations from code - let conn = pool.get().unwrap(); - embedded_migrations::run(&conn).unwrap(); + embedded_migrations::run(&establish_unpooled_connection()).unwrap(); // Set up websocket server - let server = ChatServer::startup(pool.clone()).start(); + let server = ChatServer::startup(db_handle.clone()).start(); println!( "Starting http server at {}:{}", @@ -42,7 +37,7 @@ async fn main() -> io::Result<()> { let settings = Settings::get(); App::new() .wrap(middleware::Logger::default()) - .data(pool.clone()) + .data(db_handle.clone()) .data(server.clone()) // The routes .configure(api::config) diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs index 6c4326fd3..56d2d6cf7 100644 --- a/server/src/routes/websocket.rs +++ b/server/src/routes/websocket.rs @@ -100,6 +100,7 @@ impl Handler for WSSession { type Result = (); fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + info!("Message sent out: {}", msg.0); ctx.text(msg.0); } } @@ -142,6 +143,8 @@ impl StreamHandler> for WSSession { } actix::fut::ready(()) }) + // Testing out spawn instead of wait + // .spawn(ctx); .wait(ctx); } ws::Message::Binary(_bin) => info!("Unexpected binary"), diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index 0f2d2d26f..8e23ec8af 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -22,7 +22,7 @@ use crate::api::site::*; use crate::api::user::*; use crate::api::*; use crate::websocket::UserOperation; -use crate::Settings; +use crate::{Settings, DbHandle}; type ConnectionId = usize; type PostId = i32; @@ -53,7 +53,7 @@ pub struct Disconnect { pub ip: IPAddr, } -#[derive(Serialize, Deserialize, Message)] +#[derive(Serialize, Deserialize, Message, Clone)] #[rtype(String)] pub struct StandardMessage { /// Id of the client session @@ -82,6 +82,7 @@ pub enum RateLimitType { /// `ChatServer` manages chat rooms and responsible for coordinating chat /// session. +#[derive(Clone)] pub struct ChatServer { /// A map from generated random ID to session addr sessions: HashMap, @@ -99,20 +100,18 @@ pub struct ChatServer { /// Rate limiting based on rate type and IP addr rate_limit_buckets: HashMap>, - rng: ThreadRng, - db: Pool>, + db_handle: DbHandle, } impl ChatServer { - pub fn startup(db: Pool>) -> ChatServer { + pub fn startup(db_handle: DbHandle) -> ChatServer { ChatServer { sessions: HashMap::new(), rate_limit_buckets: HashMap::new(), post_rooms: HashMap::new(), community_rooms: HashMap::new(), user_rooms: HashMap::new(), - rng: rand::thread_rng(), - db, + db_handle, } } @@ -387,7 +386,7 @@ impl Handler for ChatServer { fn handle(&mut self, msg: Connect, _ctx: &mut Context) -> Self::Result { // register session with random id - let id = self.rng.gen::(); + let id = ThreadRng::default().gen::(); info!("{} joined", &msg.ip); self.sessions.insert( @@ -448,17 +447,33 @@ impl Handler for ChatServer { impl Handler for ChatServer { type Result = MessageResult; - fn handle(&mut self, msg: StandardMessage, _: &mut Context) -> Self::Result { - match parse_json_message(self, msg) { - Ok(m) => { - info!("Message Sent: {}", m); - MessageResult(m) - } - Err(e) => { - error!("Error during message handling {}", e); - MessageResult(e.to_string()) - } - } + fn handle(&mut self, msg: StandardMessage, ctx: &mut Context) { + + let my_block = async move { + self.db_handle.run(|conn| { + parse_json_message(&mut self.clone(), msg.clone(), &conn) + // match parse_json_message(self, msg, &conn) { + // Ok(m) => { + // info!("Message Sent: {}", m); + // MessageResult(m) + // } + // Err(e) => { + // error!("Error during message handling {}", e); + // MessageResult(e.to_string()) + // } + // } + }).await + }; + + let actor_future = my_block + .into_actor(self) + .map(|res, actor, ctx| { + + + }); + + ctx.spawn(actor_future); + } } @@ -494,14 +509,13 @@ where to_json_string(&op, &res) } -fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result { +fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage, conn: &PgConnection) -> Result { let json: Value = serde_json::from_str(&msg.msg)?; let data = &json["data"].to_string(); let op = &json["op"].as_str().ok_or(APIError { message: "Unknown op type".to_string(), })?; - let conn = chat.db.get()?; let user_operation: UserOperation = UserOperation::from_str(&op)?;