Use db pool

This commit is contained in:
Felix Ableitner 2024-02-27 17:49:36 +01:00
parent dd56a3f0d5
commit d22654261f
31 changed files with 211 additions and 182 deletions

21
Cargo.lock generated
View File

@ -906,6 +906,7 @@ dependencies = [
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"uuid",
]
@ -2750,6 +2751,17 @@ dependencies = [
"syn 2.0.51",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -3033,6 +3045,15 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.2.0"

View File

@ -44,6 +44,7 @@ diesel = { version = "2.1.4", features = [
"postgres",
"chrono",
"uuid",
"r2d2"
], optional = true }
diesel-derive-newtype = { version = "2.1.0", optional = true }
diesel_migrations = { version = "2.1.0", optional = true }

View File

@ -1,12 +1,17 @@
# Address where ibis should listen for incoming requests
bind = "127.0.0.1:8081"
# Database connection url
database_url = "postgres://ibis:password@localhost:5432/ibis"
# Whether users can create new accounts
registration_open = true
# Details about the PostgreSQL database connection
[database]
# Database connection url
connection_url = "postgres://ibis:password@localhost:5432/ibis"
# Database connection pool size
pool_size = 5
# Details of the initial admin account
[setup]
admin_username = "ibis"
@ -22,4 +27,4 @@ domain = "example.com"
allowlist = "good.com,friends.org"
# Comma separated list of instances which are blocked for federation; optional
blocklist = "evil.com,bad.org"
blocklist = "evil.com,bad.org"

View File

@ -36,7 +36,7 @@ pub(in crate::backend::api) async fn create_article(
return Err(anyhow!("Title must not be empty").into());
}
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(&data)?;
let ap_id = ObjectId::parse(&format!(
"{}://{}:{}/article/{}",
http_protocol_str(),
@ -51,18 +51,18 @@ pub(in crate::backend::api) async fn create_article(
instance_id: local_instance.id,
local: true,
};
let article = DbArticle::create(form, &data.db_connection)?;
let article = DbArticle::create(form, &data)?;
let edit_data = EditArticleData {
article_id: article.id,
new_text: create_article.text,
summary: create_article.summary,
previous_version_id: article.latest_edit_version(&data.db_connection)?,
previous_version_id: article.latest_edit_version(&data)?,
resolve_conflict_id: None,
};
let _ = edit_article(Extension(user), data.reset_request_count(), Form(edit_data)).await?;
let article_view = DbArticle::read_view(article.id, &data.db_connection)?;
let article_view = DbArticle::read_view(article.id, &data)?;
CreateArticle::send_to_followers(article_view.article.clone(), &data).await?;
Ok(Json(article_view))
@ -85,9 +85,9 @@ pub(in crate::backend::api) async fn edit_article(
) -> MyResult<Json<Option<ApiConflict>>> {
// resolve conflict if any
if let Some(resolve_conflict_id) = edit_form.resolve_conflict_id {
DbConflict::delete(resolve_conflict_id, &data.db_connection)?;
DbConflict::delete(resolve_conflict_id, &data)?;
}
let original_article = DbArticle::read_view(edit_form.article_id, &data.db_connection)?;
let original_article = DbArticle::read_view(edit_form.article_id, &data)?;
if edit_form.new_text == original_article.article.text {
return Err(anyhow!("Edit contains no changes").into());
}
@ -119,7 +119,7 @@ pub(in crate::backend::api) async fn edit_article(
generate_article_version(&original_article.edits, &edit_form.previous_version_id)?;
let patch = create_patch(&ancestor, &edit_form.new_text);
let previous_version = DbEdit::read(&edit_form.previous_version_id, &data.db_connection)?;
let previous_version = DbEdit::read(&edit_form.previous_version_id, &data)?;
let form = DbConflictForm {
hash: EditVersion::new(&patch.to_string()),
diff: patch.to_string(),
@ -128,7 +128,7 @@ pub(in crate::backend::api) async fn edit_article(
article_id: original_article.article.id,
previous_version_id: previous_version.hash,
};
let conflict = DbConflict::create(&form, &data.db_connection)?;
let conflict = DbConflict::create(&form, &data)?;
Ok(Json(conflict.to_api_conflict(&data).await?))
}
}
@ -143,13 +143,13 @@ pub(in crate::backend::api) async fn get_article(
(Some(title), None) => Ok(Json(DbArticle::read_view_title(
&title,
query.domain,
&data.db_connection,
&data,
)?)),
(None, Some(id)) => {
if query.domain.is_some() {
return Err(anyhow!("Cant combine id and instance_domain").into());
}
Ok(Json(DbArticle::read_view(id, &data.db_connection)?))
Ok(Json(DbArticle::read_view(id, &data)?))
}
_ => Err(anyhow!("Must pass exactly one of title, id").into()),
}
@ -161,7 +161,7 @@ pub(in crate::backend::api) async fn list_articles(
data: Data<IbisData>,
) -> MyResult<Json<Vec<DbArticle>>> {
let only_local = query.only_local.unwrap_or(false);
Ok(Json(DbArticle::read_all(only_local, &data.db_connection)?))
Ok(Json(DbArticle::read_all(only_local, &data)?))
}
/// Fork a remote article to local instance. This is useful if there are disagreements about
@ -173,9 +173,9 @@ pub(in crate::backend::api) async fn fork_article(
Form(fork_form): Form<ForkArticleData>,
) -> MyResult<Json<ArticleView>> {
// TODO: lots of code duplicated from create_article(), can move it into helper
let original_article = DbArticle::read(fork_form.article_id, &data.db_connection)?;
let original_article = DbArticle::read(fork_form.article_id, &data)?;
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(&data)?;
let ap_id = ObjectId::parse(&format!(
"{}://{}:{}/article/{}",
http_protocol_str(),
@ -190,11 +190,11 @@ pub(in crate::backend::api) async fn fork_article(
instance_id: local_instance.id,
local: true,
};
let article = DbArticle::create(form, &data.db_connection)?;
let article = DbArticle::create(form, &data)?;
// copy edits to new article
// this could also be done in sql
let edits = DbEdit::read_for_article(&original_article, &data.db_connection)?
let edits = DbEdit::read_for_article(&original_article, &data)?
.into_iter()
.map(|e| e.edit)
.collect::<Vec<_>>();
@ -210,12 +210,12 @@ pub(in crate::backend::api) async fn fork_article(
previous_version_id: e.previous_version_id,
created: Utc::now(),
};
DbEdit::create(&form, &data.db_connection)?;
DbEdit::create(&form, &data)?;
}
CreateArticle::send_to_followers(article.clone(), &data).await?;
Ok(Json(DbArticle::read_view(article.id, &data.db_connection)?))
Ok(Json(DbArticle::read_view(article.id, &data)?))
}
/// Fetch a remote article, including edits collection. Allows viewing and editing. Note that new
@ -226,7 +226,7 @@ pub(super) async fn resolve_article(
data: Data<IbisData>,
) -> MyResult<Json<ArticleView>> {
let article: DbArticle = ObjectId::from(query.id).dereference(&data).await?;
let edits = DbEdit::read_for_article(&article, &data.db_connection)?;
let edits = DbEdit::read_for_article(&article, &data)?;
let latest_version = edits.last().unwrap().edit.hash.clone();
Ok(Json(ArticleView {
article,
@ -244,6 +244,6 @@ pub(super) async fn search_article(
if query.query.is_empty() {
return Err(anyhow!("Query is empty").into());
}
let article = DbArticle::search(&query.query, &data.db_connection)?;
let article = DbArticle::search(&query.query, &data)?;
Ok(Json(article))
}

View File

@ -27,10 +27,10 @@ pub(in crate::backend::api) async fn follow_instance(
data: Data<IbisData>,
Form(query): Form<FollowInstance>,
) -> MyResult<()> {
let target = DbInstance::read(query.id, &data.db_connection)?;
let target = DbInstance::read(query.id, &data)?;
let pending = !target.local;
DbInstance::follow(&user.person, &target, pending, &data)?;
let instance = DbInstance::read(query.id, &data.db_connection)?;
let instance = DbInstance::read(query.id, &data)?;
Follow::send(user.person, &instance, &data).await?;
Ok(())
}

View File

@ -74,7 +74,7 @@ async fn edit_conflicts(
Extension(user): Extension<LocalUserView>,
data: Data<IbisData>,
) -> MyResult<Json<Vec<ApiConflict>>> {
let conflicts = DbConflict::list(&user.local_user, &data.db_connection)?;
let conflicts = DbConflict::list(&user.local_user, &data)?;
let conflicts: Vec<ApiConflict> = try_join_all(conflicts.into_iter().map(|c| {
let data = data.reset_request_count();
async move { c.to_api_conflict(&data).await }

View File

@ -3,18 +3,17 @@ use doku::Document;
use serde::Deserialize;
use smart_default::SmartDefault;
use std::net::SocketAddr;
use crate::backend::error::MyResult;
#[derive(Debug, Deserialize, PartialEq, Eq, Clone, Document, SmartDefault)]
#[serde(default)]
pub struct IbisConfig {
/// Address where ibis should listen for incoming requests
#[default("127.0.0.1:8081".parse().unwrap())]
#[default("127.0.0.1:8081".parse().expect("parse config bind"))]
#[doku(as = "String", example = "127.0.0.1:8081")]
pub bind: SocketAddr,
/// Database connection url
#[default("postgres://ibis:password@localhost:5432/ibis")]
#[doku(example = "postgres://ibis:password@localhost:5432/ibis")]
pub database_url: String,
/// Details about the PostgreSQL database connection
pub database: IbisConfigDatabase,
/// Whether users can create new accounts
#[default = true]
#[doku(example = "true")]
@ -25,18 +24,30 @@ pub struct IbisConfig {
}
impl IbisConfig {
pub fn read() -> Self {
pub fn read() -> MyResult<Self> {
let config = Config::builder()
.add_source(config::File::with_name("config/config.toml"))
// Cant use _ as separator due to https://github.com/mehcode/config-rs/issues/391
.add_source(config::Environment::with_prefix("IBIS").separator("__"))
.build()
.unwrap();
.build()?;
config.try_deserialize().unwrap()
Ok(config.try_deserialize()?)
}
}
#[derive(Debug, Deserialize, PartialEq, Eq, Clone, Document, SmartDefault)]
#[serde(default)]
pub struct IbisConfigDatabase {
/// Database connection url
#[default("postgres://ibis:password@localhost:5432/ibis")]
#[doku(example = "postgres://ibis:password@localhost:5432/ibis")]
pub connection_url: String,
/// Database connection pool size
#[default(5)]
#[doku(example = "5")]
pub pool_size: u32,
}
#[derive(Debug, Deserialize, PartialEq, Eq, Clone, Document, SmartDefault)]
#[serde(default)]
pub struct IbisConfigSetup {

View File

@ -1,4 +1,5 @@
use crate::backend::database::schema::{article, edit, instance};
use crate::backend::database::IbisData;
use crate::backend::error::MyResult;
use crate::backend::federation::objects::edits_collection::DbEditCollection;
use crate::common::DbEdit;
@ -7,14 +8,14 @@ use crate::common::{ArticleView, DbArticle};
use activitypub_federation::fetch::collection_id::CollectionId;
use activitypub_federation::fetch::object_id::ObjectId;
use diesel::dsl::max;
use diesel::pg::PgConnection;
use diesel::ExpressionMethods;
use diesel::{
insert_into, AsChangeset, BoolExpressionMethods, Insertable, PgTextExpressionMethods, QueryDsl,
RunQueryDsl,
};
use std::ops::DerefMut;
use std::sync::Mutex;
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = article, check_for_backend(diesel::pg::Pg))]
@ -32,17 +33,17 @@ impl DbArticle {
Ok(CollectionId::parse(&format!("{}/edits", self.ap_id))?)
}
pub fn create(mut form: DbArticleForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
pub fn create(mut form: DbArticleForm, data: &IbisData) -> MyResult<Self> {
form.title = form.title.replace(' ', "_");
let mut conn = conn.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(insert_into(article::table)
.values(form)
.get_result(conn.deref_mut())?)
}
pub fn create_or_update(mut form: DbArticleForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
pub fn create_or_update(mut form: DbArticleForm, data: &IbisData) -> MyResult<Self> {
form.title = form.title.replace(' ', "_");
let mut conn = conn.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(insert_into(article::table)
.values(&form)
.on_conflict(article::dsl::ap_id)
@ -51,25 +52,23 @@ impl DbArticle {
.get_result(conn.deref_mut())?)
}
pub fn update_text(id: i32, text: &str, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn update_text(id: i32, text: &str, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(diesel::update(article::dsl::article.find(id))
.set(article::dsl::text.eq(text))
.get_result::<Self>(conn.deref_mut())?)
}
pub fn read(id: i32, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read(id: i32, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(article::table.find(id).get_result(conn.deref_mut())?)
}
pub fn read_view(id: i32, conn: &Mutex<PgConnection>) -> MyResult<ArticleView> {
let article: DbArticle = {
let mut conn = conn.lock().unwrap();
article::table.find(id).get_result(conn.deref_mut())?
};
let latest_version = article.latest_edit_version(conn)?;
let edits = DbEdit::read_for_article(&article, conn)?;
pub fn read_view(id: i32, data: &IbisData) -> MyResult<ArticleView> {
let mut conn = data.db_pool.get()?;
let article: DbArticle = { article::table.find(id).get_result(conn.deref_mut())? };
let latest_version = article.latest_edit_version(data)?;
let edits = DbEdit::read_for_article(&article, data)?;
Ok(ArticleView {
article,
edits,
@ -80,10 +79,10 @@ impl DbArticle {
pub fn read_view_title(
title: &str,
domain: Option<String>,
conn: &Mutex<PgConnection>,
data: &IbisData,
) -> MyResult<ArticleView> {
let mut conn = data.db_pool.get()?;
let article: DbArticle = {
let mut conn = conn.lock().unwrap();
let query = article::table
.inner_join(instance::table)
.filter(article::dsl::title.eq(title))
@ -99,8 +98,8 @@ impl DbArticle {
.select(article::all_columns)
.get_result(conn.deref_mut())?
};
let latest_version = article.latest_edit_version(conn)?;
let edits = DbEdit::read_for_article(&article, conn)?;
let latest_version = article.latest_edit_version(data)?;
let edits = DbEdit::read_for_article(&article, data)?;
Ok(ArticleView {
article,
edits,
@ -108,18 +107,15 @@ impl DbArticle {
})
}
pub fn read_from_ap_id(
ap_id: &ObjectId<DbArticle>,
conn: &Mutex<PgConnection>,
) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read_from_ap_id(ap_id: &ObjectId<DbArticle>, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(article::table
.filter(article::dsl::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
}
pub fn read_local_title(title: &str, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read_local_title(title: &str, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(article::table
.filter(article::dsl::title.eq(title))
.filter(article::dsl::local.eq(true))
@ -127,8 +123,8 @@ impl DbArticle {
}
/// Read all articles, ordered by most recently edited first.
pub fn read_all(only_local: bool, conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
let mut conn = conn.lock().unwrap();
pub fn read_all(only_local: bool, data: &IbisData) -> MyResult<Vec<Self>> {
let mut conn = data.db_pool.get()?;
let query = article::table
.inner_join(edit::table)
.group_by(article::dsl::id)
@ -137,14 +133,14 @@ impl DbArticle {
Ok(if only_local {
query
.filter(article::dsl::local.eq(true))
.get_results(conn.deref_mut())?
.get_results(&mut conn)?
} else {
query.get_results(conn.deref_mut())?
query.get_results(&mut conn)?
})
}
pub fn search(query: &str, conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
let mut conn = conn.lock().unwrap();
pub fn search(query: &str, data: &IbisData) -> MyResult<Vec<Self>> {
let mut conn = data.db_pool.get()?;
let replaced = query
.replace('%', "\\%")
.replace('_', "\\_")
@ -159,8 +155,8 @@ impl DbArticle {
.get_results(conn.deref_mut())?)
}
pub fn latest_edit_version(&self, conn: &Mutex<PgConnection>) -> MyResult<EditVersion> {
let mut conn = conn.lock().unwrap();
pub fn latest_edit_version(&self, data: &IbisData) -> MyResult<EditVersion> {
let mut conn = data.db_pool.get()?;
let latest_version: Option<EditVersion> = edit::table
.filter(edit::dsl::article_id.eq(self.id))
.order_by(edit::dsl::id.desc())

View File

@ -10,13 +10,13 @@ use crate::common::{ApiConflict, DbArticle};
use activitypub_federation::config::Data;
use diesel::ExpressionMethods;
use diesel::{
delete, insert_into, Identifiable, Insertable, PgConnection, QueryDsl, Queryable, RunQueryDsl,
delete, insert_into, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl,
Selectable,
};
use diffy::{apply, merge, Patch};
use serde::{Deserialize, Serialize};
use std::ops::DerefMut;
use std::sync::Mutex;
/// A local only object which represents a merge conflict. It is created
/// when a local user edit conflicts with another concurrent edit.
@ -44,33 +44,33 @@ pub struct DbConflictForm {
}
impl DbConflict {
pub fn create(form: &DbConflictForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn create(form: &DbConflictForm, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(insert_into(conflict::table)
.values(form)
.get_result(conn.deref_mut())?)
}
pub fn list(local_user: &DbLocalUser, conn: &Mutex<PgConnection>) -> MyResult<Vec<Self>> {
let mut conn = conn.lock().unwrap();
pub fn list(local_user: &DbLocalUser, data: &IbisData) -> MyResult<Vec<Self>> {
let mut conn = data.db_pool.get()?;
Ok(conflict::table
.filter(conflict::dsl::creator_id.eq(local_user.id))
.get_results(conn.deref_mut())?)
}
/// Delete a merge conflict after it is resolved.
pub fn delete(id: i32, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn delete(id: i32, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(delete(conflict::table.find(id)).get_result(conn.deref_mut())?)
}
pub async fn to_api_conflict(&self, data: &Data<IbisData>) -> MyResult<Option<ApiConflict>> {
let article = DbArticle::read(self.article_id, &data.db_connection)?;
let article = DbArticle::read(self.article_id, data)?;
// Make sure to get latest version from origin so that all conflicts can be resolved
let original_article = article.ap_id.dereference_forced(data).await?;
// create common ancestor version
let edits = DbEdit::read_for_article(&original_article, &data.db_connection)?;
let edits = DbEdit::read_for_article(&original_article, data)?;
let ancestor = generate_article_version(&edits, &self.previous_version_id)?;
let patch = Patch::from_str(&self.diff)?;
@ -89,7 +89,7 @@ impl DbConflict {
data,
)
.await?;
DbConflict::delete(self.id, &data.db_connection)?;
DbConflict::delete(self.id, data)?;
Ok(None)
}
Err(three_way_merge) => {
@ -100,8 +100,7 @@ impl DbConflict {
three_way_merge,
summary: self.summary.clone(),
article: original_article.clone(),
previous_version_id: original_article
.latest_edit_version(&data.db_connection)?,
previous_version_id: original_article.latest_edit_version(data)?,
}))
}
}

View File

@ -1,14 +1,15 @@
use crate::backend::database::schema::{edit, person};
use crate::backend::error::MyResult;
use crate::backend::IbisData;
use crate::common::{DbArticle, DbEdit};
use crate::common::{EditVersion, EditView};
use activitypub_federation::fetch::object_id::ObjectId;
use chrono::{DateTime, Utc};
use diesel::ExpressionMethods;
use diesel::{insert_into, AsChangeset, Insertable, PgConnection, QueryDsl, RunQueryDsl};
use diesel::{insert_into, AsChangeset, Insertable, QueryDsl, RunQueryDsl};
use diffy::create_patch;
use std::ops::DerefMut;
use std::sync::Mutex;
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = edit, check_for_backend(diesel::pg::Pg))]
@ -59,8 +60,8 @@ impl DbEditForm {
}
impl DbEdit {
pub fn create(form: &DbEditForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn create(form: &DbEditForm, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(insert_into(edit::table)
.values(form)
.on_conflict(edit::dsl::ap_id)
@ -69,26 +70,23 @@ impl DbEdit {
.get_result(conn.deref_mut())?)
}
pub fn read(version: &EditVersion, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read(version: &EditVersion, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(edit::table
.filter(edit::dsl::hash.eq(version))
.get_result(conn.deref_mut())?)
}
pub fn read_from_ap_id(ap_id: &ObjectId<DbEdit>, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read_from_ap_id(ap_id: &ObjectId<DbEdit>, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(edit::table
.filter(edit::dsl::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
}
// TODO: create internal variant which doesnt return person?
pub fn read_for_article(
article: &DbArticle,
conn: &Mutex<PgConnection>,
) -> MyResult<Vec<EditView>> {
let mut conn = conn.lock().unwrap();
pub fn read_for_article(article: &DbArticle, data: &IbisData) -> MyResult<Vec<EditView>> {
let mut conn = data.db_pool.get()?;
Ok(edit::table
.inner_join(person::table)
.filter(edit::article_id.eq(article.id))

View File

@ -9,11 +9,11 @@ use activitypub_federation::fetch::object_id::ObjectId;
use chrono::{DateTime, Utc};
use diesel::ExpressionMethods;
use diesel::{
insert_into, AsChangeset, Insertable, JoinOnDsl, PgConnection, QueryDsl, RunQueryDsl,
insert_into, AsChangeset, Insertable, JoinOnDsl, QueryDsl, RunQueryDsl,
};
use std::fmt::Debug;
use std::ops::DerefMut;
use std::sync::Mutex;
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))]
@ -30,8 +30,8 @@ pub struct DbInstanceForm {
}
impl DbInstance {
pub fn create(form: &DbInstanceForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn create(form: &DbInstanceForm, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(insert_into(instance::table)
.values(form)
.on_conflict(instance::ap_id)
@ -40,8 +40,8 @@ impl DbInstance {
.get_result(conn.deref_mut())?)
}
pub fn read(id: i32, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read(id: i32, data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(instance::table.find(id).get_result(conn.deref_mut())?)
}
@ -49,22 +49,22 @@ impl DbInstance {
ap_id: &ObjectId<DbInstance>,
data: &Data<IbisData>,
) -> MyResult<DbInstance> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(instance::table
.filter(instance::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
}
pub fn read_local_instance(conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn read_local_instance(data: &IbisData) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(instance::table
.filter(instance::local.eq(true))
.get_result(conn.deref_mut())?)
}
pub fn read_local_view(data: &Data<IbisData>) -> MyResult<InstanceView> {
let instance = DbInstance::read_local_instance(&data.db_connection)?;
let followers = DbInstance::read_followers(instance.id, &data.db_connection)?;
let instance = DbInstance::read_local_instance(data)?;
let followers = DbInstance::read_followers(instance.id, data)?;
Ok(InstanceView {
instance,
@ -80,7 +80,7 @@ impl DbInstance {
data: &Data<IbisData>,
) -> MyResult<()> {
use instance_follow::dsl::{follower_id, instance_id, pending};
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
let form = (
instance_id.eq(instance.id),
follower_id.eq(follower.id),
@ -96,10 +96,10 @@ impl DbInstance {
Ok(())
}
pub fn read_followers(id_: i32, conn: &Mutex<PgConnection>) -> MyResult<Vec<DbPerson>> {
pub fn read_followers(id_: i32, data: &IbisData) -> MyResult<Vec<DbPerson>> {
use crate::backend::database::schema::person;
use instance_follow::dsl::{follower_id, instance_id};
let mut conn = conn.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(instance_follow::table
.inner_join(person::table.on(follower_id.eq(person::id)))
.filter(instance_id.eq(id_))

View File

@ -1,6 +1,8 @@
use crate::backend::config::IbisConfig;
use crate::backend::database::schema::jwt_secret;
use crate::backend::error::MyResult;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use diesel::{QueryDsl, RunQueryDsl};
use std::ops::Deref;
@ -13,24 +15,15 @@ pub mod edit;
pub mod instance;
pub(crate) mod schema;
pub mod user;
pub mod version;
#[derive(Clone)]
pub struct IbisData {
pub db_connection: Arc<Mutex<PgConnection>>,
pub db_pool: Pool<ConnectionManager<PgConnection>>,
pub config: IbisConfig,
}
impl Deref for IbisData {
type Target = Arc<Mutex<PgConnection>>;
fn deref(&self) -> &Self::Target {
&self.db_connection
}
}
pub fn read_jwt_secret(conn: &Mutex<PgConnection>) -> MyResult<String> {
let mut conn = conn.lock().unwrap();
pub fn read_jwt_secret(data: &IbisData) -> MyResult<String> {
let mut conn = data.db_pool.get()?;
Ok(jwt_secret::table
.select(jwt_secret::dsl::secret)
.first(conn.deref_mut())?)

View File

@ -10,11 +10,11 @@ use activitypub_federation::http_signatures::generate_actor_keypair;
use bcrypt::hash;
use bcrypt::DEFAULT_COST;
use chrono::{DateTime, Local, Utc};
use diesel::{insert_into, AsChangeset, Insertable, PgConnection, RunQueryDsl};
use diesel::{insert_into, AsChangeset, Insertable, RunQueryDsl};
use diesel::{ExpressionMethods, JoinOnDsl};
use diesel::{PgTextExpressionMethods, QueryDsl};
use std::ops::DerefMut;
use std::sync::{Mutex, MutexGuard};
#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = local_user, check_for_backend(diesel::pg::Pg))]
@ -37,8 +37,8 @@ pub struct DbPersonForm {
}
impl DbPerson {
pub fn create(person_form: &DbPersonForm, conn: &Mutex<PgConnection>) -> MyResult<Self> {
let mut conn = conn.lock().unwrap();
pub fn create(person_form: &DbPersonForm, data: &Data<IbisData>) -> MyResult<Self> {
let mut conn = data.db_pool.get()?;
Ok(insert_into(person::table)
.values(person_form)
.on_conflict(person::dsl::ap_id)
@ -48,7 +48,7 @@ impl DbPerson {
}
pub fn read(id: i32, data: &Data<IbisData>) -> MyResult<DbPerson> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(person::table.find(id).get_result(conn.deref_mut())?)
}
@ -58,7 +58,7 @@ impl DbPerson {
admin: bool,
data: &IbisData,
) -> MyResult<LocalUserView> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
let domain = &data.config.federation.domain;
let ap_id = ObjectId::parse(&format!(
"{}://{domain}/user/{username}",
@ -101,7 +101,7 @@ impl DbPerson {
ap_id: &ObjectId<DbPerson>,
data: &Data<IbisData>,
) -> MyResult<DbPerson> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
Ok(person::table
.filter(person::dsl::ap_id.eq(ap_id))
.get_result(conn.deref_mut())?)
@ -112,7 +112,7 @@ impl DbPerson {
domain: &Option<String>,
data: &Data<IbisData>,
) -> MyResult<DbPerson> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
let mut query = person::table
.filter(person::username.eq(username))
.select(person::all_columns)
@ -129,14 +129,14 @@ impl DbPerson {
}
pub fn read_local_from_name(username: &str, data: &Data<IbisData>) -> MyResult<LocalUserView> {
let mut conn = data.db_connection.lock().unwrap();
let mut conn = data.db_pool.get()?;
let (person, local_user) = person::table
.inner_join(local_user::table)
.filter(person::dsl::local)
.filter(person::dsl::username.eq(username))
.get_result::<(DbPerson, DbLocalUser)>(conn.deref_mut())?;
// TODO: handle this in single query
let following = Self::read_following(person.id, conn)?;
let following = Self::read_following(person.id, data)?;
Ok(LocalUserView {
person,
local_user,
@ -144,8 +144,9 @@ impl DbPerson {
})
}
fn read_following(id_: i32, mut conn: MutexGuard<PgConnection>) -> MyResult<Vec<DbInstance>> {
fn read_following(id_: i32, data: &Data<IbisData>) -> MyResult<Vec<DbInstance>> {
use instance_follow::dsl::{follower_id, instance_id};
let mut conn = data.db_pool.get()?;
Ok(instance_follow::table
.inner_join(instance::table.on(instance_id.eq(instance::dsl::id)))
.filter(follower_id.eq(id_))

View File

@ -1 +0,0 @@

View File

@ -28,7 +28,7 @@ pub struct CreateArticle {
impl CreateArticle {
pub async fn send_to_followers(article: DbArticle, data: &Data<IbisData>) -> MyResult<()> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
let object = article.clone().into_json(data).await?;
let id = generate_activity_id(&local_instance.ap_id)?;
let to = local_instance.follower_ids(data)?;
@ -65,7 +65,7 @@ impl ActivityHandler for CreateArticle {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let article = DbArticle::from_json(self.object.clone(), data).await?;
if article.local {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
local_instance.send_to_followers(self, vec![], data).await?;
}
Ok(())

View File

@ -58,7 +58,7 @@ impl ActivityHandler for Follow {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let actor = self.actor.dereference(data).await?;
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
verify_urls_match(self.object.inner(), local_instance.ap_id.inner())?;
DbInstance::follow(&actor, &local_instance, false, data)?;

View File

@ -32,9 +32,8 @@ pub async fn submit_article_update(
previous_version,
)?;
if original_article.local {
let edit = DbEdit::create(&form, &data.db_connection)?;
let updated_article =
DbArticle::update_text(edit.article_id, &new_text, &data.db_connection)?;
let edit = DbEdit::create(&form, data)?;
let updated_article = DbArticle::update_text(edit.article_id, &new_text, data)?;
UpdateLocalArticle::send(updated_article, vec![], data).await?;
} else {
@ -50,7 +49,7 @@ pub async fn submit_article_update(
previous_version_id: form.previous_version_id,
created: Utc::now(),
};
let instance = DbInstance::read(original_article.instance_id, &data.db_connection)?;
let instance = DbInstance::read(original_article.instance_id, data)?;
UpdateRemoteArticle::send(edit, instance, data).await?;
}
Ok(())

View File

@ -33,7 +33,7 @@ impl RejectEdit {
user_instance: DbInstance,
data: &Data<IbisData>,
) -> MyResult<()> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
let id = generate_activity_id(&local_instance.ap_id)?;
let reject = RejectEdit {
actor: local_instance.ap_id.clone(),
@ -82,7 +82,7 @@ impl ActivityHandler for RejectEdit {
article_id: article.id,
previous_version_id: self.object.previous_version,
};
DbConflict::create(&form, &data.db_connection)?;
DbConflict::create(&form, data)?;
Ok(())
}
}

View File

@ -35,7 +35,7 @@ impl UpdateLocalArticle {
data: &Data<IbisData>,
) -> MyResult<()> {
debug_assert!(article.local);
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
let id = generate_activity_id(&local_instance.ap_id)?;
let mut to = local_instance.follower_ids(data)?;
to.extend(extra_recipients.iter().map(|i| i.ap_id.inner().clone()));

View File

@ -40,7 +40,7 @@ impl UpdateRemoteArticle {
article_instance: DbInstance,
data: &Data<IbisData>,
) -> MyResult<()> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
let id = generate_activity_id(&local_instance.ap_id)?;
let update = UpdateRemoteArticle {
actor: local_instance.ap_id.clone(),
@ -74,21 +74,20 @@ impl ActivityHandler for UpdateRemoteArticle {
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let article = DbArticle::read_from_ap_id(&self.object.object, &data.db_connection)?;
let article = DbArticle::read_from_ap_id(&self.object.object, data)?;
can_edit_article(&article, false)?;
Ok(())
}
/// Received on article origin instances
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let local_article = DbArticle::read_from_ap_id(&self.object.object, &data.db_connection)?;
let local_article = DbArticle::read_from_ap_id(&self.object.object, data)?;
let patch = Patch::from_str(&self.object.content)?;
match apply(&local_article.text, &patch) {
Ok(applied) => {
let edit = DbEdit::from_json(self.object.clone(), data).await?;
let article =
DbArticle::update_text(edit.article_id, &applied, &data.db_connection)?;
let article = DbArticle::update_text(edit.article_id, &applied, data)?;
UpdateLocalArticle::send(article, vec![self.actor.dereference(data).await?], data)
.await?;
}

View File

@ -41,19 +41,19 @@ impl Object for DbArticle {
object_id: Url,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
let article = DbArticle::read_from_ap_id(&object_id.into(), &data.db_connection).ok();
let article = DbArticle::read_from_ap_id(&object_id.into(), data).ok();
Ok(article)
}
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
Ok(ApubArticle {
kind: Default::default(),
id: self.ap_id.clone(),
attributed_to: local_instance.ap_id.clone(),
to: vec![public(), local_instance.followers_url()?],
edits: self.edits_id()?,
latest_version: self.latest_edit_version(&data.db_connection)?,
latest_version: self.latest_edit_version(data)?,
content: self.text,
name: self.title,
})
@ -77,7 +77,7 @@ impl Object for DbArticle {
local: false,
instance_id: instance.id,
};
let article = DbArticle::create_or_update(form, &data.db_connection)?;
let article = DbArticle::create_or_update(form, data)?;
json.edits.dereference(&article, data).await?;

View File

@ -38,7 +38,7 @@ impl Collection for DbArticleCollection {
owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> {
let local_articles = DbArticle::read_all(true, &data.db_connection)?;
let local_articles = DbArticle::read_all(true, data)?;
let articles = future::try_join_all(
local_articles
.into_iter()

View File

@ -48,7 +48,7 @@ impl Object for DbEdit {
}
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let article = DbArticle::read(self.article_id, &data.db_connection)?;
let article = DbArticle::read(self.article_id, data)?;
let creator = DbPerson::read(self.creator_id, data)?;
Ok(ApubEdit {
kind: PatchType::Patch,
@ -85,7 +85,7 @@ impl Object for DbEdit {
previous_version_id: json.previous_version,
created: json.published,
};
let edit = DbEdit::create(&form, &data.db_connection)?;
let edit = DbEdit::create(&form, data)?;
Ok(edit)
}
}

View File

@ -39,7 +39,7 @@ impl Collection for DbEditCollection {
owner: &Self::Owner,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error> {
let article = DbArticle::read_view(owner.id, &data.db_connection)?;
let article = DbArticle::read_view(owner.id, data)?;
let edits = future::try_join_all(
article
@ -49,7 +49,7 @@ impl Collection for DbEditCollection {
.collect::<Vec<_>>(),
)
.await?;
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(data)?;
let collection = ApubEditCollection {
r#type: Default::default(),
id: Url::from(local_instance.articles_url),

View File

@ -38,7 +38,7 @@ impl DbInstance {
}
pub fn follower_ids(&self, data: &Data<IbisData>) -> MyResult<Vec<Url>> {
Ok(DbInstance::read_followers(self.id, &data.db_connection)?
Ok(DbInstance::read_followers(self.id, data)?
.into_iter()
.map(|f| f.ap_id.into())
.collect())
@ -55,7 +55,7 @@ impl DbInstance {
<Activity as ActivityHandler>::Error: From<activitypub_federation::error::Error>,
<Activity as ActivityHandler>::Error: From<Error>,
{
let mut inboxes: Vec<_> = DbInstance::read_followers(self.id, &data.db_connection)?
let mut inboxes: Vec<_> = DbInstance::read_followers(self.id, data)?
.iter()
.map(|f| Url::parse(&f.inbox_url).unwrap())
.collect();
@ -119,7 +119,7 @@ impl Object for DbInstance {
last_refreshed_at: Local::now().into(),
local: false,
};
let instance = DbInstance::create(&form, &data.db_connection)?;
let instance = DbInstance::create(&form, data)?;
// TODO: very inefficient to sync all articles every time
instance.articles_url.dereference(&instance, data).await?;
Ok(instance)

View File

@ -71,7 +71,7 @@ impl Object for DbPerson {
last_refreshed_at: Local::now().into(),
local: false,
};
DbPerson::create(&form, &data.db_connection)
DbPerson::create(&form, data)
}
}

View File

@ -47,7 +47,7 @@ pub fn federation_routes() -> Router {
async fn http_get_instance(
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<ApubInstance>>> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(&data)?;
let json_instance = local_instance.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json_instance)))
}
@ -66,7 +66,7 @@ async fn http_get_person(
async fn http_get_all_articles(
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<ArticleCollection>>> {
let local_instance = DbInstance::read_local_instance(&data.db_connection)?;
let local_instance = DbInstance::read_local_instance(&data)?;
let collection = DbArticleCollection::read_local(&local_instance, &data).await?;
Ok(FederationJson(WithContext::new_default(collection)))
}
@ -76,7 +76,7 @@ async fn http_get_article(
Path(title): Path<String>,
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<ApubArticle>>> {
let article = DbArticle::read_local_title(&title, &data.db_connection)?;
let article = DbArticle::read_local_title(&title, &data)?;
let json = article.into_json(&data).await?;
Ok(FederationJson(WithContext::new_default(json)))
}
@ -86,7 +86,7 @@ async fn http_get_article_edits(
Path(title): Path<String>,
data: Data<IbisData>,
) -> MyResult<FederationJson<WithContext<ApubEditCollection>>> {
let article = DbArticle::read_local_title(&title, &data.db_connection)?;
let article = DbArticle::read_local_title(&title, &data)?;
let json = DbEditCollection::read_local(&article, &data).await?;
Ok(FederationJson(WithContext::new_default(json)))
}

View File

@ -21,6 +21,8 @@ use axum::Server;
use axum::ServiceExt;
use axum::{middleware::Next, response::Response, Router};
use chrono::Local;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::Connection;
use diesel::PgConnection;
use diesel_migrations::embed_migrations;
@ -46,15 +48,17 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
const FEDERATION_ROUTES_PREFIX: &str = "/federation_routes";
pub async fn start(config: IbisConfig) -> MyResult<()> {
let db_connection = Arc::new(Mutex::new(PgConnection::establish(&config.database_url)?));
db_connection
.lock()
.unwrap()
.run_pending_migrations(MIGRATIONS)
.unwrap();
let manager = ConnectionManager::<PgConnection>::new(&config.database.connection_url);
let db_pool = Pool::builder()
.max_size(config.database.pool_size)
.build(manager)?;
db_pool
.get()?
.run_pending_migrations(MIGRATIONS)
.expect("run migrations");
let data = IbisData {
db_connection,
db_pool,
config,
};
let data = FederationConfig::builder()
@ -66,7 +70,7 @@ pub async fn start(config: IbisConfig) -> MyResult<()> {
.await?;
// Create local instance if it doesnt exist yet
if DbInstance::read_local_instance(&data.db_connection).is_err() {
if DbInstance::read_local_instance(&data).is_err() {
setup(&data.to_request_data()).await?;
}
@ -129,7 +133,7 @@ async fn setup(data: &Data<IbisData>) -> Result<(), Error> {
last_refreshed_at: Local::now().into(),
local: true,
};
let instance = DbInstance::create(&form, &data.db_connection)?;
let instance = DbInstance::create(&form, data)?;
let person = DbPerson::create_local(
data.config.setup.admin_username.clone(),
@ -149,7 +153,7 @@ async fn setup(data: &Data<IbisData>) -> Result<(), Error> {
instance_id: instance.id,
local: true,
};
let article = DbArticle::create(form, &data.db_connection)?;
let article = DbArticle::create(form, data)?;
// also create an article so its included in most recently edited list
submit_article_update(
MAIN_PAGE_DEFAULT_TEXT.to_string(),

View File

@ -57,7 +57,7 @@ fn backend_hostname() -> String {
}
#[cfg(feature = "ssr")]
{
backend_hostname = crate::backend::config::IbisConfig::read().bind.to_string();
backend_hostname = crate::backend::config::IbisConfig::read().unwrap().bind.to_string();
}
backend_hostname
}

View File

@ -15,7 +15,7 @@ pub async fn main() -> ibis_lib::backend::error::MyResult<()> {
.filter_module("ibis", LevelFilter::Info)
.init();
let ibis_config = IbisConfig::read();
let ibis_config = IbisConfig::read()?;
ibis_lib::backend::start(ibis_config).await?;
Ok(())
}

View File

@ -1,4 +1,4 @@
use ibis_lib::backend::config::{IbisConfig, IbisConfigFederation};
use ibis_lib::backend::config::{IbisConfig, IbisConfigDatabase, IbisConfigFederation};
use ibis_lib::backend::start;
use ibis_lib::common::RegisterUserData;
use ibis_lib::frontend::api::ApiClient;
@ -105,12 +105,15 @@ impl IbisInstance {
}
async fn start(db_path: String, port: i32, username: &str) -> Self {
let database_url = format!("postgresql://ibis:password@/ibis?host={db_path}");
let connection_url = format!("postgresql://ibis:password@/ibis?host={db_path}");
let hostname = format!("localhost:{port}");
let bind = format!("127.0.0.1:{port}").parse().unwrap();
let config = IbisConfig {
bind,
database_url,
database: IbisConfigDatabase {
connection_url,
..Default::default()
},
registration_open: true,
federation: IbisConfigFederation {
domain: hostname.clone(),