Compare commits
2 commits
main
...
sync_actor
Author | SHA1 | Date | |
---|---|---|---|
6a6ed19cc6 | |||
fa5b1caa84 |
6 changed files with 92 additions and 38 deletions
10
server/Cargo.lock
generated
vendored
10
server/Cargo.lock
generated
vendored
|
@ -832,7 +832,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "diesel"
|
||||
version = "1.4.3"
|
||||
version = "1.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1416,13 +1416,14 @@ dependencies = [
|
|||
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-files 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-threadpool 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"bcrypt 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"comrak 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1436,6 +1437,7 @@ dependencies = [
|
|||
"lettre_email 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rss 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1589,7 +1591,7 @@ name = "migrations_internals"
|
|||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3050,7 +3052,7 @@ dependencies = [
|
|||
"checksum derive_builder 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0"
|
||||
"checksum derive_builder_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef"
|
||||
"checksum derive_more 0.99.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c"
|
||||
"checksum diesel 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9d7cc03b910de9935007861dce440881f69102aaaedfd4bc5a6f40340ca5840c"
|
||||
"checksum diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "33d7ca63eb2efea87a7f56a283acc49e2ce4b2bd54adf7465dc1d81fef13d8fc"
|
||||
"checksum diesel_derives 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3"
|
||||
"checksum diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c"
|
||||
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
|
||||
|
|
4
server/Cargo.toml
vendored
4
server/Cargo.toml
vendored
|
@ -5,7 +5,8 @@ authors = ["Dessalines <happydooby@gmail.com>"]
|
|||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
diesel = { version = "1.4.2", features = ["postgres","chrono", "r2d2", "64-column-tables"] }
|
||||
diesel = { version = "1.4.4", features = ["postgres","chrono", "r2d2", "64-column-tables"] }
|
||||
r2d2 = "0.8.8"
|
||||
diesel_migrations = "1.4.0"
|
||||
dotenv = "0.15.0"
|
||||
bcrypt = "0.6.2"
|
||||
|
@ -19,6 +20,7 @@ actix-web = "2.0.0"
|
|||
actix-files = "0.2.1"
|
||||
actix-web-actors = "2.0.0"
|
||||
actix-rt = "1.0.0"
|
||||
actix-threadpool = "0.3.1"
|
||||
log = "0.4.0"
|
||||
env_logger = "0.7.1"
|
||||
rand = "0.7.3"
|
||||
|
|
|
@ -47,6 +47,13 @@ use rand::distributions::Alphanumeric;
|
|||
use rand::{thread_rng, Rng};
|
||||
use regex::{Regex, RegexBuilder};
|
||||
use serde::Deserialize;
|
||||
use actix_threadpool::{run, BlockingError};
|
||||
use diesel::{
|
||||
r2d2::{ConnectionManager, Pool},
|
||||
PgConnection,
|
||||
};
|
||||
use failure::Error;
|
||||
use std::fmt::Debug;
|
||||
|
||||
pub fn to_datetime_utc(ndt: NaiveDateTime) -> DateTime<Utc> {
|
||||
DateTime::<Utc>::from_utc(ndt, Utc)
|
||||
|
@ -224,6 +231,39 @@ pub fn markdown_to_html(text: &str) -> String {
|
|||
comrak::markdown_to_html(text, &comrak::ComrakOptions::default())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DbHandle(Pool<ConnectionManager<PgConnection>>);
|
||||
|
||||
impl DbHandle {
|
||||
/// Start the DbActor with a SyncArbiter.
|
||||
pub fn start(settings: &Settings) -> Result<Self, Error> {
|
||||
|
||||
// Set up the r2d2 connection pool
|
||||
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
|
||||
let pool = Pool::builder()
|
||||
.max_size(settings.database.pool_size)
|
||||
.build(manager)
|
||||
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
|
||||
|
||||
Ok(DbHandle(pool))
|
||||
}
|
||||
|
||||
pub async fn run<F, T, E>(&self, f: F) -> Result<T, BlockingError<E>>
|
||||
where
|
||||
F: FnOnce(&PgConnection) -> Result<T, E> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
E: Debug + From<r2d2::Error> + Send + 'static,
|
||||
{
|
||||
let pool = self.0.clone();
|
||||
run(move || {
|
||||
let conn = pool.get()?;
|
||||
|
||||
(f)(&conn)
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{extract_usernames, is_email_regex, remove_slurs, slur_check, slurs_vec_to_str};
|
||||
|
|
|
@ -4,33 +4,28 @@ extern crate diesel_migrations;
|
|||
|
||||
use actix::prelude::*;
|
||||
use actix_web::*;
|
||||
use diesel::r2d2::{ConnectionManager, Pool};
|
||||
use diesel::PgConnection;
|
||||
use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
|
||||
use lemmy_server::settings::Settings;
|
||||
use lemmy_server::websocket::server::*;
|
||||
use lemmy_server::db::establish_unpooled_connection;
|
||||
use lemmy_server::DbHandle;
|
||||
use std::io;
|
||||
|
||||
embed_migrations!();
|
||||
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
let settings = Settings::get();
|
||||
|
||||
// Set up the r2d2 connection pool
|
||||
let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
|
||||
let pool = Pool::builder()
|
||||
.max_size(settings.database.pool_size)
|
||||
.build(manager)
|
||||
.unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
|
||||
let db_handle = DbHandle::start(&settings).unwrap();
|
||||
|
||||
// Run the migrations from code
|
||||
let conn = pool.get().unwrap();
|
||||
embedded_migrations::run(&conn).unwrap();
|
||||
embedded_migrations::run(&establish_unpooled_connection()).unwrap();
|
||||
|
||||
// Set up websocket server
|
||||
let server = ChatServer::startup(pool.clone()).start();
|
||||
let server = ChatServer::startup(db_handle.clone()).start();
|
||||
|
||||
println!(
|
||||
"Starting http server at {}:{}",
|
||||
|
@ -42,7 +37,7 @@ async fn main() -> io::Result<()> {
|
|||
let settings = Settings::get();
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.data(pool.clone())
|
||||
.data(db_handle.clone())
|
||||
.data(server.clone())
|
||||
// The routes
|
||||
.configure(api::config)
|
||||
|
|
|
@ -100,6 +100,7 @@ impl Handler<WSMessage> for WSSession {
|
|||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
|
||||
info!("Message sent out: {}", msg.0);
|
||||
ctx.text(msg.0);
|
||||
}
|
||||
}
|
||||
|
@ -142,6 +143,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
|
|||
}
|
||||
actix::fut::ready(())
|
||||
})
|
||||
// Testing out spawn instead of wait
|
||||
// .spawn(ctx);
|
||||
.wait(ctx);
|
||||
}
|
||||
ws::Message::Binary(_bin) => info!("Unexpected binary"),
|
||||
|
|
|
@ -22,7 +22,7 @@ use crate::api::site::*;
|
|||
use crate::api::user::*;
|
||||
use crate::api::*;
|
||||
use crate::websocket::UserOperation;
|
||||
use crate::Settings;
|
||||
use crate::{Settings, DbHandle};
|
||||
|
||||
type ConnectionId = usize;
|
||||
type PostId = i32;
|
||||
|
@ -53,7 +53,7 @@ pub struct Disconnect {
|
|||
pub ip: IPAddr,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Message)]
|
||||
#[derive(Serialize, Deserialize, Message, Clone)]
|
||||
#[rtype(String)]
|
||||
pub struct StandardMessage {
|
||||
/// Id of the client session
|
||||
|
@ -82,6 +82,7 @@ pub enum RateLimitType {
|
|||
|
||||
/// `ChatServer` manages chat rooms and responsible for coordinating chat
|
||||
/// session.
|
||||
#[derive(Clone)]
|
||||
pub struct ChatServer {
|
||||
/// A map from generated random ID to session addr
|
||||
sessions: HashMap<ConnectionId, SessionInfo>,
|
||||
|
@ -99,20 +100,18 @@ pub struct ChatServer {
|
|||
/// Rate limiting based on rate type and IP addr
|
||||
rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
|
||||
|
||||
rng: ThreadRng,
|
||||
db: Pool<ConnectionManager<PgConnection>>,
|
||||
db_handle: DbHandle,
|
||||
}
|
||||
|
||||
impl ChatServer {
|
||||
pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
|
||||
pub fn startup(db_handle: DbHandle) -> ChatServer {
|
||||
ChatServer {
|
||||
sessions: HashMap::new(),
|
||||
rate_limit_buckets: HashMap::new(),
|
||||
post_rooms: HashMap::new(),
|
||||
community_rooms: HashMap::new(),
|
||||
user_rooms: HashMap::new(),
|
||||
rng: rand::thread_rng(),
|
||||
db,
|
||||
db_handle,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -387,7 +386,7 @@ impl Handler<Connect> for ChatServer {
|
|||
|
||||
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
|
||||
// register session with random id
|
||||
let id = self.rng.gen::<usize>();
|
||||
let id = ThreadRng::default().gen::<usize>();
|
||||
info!("{} joined", &msg.ip);
|
||||
|
||||
self.sessions.insert(
|
||||
|
@ -444,21 +443,35 @@ impl Handler<Disconnect> for ChatServer {
|
|||
}
|
||||
}
|
||||
|
||||
// type DeferredStandardMessage = MessageResult<StandardMessage>;
|
||||
|
||||
/// Handler for Message message.
|
||||
impl Handler<StandardMessage> for ChatServer {
|
||||
type Result = MessageResult<StandardMessage>;
|
||||
// type Result = MessageResult<StandardMessage>;
|
||||
type Result = ResponseActFuture<Self, MessageResult<StandardMessage>>;
|
||||
|
||||
fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
|
||||
match parse_json_message(self, msg) {
|
||||
Ok(m) => {
|
||||
info!("Message Sent: {}", m);
|
||||
MessageResult(m)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error during message handling {}", e);
|
||||
MessageResult(e.to_string())
|
||||
}
|
||||
fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
|
||||
|
||||
Box::new(async move {
|
||||
self.db_handle.run(|conn| {
|
||||
parse_json_message(self, msg, &conn)
|
||||
});
|
||||
}
|
||||
.into_actor(self).map(|res, actor, ctx| {
|
||||
// match res {
|
||||
// Ok(m) => {
|
||||
// info!("Message Sent: {}", m);
|
||||
// MessageResult(m);
|
||||
// Ok(())
|
||||
// },
|
||||
// Err(e) => {
|
||||
// error!("Error during message handling {}", e);
|
||||
// MessageResult(e.to_string());
|
||||
// Err(())
|
||||
// },
|
||||
// Ok(())
|
||||
MessageResult("hi".to_string())
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -494,14 +507,13 @@ where
|
|||
to_json_string(&op, &res)
|
||||
}
|
||||
|
||||
fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
|
||||
fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage, conn: &PgConnection) -> Result<String, Error> {
|
||||
let json: Value = serde_json::from_str(&msg.msg)?;
|
||||
let data = &json["data"].to_string();
|
||||
let op = &json["op"].as_str().ok_or(APIError {
|
||||
message: "Unknown op type".to_string(),
|
||||
})?;
|
||||
|
||||
let conn = chat.db.get()?;
|
||||
|
||||
let user_operation: UserOperation = UserOperation::from_str(&op)?;
|
||||
|
||||
|
|
Loading…
Reference in a new issue