Compare commits

...

2 commits

Author SHA1 Message Date
6a6ed19cc6 Working on sync 2 more 2020-04-17 12:57:16 -04:00
fa5b1caa84 Working on sync 2 2020-04-17 09:52:27 -04:00
6 changed files with 92 additions and 38 deletions

10
server/Cargo.lock generated vendored
View file

@ -832,7 +832,7 @@ dependencies = [
[[package]]
name = "diesel"
version = "1.4.3"
version = "1.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1416,13 +1416,14 @@ dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-files 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-threadpool 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bcrypt 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"comrak 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1436,6 +1437,7 @@ dependencies = [
"lettre_email 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1589,7 +1591,7 @@ name = "migrations_internals"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -3050,7 +3052,7 @@ dependencies = [
"checksum derive_builder 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0"
"checksum derive_builder_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef"
"checksum derive_more 0.99.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c"
"checksum diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9d7cc03b910de9935007861dce440881f69102aaaedfd4bc5a6f40340ca5840c"
"checksum diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "33d7ca63eb2efea87a7f56a283acc49e2ce4b2bd54adf7465dc1d81fef13d8fc"
"checksum diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3"
"checksum diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"

4
server/Cargo.toml vendored
View file

@ -5,7 +5,8 @@ authors = ["Dessalines <happydooby@gmail.com>"]
edition = "2018"
[dependencies]
diesel = { version = "1.4.2", features = ["postgres","chrono", "r2d2", "64-column-tables"] }
diesel = { version = "1.4.4", features = ["postgres","chrono", "r2d2", "64-column-tables"] }
r2d2 = "0.8.8"
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
bcrypt = "0.6.2"
@ -19,6 +20,7 @@ actix-web = "2.0.0"
actix-files = "0.2.1"
actix-web-actors = "2.0.0"
actix-rt = "1.0.0"
actix-threadpool = "0.3.1"
log = "0.4.0"
env_logger = "0.7.1"
rand = "0.7.3"

View file

@ -47,6 +47,13 @@ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use regex::{Regex, RegexBuilder};
use serde::Deserialize;
use actix_threadpool::{run, BlockingError};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use failure::Error;
use std::fmt::Debug;
pub fn to_datetime_utc(ndt: NaiveDateTime) -> DateTime<Utc> {
DateTime::<Utc>::from_utc(ndt, Utc)
@ -224,6 +231,39 @@ pub fn markdown_to_html(text: &str) -> String {
comrak::markdown_to_html(text, &comrak::ComrakOptions::default())
}
#[derive(Clone)]
pub struct DbHandle(Pool<ConnectionManager<PgConnection>>);
impl DbHandle {
/// Start the DbActor with a SyncArbiter.
pub fn start(settings: &Settings) -> Result<Self, Error> {
// Set up the r2d2 connection pool
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
let pool = Pool::builder()
.max_size(settings.database.pool_size)
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
Ok(DbHandle(pool))
}
pub async fn run<F, T, E>(&self, f: F) -> Result<T, BlockingError<E>>
where
F: FnOnce(&PgConnection) -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Debug + From<r2d2::Error> + Send + 'static,
{
let pool = self.0.clone();
run(move || {
let conn = pool.get()?;
(f)(&conn)
})
.await
}
}
#[cfg(test)]
mod tests {
use crate::{extract_usernames, is_email_regex, remove_slurs, slur_check, slurs_vec_to_str};

View file

@ -4,33 +4,28 @@ extern crate diesel_migrations;
use actix::prelude::*;
use actix_web::*;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::PgConnection;
use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
use lemmy_server::settings::Settings;
use lemmy_server::websocket::server::*;
use lemmy_server::db::establish_unpooled_connection;
use lemmy_server::DbHandle;
use std::io;
embed_migrations!();
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init();
let settings = Settings::get();
// Set up the r2d2 connection pool
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
let pool = Pool::builder()
.max_size(settings.database.pool_size)
.build(manager)
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
let db_handle = DbHandle::start(&settings).unwrap();
// Run the migrations from code
let conn = pool.get().unwrap();
embedded_migrations::run(&conn).unwrap();
embedded_migrations::run(&establish_unpooled_connection()).unwrap();
// Set up websocket server
let server = ChatServer::startup(pool.clone()).start();
let server = ChatServer::startup(db_handle.clone()).start();
println!(
"Starting http server at {}:{}",
@ -42,7 +37,7 @@ async fn main() -> io::Result<()> {
let settings = Settings::get();
App::new()
.wrap(middleware::Logger::default())
.data(pool.clone())
.data(db_handle.clone())
.data(server.clone())
// The routes
.configure(api::config)

View file

@ -100,6 +100,7 @@ impl Handler<WSMessage> for WSSession {
type Result = ();
fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
info!("Message sent out: {}", msg.0);
ctx.text(msg.0);
}
}
@ -142,6 +143,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
}
actix::fut::ready(())
})
// Testing out spawn instead of wait
// .spawn(ctx);
.wait(ctx);
}
ws::Message::Binary(_bin) => info!("Unexpected binary"),

View file

@ -22,7 +22,7 @@ use crate::api::site::*;
use crate::api::user::*;
use crate::api::*;
use crate::websocket::UserOperation;
use crate::Settings;
use crate::{Settings, DbHandle};
type ConnectionId = usize;
type PostId = i32;
@ -53,7 +53,7 @@ pub struct Disconnect {
pub ip: IPAddr,
}
#[derive(Serialize, Deserialize, Message)]
#[derive(Serialize, Deserialize, Message, Clone)]
#[rtype(String)]
pub struct StandardMessage {
/// Id of the client session
@ -82,6 +82,7 @@ pub enum RateLimitType {
/// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session.
#[derive(Clone)]
pub struct ChatServer {
/// A map from generated random ID to session addr
sessions: HashMap<ConnectionId, SessionInfo>,
@ -99,20 +100,18 @@ pub struct ChatServer {
/// Rate limiting based on rate type and IP addr
rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
rng: ThreadRng,
db: Pool<ConnectionManager<PgConnection>>,
db_handle: DbHandle,
}
impl ChatServer {
pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
pub fn startup(db_handle: DbHandle) -> ChatServer {
ChatServer {
sessions: HashMap::new(),
rate_limit_buckets: HashMap::new(),
post_rooms: HashMap::new(),
community_rooms: HashMap::new(),
user_rooms: HashMap::new(),
rng: rand::thread_rng(),
db,
db_handle,
}
}
@ -387,7 +386,7 @@ impl Handler<Connect> for ChatServer {
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
// register session with random id
let id = self.rng.gen::<usize>();
let id = ThreadRng::default().gen::<usize>();
info!("{} joined", &msg.ip);
self.sessions.insert(
@ -444,21 +443,35 @@ impl Handler<Disconnect> for ChatServer {
}
}
// type DeferredStandardMessage = MessageResult<StandardMessage>;
/// Handler for Message message.
impl Handler<StandardMessage> for ChatServer {
type Result = MessageResult<StandardMessage>;
// type Result = MessageResult<StandardMessage>;
type Result = ResponseActFuture<Self, MessageResult<StandardMessage>>;
fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
match parse_json_message(self, msg) {
Ok(m) => {
info!("Message Sent: {}", m);
MessageResult(m)
}
Err(e) => {
error!("Error during message handling {}", e);
MessageResult(e.to_string())
}
fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
Box::new(async move {
self.db_handle.run(|conn| {
parse_json_message(self, msg, &conn)
});
}
.into_actor(self).map(|res, actor, ctx| {
// match res {
// Ok(m) => {
// info!("Message Sent: {}", m);
// MessageResult(m);
// Ok(())
// },
// Err(e) => {
// error!("Error during message handling {}", e);
// MessageResult(e.to_string());
// Err(())
// },
// Ok(())
MessageResult("hi".to_string())
}))
}
}
@ -494,14 +507,13 @@ where
to_json_string(&op, &res)
}
fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage, conn: &PgConnection) -> Result<String, Error> {
let json: Value = serde_json::from_str(&msg.msg)?;
let data = &json["data"].to_string();
let op = &json["op"].as_str().ok_or(APIError {
message: "Unknown op type".to_string(),
})?;
let conn = chat.db.get()?;
let user_operation: UserOperation = UserOperation::from_str(&op)?;