From f6926f8af2079cd2105ef797bfc4544bff575042 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Tue, 3 Aug 2021 15:59:20 -0400 Subject: [PATCH] Beginning work on adding tokio-diesel. #1684 --- Cargo.lock | 166 ++++++++++++----- crates/api_crud/Cargo.toml | 1 + crates/db_queries/Cargo.toml | 2 + .../src/aggregates/comment_aggregates.rs | 42 ++--- crates/db_queries/src/lib.rs | 103 ++++++---- crates/db_queries/src/source/post.rs | 176 +++++++++--------- src/main.rs | 17 +- 7 files changed, 293 insertions(+), 214 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 622c260c6..51ff3626e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,7 +36,7 @@ dependencies = [ "actix-rt", "actix_derive", "bitflags", - "bytes", + "bytes 1.0.1", "crossbeam-channel", "futures-core", "futures-sink", @@ -45,9 +45,9 @@ dependencies = [ "log", "once_cell", "parking_lot", - "pin-project-lite", + "pin-project-lite 0.2.7", "smallvec", - "tokio", + "tokio 1.8.0", "tokio-util", ] @@ -58,12 +58,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d5dbeb2d9e51344cb83ca7cc170f1217f9fe25bfc50160e6e200b5c31c1019a" dependencies = [ "bitflags", - "bytes", + "bytes 1.0.1", "futures-core", "futures-sink", "log", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.7", + "tokio 1.8.0", "tokio-util", ] @@ -81,7 +81,7 @@ dependencies = [ "ahash 0.7.4", "base64 0.13.0", "bitflags", - "bytes", + "bytes 1.0.1", "bytestring", "derive_more", "encoding_rs", @@ -98,14 +98,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project", - "pin-project-lite", + "pin-project-lite 0.2.7", "rand 0.8.4", "regex", "serde", "sha-1 0.9.6", "smallvec", "time 0.2.27", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -139,7 +139,7 @@ checksum = "bc7d7cd957c9ed92288a7c3c96af81fa5291f65247a76a34dac7b6af74e52ba0" dependencies = [ "actix-macros", "futures-core", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -156,7 +156,7 @@ dependencies = [ "mio", "num_cpus", "slab", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -167,7 +167,7 @@ checksum = "77f5f9d66a8730d0fae62c26f3424f5751e5518086628a40b7ab6fca4a705034" dependencies = [ "futures-core", "paste", - "pin-project-lite", + "pin-project-lite 0.2.7", ] [[package]] @@ -196,7 +196,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e491cbaac2e7fc788dfff99ff48ef317e23b3cf63dbaf7aaab6418f40f92aa94" dependencies = [ "local-waker", - "pin-project-lite", + "pin-project-lite 0.2.7", ] [[package]] @@ -216,7 +216,7 @@ dependencies = [ "actix-utils", "actix-web-codegen", "ahash 0.7.4", - "bytes", + "bytes 1.0.1", "cfg-if", "cookie", "derive_more", @@ -251,11 +251,11 @@ dependencies = [ "actix-codec", "actix-http", "actix-web", - "bytes", + "bytes 1.0.1", "bytestring", "futures-core", "pin-project", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -393,7 +393,7 @@ dependencies = [ "actix-rt", "actix-service", "base64 0.13.0", - "bytes", + "bytes 1.0.1", "cfg-if", "derive_more", "futures-core", @@ -401,7 +401,7 @@ dependencies = [ "log", "mime", "percent-encoding", - "pin-project-lite", + "pin-project-lite 0.2.7", "rand 0.8.4", "serde", "serde_json", @@ -435,7 +435,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 1.8.0", "uuid", ] @@ -454,7 +454,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 1.8.0", "uuid", ] @@ -585,6 +585,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.0.1" @@ -597,7 +603,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90706ba19e97b90786e19dc0d5e2abd80008d99d4c0c5d1ad0b5e72cec7c494d" dependencies = [ - "bytes", + "bytes 1.0.1", ] [[package]] @@ -1176,7 +1182,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite", + "pin-project-lite 0.2.7", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1252,7 +1258,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726" dependencies = [ - "bytes", + "bytes 1.0.1", "fnv", "futures-core", "futures-sink", @@ -1260,7 +1266,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", + "tokio 1.8.0", "tokio-util", "tracing", ] @@ -1321,7 +1327,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" dependencies = [ - "bytes", + "bytes 1.0.1", "fnv", "itoa", ] @@ -1332,9 +1338,9 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ - "bytes", + "bytes 1.0.1", "http", - "pin-project-lite", + "pin-project-lite 0.2.7", ] [[package]] @@ -1371,7 +1377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a1e24f3c8b2c8b5eddb82d4cdf07dafe01f5b87f92b81a369dd520a107d33e8" dependencies = [ "base64 0.13.0", - "bytes", + "bytes 1.0.1", "chrono", "futures", "http", @@ -1379,7 +1385,7 @@ dependencies = [ "reqwest", "sha2", "thiserror", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -1406,7 +1412,7 @@ version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07d6baa1b441335f3ce5098ac421fb6547c46dda735ca1bc6d0153c838f9dd83" dependencies = [ - "bytes", + "bytes 1.0.1", "futures-channel", "futures-core", "futures-util", @@ -1416,9 +1422,9 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project-lite", + "pin-project-lite 0.2.7", "socket2", - "tokio", + "tokio 1.8.0", "tower-service", "tracing", "want", @@ -1430,10 +1436,10 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes", + "bytes 1.0.1", "hyper", "native-tls", - "tokio", + "tokio 1.8.0", "tokio-native-tls", ] @@ -1597,7 +1603,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", - "tokio", + "tokio 1.8.0", "url", "uuid", ] @@ -1660,7 +1666,8 @@ dependencies = [ "strum", "strum_macros", "thiserror", - "tokio", + "tokio 1.8.0", + "tokio-diesel 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "url", "uuid", ] @@ -1708,7 +1715,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", - "tokio", + "tokio 1.8.0", "url", "uuid", ] @@ -1757,6 +1764,8 @@ dependencies = [ "sha2", "strum", "strum_macros", + "tokio 1.8.0", + "tokio-diesel 0.3.0 (git+https://github.com/mehcode/tokio-diesel)", "url", ] @@ -1867,7 +1876,7 @@ dependencies = [ "serde", "serde_json", "strum", - "tokio", + "tokio 1.8.0", "url", ] @@ -1901,7 +1910,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", - "tokio", + "tokio 1.8.0", "url", ] @@ -1927,7 +1936,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -2399,6 +2408,12 @@ dependencies = [ "syn 1.0.73", ] +[[package]] +name = "pin-project-lite" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -2697,7 +2712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" dependencies = [ "base64 0.13.0", - "bytes", + "bytes 1.0.1", "encoding_rs", "futures-core", "futures-util", @@ -2712,11 +2727,11 @@ dependencies = [ "mime", "native-tls", "percent-encoding", - "pin-project-lite", + "pin-project-lite 0.2.7", "serde", "serde_json", "serde_urlencoded", - "tokio", + "tokio 1.8.0", "tokio-native-tls", "url", "wasm-bindgen", @@ -3265,6 +3280,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" +dependencies = [ + "bytes 0.5.6", + "pin-project-lite 0.1.12", + "slab", +] + [[package]] name = "tokio" version = "1.8.0" @@ -3272,17 +3298,55 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "570c2eb13b3ab38208130eccd41be92520388791207fde783bda7c1e8ace28d4" dependencies = [ "autocfg", - "bytes", + "bytes 1.0.1", "libc", "memchr", "mio", + "num_cpus", "once_cell", "parking_lot", - "pin-project-lite", + "pin-project-lite 0.2.7", "signal-hook-registry", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-diesel" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713f06058e12ed5adb542401b9d20f2b18b3fccf6184c5d77b57a31b3d21cc69" +dependencies = [ + "async-trait", + "diesel", + "futures", + "r2d2", + "tokio 0.2.25", +] + +[[package]] +name = "tokio-diesel" +version = "0.3.0" +source = "git+https://github.com/mehcode/tokio-diesel#f4af42558246ab323600622ba8d08803d3c18842" +dependencies = [ + "async-trait", + "diesel", + "futures", + "r2d2", + "tokio 1.8.0", +] + +[[package]] +name = "tokio-macros" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" +dependencies = [ + "proc-macro2 1.0.27", + "quote 1.0.9", + "syn 1.0.73", +] + [[package]] name = "tokio-native-tls" version = "0.3.0" @@ -3290,7 +3354,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio", + "tokio 1.8.0", ] [[package]] @@ -3300,7 +3364,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls", - "tokio", + "tokio 1.8.0", "webpki", ] @@ -3310,12 +3374,12 @@ version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" dependencies = [ - "bytes", + "bytes 1.0.1", "futures-core", "futures-sink", "log", - "pin-project-lite", - "tokio", + "pin-project-lite 0.2.7", + "tokio 1.8.0", ] [[package]] @@ -3340,7 +3404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if", - "pin-project-lite", + "pin-project-lite 0.2.7", "tracing-core", ] diff --git a/crates/api_crud/Cargo.toml b/crates/api_crud/Cargo.toml index e64d50b11..45a595d25 100644 --- a/crates/api_crud/Cargo.toml +++ b/crates/api_crud/Cargo.toml @@ -42,3 +42,4 @@ anyhow = "1.0.41" thiserror = "1.0.26" background-jobs = "0.9.0" reqwest = { version = "0.11.4", features = ["json"] } +tokio-diesel = "0.3.0" diff --git a/crates/db_queries/Cargo.toml b/crates/db_queries/Cargo.toml index 1fa57ef84..2c6804024 100644 --- a/crates/db_queries/Cargo.toml +++ b/crates/db_queries/Cargo.toml @@ -24,6 +24,8 @@ url = { version = "2.2.2", features = ["serde"] } lazy_static = "1.4.0" regex = "1.5.4" bcrypt = "0.10.0" +tokio = { version = "1.8.0", features = ["rt", "macros"] } +tokio-diesel = { git = "https://github.com/mehcode/tokio-diesel" } [dev-dependencies] serial_test = "0.5.1" diff --git a/crates/db_queries/src/aggregates/comment_aggregates.rs b/crates/db_queries/src/aggregates/comment_aggregates.rs index 898c57da3..fba0dac1c 100644 --- a/crates/db_queries/src/aggregates/comment_aggregates.rs +++ b/crates/db_queries/src/aggregates/comment_aggregates.rs @@ -14,10 +14,10 @@ pub struct CommentAggregates { } impl CommentAggregates { - pub fn read(conn: &PgConnection, comment_id: CommentId) -> Result { + pub fn read(pool: &PgConnection, comment_id: CommentId) -> Result { comment_aggregates::table .filter(comment_aggregates::comment_id.eq(comment_id)) - .first::(conn) + .first::(pool) } } @@ -25,7 +25,7 @@ impl CommentAggregates { mod tests { use crate::{ aggregates::comment_aggregates::CommentAggregates, - establish_unpooled_connection, + setup_connection_pool_for_tests, Crud, Likeable, }; @@ -40,21 +40,21 @@ mod tests { #[test] #[serial] fn test_crud() { - let conn = establish_unpooled_connection(); + let pool = setup_connection_pool_for_tests(); let new_person = PersonForm { name: "thommy_comment_agg".into(), ..PersonForm::default() }; - let inserted_person = Person::create(&conn, &new_person).unwrap(); + let inserted_person = Person::create(&pool, &new_person).unwrap(); let another_person = PersonForm { name: "jerry_comment_agg".into(), ..PersonForm::default() }; - let another_inserted_person = Person::create(&conn, &another_person).unwrap(); + let another_inserted_person = Person::create(&pool, &another_person).unwrap(); let new_community = CommunityForm { name: "TIL_comment_agg".into(), @@ -62,7 +62,7 @@ mod tests { ..CommunityForm::default() }; - let inserted_community = Community::create(&conn, &new_community).unwrap(); + let inserted_community = Community::create(&pool, &new_community).unwrap(); let new_post = PostForm { name: "A test post".into(), @@ -71,7 +71,7 @@ mod tests { ..PostForm::default() }; - let inserted_post = Post::create(&conn, &new_post).unwrap(); + let inserted_post = Post::create(&pool, &new_post).unwrap(); let comment_form = CommentForm { content: "A test comment".into(), @@ -80,7 +80,7 @@ mod tests { ..CommentForm::default() }; - let inserted_comment = Comment::create(&conn, &comment_form).unwrap(); + let inserted_comment = Comment::create(&pool, &comment_form).unwrap(); let child_comment_form = CommentForm { content: "A test comment".into(), @@ -90,7 +90,7 @@ mod tests { ..CommentForm::default() }; - let _inserted_child_comment = Comment::create(&conn, &child_comment_form).unwrap(); + let _inserted_child_comment = Comment::create(&pool, &child_comment_form).unwrap(); let comment_like = CommentLikeForm { comment_id: inserted_comment.id, @@ -99,9 +99,9 @@ mod tests { score: 1, }; - CommentLike::like(&conn, &comment_like).unwrap(); + CommentLike::like(&pool, &comment_like).unwrap(); - let comment_aggs_before_delete = CommentAggregates::read(&conn, inserted_comment.id).unwrap(); + let comment_aggs_before_delete = CommentAggregates::read(&pool, inserted_comment.id).unwrap(); assert_eq!(1, comment_aggs_before_delete.score); assert_eq!(1, comment_aggs_before_delete.upvotes); @@ -115,35 +115,35 @@ mod tests { score: -1, }; - CommentLike::like(&conn, &comment_dislike).unwrap(); + CommentLike::like(&pool, &comment_dislike).unwrap(); - let comment_aggs_after_dislike = CommentAggregates::read(&conn, inserted_comment.id).unwrap(); + let comment_aggs_after_dislike = CommentAggregates::read(&pool, inserted_comment.id).unwrap(); assert_eq!(0, comment_aggs_after_dislike.score); assert_eq!(1, comment_aggs_after_dislike.upvotes); assert_eq!(1, comment_aggs_after_dislike.downvotes); // Remove the first comment like - CommentLike::remove(&conn, inserted_person.id, inserted_comment.id).unwrap(); - let after_like_remove = CommentAggregates::read(&conn, inserted_comment.id).unwrap(); + CommentLike::remove(&pool, inserted_person.id, inserted_comment.id).unwrap(); + let after_like_remove = CommentAggregates::read(&pool, inserted_comment.id).unwrap(); assert_eq!(-1, after_like_remove.score); assert_eq!(0, after_like_remove.upvotes); assert_eq!(1, after_like_remove.downvotes); // Remove the parent post - Post::delete(&conn, inserted_post.id).unwrap(); + Post::delete(&pool, inserted_post.id).unwrap(); // Should be none found, since the post was deleted - let after_delete = CommentAggregates::read(&conn, inserted_comment.id); + let after_delete = CommentAggregates::read(&pool, inserted_comment.id); assert!(after_delete.is_err()); // This should delete all the associated rows, and fire triggers - Person::delete(&conn, another_inserted_person.id).unwrap(); - let person_num_deleted = Person::delete(&conn, inserted_person.id).unwrap(); + Person::delete(&pool, another_inserted_person.id).unwrap(); + let person_num_deleted = Person::delete(&pool, inserted_person.id).unwrap(); assert_eq!(1, person_num_deleted); // Delete the community - let community_num_deleted = Community::delete(&conn, inserted_community.id).unwrap(); + let community_num_deleted = Community::delete(&pool, inserted_community.id).unwrap(); assert_eq!(1, community_num_deleted); } } diff --git a/crates/db_queries/src/lib.rs b/crates/db_queries/src/lib.rs index adc0717ac..74db2f9ce 100644 --- a/crates/db_queries/src/lib.rs +++ b/crates/db_queries/src/lib.rs @@ -12,30 +12,34 @@ extern crate diesel_migrations; #[cfg(test)] extern crate serial_test; -use diesel::{result::Error, *}; +use diesel::{*, r2d2::{ConnectionManager, Pool}, result::Error}; use lemmy_db_schema::{CommunityId, DbUrl, PersonId}; -use lemmy_utils::ApiError; +use lemmy_utils::{ApiError, settings::structs::Settings}; use regex::Regex; use serde::{Deserialize, Serialize}; use std::{env, env::VarError}; use url::Url; +use core::pin::Pin; +use core::future::Future; +use tokio_diesel::*; pub mod aggregates; pub mod source; pub type DbPool = diesel::r2d2::Pool>; +pub type TokioDieselFuture<'a, T>= Pin> + Send + 'a>>; -pub trait Crud { - fn create(conn: &PgConnection, form: &Form) -> Result +pub trait Crud<'a, Form, IdType> { + fn create(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn read(conn: &PgConnection, id: IdType) -> Result + fn read(pool: &'a DbPool, id: IdType) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn update(conn: &PgConnection, id: IdType, form: &Form) -> Result + fn update(pool: &'a DbPool, id: IdType, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn delete(_conn: &PgConnection, _id: IdType) -> Result + fn delete(_pool: &'a DbPool, _id: IdType) -> TokioDieselFuture<'a, usize> where Self: Sized, { @@ -43,76 +47,76 @@ pub trait Crud { } } -pub trait Followable
{ - fn follow(conn: &PgConnection, form: &Form) -> Result +pub trait Followable<'a, Form> { + fn follow(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, Self> where Self: Sized; fn follow_accepted( - conn: &PgConnection, + pool: &'a DbPool, community_id: CommunityId, person_id: PersonId, - ) -> Result + ) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn unfollow(conn: &PgConnection, form: &Form) -> Result + fn unfollow(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, usize> where Self: Sized; - fn has_local_followers(conn: &PgConnection, community_id: CommunityId) -> Result; + fn has_local_followers(pool: &'a DbPool, community_id: CommunityId) -> Result; } -pub trait Joinable { - fn join(conn: &PgConnection, form: &Form) -> Result +pub trait Joinable<'a, Form> { + fn join(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn leave(conn: &PgConnection, form: &Form) -> Result + fn leave(pool: &'a DbPool, form: &Form) -> TokioDieselFuture<'a, usize> where Self: Sized; } -pub trait Likeable { - fn like(conn: &PgConnection, form: &Form) -> Result +pub trait Likeable<'a, Form, IdType> { + fn like(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn remove(conn: &PgConnection, person_id: PersonId, item_id: IdType) -> Result + fn remove(pool: &'a DbPool, person_id: PersonId, item_id: IdType) -> TokioDieselFuture<'a, usize> where Self: Sized; } -pub trait Bannable { - fn ban(conn: &PgConnection, form: &Form) -> Result +pub trait Bannable<'a, Form> { + fn ban(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn unban(conn: &PgConnection, form: &Form) -> Result + fn unban(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize> where Self: Sized; } -pub trait Saveable { - fn save(conn: &PgConnection, form: &Form) -> Result +pub trait Saveable<'a, Form> { + fn save(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn unsave(conn: &PgConnection, form: &Form) -> Result + fn unsave(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize> where Self: Sized; } -pub trait Readable { - fn mark_as_read(conn: &PgConnection, form: &Form) -> Result +pub trait Readable<'a, Form> { + fn mark_as_read(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn mark_as_unread(conn: &PgConnection, form: &Form) -> Result + fn mark_as_unread(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, usize> where Self: Sized; } -pub trait Reportable { - fn report(conn: &PgConnection, form: &Form) -> Result +pub trait Reportable<'a, Form> { + fn report(pool: &'a DbPool, form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn resolve(conn: &PgConnection, report_id: i32, resolver_id: PersonId) -> Result + fn resolve(pool: &'a DbPool, report_id: i32, resolver_id: PersonId) -> TokioDieselFuture<'a, usize> where Self: Sized; - fn unresolve(conn: &PgConnection, report_id: i32, resolver_id: PersonId) -> Result + fn unresolve(pool: &'a DbPool, report_id: i32, resolver_id: PersonId) -> TokioDieselFuture<'a, usize> where Self: Sized; } @@ -121,11 +125,11 @@ pub trait DeleteableOrRemoveable { fn blank_out_deleted_or_removed_info(self) -> Self; } -pub trait ApubObject { - fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result +pub trait ApubObject<'a, Form> { + fn read_from_apub_id(pool: &'a DbPool, object_id: &'a DbUrl) -> TokioDieselFuture<'a, Self> where Self: Sized; - fn upsert(conn: &PgConnection, user_form: &Form) -> Result + fn upsert(pool: &'a DbPool, user_form: &'a Form) -> TokioDieselFuture<'a, Self> where Self: Sized; } @@ -249,7 +253,17 @@ pub fn diesel_option_overwrite_to_url( embed_migrations!(); -pub fn establish_unpooled_connection() -> PgConnection { +/// Set up the r2d2 connection pool +pub fn setup_connection_pool() -> DbPool { + let db_url = match get_database_url_from_env() { + Ok(url) => url, + Err(_) => Settings::get().get_database_url(), + }; + build_connection_pool(&db_url, Settings::get().database().pool_size()) +} + +/// Set up the r2d2 connection pool for tests +pub fn setup_connection_pool_for_tests() -> DbPool { let db_url = match get_database_url_from_env() { Ok(url) => url, Err(e) => panic!( @@ -257,10 +271,21 @@ pub fn establish_unpooled_connection() -> PgConnection { e ), }; - let conn = - PgConnection::establish(&db_url).unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); + build_connection_pool(&db_url, 10) +} + +fn build_connection_pool(db_url: &str, pool_size: u32) -> DbPool { + let manager = ConnectionManager::::new(db_url); + let pool = Pool::builder() + .max_size(pool_size) + .build(manager) + .unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); + let conn = pool.get().expect("Missing connection in pool"); + + // Run the migrations embedded_migrations::run(&conn).expect("load migrations"); - conn + + pool } lazy_static! { diff --git a/crates/db_queries/src/source/post.rs b/crates/db_queries/src/source/post.rs index f0cf10935..168c779f3 100644 --- a/crates/db_queries/src/source/post.rs +++ b/crates/db_queries/src/source/post.rs @@ -1,4 +1,5 @@ -use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Readable, Saveable}; +use crate::{ApubObject, Crud, DbPool, TokioDieselFuture, DeleteableOrRemoveable, Likeable, Readable, Saveable}; +use tokio_diesel::*; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{ naive_now, @@ -18,86 +19,85 @@ use lemmy_db_schema::{ PostId, }; -impl Crud for Post { - fn read(conn: &PgConnection, post_id: PostId) -> Result { +impl<'a> Crud<'a, PostForm, PostId> for Post { + fn read(pool: &'a DbPool, post_id: PostId) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; - post.find(post_id).first::(conn) + post.find(post_id).first_async(pool) } - fn delete(conn: &PgConnection, post_id: PostId) -> Result { + fn delete(pool: &'a DbPool, post_id: PostId) -> TokioDieselFuture<'a, usize> { use lemmy_db_schema::schema::post::dsl::*; - diesel::delete(post.find(post_id)).execute(conn) + diesel::delete(post.find(post_id)).execute_async(pool) } - fn create(conn: &PgConnection, new_post: &PostForm) -> Result { + fn create(pool: &'a DbPool, new_post: &'a PostForm) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; - insert_into(post).values(new_post).get_result::(conn) + insert_into(post).values(new_post).get_result_async(pool) } - fn update(conn: &PgConnection, post_id: PostId, new_post: &PostForm) -> Result { + fn update(pool: &'a DbPool, post_id: PostId, new_post: &'a PostForm) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set(new_post) - .get_result::(conn) + .get_result_async(pool) } } -pub trait Post_ { - //fn read(conn: &PgConnection, post_id: i32) -> Result; +pub trait Post_<'a>{ fn list_for_community( - conn: &PgConnection, + pool: &'a DbPool, the_community_id: CommunityId, - ) -> Result, Error>; - fn update_ap_id(conn: &PgConnection, post_id: PostId, apub_id: DbUrl) -> Result; + ) -> TokioDieselFuture<'a, Vec>; + fn update_ap_id(pool: &'a DbPool, post_id: PostId, apub_id: DbUrl) -> TokioDieselFuture<'a, Post>; fn permadelete_for_creator( - conn: &PgConnection, + pool: &'a DbPool, for_creator_id: PersonId, - ) -> Result, Error>; - fn update_deleted(conn: &PgConnection, post_id: PostId, new_deleted: bool) - -> Result; - fn update_removed(conn: &PgConnection, post_id: PostId, new_removed: bool) - -> Result; + ) -> TokioDieselFuture<'a, Vec>; + fn update_deleted(pool: &'a DbPool, post_id: PostId, new_deleted: bool) + -> TokioDieselFuture<'a, Post>; + fn update_removed(pool: &'a DbPool, post_id: PostId, new_removed: bool) + -> TokioDieselFuture<'a, Post>; fn update_removed_for_creator( - conn: &PgConnection, + pool: &'a DbPool, for_creator_id: PersonId, for_community_id: Option, new_removed: bool, - ) -> Result, Error>; - fn update_locked(conn: &PgConnection, post_id: PostId, new_locked: bool) -> Result; + ) -> TokioDieselFuture<'a, Vec>; + fn update_locked(pool: &'a DbPool, post_id: PostId, new_locked: bool) -> TokioDieselFuture<'a, Post>; fn update_stickied( - conn: &PgConnection, + pool: &'a DbPool, post_id: PostId, new_stickied: bool, - ) -> Result; + ) -> TokioDieselFuture<'a, Post>; fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool; } -impl Post_ for Post { +impl<'a> Post_<'a> for Post { fn list_for_community( - conn: &PgConnection, + pool: &'a DbPool, the_community_id: CommunityId, - ) -> Result, Error> { + ) -> TokioDieselFuture<'a, Vec> { use lemmy_db_schema::schema::post::dsl::*; post .filter(community_id.eq(the_community_id)) .then_order_by(published.desc()) .then_order_by(stickied.desc()) .limit(20) - .load::(conn) + .load_async(pool) } - fn update_ap_id(conn: &PgConnection, post_id: PostId, apub_id: DbUrl) -> Result { + fn update_ap_id(pool: &'a DbPool, post_id: PostId, apub_id: DbUrl) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set(ap_id.eq(apub_id)) - .get_result::(conn) + .get_result_async(pool) } fn permadelete_for_creator( - conn: &PgConnection, + pool: &'a DbPool, for_creator_id: PersonId, - ) -> Result, Error> { + ) -> TokioDieselFuture<'a, Vec> { use lemmy_db_schema::schema::post::dsl::*; let perma_deleted = "*Permananently Deleted*"; @@ -111,37 +111,37 @@ impl Post_ for Post { deleted.eq(true), updated.eq(naive_now()), )) - .get_results::(conn) + .get_results_async(pool) } fn update_deleted( - conn: &PgConnection, + pool: &'a DbPool, post_id: PostId, new_deleted: bool, - ) -> Result { + ) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set((deleted.eq(new_deleted), updated.eq(naive_now()))) - .get_result::(conn) + .get_result_async(pool) } fn update_removed( - conn: &PgConnection, + pool: &'a DbPool, post_id: PostId, new_removed: bool, - ) -> Result { + ) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set((removed.eq(new_removed), updated.eq(naive_now()))) - .get_result::(conn) + .get_result_async(pool) } fn update_removed_for_creator( - conn: &PgConnection, + pool: &'a DbPool, for_creator_id: PersonId, for_community_id: Option, new_removed: bool, - ) -> Result, Error> { + ) -> TokioDieselFuture<'a, Vec> { use lemmy_db_schema::schema::post::dsl::*; let mut update = diesel::update(post).into_boxed(); @@ -153,25 +153,25 @@ impl Post_ for Post { update .set((removed.eq(new_removed), updated.eq(naive_now()))) - .get_results::(conn) + .get_results_async(pool) } - fn update_locked(conn: &PgConnection, post_id: PostId, new_locked: bool) -> Result { + fn update_locked(pool: &'a DbPool, post_id: PostId, new_locked: bool) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set(locked.eq(new_locked)) - .get_result::(conn) + .get_result_async(pool) } fn update_stickied( - conn: &PgConnection, + pool: &'a DbPool, post_id: PostId, new_stickied: bool, - ) -> Result { + ) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; diesel::update(post.find(post_id)) .set(stickied.eq(new_stickied)) - .get_result::(conn) + .get_result_async(pool) } fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool { @@ -179,84 +179,84 @@ impl Post_ for Post { } } -impl ApubObject for Post { - fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { +impl<'a> ApubObject<'a, PostForm> for Post { + fn read_from_apub_id(pool: &'a DbPool, object_id: &'a DbUrl) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post::dsl::*; - post.filter(ap_id.eq(object_id)).first::(conn) + post.filter(ap_id.eq(object_id)).first_async(pool) } - fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result { + fn upsert(pool: &'a DbPool, post_form: &'a PostForm) -> TokioDieselFuture<'a, Post> { use lemmy_db_schema::schema::post::dsl::*; insert_into(post) .values(post_form) .on_conflict(ap_id) .do_update() .set(post_form) - .get_result::(conn) + .get_result_async(pool) } } -impl Likeable for PostLike { - fn like(conn: &PgConnection, post_like_form: &PostLikeForm) -> Result { +impl<'a> Likeable<'a, PostLikeForm, PostId> for PostLike { + fn like(pool: &'a DbPool, post_like_form: &'a PostLikeForm) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post_like::dsl::*; insert_into(post_like) .values(post_like_form) .on_conflict((post_id, person_id)) .do_update() .set(post_like_form) - .get_result::(conn) + .get_result_async(pool) } - fn remove(conn: &PgConnection, person_id: PersonId, post_id: PostId) -> Result { + fn remove(pool: &'a DbPool, person_id: PersonId, post_id: PostId) -> TokioDieselFuture<'a, usize> { use lemmy_db_schema::schema::post_like::dsl; diesel::delete( dsl::post_like .filter(dsl::post_id.eq(post_id)) .filter(dsl::person_id.eq(person_id)), ) - .execute(conn) + .execute_async(pool) } } -impl Saveable for PostSaved { - fn save(conn: &PgConnection, post_saved_form: &PostSavedForm) -> Result { +impl<'a> Saveable<'a, PostSavedForm> for PostSaved { + fn save(pool: &'a DbPool, post_saved_form: &'a PostSavedForm) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post_saved::dsl::*; insert_into(post_saved) .values(post_saved_form) .on_conflict((post_id, person_id)) .do_update() .set(post_saved_form) - .get_result::(conn) + .get_result_async(pool) } - fn unsave(conn: &PgConnection, post_saved_form: &PostSavedForm) -> Result { + fn unsave(pool: &'a DbPool, post_saved_form: &PostSavedForm) -> TokioDieselFuture<'a, usize> { use lemmy_db_schema::schema::post_saved::dsl::*; diesel::delete( post_saved .filter(post_id.eq(post_saved_form.post_id)) .filter(person_id.eq(post_saved_form.person_id)), ) - .execute(conn) + .execute_async(pool) } } -impl Readable for PostRead { - fn mark_as_read(conn: &PgConnection, post_read_form: &PostReadForm) -> Result { +impl<'a> Readable<'a, PostReadForm> for PostRead { + fn mark_as_read(pool: &'a DbPool, post_read_form: &'a PostReadForm) -> TokioDieselFuture<'a, Self> { use lemmy_db_schema::schema::post_read::dsl::*; insert_into(post_read) .values(post_read_form) .on_conflict((post_id, person_id)) .do_update() .set(post_read_form) - .get_result::(conn) + .get_result_async(pool) } - fn mark_as_unread(conn: &PgConnection, post_read_form: &PostReadForm) -> Result { + fn mark_as_unread(pool: &'a DbPool, post_read_form: &'a PostReadForm) -> TokioDieselFuture<'a, usize> { use lemmy_db_schema::schema::post_read::dsl::*; diesel::delete( post_read .filter(post_id.eq(post_read_form.post_id)) .filter(person_id.eq(post_read_form.person_id)), ) - .execute(conn) + .execute_async(pool) } } @@ -276,24 +276,22 @@ impl DeleteableOrRemoveable for Post { #[cfg(test)] mod tests { - use crate::{establish_unpooled_connection, source::post::*}; + use crate::{setup_connection_pool_for_tests, source::post::*}; use lemmy_db_schema::source::{ community::{Community, CommunityForm}, person::*, }; - use serial_test::serial; - #[test] - #[serial] - fn test_crud() { - let conn = establish_unpooled_connection(); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_crud() { + let pool = setup_connection_pool_for_tests(); let new_person = PersonForm { name: "jim".into(), ..PersonForm::default() }; - let inserted_person = Person::create(&conn, &new_person).unwrap(); + let inserted_person = Person::create(&pool, &new_person).await.unwrap(); let new_community = CommunityForm { name: "test community_3".to_string(), @@ -301,7 +299,7 @@ mod tests { ..CommunityForm::default() }; - let inserted_community = Community::create(&conn, &new_community).unwrap(); + let inserted_community = Community::create(&pool, &new_community).await.unwrap(); let new_post = PostForm { name: "A test post".into(), @@ -310,7 +308,7 @@ mod tests { ..PostForm::default() }; - let inserted_post = Post::create(&conn, &new_post).unwrap(); + let inserted_post = Post::create(&pool, &new_post).await.unwrap(); let expected_post = Post { id: inserted_post.id, @@ -341,7 +339,7 @@ mod tests { score: 1, }; - let inserted_post_like = PostLike::like(&conn, &post_like_form).unwrap(); + let inserted_post_like = PostLike::like(&pool, &post_like_form).await.unwrap(); let expected_post_like = PostLike { id: inserted_post_like.id, @@ -357,7 +355,7 @@ mod tests { person_id: inserted_person.id, }; - let inserted_post_saved = PostSaved::save(&conn, &post_saved_form).unwrap(); + let inserted_post_saved = PostSaved::save(&pool, &post_saved_form).await.unwrap(); let expected_post_saved = PostSaved { id: inserted_post_saved.id, @@ -372,7 +370,7 @@ mod tests { person_id: inserted_person.id, }; - let inserted_post_read = PostRead::mark_as_read(&conn, &post_read_form).unwrap(); + let inserted_post_read = PostRead::mark_as_read(&pool, &post_read_form).await.unwrap(); let expected_post_read = PostRead { id: inserted_post_read.id, @@ -381,14 +379,14 @@ mod tests { published: inserted_post_read.published, }; - let read_post = Post::read(&conn, inserted_post.id).unwrap(); - let updated_post = Post::update(&conn, inserted_post.id, &new_post).unwrap(); - let like_removed = PostLike::remove(&conn, inserted_person.id, inserted_post.id).unwrap(); - let saved_removed = PostSaved::unsave(&conn, &post_saved_form).unwrap(); - let read_removed = PostRead::mark_as_unread(&conn, &post_read_form).unwrap(); - let num_deleted = Post::delete(&conn, inserted_post.id).unwrap(); - Community::delete(&conn, inserted_community.id).unwrap(); - Person::delete(&conn, inserted_person.id).unwrap(); + let read_post = Post::read(&pool, inserted_post.id).await.unwrap(); + let updated_post = Post::update(&pool, inserted_post.id, &new_post).await.unwrap(); + let like_removed = PostLike::remove(&pool, inserted_person.id, inserted_post.id).await.unwrap(); + let saved_removed = PostSaved::unsave(&pool, &post_saved_form).await.unwrap(); + let read_removed = PostRead::mark_as_unread(&pool, &post_read_form).await.unwrap(); + let num_deleted = Post::delete(&pool, inserted_post.id).await.unwrap(); + Community::delete(&pool, inserted_community.id).await.unwrap(); + Person::delete(&pool, inserted_person.id).await.unwrap(); assert_eq!(expected_post, read_post); assert_eq!(expected_post, inserted_post); diff --git a/src/main.rs b/src/main.rs index 8076fb365..24c09d64e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,15 +3,11 @@ extern crate diesel_migrations; use actix::prelude::*; use actix_web::{web::Data, *}; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; use lemmy_api::match_websocket_operation; use lemmy_api_common::blocking; use lemmy_api_crud::match_websocket_operation_crud; use lemmy_apub::activity_queue::create_activity_queue; -use lemmy_db_queries::get_database_url_from_env; +use lemmy_db_queries::setup_connection_pool; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; use lemmy_server::{api_routes, code_migrations::run_advanced_migrations, scheduled_tasks}; use lemmy_utils::{ @@ -32,18 +28,11 @@ async fn main() -> Result<(), LemmyError> { let settings = Settings::get(); // Set up the r2d2 connection pool - let db_url = match get_database_url_from_env() { - Ok(url) => url, - Err(_) => settings.get_database_url(), - }; - let manager = ConnectionManager::::new(&db_url); - let pool = Pool::builder() - .max_size(settings.database().pool_size()) - .build(manager) - .unwrap_or_else(|_| panic!("Error connecting to {}", db_url)); + let pool = setup_connection_pool(); // Run the migrations from code blocking(&pool, move |conn| { + // TODO this is already done from the pool embedded_migrations::run(conn)?; run_advanced_migrations(conn)?; Ok(()) as Result<(), LemmyError>