use crate::database::schema::{instance, instance_follow}; use crate::database::MyDataHandle; use crate::error::{Error, MyResult}; use crate::federation::objects::articles_collection::DbArticleCollection; use activitypub_federation::activity_sending::SendActivityTask; use activitypub_federation::config::Data; use activitypub_federation::fetch::collection_id::CollectionId; use activitypub_federation::fetch::object_id::ObjectId; use activitypub_federation::protocol::context::WithContext; use activitypub_federation::traits::ActivityHandler; use chrono::{DateTime, Utc}; use diesel::ExpressionMethods; use diesel::{ insert_into, AsChangeset, Identifiable, Insertable, JoinOnDsl, PgConnection, QueryDsl, Queryable, RunQueryDsl, Selectable, }; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::ops::DerefMut; use std::sync::Mutex; use tracing::warn; use url::Url; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable, Selectable, Identifiable)] #[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))] pub struct DbInstance { pub id: i32, pub ap_id: ObjectId, pub articles_url: CollectionId, pub inbox_url: String, #[serde(skip)] pub(crate) public_key: String, #[serde(skip)] pub(crate) private_key: Option, #[serde(skip)] pub(crate) last_refreshed_at: DateTime, pub local: bool, } #[derive(Debug, Clone, Insertable, AsChangeset)] #[diesel(table_name = instance, check_for_backend(diesel::pg::Pg))] pub struct DbInstanceForm { pub ap_id: ObjectId, pub articles_url: CollectionId, pub inbox_url: String, pub(crate) public_key: String, pub(crate) private_key: Option, pub(crate) last_refreshed_at: DateTime, pub local: bool, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Queryable)] #[diesel(table_name = article, check_for_backend(diesel::pg::Pg))] pub struct InstanceView { pub instance: DbInstance, pub followers: Vec, pub following: Vec, } impl DbInstance { pub fn followers_url(&self) -> MyResult { Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?) } pub fn follower_ids(&self, data: &Data) -> MyResult> { Ok(DbInstance::read_followers(self.id, &data.db_connection)? .into_iter() .map(|f| f.ap_id.into()) .collect()) } pub async fn send_to_followers( &self, activity: Activity, extra_recipients: Vec, data: &Data, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize + Debug + Send + Sync, ::Error: From, ::Error: From, { let mut inboxes: Vec<_> = DbInstance::read_followers(self.id, &data.db_connection)? .iter() .map(|f| Url::parse(&f.inbox_url).unwrap()) .collect(); inboxes.extend( extra_recipients .into_iter() .map(|i| Url::parse(&i.inbox_url).unwrap()), ); self.send(activity, inboxes, data).await?; Ok(()) } pub async fn send( &self, activity: Activity, recipients: Vec, data: &Data, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize + Debug + Send + Sync, ::Error: From, { let activity = WithContext::new_default(activity); let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; for send in sends { let send = send.sign_and_send(data).await; if let Err(e) = send { warn!("Failed to send activity {:?}: {e}", activity); } } Ok(()) } pub fn create(form: &DbInstanceForm, conn: &Mutex) -> MyResult { let mut conn = conn.lock().unwrap(); Ok(insert_into(instance::table) .values(form) .on_conflict(instance::dsl::ap_id) .do_update() .set(form) .get_result(conn.deref_mut())?) } pub fn read(id: i32, conn: &Mutex) -> MyResult { let mut conn = conn.lock().unwrap(); Ok(instance::table.find(id).get_result(conn.deref_mut())?) } pub fn read_from_ap_id( ap_id: &ObjectId, data: &Data, ) -> MyResult { let mut conn = data.db_connection.lock().unwrap(); Ok(instance::table .filter(instance::dsl::ap_id.eq(ap_id)) .get_result(conn.deref_mut())?) } pub fn read_local_instance(conn: &Mutex) -> MyResult { let mut conn = conn.lock().unwrap(); Ok(instance::table .filter(instance::dsl::local.eq(true)) .get_result(conn.deref_mut())?) } pub fn read_local_view(conn: &Mutex) -> MyResult { let instance = DbInstance::read_local_instance(conn)?; let followers = DbInstance::read_followers(instance.id, conn)?; let following = DbInstance::read_following(instance.id, conn)?; Ok(InstanceView { instance, followers, following, }) } pub fn follow( follower_id_: i32, instance_id_: i32, pending_: bool, data: &Data, ) -> MyResult<()> { debug_assert_ne!(follower_id_, instance_id_); use instance_follow::dsl::{follower_id, instance_id, pending}; let mut conn = data.db_connection.lock().unwrap(); let form = ( instance_id.eq(instance_id_), follower_id.eq(follower_id_), pending.eq(pending_), ); dbg!(follower_id_, instance_id_, pending_); insert_into(instance_follow::table) .values(form) .on_conflict((instance_id, follower_id)) .do_update() .set(form) .execute(conn.deref_mut())?; Ok(()) } pub fn read_followers(id_: i32, conn: &Mutex) -> MyResult> { use instance_follow::dsl::{follower_id, instance_id}; let mut conn = conn.lock().unwrap(); Ok(instance_follow::table .inner_join(instance::table.on(follower_id.eq(instance::dsl::id))) .filter(instance_id.eq(id_)) .select(instance::all_columns) .get_results(conn.deref_mut())?) } pub fn read_following(id_: i32, conn: &Mutex) -> MyResult> { use instance_follow::dsl::{follower_id, instance_id}; let mut conn = conn.lock().unwrap(); Ok(instance_follow::table .inner_join(instance::table.on(instance_id.eq(instance::dsl::id))) .filter(follower_id.eq(id_)) .select(instance::all_columns) .get_results(conn.deref_mut())?) } }