Refactoring websocket code, adding create community

- Adding create community
This commit is contained in:
Dessalines 2019-03-24 20:51:27 -07:00
parent c438f0fef1
commit fc1c16a58b
7 changed files with 440 additions and 220 deletions

View file

@ -4,7 +4,7 @@ use diesel::result::Error;
use schema::user_::dsl::*;
use serde::{Serialize, Deserialize};
use {Crud,is_email_regex};
use jsonwebtoken::{encode, decode, Header, Validation};
use jsonwebtoken::{encode, decode, Header, Validation, TokenData};
use bcrypt::{DEFAULT_COST, hash};
#[derive(Queryable, Identifiable, PartialEq, Debug)]
@ -60,9 +60,20 @@ impl Crud<UserForm> for User_ {
}
#[derive(Debug, Serialize, Deserialize)]
struct Claims {
id: i32,
username: String
pub struct Claims {
pub id: i32,
pub username: String,
pub iss: String,
}
impl Claims {
pub fn decode(jwt: &str) -> Result<TokenData<Claims>, jsonwebtoken::errors::Error> {
let v = Validation {
validate_exp: false,
..Validation::default()
};
decode::<Claims>(&jwt, "secret".as_ref(), &v)
}
}
type Jwt = String;
@ -70,7 +81,8 @@ impl User_ {
pub fn jwt(&self) -> Jwt {
let my_claims = Claims {
id: self.id,
username: self.name.to_owned()
username: self.name.to_owned(),
iss: "rrf".to_string() // TODO this should come from config file
};
encode(&Header::default(), &my_claims, "secret".as_ref()).unwrap()
}
@ -86,12 +98,13 @@ impl User_ {
}
pub fn find_by_jwt(conn: &PgConnection, jwt: &str) -> Result<Self, Error> {
let token = decode::<Claims>(&jwt, "secret".as_ref(), &Validation::default())
.expect("Couldn't decode jwt");
Self::read(&conn, token.claims.id)
let claims: Claims = Claims::decode(&jwt).expect("Invalid token").claims;
Self::read(&conn, claims.id)
}
}
#[cfg(test)]
mod tests {
use establish_connection;

View file

@ -4,7 +4,6 @@ use std::time::{Instant, Duration};
use server::actix::*;
use server::actix_web::server::HttpServer;
use server::actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse};
use std::str::FromStr;
use server::websocket_server::server::*;
@ -82,15 +81,16 @@ impl Handler<WSMessage> for WSSession {
type Result = ();
fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
println!("id: {} msg: {}", self.id, msg.0);
ctx.text(msg.0);
ctx.text("NO");
}
}
use server::serde_json::Value;
/// WebSocket message handler
impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
println!("WEBSOCKET MESSAGE: {:?}", msg);
println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
match msg {
ws::Message::Ping(msg) => {
self.hb = Instant::now();
@ -100,86 +100,29 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
self.hb = Instant::now();
}
ws::Message::Text(text) => {
let m = text.trim();
let json: Value = serde_json::from_str(m).unwrap();
// Get the OP command, and its data
let op: &str = &json["op"].as_str().unwrap();
let data: &Value = &json["data"];
let user_operation: UserOperation = UserOperation::from_str(op).unwrap();
match user_operation {
UserOperation::Login => {
let login: Login = serde_json::from_str(&data.to_string()).unwrap();
ctx.state()
.addr
.send(login)
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(response) => match response {
Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()),
Err(e) => {
let error_message_str: String = serde_json::to_string(&e).unwrap();
eprintln!("{}", &error_message_str);
ctx.text(&error_message_str);
}
},
_ => println!("Something is wrong"),
}
fut::ok(())
})
.wait(ctx)
},
UserOperation::Register => {
let register: Register = serde_json::from_str(&data.to_string()).unwrap();
ctx.state()
.addr
.send(register)
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(response) => match response {
Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()),
Err(e) => {
let error_message_str: String = serde_json::to_string(&e).unwrap();
eprintln!("{}", &error_message_str);
ctx.text(&error_message_str);
}
},
_ => println!("Something is wrong"),
}
fut::ok(())
})
.wait(ctx)
},
UserOperation::CreateCommunity => {
use server::actions::community::CommunityForm;
let auth: &str = &json["auth"].as_str().unwrap();
let community_form: CommunityForm = serde_json::from_str(&data.to_string()).unwrap();
ctx.state()
.addr
.send(community_form)
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(response) => match response {
Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()),
Err(e) => {
let error_message_str: String = serde_json::to_string(&e).unwrap();
eprintln!("{}", &error_message_str);
ctx.text(&error_message_str);
}
},
_ => println!("Something is wrong"),
}
fut::ok(())
})
.wait(ctx)
},
_ => ctx.text(format!("!!! unknown command: {:?}", m)),
}
let m = text.trim().to_owned();
ctx.state()
.addr
.send(StandardMessage {
id: self.id,
msg: m
})
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(res) => ctx.text(res),
Err(e) => {
eprintln!("{}", &e);
// ctx.text(e);
}
}
// Ok(res) => ctx.text(res),
// // something is wrong with chat server
// _ => ctx.stop(),
fut::ok(())
})
.wait(ctx);
// we check for /sss type of messages
// if m.starts_with('/') {

View file

@ -6,10 +6,13 @@ use actix::prelude::*;
use rand::{rngs::ThreadRng, Rng};
use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use serde_json::{Result, Value};
use bcrypt::{verify};
use std::str::FromStr;
use {Crud,establish_connection};
use {Crud, Joinable, establish_connection};
use actions::community::*;
use actions::user::*;
#[derive(EnumString,ToString,Debug)]
pub enum UserOperation {
@ -58,6 +61,24 @@ pub struct ClientMessage {
pub room: String,
}
#[derive(Serialize, Deserialize)]
pub struct StandardMessage {
/// Id of the client session
pub id: usize,
/// Peer message
pub msg: String,
}
impl actix::Message for StandardMessage {
type Result = String;
}
#[derive(Serialize, Deserialize)]
pub struct StandardResponse<T> {
op: String,
response: T
}
/// List of available rooms
pub struct ListRooms;
@ -80,10 +101,6 @@ pub struct Login {
pub password: String
}
impl actix::Message for Login {
type Result = Result<LoginResponse, ErrorMessage>;
}
#[derive(Serialize, Deserialize)]
pub struct Register {
username: String,
@ -98,23 +115,15 @@ pub struct LoginResponse {
jwt: String
}
impl actix::Message for Register {
type Result = Result<LoginResponse, ErrorMessage>;
#[derive(Serialize, Deserialize)]
pub struct CreateCommunity {
name: String,
}
// #[derive(Serialize, Deserialize)]
// pub struct CreateCommunity {
// name: String
// }
#[derive(Serialize, Deserialize)]
pub struct CreateCommunityResponse {
op: String,
community: Community
}
impl actix::Message for CommunityForm {
type Result = Result<CreateCommunityResponse, ErrorMessage>;
data: Community
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat
@ -152,6 +161,16 @@ impl ChatServer {
}
}
}
/// Send message only to self
fn send(&self, message: &str, id: &usize) {
// println!("{:?}", self.sessions);
if let Some(addr) = self.sessions.get(id) {
println!("msg: {}", message);
// println!("{:?}", addr.connected());
let _ = addr.do_send(WSMessage(message.to_owned()));
}
}
}
/// Make actor from `ChatServer`
@ -219,121 +238,176 @@ impl Handler<ClientMessage> for ChatServer {
}
}
/// Handler for `ListRooms` message.
impl Handler<ListRooms> for ChatServer {
type Result = MessageResult<ListRooms>;
/// Handler for Message message.
impl Handler<StandardMessage> for ChatServer {
type Result = MessageResult<StandardMessage>;
fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result {
let mut rooms = Vec::new();
fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
for key in self.rooms.keys() {
rooms.push(key.to_owned())
}
let json: Value = serde_json::from_str(&msg.msg)
.expect("Couldn't parse message");
MessageResult(rooms)
}
}
let data: &Value = &json["data"];
let op = &json["op"].as_str().unwrap();
let auth = &json["auth"].as_str();
let user_operation: UserOperation = UserOperation::from_str(&op).unwrap();
/// Join room, send disconnect message to old room
/// send join message to new room
impl Handler<Join> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Join, _: &mut Context<Self>) {
let Join { id, name } = msg;
let mut rooms = Vec::new();
// remove session from all rooms
for (n, sessions) in &mut self.rooms {
if sessions.remove(&id) {
rooms.push(n.to_owned());
let res: String = match user_operation {
UserOperation::Login => {
let login: Login = serde_json::from_str(&data.to_string()).unwrap();
login.perform()
},
UserOperation::Register => {
let register: Register = serde_json::from_str(&data.to_string()).unwrap();
register.perform()
},
UserOperation::CreateCommunity => {
let create_community: CreateCommunity = serde_json::from_str(&data.to_string()).unwrap();
match auth {
Some(auth) => {
create_community.perform(auth)
},
None => serde_json::to_string(
&ErrorMessage {
op: UserOperation::CreateCommunity.to_string(),
error: "Not logged in.".to_string()
}
)
.unwrap()
}
},
_ => {
let e = ErrorMessage {
op: "Unknown".to_string(),
error: "Unknown User Operation".to_string()
};
serde_json::to_string(&e).unwrap()
}
}
// send message to other users
for room in rooms {
self.send_room_message(&room, "Someone disconnected", 0);
}
// _ => "no".to_string()
};
if self.rooms.get_mut(&name).is_none() {
self.rooms.insert(name.clone(), HashSet::new());
}
self.send_room_message(&name, "Someone connected", id);
self.rooms.get_mut(&name).unwrap().insert(id);
// let data: &Value = &json["data"];
// let res = StandardResponse {op: "nope".to_string(), response: "hi".to_string()};
// let out = serde_json::to_string(&res).unwrap();
MessageResult(res)
}
}
impl Handler<Login> for ChatServer {
// /// Handler for `ListRooms` message.
// impl Handler<ListRooms> for ChatServer {
// type Result = MessageResult<ListRooms>;
type Result = MessageResult<Login>;
fn handle(&mut self, msg: Login, _: &mut Context<Self>) -> Self::Result {
// fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result {
// let mut rooms = Vec::new();
// for key in self.rooms.keys() {
// rooms.push(key.to_owned())
// }
// MessageResult(rooms)
// }
// }
// /// Join room, send disconnect message to old room
// /// send join message to new room
// impl Handler<Join> for ChatServer {
// type Result = ();
// fn handle(&mut self, msg: Join, _: &mut Context<Self>) {
// let Join { id, name } = msg;
// let mut rooms = Vec::new();
// // remove session from all rooms
// for (n, sessions) in &mut self.rooms {
// if sessions.remove(&id) {
// rooms.push(n.to_owned());
// }
// }
// // send message to other users
// for room in rooms {
// self.send_room_message(&room, "Someone disconnected", 0);
// }
// if self.rooms.get_mut(&name).is_none() {
// self.rooms.insert(name.clone(), HashSet::new());
// }
// self.send_room_message(&name, "Someone connected", id);
// self.rooms.get_mut(&name).unwrap().insert(id);
// }
// }
pub trait Perform {
fn perform(&self) -> String;
}
pub trait PerformAuth {
fn perform(&self, auth: &str) -> String;
}
impl Perform for Login {
fn perform(&self) -> String {
use actions::user::*;
let conn = establish_connection();
// Fetch that username / email
let user: User_ = match User_::find_by_email_or_username(&conn, &msg.username_or_email) {
let user: User_ = match User_::find_by_email_or_username(&conn, &self.username_or_email) {
Ok(user) => user,
Err(e) => return MessageResult(
Err(
ErrorMessage {
op: UserOperation::Login.to_string(),
error: "Couldn't find that username or email".to_string()
}
)
Err(e) => return serde_json::to_string(
&ErrorMessage {
op: UserOperation::Login.to_string(),
error: "Couldn't find that username or email".to_string()
}
)
.unwrap()
};
// Verify the password
let valid: bool = verify(&msg.password, &user.password_encrypted).unwrap_or(false);
let valid: bool = verify(&self.password, &user.password_encrypted).unwrap_or(false);
if !valid {
return MessageResult(
Err(
ErrorMessage {
op: UserOperation::Login.to_string(),
error: "Password incorrect".to_string()
}
)
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::Login.to_string(),
error: "Password incorrect".to_string()
}
)
.unwrap()
}
// Return the jwt
MessageResult(
Ok(
LoginResponse {
op: UserOperation::Login.to_string(),
jwt: user.jwt()
}
)
serde_json::to_string(
&LoginResponse {
op: UserOperation::Login.to_string(),
jwt: user.jwt()
}
)
.unwrap()
}
}
impl Handler<Register> for ChatServer {
impl Perform for Register {
fn perform(&self) -> String {
type Result = MessageResult<Register>;
fn handle(&mut self, msg: Register, _: &mut Context<Self>) -> Self::Result {
use actions::user::*;
let conn = establish_connection();
// Make sure passwords match
if msg.password != msg.password_verify {
return MessageResult(
Err(
ErrorMessage {
op: UserOperation::Register.to_string(),
error: "Passwords do not match.".to_string()
}
)
);
if &self.password != &self.password_verify {
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::Register.to_string(),
error: "Passwords do not match.".to_string()
}
)
.unwrap();
}
// Register the new user
let user_form = UserForm {
name: msg.username,
email: msg.email,
password_encrypted: msg.password,
name: self.username.to_owned(),
email: self.email.to_owned(),
password_encrypted: self.password.to_owned(),
preferred_username: None,
updated: None
};
@ -341,55 +415,232 @@ impl Handler<Register> for ChatServer {
// Create the user
let inserted_user = match User_::create(&conn, &user_form) {
Ok(user) => user,
Err(e) => return MessageResult(
Err(
ErrorMessage {
Err(e) => {
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::Register.to_string(),
error: "User already exists.".to_string() // overwrite the diesel error
}
)
)
.unwrap()
}
};
// Return the jwt
MessageResult(
Ok(
LoginResponse {
op: UserOperation::Register.to_string(),
jwt: inserted_user.jwt()
}
)
serde_json::to_string(
&LoginResponse {
op: UserOperation::Register.to_string(),
jwt: inserted_user.jwt()
}
)
.unwrap()
}
}
impl PerformAuth for CreateCommunity {
fn perform(&self, auth: &str) -> String {
impl Handler<CommunityForm> for ChatServer {
type Result = MessageResult<CommunityForm>;
fn handle(&mut self, form: CommunityForm, _: &mut Context<Self>) -> Self::Result {
let conn = establish_connection();
let community = match Community::create(&conn, &form) {
let claims = match Claims::decode(&auth) {
Ok(claims) => claims.claims,
Err(e) => {
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::CreateCommunity.to_string(),
error: "Community user already exists.".to_string() // overwrite the diesel error
}
)
.unwrap();
}
};
let user_id = claims.id;
let iss = claims.iss;
// Register the new user
let community_form = CommunityForm {
name: self.name.to_owned(),
updated: None
};
let inserted_community = match Community::create(&conn, &community_form) {
Ok(community) => community,
Err(e) => return MessageResult(
Err(
ErrorMessage {
Err(e) => {
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::CreateCommunity.to_string(),
error: "Community already exists.".to_string() // overwrite the diesel error
}
)
)
.unwrap()
}
};
MessageResult(
Ok(
CreateCommunityResponse {
op: UserOperation::CreateCommunity.to_string(),
community: community
}
)
let community_user_form = CommunityUserForm {
community_id: inserted_community.id,
fedi_user_id: format!("{}/{}", iss, user_id)
};
let inserted_community_user = match CommunityUser::join(&conn, &community_user_form) {
Ok(user) => user,
Err(e) => {
return serde_json::to_string(
&ErrorMessage {
op: UserOperation::CreateCommunity.to_string(),
error: "Community user already exists.".to_string() // overwrite the diesel error
}
)
.unwrap()
}
};
// Return the jwt
serde_json::to_string(
&CreateCommunityResponse {
op: UserOperation::CreateCommunity.to_string(),
data: inserted_community
}
)
.unwrap()
}
}
// impl Handler<Login> for ChatServer {
// type Result = MessageResult<Login>;
// fn handle(&mut self, msg: Login, _: &mut Context<Self>) -> Self::Result {
// let conn = establish_connection();
// // Fetch that username / email
// let user: User_ = match User_::find_by_email_or_username(&conn, &msg.username_or_email) {
// Ok(user) => user,
// Err(e) => return MessageResult(
// Err(
// ErrorMessage {
// op: UserOperation::Login.to_string(),
// error: "Couldn't find that username or email".to_string()
// }
// )
// )
// };
// // Verify the password
// let valid: bool = verify(&msg.password, &user.password_encrypted).unwrap_or(false);
// if !valid {
// return MessageResult(
// Err(
// ErrorMessage {
// op: UserOperation::Login.to_string(),
// error: "Password incorrect".to_string()
// }
// )
// )
// }
// // Return the jwt
// MessageResult(
// Ok(
// LoginResponse {
// op: UserOperation::Login.to_string(),
// jwt: user.jwt()
// }
// )
// )
// }
// }
// impl Handler<Register> for ChatServer {
// type Result = MessageResult<Register>;
// fn handle(&mut self, msg: Register, _: &mut Context<Self>) -> Self::Result {
// let conn = establish_connection();
// // Make sure passwords match
// if msg.password != msg.password_verify {
// return MessageResult(
// Err(
// ErrorMessage {
// op: UserOperation::Register.to_string(),
// error: "Passwords do not match.".to_string()
// }
// )
// );
// }
// // Register the new user
// let user_form = UserForm {
// name: msg.username,
// email: msg.email,
// password_encrypted: msg.password,
// preferred_username: None,
// updated: None
// };
// // Create the user
// let inserted_user = match User_::create(&conn, &user_form) {
// Ok(user) => user,
// Err(e) => return MessageResult(
// Err(
// ErrorMessage {
// op: UserOperation::Register.to_string(),
// error: "User already exists.".to_string() // overwrite the diesel error
// }
// )
// )
// };
// // Return the jwt
// MessageResult(
// Ok(
// LoginResponse {
// op: UserOperation::Register.to_string(),
// jwt: inserted_user.jwt()
// }
// )
// )
// }
// }
// impl Handler<CreateCommunity> for ChatServer {
// type Result = MessageResult<CreateCommunity>;
// fn handle(&mut self, msg: CreateCommunity, _: &mut Context<Self>) -> Self::Result {
// let conn = establish_connection();
// let user_id = Claims::decode(&msg.auth).id;
// let community_form = CommunityForm {
// name: msg.name,
// updated: None
// };
// let community = match Community::create(&conn, &community_form) {
// Ok(community) => community,
// Err(e) => return MessageResult(
// Err(
// ErrorMessage {
// op: UserOperation::CreateCommunity.to_string(),
// error: "Community already exists.".to_string() // overwrite the diesel error
// }
// )
// )
// };
// MessageResult(
// Ok(
// CreateCommunityResponse {
// op: UserOperation::CreateCommunity.to_string(),
// community: community
// }
// )
// )
// }
// }

View file

@ -5,6 +5,8 @@ import { CommunityForm, UserOperation } from '../interfaces';
import { WebSocketService, UserService } from '../services';
import { msgOp } from '../utils';
import { Community } from '../interfaces';
interface State {
communityForm: CommunityForm;
}
@ -28,6 +30,7 @@ export class CreateCommunity extends Component<any, State> {
.subscribe(
(msg) => this.parseMessage(msg),
(err) => console.error(err),
() => console.log("complete")
);
}
@ -80,10 +83,14 @@ export class CreateCommunity extends Component<any, State> {
parseMessage(msg: any) {
let op: UserOperation = msgOp(msg);
console.log(msg);
if (msg.error) {
alert(msg.error);
return;
} else {
if (op == UserOperation.CreateCommunity) {
let community: Community = msg.data;
}
}
}

View file

@ -35,6 +35,7 @@ export class Login extends Component<any, State> {
.subscribe(
(msg) => this.parseMessage(msg),
(err) => console.error(err),
() => console.log("complete")
);
}

View file

@ -3,10 +3,17 @@ export enum UserOperation {
}
export interface User {
id: number
id: number;
username: string;
}
export interface Community {
id: number;
name: string;
published: Date;
updated?: Date;
}
export interface LoginForm {
username_or_email: string;
password: string;
@ -21,7 +28,6 @@ export interface RegisterForm {
export interface CommunityForm {
name: string;
updated?: number
}
export interface PostForm {

View file

@ -19,9 +19,9 @@ export class UserService {
}
public login(jwt: string) {
this.setUser(jwt);
Cookies.set("jwt", jwt);
console.log("jwt cookie set");
this.setUser(jwt);
}
public logout() {
@ -42,7 +42,6 @@ export class UserService {
private setUser(jwt: string) {
this.user = jwt_decode(jwt);
this.sub.next(this.user);
console.log(this.user.username);
}
public static get Instance(){