pub mod uplete; use crate::{newtypes::DbUrl, CommentSortType, PostSortType}; use chrono::TimeDelta; use deadpool::Runtime; use diesel::{ dsl, expression::AsExpression, helper_types::AsExprOf, pg::Pg, query_builder::{Query, QueryFragment}, query_dsl::methods::{FilterDsl, FindDsl, LimitDsl}, query_source::{Alias, AliasSource, AliasedField}, result::{ ConnectionError, ConnectionResult, Error::{self as DieselError, QueryBuilderError}, }, sql_types::{self, SingleValue, Timestamptz}, Column, Expression, ExpressionMethods, IntoSql, JoinOnDsl, NullableExpressionMethods, QuerySource, Table, }; use diesel_async::{ pg::AsyncPgConnection, pooled_connection::{ deadpool::{Hook, HookError, Object as PooledConnection, Pool}, AsyncDieselConnectionManager, ManagerConfig, }, AsyncConnection, }; use diesel_bind_if_some::BindIfSome; use futures_util::{future::BoxFuture, Future, FutureExt}; use i_love_jesus::CursorKey; use lemmy_utils::{ error::{LemmyErrorExt, LemmyErrorType, LemmyResult}, settings::SETTINGS, utils::validation::clean_url, }; use regex::Regex; use rustls::{ client::danger::{ DangerousClientConfigBuilder, HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier, }, crypto::{self, verify_tls12_signature, verify_tls13_signature}, pki_types::{CertificateDer, ServerName, UnixTime}, ClientConfig, DigitallySignedStruct, SignatureScheme, }; use std::{ ops::{Deref, DerefMut}, sync::{Arc, LazyLock, OnceLock}, time::Duration, }; use tracing::error; use url::Url; const FETCH_LIMIT_DEFAULT: i64 = 10; pub const FETCH_LIMIT_MAX: i64 = 50; pub const SITEMAP_LIMIT: i64 = 50000; pub const SITEMAP_DAYS: TimeDelta = TimeDelta::days(31); pub const RANK_DEFAULT: f64 = 0.0001; /// Some connection options to speed up queries const CONNECTION_OPTIONS: [&str; 1] = ["geqo_threshold=12"]; pub type ActualDbPool = Pool; /// References a pool or connection. Functions must take `&mut DbPool<'_>` to allow implicit /// reborrowing. /// /// https://github.com/rust-lang/rfcs/issues/1403 pub enum DbPool<'a> { Pool(&'a ActualDbPool), Conn(&'a mut AsyncPgConnection), } pub enum DbConn<'a> { Pool(PooledConnection), Conn(&'a mut AsyncPgConnection), } pub async fn get_conn<'a, 'b: 'a>(pool: &'a mut DbPool<'b>) -> Result, DieselError> { Ok(match pool { DbPool::Pool(pool) => DbConn::Pool(pool.get().await.map_err(|e| QueryBuilderError(e.into()))?), DbPool::Conn(conn) => DbConn::Conn(conn), }) } impl Deref for DbConn<'_> { type Target = AsyncPgConnection; fn deref(&self) -> &Self::Target { match self { DbConn::Pool(conn) => conn.deref(), DbConn::Conn(conn) => conn.deref(), } } } impl DerefMut for DbConn<'_> { fn deref_mut(&mut self) -> &mut Self::Target { match self { DbConn::Pool(conn) => conn.deref_mut(), DbConn::Conn(conn) => conn.deref_mut(), } } } // Allows functions that take `DbPool<'_>` to be called in a transaction by passing `&mut // conn.into()` impl<'a> From<&'a mut AsyncPgConnection> for DbPool<'a> { fn from(value: &'a mut AsyncPgConnection) -> Self { DbPool::Conn(value) } } impl<'a, 'b: 'a> From<&'a mut DbConn<'b>> for DbPool<'a> { fn from(value: &'a mut DbConn<'b>) -> Self { DbPool::Conn(value.deref_mut()) } } impl<'a> From<&'a ActualDbPool> for DbPool<'a> { fn from(value: &'a ActualDbPool) -> Self { DbPool::Pool(value) } } /// Runs multiple async functions that take `&mut DbPool<'_>` as input and return `Result`. Only /// works when the `futures` crate is listed in `Cargo.toml`. /// /// `$pool` is the value given to each function. /// /// A `Result` is returned (not in a `Future`, so don't use `.await`). The `Ok` variant contains a /// tuple with the values returned by the given functions. /// /// The functions run concurrently if `$pool` has the `DbPool::Pool` variant. #[macro_export] macro_rules! try_join_with_pool { ($pool:ident => ($($func:expr),+)) => {{ // Check type let _: &mut $crate::utils::DbPool<'_> = $pool; match $pool { // Run concurrently with `try_join` $crate::utils::DbPool::Pool(__pool) => ::futures::try_join!( $(async { let mut __dbpool = $crate::utils::DbPool::Pool(__pool); ($func)(&mut __dbpool).await }),+ ), // Run sequentially $crate::utils::DbPool::Conn(__conn) => async { Ok(($({ let mut __dbpool = $crate::utils::DbPool::Conn(__conn); // `?` prevents the error type from being inferred in an `async` block, so `match` is used instead match ($func)(&mut __dbpool).await { ::core::result::Result::Ok(__v) => __v, ::core::result::Result::Err(__v) => return ::core::result::Result::Err(__v), } }),+)) }.await, } }}; } pub struct ReverseTimestampKey(pub K); impl CursorKey for ReverseTimestampKey where K: CursorKey, { type SqlType = sql_types::BigInt; type CursorValue = functions::reverse_timestamp_sort; type SqlValue = functions::reverse_timestamp_sort; fn get_cursor_value(cursor: &C) -> Self::CursorValue { functions::reverse_timestamp_sort(K::get_cursor_value(cursor)) } fn get_sql_value() -> Self::SqlValue { functions::reverse_timestamp_sort(K::get_sql_value()) } } /// Includes an SQL comment before `T`, which can be used to label auto_explain output #[derive(QueryId)] pub struct Commented { comment: String, inner: T, } impl Commented { pub fn new(inner: T) -> Self { Commented { comment: String::new(), inner, } } /// Adds `text` to the comment if `condition` is true pub fn text_if(mut self, text: &str, condition: bool) -> Self { if condition { if !self.comment.is_empty() { self.comment.push_str(", "); } self.comment.push_str(text); } self } /// Adds `text` to the comment pub fn text(self, text: &str) -> Self { self.text_if(text, true) } } impl Query for Commented { type SqlType = T::SqlType; } impl> QueryFragment for Commented { fn walk_ast<'b>( &'b self, mut out: diesel::query_builder::AstPass<'_, 'b, Pg>, ) -> Result<(), DieselError> { for line in self.comment.lines() { out.push_sql("\n-- "); out.push_sql(line); } out.push_sql("\n"); self.inner.walk_ast(out.reborrow()) } } impl LimitDsl for Commented { type Output = Commented; fn limit(self, limit: i64) -> Self::Output { Commented { comment: self.comment, inner: self.inner.limit(limit), } } } pub fn fuzzy_search(q: &str) -> String { let replaced = q .replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_") .replace(' ', "%"); format!("%{replaced}%") } pub fn limit_and_offset( page: Option, limit: Option, ) -> Result<(i64, i64), diesel::result::Error> { let page = match page { Some(page) => { if page < 1 { return Err(QueryBuilderError("Page is < 1".into())); } page } None => 1, }; let limit = match limit { Some(limit) => { if !(1..=FETCH_LIMIT_MAX).contains(&limit) { return Err(QueryBuilderError( format!("Fetch limit is > {FETCH_LIMIT_MAX}").into(), )); } limit } None => FETCH_LIMIT_DEFAULT, }; let offset = limit * (page - 1); Ok((limit, offset)) } pub fn limit_and_offset_unlimited(page: Option, limit: Option) -> (i64, i64) { let limit = limit.unwrap_or(FETCH_LIMIT_DEFAULT); let offset = limit * (page.unwrap_or(1) - 1); (limit, offset) } pub fn is_email_regex(test: &str) -> bool { EMAIL_REGEX.is_match(test) } /// Takes an API optional text input, and converts it to an optional diesel DB update. pub fn diesel_string_update(opt: Option<&str>) -> Option> { match opt { // An empty string is an erase Some("") => Some(None), Some(str) => Some(Some(str.into())), None => None, } } /// Takes an API optional text input, and converts it to an optional diesel DB update (for non /// nullable properties). pub fn diesel_required_string_update(opt: Option<&str>) -> Option { match opt { // An empty string is no change Some("") => None, Some(str) => Some(str.into()), None => None, } } /// Takes an optional API URL-type input, and converts it to an optional diesel DB update. /// Also cleans the url params. pub fn diesel_url_update(opt: Option<&str>) -> LemmyResult>> { match opt { // An empty string is an erase Some("") => Ok(Some(None)), Some(str_url) => Url::parse(str_url) .map(|u| Some(Some(clean_url(&u).into()))) .with_lemmy_type(LemmyErrorType::InvalidUrl), None => Ok(None), } } /// Takes an optional API URL-type input, and converts it to an optional diesel DB update (for non /// nullable properties). Also cleans the url params. pub fn diesel_required_url_update(opt: Option<&str>) -> LemmyResult> { match opt { // An empty string is no change Some("") => Ok(None), Some(str_url) => Url::parse(str_url) .map(|u| Some(clean_url(&u).into())) .with_lemmy_type(LemmyErrorType::InvalidUrl), None => Ok(None), } } /// Takes an optional API URL-type input, and converts it to an optional diesel DB create. /// Also cleans the url params. pub fn diesel_url_create(opt: Option<&str>) -> LemmyResult> { match opt { Some(str_url) => Url::parse(str_url) .map(|u| Some(clean_url(&u).into())) .with_lemmy_type(LemmyErrorType::InvalidUrl), None => Ok(None), } } /// Sets a few additional config options necessary for starting lemmy fn build_config_options_uri_segment(config: &str) -> LemmyResult { let mut url = Url::parse(config)?; // Set `lemmy.protocol_and_hostname` so triggers can use it let lemmy_protocol_and_hostname_option = "lemmy.protocol_and_hostname=".to_owned() + &SETTINGS.get_protocol_and_hostname(); let mut options = CONNECTION_OPTIONS.to_vec(); options.push(&lemmy_protocol_and_hostname_option); // Create the connection uri portion let options_segments = options .iter() .map(|o| "-c ".to_owned() + o) .collect::>() .join(" "); url.set_query(Some(&format!("options={options_segments}"))); Ok(url.into()) } fn establish_connection(config: &str) -> BoxFuture> { let fut = async { /// Use a once_lock to create the postgres connection config, since this config never changes static POSTGRES_CONFIG_WITH_OPTIONS: OnceLock = OnceLock::new(); let config = POSTGRES_CONFIG_WITH_OPTIONS.get_or_init(|| { build_config_options_uri_segment(config) .inspect_err(|e| error!("Couldn't parse postgres connection URI: {e}")) .unwrap_or_default() }); // We only support TLS with sslmode=require currently let conn = if config.contains("sslmode=require") { let rustls_config = DangerousClientConfigBuilder { cfg: ClientConfig::builder(), } .with_custom_certificate_verifier(Arc::new(NoCertVerifier {})) .with_no_client_auth(); let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config); let (client, conn) = tokio_postgres::connect(config, tls) .await .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; tokio::spawn(async move { if let Err(e) = conn.await { error!("Database connection failed: {e}"); } }); AsyncPgConnection::try_from(client).await? } else { AsyncPgConnection::establish(config).await? }; Ok(conn) }; fut.boxed() } #[derive(Debug)] struct NoCertVerifier {} impl ServerCertVerifier for NoCertVerifier { fn verify_server_cert( &self, _end_entity: &CertificateDer, _intermediates: &[CertificateDer], _server_name: &ServerName, _ocsp: &[u8], _now: UnixTime, ) -> Result { // Will verify all (even invalid) certs without any checks (sslmode=require) Ok(ServerCertVerified::assertion()) } fn verify_tls12_signature( &self, message: &[u8], cert: &CertificateDer, dss: &DigitallySignedStruct, ) -> Result { verify_tls12_signature( message, cert, dss, &crypto::ring::default_provider().signature_verification_algorithms, ) } fn verify_tls13_signature( &self, message: &[u8], cert: &CertificateDer, dss: &DigitallySignedStruct, ) -> Result { verify_tls13_signature( message, cert, dss, &crypto::ring::default_provider().signature_verification_algorithms, ) } fn supported_verify_schemes(&self) -> Vec { crypto::ring::default_provider() .signature_verification_algorithms .supported_schemes() } } pub fn build_db_pool() -> LemmyResult { let db_url = SETTINGS.get_database_url(); // diesel-async does not support any TLS connections out of the box, so we need to manually // provide a setup function which handles creating the connection let mut config = ManagerConfig::default(); config.custom_setup = Box::new(establish_connection); let manager = AsyncDieselConnectionManager::::new_with_config(&db_url, config); let pool = Pool::builder(manager) .max_size(SETTINGS.database.pool_size) .runtime(Runtime::Tokio1) // Limit connection age to prevent use of prepared statements that have query plans based on // very old statistics .pre_recycle(Hook::sync_fn(|_conn, metrics| { // Preventing the first recycle can cause an infinite loop when trying to get a new connection // from the pool let conn_was_used = metrics.recycled.is_some(); if metrics.age() > Duration::from_secs(3 * 24 * 60 * 60) && conn_was_used { Err(HookError::Message("Connection is too old".into())) } else { Ok(()) } })) .build()?; crate::schema_setup::run(&db_url)?; Ok(pool) } #[allow(clippy::expect_used)] pub fn build_db_pool_for_tests() -> ActualDbPool { build_db_pool().expect("db pool missing") } pub fn post_to_comment_sort_type(sort: PostSortType) -> CommentSortType { use PostSortType::*; match sort { Active | Hot | Scaled => CommentSortType::Hot, New | NewComments | MostComments => CommentSortType::New, Old => CommentSortType::Old, Controversial => CommentSortType::Controversial, TopHour | TopSixHour | TopTwelveHour | TopDay | TopAll | TopWeek | TopYear | TopMonth | TopThreeMonths | TopSixMonths | TopNineMonths => CommentSortType::Top, } } #[allow(clippy::expect_used)] static EMAIL_REGEX: LazyLock = LazyLock::new(|| { Regex::new(r"^[a-zA-Z0-9.!#$%&’*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*$") .expect("compile email regex") }); pub mod functions { use diesel::sql_types::{BigInt, Text, Timestamptz}; define_sql_function! { #[sql_name = "r.hot_rank"] fn hot_rank(score: BigInt, time: Timestamptz) -> Double; } define_sql_function! { #[sql_name = "r.scaled_rank"] fn scaled_rank(score: BigInt, time: Timestamptz, users_active_month: BigInt) -> Double; } define_sql_function! { #[sql_name = "r.controversy_rank"] fn controversy_rank(upvotes: BigInt, downvotes: BigInt, score: BigInt) -> Double; } define_sql_function!(fn reverse_timestamp_sort(time: Timestamptz) -> BigInt); define_sql_function!(fn lower(x: Text) -> Text); define_sql_function!(fn random() -> Text); // really this function is variadic, this just adds the two-argument version define_sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; pub fn now() -> AsExprOf { // https://github.com/diesel-rs/diesel/issues/1514 diesel::dsl::now.into_sql::() } /// Trait alias for a type that can be converted to an SQL tuple using `IntoSql::into_sql` pub trait AsRecord: Expression + AsExpression> where Self::SqlType: 'static, { } impl>> AsRecord for T where T::SqlType: 'static { } /// Output of `IntoSql::into_sql` for a type that implements `AsRecord` pub type AsRecordOutput = dsl::AsExprOf::SqlType>>; /// Output of `t.on((l0, l1).into_sql().eq((r0, r1)))` type OnTupleEq = dsl::On, (R0, R1)>>; /// Creates an `ON` clause for a table where a person ID and another column are used as the /// primary key. Use with the `QueryDsl::left_join` method. /// /// This example modifies a query to make columns in `community_actions` available: /// /// ``` /// community::table /// .left_join(actions( /// community_actions::table, /// my_person_id, /// community::id, /// )) /// ``` pub fn actions( actions_table: T, person_id: Option

, target_id: C, ) -> OnTupleEq, K1, BindIfSome>, C> where T: Table + Copy, K0: Expression, P: AsExpression, (dsl::Nullable, K1): AsRecord, (BindIfSome>, C): AsExpression<, K1)> as Expression>::SqlType>, { let (k0, k1) = actions_table.primary_key(); actions_table.on((k0.nullable(), k1).into_sql().eq(( BindIfSome(person_id.map(diesel::IntoSql::into_sql)), target_id, ))) } /// Like `actions` but `actions_table` is an alias and person id is not nullable #[allow(clippy::type_complexity)] pub fn actions_alias( actions_table: Alias, person_id: P, target_id: C, ) -> OnTupleEq, AliasedField, AliasedField, P, C> where Alias: QuerySource + Copy, T: AliasSource> + Default, K0: Column, K1: Column
, (AliasedField, AliasedField): AsRecord, (P, C): AsExpression< , AliasedField)> as Expression>::SqlType, >, { let (k0, k1) = T::default().target().primary_key(); actions_table.on( (actions_table.field(k0), actions_table.field(k1)) .into_sql() .eq((person_id, target_id)), ) } /// `action_query(table_name::action_name)` is the same as /// `table_name::table.filter(table_name::action_name.is_not_null())`. pub fn action_query(column: C) -> dsl::Filter> where C: Column>, SqlType: SingleValue>, { action_query_with_fn(column, |t| t) } /// `find_action(table_name::action_name, key)` is the same as /// `table_name::table.find(key).filter(table_name::action_name.is_not_null())`. pub fn find_action( column: C, key: K, ) -> dsl::Filter, dsl::IsNotNull> where C: Column>>, SqlType: SingleValue>, { action_query_with_fn(column, |t| t.find(key)) } /// `action_query_with_fn(table_name::action_name, f)` is the same as /// `f(table_name::table).filter(table_name::action_name.is_not_null())`. fn action_query_with_fn( column: C, f: impl FnOnce(C::Table) -> Q, ) -> dsl::Filter> where C: Column, Q: FilterDsl>, { f(C::Table::default()).filter(column.is_not_null()) } pub type ResultFuture<'a, T> = BoxFuture<'a, Result>; pub trait ReadFn<'a, T, Args>: Fn(DbConn<'a>, Args) -> ResultFuture<'a, T> {} impl<'a, T, Args, F: Fn(DbConn<'a>, Args) -> ResultFuture<'a, T>> ReadFn<'a, T, Args> for F {} pub trait ListFn<'a, T, Args>: Fn(DbConn<'a>, Args) -> ResultFuture<'a, Vec> {} impl<'a, T, Args, F: Fn(DbConn<'a>, Args) -> ResultFuture<'a, Vec>> ListFn<'a, T, Args> for F {} /// Allows read and list functions to capture a shared closure that has an inferred return type, /// which is useful for join logic pub struct Queries { pub read_fn: RF, pub list_fn: LF, } // `()` is used to prevent type inference error impl Queries<(), ()> { pub fn new<'a, RFut, LFut, RT, LT, RA, LA, RF2, LF2>( read_fn: RF2, list_fn: LF2, ) -> Queries, impl ListFn<'a, LT, LA>> where RFut: Future> + Sized + Send + 'a, LFut: Future, DieselError>> + Sized + Send + 'a, RF2: Fn(DbConn<'a>, RA) -> RFut, LF2: Fn(DbConn<'a>, LA) -> LFut, { Queries { read_fn: move |conn, args| read_fn(conn, args).boxed(), list_fn: move |conn, args| list_fn(conn, args).boxed(), } } } impl Queries { pub async fn read<'a, T, Args>( self, pool: &'a mut DbPool<'_>, args: Args, ) -> Result where RF: ReadFn<'a, T, Args>, { let conn = get_conn(pool).await?; (self.read_fn)(conn, args).await } pub async fn list<'a, T, Args>( self, pool: &'a mut DbPool<'_>, args: Args, ) -> Result, DieselError> where LF: ListFn<'a, T, Args>, { let conn = get_conn(pool).await?; (self.list_fn)(conn, args).await } } #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; #[test] fn test_fuzzy_search() { let test = "This %is% _a_ fuzzy search"; assert_eq!( fuzzy_search(test), "%This%\\%is\\%%\\_a\\_%fuzzy%search%".to_string() ); } #[test] fn test_email() { assert!(is_email_regex("gush@gmail.com")); assert!(!is_email_regex("nada_neutho")); } #[test] fn test_diesel_option_overwrite() { assert_eq!(diesel_string_update(None), None); assert_eq!(diesel_string_update(Some("")), Some(None)); assert_eq!( diesel_string_update(Some("test")), Some(Some("test".to_string())) ); } #[test] fn test_diesel_option_overwrite_to_url() -> LemmyResult<()> { assert!(matches!(diesel_url_update(None), Ok(None))); assert!(matches!(diesel_url_update(Some("")), Ok(Some(None)))); assert!(diesel_url_update(Some("invalid_url")).is_err()); let example_url = "https://example.com"; assert!(matches!( diesel_url_update(Some(example_url)), Ok(Some(Some(url))) if url == Url::parse(example_url)?.into() )); Ok(()) } }