Merge remote-tracking branch 'upstream/main' into migration-runner

This commit is contained in:
Dull Bananas 2024-05-21 20:20:14 +00:00
commit 5fca4ea918
9 changed files with 71 additions and 108 deletions

View file

@ -3,13 +3,13 @@
# it is expected that this script is called by run-federation-test.sh script. # it is expected that this script is called by run-federation-test.sh script.
set -e set -e
if [ -n "$LEMMY_LOG_LEVEL" ]; if [ -z "$LEMMY_LOG_LEVEL" ];
then then
LEMMY_LOG_LEVEL=warn LEMMY_LOG_LEVEL=info
fi fi
export RUST_BACKTRACE=1 export RUST_BACKTRACE=1
#export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL" export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL"
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min

View file

@ -19,7 +19,7 @@ pub async fn change_password_after_reset(
) -> LemmyResult<Json<SuccessResponse>> { ) -> LemmyResult<Json<SuccessResponse>> {
// Fetch the user_id from the token // Fetch the user_id from the token
let token = data.token.clone(); let token = data.token.clone();
let local_user_id = PasswordResetRequest::read_from_token(&mut context.pool(), &token) let local_user_id = PasswordResetRequest::read_and_delete(&mut context.pool(), &token)
.await? .await?
.ok_or(LemmyErrorType::TokenNotFound)? .ok_or(LemmyErrorType::TokenNotFound)?
.local_user_id; .local_user_id;

View file

@ -6,7 +6,6 @@ use lemmy_api_common::{
utils::send_password_reset_email, utils::send_password_reset_email,
SuccessResponse, SuccessResponse,
}; };
use lemmy_db_schema::source::password_reset_request::PasswordResetRequest;
use lemmy_db_views::structs::{LocalUserView, SiteView}; use lemmy_db_views::structs::{LocalUserView, SiteView};
use lemmy_utils::error::{LemmyErrorType, LemmyResult}; use lemmy_utils::error::{LemmyErrorType, LemmyResult};
@ -21,15 +20,6 @@ pub async fn reset_password(
.await? .await?
.ok_or(LemmyErrorType::IncorrectLogin)?; .ok_or(LemmyErrorType::IncorrectLogin)?;
// Check for too many attempts (to limit potential abuse)
let recent_resets_count = PasswordResetRequest::get_recent_password_resets_count(
&mut context.pool(),
local_user_view.local_user.id,
)
.await?;
if recent_resets_count >= 3 {
Err(LemmyErrorType::PasswordResetLimitReached)?
}
let site_view = SiteView::read_local(&mut context.pool()) let site_view = SiteView::read_local(&mut context.pool())
.await? .await?
.ok_or(LemmyErrorType::LocalSiteNotSetup)?; .ok_or(LemmyErrorType::LocalSiteNotSetup)?;

View file

@ -440,7 +440,7 @@ pub async fn send_password_reset_email(
// Insert the row after successful send, to avoid using daily reset limit while // Insert the row after successful send, to avoid using daily reset limit while
// email sending is broken. // email sending is broken.
let local_user_id = user.local_user.id; let local_user_id = user.local_user.id;
PasswordResetRequest::create_token(pool, local_user_id, token.clone()).await?; PasswordResetRequest::create(pool, local_user_id, token.clone()).await?;
Ok(()) Ok(())
} }

View file

@ -1,49 +1,22 @@
use crate::{ use crate::{
diesel::OptionalExtension, diesel::OptionalExtension,
newtypes::LocalUserId, newtypes::LocalUserId,
schema::password_reset_request::dsl::{local_user_id, password_reset_request, published, token}, schema::password_reset_request::dsl::{password_reset_request, published, token},
source::password_reset_request::{PasswordResetRequest, PasswordResetRequestForm}, source::password_reset_request::{PasswordResetRequest, PasswordResetRequestForm},
traits::Crud,
utils::{get_conn, DbPool}, utils::{get_conn, DbPool},
}; };
use diesel::{ use diesel::{
delete,
dsl::{insert_into, now, IntervalDsl}, dsl::{insert_into, now, IntervalDsl},
result::Error, result::Error,
sql_types::Timestamptz, sql_types::Timestamptz,
ExpressionMethods, ExpressionMethods,
IntoSql, IntoSql,
QueryDsl,
}; };
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
#[async_trait]
impl Crud for PasswordResetRequest {
type InsertForm = PasswordResetRequestForm;
type UpdateForm = PasswordResetRequestForm;
type IdType = i32;
async fn create(pool: &mut DbPool<'_>, form: &PasswordResetRequestForm) -> Result<Self, Error> {
let conn = &mut get_conn(pool).await?;
insert_into(password_reset_request)
.values(form)
.get_result::<Self>(conn)
.await
}
async fn update(
pool: &mut DbPool<'_>,
password_reset_request_id: i32,
form: &PasswordResetRequestForm,
) -> Result<Self, Error> {
let conn = &mut get_conn(pool).await?;
diesel::update(password_reset_request.find(password_reset_request_id))
.set(form)
.get_result::<Self>(conn)
.await
}
}
impl PasswordResetRequest { impl PasswordResetRequest {
pub async fn create_token( pub async fn create(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
from_local_user_id: LocalUserId, from_local_user_id: LocalUserId,
token_: String, token_: String,
@ -52,30 +25,21 @@ impl PasswordResetRequest {
local_user_id: from_local_user_id, local_user_id: from_local_user_id,
token: token_.into(), token: token_.into(),
}; };
Self::create(pool, &form).await
}
pub async fn read_from_token(pool: &mut DbPool<'_>, token_: &str) -> Result<Option<Self>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
password_reset_request insert_into(password_reset_request)
.values(form)
.get_result::<Self>(conn)
.await
}
pub async fn read_and_delete(pool: &mut DbPool<'_>, token_: &str) -> Result<Option<Self>, Error> {
let conn = &mut get_conn(pool).await?;
delete(password_reset_request)
.filter(token.eq(token_)) .filter(token.eq(token_))
.filter(published.gt(now.into_sql::<Timestamptz>() - 1.days())) .filter(published.gt(now.into_sql::<Timestamptz>() - 1.days()))
.first(conn)
.await
.optional()
}
pub async fn get_recent_password_resets_count(
pool: &mut DbPool<'_>,
user_id: LocalUserId,
) -> Result<i64, Error> {
let conn = &mut get_conn(pool).await?;
password_reset_request
.filter(local_user_id.eq(user_id))
.filter(published.gt(now.into_sql::<Timestamptz>() - 1.days()))
.count()
.get_result(conn) .get_result(conn)
.await .await
.optional()
} }
} }
@ -94,62 +58,64 @@ mod tests {
traits::Crud, traits::Crud,
utils::build_db_pool_for_tests, utils::build_db_pool_for_tests,
}; };
use lemmy_utils::error::LemmyResult;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use serial_test::serial; use serial_test::serial;
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn test_crud() { async fn test_password_reset() -> LemmyResult<()> {
let pool = &build_db_pool_for_tests().await; let pool = &build_db_pool_for_tests().await;
let pool = &mut pool.into(); let pool = &mut pool.into();
let inserted_instance = Instance::read_or_create(pool, "my_domain.tld".to_string()) // Setup
.await let inserted_instance = Instance::read_or_create(pool, "my_domain.tld".to_string()).await?;
.unwrap();
let new_person = PersonInsertForm::builder() let new_person = PersonInsertForm::builder()
.name("thommy prw".into()) .name("thommy prw".into())
.public_key("pubkey".to_string()) .public_key("pubkey".to_string())
.instance_id(inserted_instance.id) .instance_id(inserted_instance.id)
.build(); .build();
let inserted_person = Person::create(pool, &new_person).await?;
let inserted_person = Person::create(pool, &new_person).await.unwrap();
let new_local_user = LocalUserInsertForm::builder() let new_local_user = LocalUserInsertForm::builder()
.person_id(inserted_person.id) .person_id(inserted_person.id)
.password_encrypted("pass".to_string()) .password_encrypted("pass".to_string())
.build(); .build();
let inserted_local_user = LocalUser::create(pool, &new_local_user, vec![]).await?;
let inserted_local_user = LocalUser::create(pool, &new_local_user, vec![]) // Create password reset token
.await
.unwrap();
let token = "nope"; let token = "nope";
let inserted_password_reset_request = let inserted_password_reset_request =
PasswordResetRequest::create_token(pool, inserted_local_user.id, token.to_string()) PasswordResetRequest::create(pool, inserted_local_user.id, token.to_string()).await?;
.await
.unwrap();
let expected_password_reset_request = PasswordResetRequest { // Read it and verify
id: inserted_password_reset_request.id, let read_password_reset_request = PasswordResetRequest::read_and_delete(pool, token)
local_user_id: inserted_local_user.id, .await?
token: token.to_string().into(),
published: inserted_password_reset_request.published,
};
let read_password_reset_request = PasswordResetRequest::read_from_token(pool, token)
.await
.unwrap()
.unwrap(); .unwrap();
let num_deleted = Person::delete(pool, inserted_person.id).await.unwrap();
Instance::delete(pool, inserted_instance.id).await.unwrap();
assert_eq!(expected_password_reset_request, read_password_reset_request);
assert_eq!( assert_eq!(
expected_password_reset_request, inserted_password_reset_request.id,
inserted_password_reset_request read_password_reset_request.id
); );
assert_eq!(
inserted_password_reset_request.local_user_id,
read_password_reset_request.local_user_id
);
assert_eq!(
inserted_password_reset_request.token,
read_password_reset_request.token
);
assert_eq!(
inserted_password_reset_request.published,
read_password_reset_request.published
);
// Cannot reuse same token again
let read_password_reset_request = PasswordResetRequest::read_and_delete(pool, token).await?;
assert!(read_password_reset_request.is_none());
// Cleanup
let num_deleted = Person::delete(pool, inserted_person.id).await?;
Instance::delete(pool, inserted_instance.id).await?;
assert_eq!(1, num_deleted); assert_eq!(1, num_deleted);
Ok(())
} }
} }

View file

@ -13,6 +13,7 @@ use tokio::{
time::sleep, time::sleep,
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::info;
mod util; mod util;
mod worker; mod worker;
@ -44,6 +45,10 @@ async fn start_stop_federation_workers(
let pool2 = &mut DbPool::Pool(&pool); let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1; let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?; let local_domain = federation_config.settings().get_hostname_without_port()?;
info!(
"Starting federation workers for process count {} and index {}",
opts.process_count, process_index
);
loop { loop {
let mut total_count = 0; let mut total_count = 0;
let mut dead_count = 0; let mut dead_count = 0;

View file

@ -34,6 +34,7 @@ use std::{
}; };
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
/// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt) /// Check whether to save state to db every n sends if there's no failures (during failures state is saved after every attempt)
/// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB. /// This determines the batch size for loop_batch. After a batch ends and SAVE_STATE_EVERY_TIME has passed, the federation_queue_state is updated in the DB.
@ -105,6 +106,7 @@ impl InstanceWorker {
&mut self, &mut self,
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
debug!("Starting federation worker for {}", self.instance.domain);
let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative");
self.update_communities(pool).await?; self.update_communities(pool).await?;
@ -176,15 +178,14 @@ impl InstanceWorker {
.await .await
.context("failed reading activity from db")? .context("failed reading activity from db")?
else { else {
tracing::debug!("{}: {:?} does not exist", self.instance.domain, id); debug!("{}: {:?} does not exist", self.instance.domain, id);
self.state.last_successful_id = Some(id); self.state.last_successful_id = Some(id);
continue; continue;
}; };
if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await {
tracing::warn!( warn!(
"sending {} errored internally, skipping activity: {:?}", "sending {} errored internally, skipping activity: {:?}",
ele.0.ap_id, ele.0.ap_id, e
e
); );
} }
if self.stop.is_cancelled() { if self.stop.is_cancelled() {
@ -211,7 +212,7 @@ impl InstanceWorker {
.await .await
.context("failed figuring out inbox urls")?; .context("failed figuring out inbox urls")?;
if inbox_urls.is_empty() { if inbox_urls.is_empty() {
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); trace!("{}: {:?} no inboxes", self.instance.domain, activity.id);
self.state.last_successful_id = Some(activity.id); self.state.last_successful_id = Some(activity.id);
self.state.last_successful_published_time = Some(activity.published); self.state.last_successful_published_time = Some(activity.published);
return Ok(()); return Ok(());
@ -229,16 +230,14 @@ impl InstanceWorker {
SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?; SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &self.context).await?;
for task in requests { for task in requests {
// usually only one due to shared inbox // usually only one due to shared inbox
tracing::debug!("sending out {}", task); trace!("sending out {}", task);
while let Err(e) = task.sign_and_send(&self.context).await { while let Err(e) = task.sign_and_send(&self.context).await {
self.state.fail_count += 1; self.state.fail_count += 1;
self.state.last_retry = Some(Utc::now()); self.state.last_retry = Some(Utc::now());
let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count); let retry_delay: Duration = federate_retry_sleep_duration(self.state.fail_count);
tracing::info!( info!(
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})", "{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
self.instance.domain, self.instance.domain, activity.id, self.state.fail_count
activity.id,
self.state.fail_count
); );
self.save_and_send_state(pool).await?; self.save_and_send_state(pool).await?;
tokio::select! { tokio::select! {

View file

@ -161,7 +161,6 @@ pub enum LemmyErrorType {
PermissiveRegex, PermissiveRegex,
InvalidRegex, InvalidRegex,
CaptchaIncorrect, CaptchaIncorrect,
PasswordResetLimitReached,
CouldntCreateAudioCaptcha, CouldntCreateAudioCaptcha,
InvalidUrlScheme, InvalidUrlScheme,
CouldntSendWebmention, CouldntSendWebmention,

View file

@ -278,6 +278,11 @@ pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
.wrap(rate_limit.register()) .wrap(rate_limit.register())
.route(web::post().to(login)), .route(web::post().to(login)),
) )
.service(
web::resource("/user/password_reset")
.wrap(rate_limit.register())
.route(web::post().to(reset_password)),
)
.service( .service(
// Handle captcha separately // Handle captcha separately
web::resource("/user/get_captcha") web::resource("/user/get_captcha")
@ -318,7 +323,6 @@ pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
// TODO Account actions. I don't like that they're in /user maybe /accounts // TODO Account actions. I don't like that they're in /user maybe /accounts
.route("/logout", web::post().to(logout)) .route("/logout", web::post().to(logout))
.route("/delete_account", web::post().to(delete_account)) .route("/delete_account", web::post().to(delete_account))
.route("/password_reset", web::post().to(reset_password))
.route( .route(
"/password_change", "/password_change",
web::post().to(change_password_after_reset), web::post().to(change_password_after_reset),