Working on sync 2

This commit is contained in:
Dessalines 2020-04-17 09:52:27 -04:00
parent be6a7876b4
commit fa5b1caa84
6 changed files with 94 additions and 38 deletions

10
server/Cargo.lock generated vendored
View file

@ -832,7 +832,7 @@ dependencies = [
[[package]] [[package]]
name = "diesel" name = "diesel"
version = "1.4.3" version = "1.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1416,13 +1416,14 @@ dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-files 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-files 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-threadpool 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bcrypt 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "bcrypt 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"comrak 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "comrak 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1436,6 +1437,7 @@ dependencies = [
"lettre_email 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", "lettre_email 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1589,7 +1591,7 @@ name = "migrations_internals"
version = "1.4.0" version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -3050,7 +3052,7 @@ dependencies = [
"checksum derive_builder 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" "checksum derive_builder 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0"
"checksum derive_builder_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" "checksum derive_builder_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef"
"checksum derive_more 0.99.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c" "checksum derive_more 0.99.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c"
"checksum diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9d7cc03b910de9935007861dce440881f69102aaaedfd4bc5a6f40340ca5840c" "checksum diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "33d7ca63eb2efea87a7f56a283acc49e2ce4b2bd54adf7465dc1d81fef13d8fc"
"checksum diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" "checksum diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3"
"checksum diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c" "checksum diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"

4
server/Cargo.toml vendored
View file

@ -5,7 +5,8 @@ authors = ["Dessalines <happydooby@gmail.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
diesel = { version = "1.4.2", features = ["postgres","chrono", "r2d2", "64-column-tables"] } diesel = { version = "1.4.4", features = ["postgres","chrono", "r2d2", "64-column-tables"] }
r2d2 = "0.8.8"
diesel_migrations = "1.4.0" diesel_migrations = "1.4.0"
dotenv = "0.15.0" dotenv = "0.15.0"
bcrypt = "0.6.2" bcrypt = "0.6.2"
@ -19,6 +20,7 @@ actix-web = "2.0.0"
actix-files = "0.2.1" actix-files = "0.2.1"
actix-web-actors = "2.0.0" actix-web-actors = "2.0.0"
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-threadpool = "0.3.1"
log = "0.4.0" log = "0.4.0"
env_logger = "0.7.1" env_logger = "0.7.1"
rand = "0.7.3" rand = "0.7.3"

View file

@ -47,6 +47,13 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use regex::{Regex, RegexBuilder}; use regex::{Regex, RegexBuilder};
use serde::Deserialize; use serde::Deserialize;
use actix_threadpool::{run, BlockingError};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use failure::Error;
use std::fmt::Debug;
pub fn to_datetime_utc(ndt: NaiveDateTime) -> DateTime<Utc> { pub fn to_datetime_utc(ndt: NaiveDateTime) -> DateTime<Utc> {
DateTime::<Utc>::from_utc(ndt, Utc) DateTime::<Utc>::from_utc(ndt, Utc)
@ -224,6 +231,39 @@ pub fn markdown_to_html(text: &str) -> String {
comrak::markdown_to_html(text, &comrak::ComrakOptions::default()) comrak::markdown_to_html(text, &comrak::ComrakOptions::default())
} }
#[derive(Clone)]
pub struct DbHandle(Pool<ConnectionManager<PgConnection>>);
impl DbHandle {
/// Start the DbActor with a SyncArbiter.
pub fn start(settings: &Settings) -> Result<Self, Error> {
// Set up the r2d2 connection pool
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
let pool = Pool::builder()
.max_size(settings.database.pool_size)
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
Ok(DbHandle(pool))
}
pub async fn run<F, T, E>(&self, f: F) -> Result<T, BlockingError<E>>
where
F: FnOnce(&PgConnection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Debug + From<r2d2::Error> + Send + 'static,
{
let pool = self.0.clone();
run(move || {
let conn = pool.get()?;
(f)(&conn)
})
.await
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{extract_usernames, is_email_regex, remove_slurs, slur_check, slurs_vec_to_str}; use crate::{extract_usernames, is_email_regex, remove_slurs, slur_check, slurs_vec_to_str};

View file

@ -4,33 +4,28 @@ extern crate diesel_migrations;
use actix::prelude::*; use actix::prelude::*;
use actix_web::*; use actix_web::*;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::PgConnection;
use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket}; use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
use lemmy_server::settings::Settings; use lemmy_server::settings::Settings;
use lemmy_server::websocket::server::*; use lemmy_server::websocket::server::*;
use lemmy_server::db::establish_unpooled_connection;
use lemmy_server::DbHandle;
use std::io; use std::io;
embed_migrations!(); embed_migrations!();
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env_logger::init(); env_logger::init();
let settings = Settings::get(); let settings = Settings::get();
// Set up the r2d2 connection pool let db_handle = DbHandle::start(&settings).unwrap();
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
let pool = Pool::builder()
.max_size(settings.database.pool_size)
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
// Run the migrations from code // Run the migrations from code
let conn = pool.get().unwrap(); embedded_migrations::run(&establish_unpooled_connection()).unwrap();
embedded_migrations::run(&conn).unwrap();
// Set up websocket server // Set up websocket server
let server = ChatServer::startup(pool.clone()).start(); let server = ChatServer::startup(db_handle.clone()).start();
println!( println!(
"Starting http server at {}:{}", "Starting http server at {}:{}",
@ -42,7 +37,7 @@ async fn main() -> io::Result<()> {
let settings = Settings::get(); let settings = Settings::get();
App::new() App::new()
.wrap(middleware::Logger::default()) .wrap(middleware::Logger::default())
.data(pool.clone()) .data(db_handle.clone())
.data(server.clone()) .data(server.clone())
// The routes // The routes
.configure(api::config) .configure(api::config)

View file

@ -100,6 +100,7 @@ impl Handler<WSMessage> for WSSession {
type Result = (); type Result = ();
fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
info!("Message sent out: {}", msg.0);
ctx.text(msg.0); ctx.text(msg.0);
} }
} }
@ -142,6 +143,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
} }
actix::fut::ready(()) actix::fut::ready(())
}) })
// Testing out spawn instead of wait
// .spawn(ctx);
.wait(ctx); .wait(ctx);
} }
ws::Message::Binary(_bin) => info!("Unexpected binary"), ws::Message::Binary(_bin) => info!("Unexpected binary"),

View file

@ -22,7 +22,7 @@ use crate::api::site::*;
use crate::api::user::*; use crate::api::user::*;
use crate::api::*; use crate::api::*;
use crate::websocket::UserOperation; use crate::websocket::UserOperation;
use crate::Settings; use crate::{Settings, DbHandle};
type ConnectionId = usize; type ConnectionId = usize;
type PostId = i32; type PostId = i32;
@ -53,7 +53,7 @@ pub struct Disconnect {
pub ip: IPAddr, pub ip: IPAddr,
} }
#[derive(Serialize, Deserialize, Message)] #[derive(Serialize, Deserialize, Message, Clone)]
#[rtype(String)] #[rtype(String)]
pub struct StandardMessage { pub struct StandardMessage {
/// Id of the client session /// Id of the client session
@ -82,6 +82,7 @@ pub enum RateLimitType {
/// `ChatServer` manages chat rooms and responsible for coordinating chat /// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. /// session.
#[derive(Clone)]
pub struct ChatServer { pub struct ChatServer {
/// A map from generated random ID to session addr /// A map from generated random ID to session addr
sessions: HashMap<ConnectionId, SessionInfo>, sessions: HashMap<ConnectionId, SessionInfo>,
@ -99,20 +100,18 @@ pub struct ChatServer {
/// Rate limiting based on rate type and IP addr /// Rate limiting based on rate type and IP addr
rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>, rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
rng: ThreadRng, db_handle: DbHandle,
db: Pool<ConnectionManager<PgConnection>>,
} }
impl ChatServer { impl ChatServer {
pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer { pub fn startup(db_handle: DbHandle) -> ChatServer {
ChatServer { ChatServer {
sessions: HashMap::new(), sessions: HashMap::new(),
rate_limit_buckets: HashMap::new(), rate_limit_buckets: HashMap::new(),
post_rooms: HashMap::new(), post_rooms: HashMap::new(),
community_rooms: HashMap::new(), community_rooms: HashMap::new(),
user_rooms: HashMap::new(), user_rooms: HashMap::new(),
rng: rand::thread_rng(), db_handle,
db,
} }
} }
@ -387,7 +386,7 @@ impl Handler<Connect> for ChatServer {
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
// register session with random id // register session with random id
let id = self.rng.gen::<usize>(); let id = ThreadRng::default().gen::<usize>();
info!("{} joined", &msg.ip); info!("{} joined", &msg.ip);
self.sessions.insert( self.sessions.insert(
@ -448,17 +447,33 @@ impl Handler<Disconnect> for ChatServer {
impl Handler<StandardMessage> for ChatServer { impl Handler<StandardMessage> for ChatServer {
type Result = MessageResult<StandardMessage>; type Result = MessageResult<StandardMessage>;
fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) {
match parse_json_message(self, msg) {
Ok(m) => { let my_block = async move {
info!("Message Sent: {}", m); self.db_handle.run(|conn| {
MessageResult(m) parse_json_message(&mut self.clone(), msg.clone(), &conn)
} // match parse_json_message(self, msg, &conn) {
Err(e) => { // Ok(m) => {
error!("Error during message handling {}", e); // info!("Message Sent: {}", m);
MessageResult(e.to_string()) // MessageResult(m)
} // }
} // Err(e) => {
// error!("Error during message handling {}", e);
// MessageResult(e.to_string())
// }
// }
}).await
};
let actor_future = my_block
.into_actor(self)
.map(|res, actor, ctx| {
});
ctx.spawn(actor_future);
} }
} }
@ -494,14 +509,13 @@ where
to_json_string(&op, &res) to_json_string(&op, &res)
} }
fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> { fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage, conn: &PgConnection) -> Result<String, Error> {
let json: Value = serde_json::from_str(&msg.msg)?; let json: Value = serde_json::from_str(&msg.msg)?;
let data = &json["data"].to_string(); let data = &json["data"].to_string();
let op = &json["op"].as_str().ok_or(APIError { let op = &json["op"].as_str().ok_or(APIError {
message: "Unknown op type".to_string(), message: "Unknown op type".to_string(),
})?; })?;
let conn = chat.db.get()?;
let user_operation: UserOperation = UserOperation::from_str(&op)?; let user_operation: UserOperation = UserOperation::from_str(&op)?;