Optimize federated language updates to avoid unnecessary db writes (#2786)
* Optimize federated language updates to avoid unnecessary db writes (fixes #2772) * fix tests * fix test, rename functions --------- Co-authored-by: Dessalines <dessalines@users.noreply.github.com>
This commit is contained in:
parent
fdaa2bcf09
commit
165b19e75c
12 changed files with 86 additions and 39 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -2517,6 +2517,7 @@ dependencies = [
|
||||||
"diesel-derive-newtype",
|
"diesel-derive-newtype",
|
||||||
"diesel_ltree",
|
"diesel_ltree",
|
||||||
"diesel_migrations",
|
"diesel_migrations",
|
||||||
|
"futures",
|
||||||
"lemmy_utils",
|
"lemmy_utils",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"regex",
|
"regex",
|
||||||
|
|
|
@ -63,7 +63,7 @@ impl Perform for LeaveAdmin {
|
||||||
let admins = PersonView::admins(context.pool()).await?;
|
let admins = PersonView::admins(context.pool()).await?;
|
||||||
|
|
||||||
let all_languages = Language::read_all(context.pool()).await?;
|
let all_languages = Language::read_all(context.pool()).await?;
|
||||||
let discussion_languages = SiteLanguage::read_local(context.pool()).await?;
|
let discussion_languages = SiteLanguage::read_local_raw(context.pool()).await?;
|
||||||
let taglines = Tagline::get_all(context.pool(), site_view.local_site.id).await?;
|
let taglines = Tagline::get_all(context.pool(), site_view.local_site.id).await?;
|
||||||
let custom_emojis = CustomEmojiView::get_all(context.pool(), site_view.local_site.id).await?;
|
let custom_emojis = CustomEmojiView::get_all(context.pool(), site_view.local_site.id).await?;
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ impl PerformCrud for CreateCommunity {
|
||||||
// Update the discussion_languages if that's provided
|
// Update the discussion_languages if that's provided
|
||||||
let community_id = inserted_community.id;
|
let community_id = inserted_community.id;
|
||||||
if let Some(languages) = data.discussion_languages.clone() {
|
if let Some(languages) = data.discussion_languages.clone() {
|
||||||
let site_languages = SiteLanguage::read_local(context.pool()).await?;
|
let site_languages = SiteLanguage::read_local_raw(context.pool()).await?;
|
||||||
// check that community languages are a subset of site languages
|
// check that community languages are a subset of site languages
|
||||||
// https://stackoverflow.com/a/64227550
|
// https://stackoverflow.com/a/64227550
|
||||||
let is_subset = languages.iter().all(|item| site_languages.contains(item));
|
let is_subset = languages.iter().all(|item| site_languages.contains(item));
|
||||||
|
|
|
@ -53,7 +53,7 @@ impl PerformCrud for EditCommunity {
|
||||||
|
|
||||||
let community_id = data.community_id;
|
let community_id = data.community_id;
|
||||||
if let Some(languages) = data.discussion_languages.clone() {
|
if let Some(languages) = data.discussion_languages.clone() {
|
||||||
let site_languages = SiteLanguage::read_local(context.pool()).await?;
|
let site_languages = SiteLanguage::read_local_raw(context.pool()).await?;
|
||||||
// check that community languages are a subset of site languages
|
// check that community languages are a subset of site languages
|
||||||
// https://stackoverflow.com/a/64227550
|
// https://stackoverflow.com/a/64227550
|
||||||
let is_subset = languages.iter().all(|item| site_languages.contains(item));
|
let is_subset = languages.iter().all(|item| site_languages.contains(item));
|
||||||
|
|
|
@ -87,7 +87,7 @@ impl PerformCrud for GetSite {
|
||||||
build_federated_instances(&site_view.local_site, context.pool()).await?;
|
build_federated_instances(&site_view.local_site, context.pool()).await?;
|
||||||
|
|
||||||
let all_languages = Language::read_all(context.pool()).await?;
|
let all_languages = Language::read_all(context.pool()).await?;
|
||||||
let discussion_languages = SiteLanguage::read_local(context.pool()).await?;
|
let discussion_languages = SiteLanguage::read_local_raw(context.pool()).await?;
|
||||||
let taglines = Tagline::get_all(context.pool(), site_view.local_site.id).await?;
|
let taglines = Tagline::get_all(context.pool(), site_view.local_site.id).await?;
|
||||||
let custom_emojis = CustomEmojiView::get_all(context.pool(), site_view.local_site.id).await?;
|
let custom_emojis = CustomEmojiView::get_all(context.pool(), site_view.local_site.id).await?;
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ impl PerformApub for GetCommunity {
|
||||||
|
|
||||||
let site_id =
|
let site_id =
|
||||||
Site::instance_actor_id_from_url(community_view.community.actor_id.clone().into());
|
Site::instance_actor_id_from_url(community_view.community.actor_id.clone().into());
|
||||||
let mut site = Site::read_from_apub_id(context.pool(), site_id).await?;
|
let mut site = Site::read_from_apub_id(context.pool(), &site_id.into()).await?;
|
||||||
// no need to include metadata for local site (its already available through other endpoints).
|
// no need to include metadata for local site (its already available through other endpoints).
|
||||||
// this also prevents us from leaking the federation private key.
|
// this also prevents us from leaking the federation private key.
|
||||||
if let Some(s) = &site {
|
if let Some(s) = &site {
|
||||||
|
|
|
@ -71,7 +71,7 @@ impl Object for ApubSite {
|
||||||
data: &Data<Self::DataType>,
|
data: &Data<Self::DataType>,
|
||||||
) -> Result<Option<Self>, LemmyError> {
|
) -> Result<Option<Self>, LemmyError> {
|
||||||
Ok(
|
Ok(
|
||||||
Site::read_from_apub_id(data.pool(), object_id)
|
Site::read_from_apub_id(data.pool(), &object_id.into())
|
||||||
.await?
|
.await?
|
||||||
.map(Into::into),
|
.map(Into::into),
|
||||||
)
|
)
|
||||||
|
|
|
@ -41,6 +41,7 @@ async-trait = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tracing-error = { workspace = true }
|
tracing-error = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
deadpool = { version = "0.9.5", features = ["rt_tokio_1"], optional = true }
|
deadpool = { version = "0.9.5", features = ["rt_tokio_1"], optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -25,7 +25,11 @@ use diesel::{
|
||||||
ExpressionMethods,
|
ExpressionMethods,
|
||||||
QueryDsl,
|
QueryDsl,
|
||||||
};
|
};
|
||||||
use diesel_async::{AsyncPgConnection, RunQueryDsl};
|
use diesel_async::{
|
||||||
|
pooled_connection::deadpool::Object as PooledConnection,
|
||||||
|
AsyncPgConnection,
|
||||||
|
RunQueryDsl,
|
||||||
|
};
|
||||||
use lemmy_utils::error::LemmyError;
|
use lemmy_utils::error::LemmyError;
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
|
|
||||||
|
@ -68,6 +72,13 @@ impl LocalUserLanguage {
|
||||||
for_local_user_id: LocalUserId,
|
for_local_user_id: LocalUserId,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let lang_ids = convert_update_languages(conn, language_ids).await?;
|
||||||
|
|
||||||
|
// No need to update if languages are unchanged
|
||||||
|
let current = LocalUserLanguage::read(pool, for_local_user_id).await?;
|
||||||
|
if current == lang_ids {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
conn
|
conn
|
||||||
.build_transaction()
|
.build_transaction()
|
||||||
|
@ -79,7 +90,6 @@ impl LocalUserLanguage {
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let lang_ids = convert_update_languages(conn, language_ids).await?;
|
|
||||||
for l in lang_ids {
|
for l in lang_ids {
|
||||||
let form = LocalUserLanguageForm {
|
let form = LocalUserLanguageForm {
|
||||||
local_user_id: for_local_user_id,
|
local_user_id: for_local_user_id,
|
||||||
|
@ -98,7 +108,7 @@ impl LocalUserLanguage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SiteLanguage {
|
impl SiteLanguage {
|
||||||
pub async fn read_local(pool: &DbPool) -> Result<Vec<LanguageId>, Error> {
|
pub async fn read_local_raw(pool: &DbPool) -> Result<Vec<LanguageId>, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
site::table
|
site::table
|
||||||
.inner_join(local_site::table)
|
.inner_join(local_site::table)
|
||||||
|
@ -109,15 +119,22 @@ impl SiteLanguage {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read(pool: &DbPool, for_site_id: SiteId) -> Result<Vec<LanguageId>, Error> {
|
async fn read_raw(
|
||||||
let conn = &mut get_conn(pool).await?;
|
conn: &mut PooledConnection<AsyncPgConnection>,
|
||||||
|
for_site_id: SiteId,
|
||||||
let langs = site_language::table
|
) -> Result<Vec<LanguageId>, Error> {
|
||||||
|
site_language::table
|
||||||
.filter(site_language::site_id.eq(for_site_id))
|
.filter(site_language::site_id.eq(for_site_id))
|
||||||
.order(site_language::language_id)
|
.order(site_language::language_id)
|
||||||
.select(site_language::language_id)
|
.select(site_language::language_id)
|
||||||
.load(conn)
|
.load(conn)
|
||||||
.await?;
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read(pool: &DbPool, for_site_id: SiteId) -> Result<Vec<LanguageId>, Error> {
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let langs = Self::read_raw(conn, for_site_id).await?;
|
||||||
|
|
||||||
convert_read_languages(conn, langs).await
|
convert_read_languages(conn, langs).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +146,13 @@ impl SiteLanguage {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
let for_site_id = site.id;
|
let for_site_id = site.id;
|
||||||
let instance_id = site.instance_id;
|
let instance_id = site.instance_id;
|
||||||
|
let lang_ids = convert_update_languages(conn, language_ids).await?;
|
||||||
|
|
||||||
|
// No need to update if languages are unchanged
|
||||||
|
let current = SiteLanguage::read(pool, site.id).await?;
|
||||||
|
if current == lang_ids {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
conn
|
conn
|
||||||
.build_transaction()
|
.build_transaction()
|
||||||
|
@ -141,7 +165,6 @@ impl SiteLanguage {
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let lang_ids = convert_update_languages(conn, language_ids).await?;
|
|
||||||
for l in lang_ids {
|
for l in lang_ids {
|
||||||
let form = SiteLanguageForm {
|
let form = SiteLanguageForm {
|
||||||
site_id: for_site_id,
|
site_id: for_site_id,
|
||||||
|
@ -221,19 +244,25 @@ impl CommunityLanguage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read(
|
async fn read_raw(
|
||||||
pool: &DbPool,
|
conn: &mut PooledConnection<AsyncPgConnection>,
|
||||||
for_community_id: CommunityId,
|
for_community_id: CommunityId,
|
||||||
) -> Result<Vec<LanguageId>, Error> {
|
) -> Result<Vec<LanguageId>, Error> {
|
||||||
use crate::schema::community_language::dsl::{community_id, community_language, language_id};
|
use crate::schema::community_language::dsl::{community_id, community_language, language_id};
|
||||||
let conn = &mut get_conn(pool).await?;
|
community_language
|
||||||
|
|
||||||
let langs = community_language
|
|
||||||
.filter(community_id.eq(for_community_id))
|
.filter(community_id.eq(for_community_id))
|
||||||
.order(language_id)
|
.order(language_id)
|
||||||
.select(language_id)
|
.select(language_id)
|
||||||
.get_results(conn)
|
.get_results(conn)
|
||||||
.await?;
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read(
|
||||||
|
pool: &DbPool,
|
||||||
|
for_community_id: CommunityId,
|
||||||
|
) -> Result<Vec<LanguageId>, Error> {
|
||||||
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let langs = Self::read_raw(conn, for_community_id).await?;
|
||||||
convert_read_languages(conn, langs).await
|
convert_read_languages(conn, langs).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,9 +272,15 @@ impl CommunityLanguage {
|
||||||
for_community_id: CommunityId,
|
for_community_id: CommunityId,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
|
||||||
if language_ids.is_empty() {
|
if language_ids.is_empty() {
|
||||||
language_ids = SiteLanguage::read_local(pool).await?;
|
language_ids = SiteLanguage::read_local_raw(pool).await?;
|
||||||
|
}
|
||||||
|
let lang_ids = convert_update_languages(conn, language_ids).await?;
|
||||||
|
|
||||||
|
// No need to update if languages are unchanged
|
||||||
|
let current = CommunityLanguage::read_raw(conn, for_community_id).await?;
|
||||||
|
if current == lang_ids {
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
conn
|
conn
|
||||||
|
@ -258,7 +293,7 @@ impl CommunityLanguage {
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for l in language_ids {
|
for l in lang_ids {
|
||||||
let form = CommunityLanguageForm {
|
let form = CommunityLanguageForm {
|
||||||
community_id: for_community_id,
|
community_id: for_community_id,
|
||||||
language_id: l,
|
language_id: l,
|
||||||
|
@ -464,7 +499,7 @@ mod tests {
|
||||||
let pool = &build_db_pool_for_tests().await;
|
let pool = &build_db_pool_for_tests().await;
|
||||||
|
|
||||||
let (site, instance) = create_test_site(pool).await;
|
let (site, instance) = create_test_site(pool).await;
|
||||||
let site_languages1 = SiteLanguage::read_local(pool).await.unwrap();
|
let site_languages1 = SiteLanguage::read_local_raw(pool).await.unwrap();
|
||||||
// site is created with all languages
|
// site is created with all languages
|
||||||
assert_eq!(184, site_languages1.len());
|
assert_eq!(184, site_languages1.len());
|
||||||
|
|
||||||
|
@ -473,7 +508,7 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let site_languages2 = SiteLanguage::read_local(pool).await.unwrap();
|
let site_languages2 = SiteLanguage::read_local_raw(pool).await.unwrap();
|
||||||
// after update, site only has new languages
|
// after update, site only has new languages
|
||||||
assert_eq!(test_langs, site_languages2);
|
assert_eq!(test_langs, site_languages2);
|
||||||
|
|
||||||
|
@ -539,7 +574,7 @@ mod tests {
|
||||||
assert_eq!(test_langs, read_site_langs);
|
assert_eq!(test_langs, read_site_langs);
|
||||||
|
|
||||||
// Test the local ones are the same
|
// Test the local ones are the same
|
||||||
let read_local_site_langs = SiteLanguage::read_local(pool).await.unwrap();
|
let read_local_site_langs = SiteLanguage::read_local_raw(pool).await.unwrap();
|
||||||
assert_eq!(test_langs, read_local_site_langs);
|
assert_eq!(test_langs, read_local_site_langs);
|
||||||
|
|
||||||
let community_form = CommunityInsertForm::builder()
|
let community_form = CommunityInsertForm::builder()
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::{
|
||||||
newtypes::{CommunityId, DbUrl, PersonId},
|
newtypes::{CommunityId, DbUrl, PersonId},
|
||||||
schema::community::dsl::{actor_id, community, deleted, local, name, removed},
|
schema::community::dsl::{actor_id, community, deleted, local, name, removed},
|
||||||
source::{
|
source::{
|
||||||
actor_language::{CommunityLanguage, SiteLanguage},
|
actor_language::CommunityLanguage,
|
||||||
community::{
|
community::{
|
||||||
Community,
|
Community,
|
||||||
CommunityFollower,
|
CommunityFollower,
|
||||||
|
@ -41,6 +41,12 @@ impl Crud for Community {
|
||||||
|
|
||||||
async fn create(pool: &DbPool, form: &Self::InsertForm) -> Result<Self, Error> {
|
async fn create(pool: &DbPool, form: &Self::InsertForm) -> Result<Self, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let is_new_community = match &form.actor_id {
|
||||||
|
Some(id) => Community::read_from_apub_id(pool, id).await?.is_none(),
|
||||||
|
None => true,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Can't do separate insert/update commands because InsertForm/UpdateForm aren't convertible
|
||||||
let community_ = insert_into(community)
|
let community_ = insert_into(community)
|
||||||
.values(form)
|
.values(form)
|
||||||
.on_conflict(actor_id)
|
.on_conflict(actor_id)
|
||||||
|
@ -49,12 +55,8 @@ impl Crud for Community {
|
||||||
.get_result::<Self>(conn)
|
.get_result::<Self>(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let site_languages = SiteLanguage::read_local(pool).await;
|
// Initialize languages for new community
|
||||||
if let Ok(langs) = site_languages {
|
if is_new_community {
|
||||||
// if site exists, init user with site languages
|
|
||||||
CommunityLanguage::update(pool, langs, community_.id).await?;
|
|
||||||
} else {
|
|
||||||
// otherwise, init with all languages (this only happens during tests)
|
|
||||||
CommunityLanguage::update(pool, vec![], community_.id).await?;
|
CommunityLanguage::update(pool, vec![], community_.id).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,7 @@ impl Crud for LocalUser {
|
||||||
.await
|
.await
|
||||||
.expect("couldnt create local user");
|
.expect("couldnt create local user");
|
||||||
|
|
||||||
let site_languages = SiteLanguage::read_local(pool).await;
|
let site_languages = SiteLanguage::read_local_raw(pool).await;
|
||||||
if let Ok(langs) = site_languages {
|
if let Ok(langs) = site_languages {
|
||||||
// if site exists, init user with site languages
|
// if site exists, init user with site languages
|
||||||
LocalUserLanguage::update(pool, langs, local_user_.id).await?;
|
LocalUserLanguage::update(pool, langs, local_user_.id).await?;
|
||||||
|
|
|
@ -25,6 +25,12 @@ impl Crud for Site {
|
||||||
|
|
||||||
async fn create(pool: &DbPool, form: &Self::InsertForm) -> Result<Self, Error> {
|
async fn create(pool: &DbPool, form: &Self::InsertForm) -> Result<Self, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
|
let is_new_site = match &form.actor_id {
|
||||||
|
Some(id_) => Site::read_from_apub_id(pool, id_).await?.is_none(),
|
||||||
|
None => true,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Can't do separate insert/update commands because InsertForm/UpdateForm aren't convertible
|
||||||
let site_ = insert_into(site)
|
let site_ = insert_into(site)
|
||||||
.values(form)
|
.values(form)
|
||||||
.on_conflict(actor_id)
|
.on_conflict(actor_id)
|
||||||
|
@ -33,8 +39,11 @@ impl Crud for Site {
|
||||||
.get_result::<Self>(conn)
|
.get_result::<Self>(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// initialize languages if site is newly created
|
||||||
|
if is_new_site {
|
||||||
// initialize with all languages
|
// initialize with all languages
|
||||||
SiteLanguage::update(pool, vec![], &site_).await?;
|
SiteLanguage::update(pool, vec![], &site_).await?;
|
||||||
|
}
|
||||||
Ok(site_)
|
Ok(site_)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,9 +66,8 @@ impl Crud for Site {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Site {
|
impl Site {
|
||||||
pub async fn read_from_apub_id(pool: &DbPool, object_id: Url) -> Result<Option<Self>, Error> {
|
pub async fn read_from_apub_id(pool: &DbPool, object_id: &DbUrl) -> Result<Option<Self>, Error> {
|
||||||
let conn = &mut get_conn(pool).await?;
|
let conn = &mut get_conn(pool).await?;
|
||||||
let object_id: DbUrl = object_id.into();
|
|
||||||
Ok(
|
Ok(
|
||||||
site
|
site
|
||||||
.filter(actor_id.eq(object_id))
|
.filter(actor_id.eq(object_id))
|
||||||
|
|
Loading…
Reference in a new issue