From a4228135b48aefb99c1d28b7444ca7f5043655be Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 16 Oct 2024 18:11:53 -0500 Subject: [PATCH] Prevent postgres errors by explicitly allowing conflicts --- src/repo/postgres.rs | 123 +++++++++++++++++++------------------------ 1 file changed, 54 insertions(+), 69 deletions(-) diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index ddea813..89e952c 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -428,20 +428,19 @@ impl PostgresRepo { .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; - let res = diesel::insert_into(keyed_notifications) + let inserted = diesel::insert_into(keyed_notifications) .values(key.eq(input_key)) + .on_conflict_do_nothing() .execute(&mut conn) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(AlreadyInserted)), - Err(e) => Err(PostgresError::Diesel(e)), + if inserted == 1 { + Ok(Ok(())) + } else { + Ok(Err(AlreadyInserted)) } } @@ -947,25 +946,24 @@ impl HashRepo for PostgresRepo { let timestamp = to_primitive(timestamp); - let res = diesel::insert_into(hashes) + let inserted = diesel::insert_into(hashes) .values(( hash.eq(&input_hash), identifier.eq(input_identifier.as_ref()), created_at.eq(×tamp), )) + .on_conflict_do_nothing() .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_HASHES_CREATE_HASH) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(HashAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), + if inserted == 1 { + Ok(Ok(())) + } else { + Ok(Err(HashAlreadyExists)) } } @@ -1195,25 +1193,24 @@ impl VariantRepo for PostgresRepo { let mut conn = self.get_connection().await?; - let res = diesel::insert_into(variants) + let inserted = diesel::insert_into(variants) .values(( hash.eq(&input_hash), variant.eq(&input_variant), identifier.eq(input_identifier.to_string()), )) + .on_conflict_do_nothing() .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(VariantAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), + if inserted == 1 { + Ok(Ok(())) + } else { + Ok(Err(VariantAlreadyExists)) } } @@ -1302,25 +1299,24 @@ impl AliasRepo for PostgresRepo { let mut conn = self.get_connection().await?; - let res = diesel::insert_into(aliases) + let inserted = diesel::insert_into(aliases) .values(( alias.eq(input_alias), hash.eq(&input_hash), token.eq(delete_token), )) + .on_conflict_do_nothing() .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_ALIASES_CREATE) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(AliasAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), + if inserted == 1 { + Ok(Ok(())) + } else { + Ok(Err(AliasAlreadyExists)) } } @@ -1467,22 +1463,17 @@ impl DetailsRepo for PostgresRepo { let value = serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; - let res = diesel::insert_into(details) + diesel::insert_into(details) .values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) + .on_conflict_do_nothing() .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) - | Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(()), - Err(e) => Err(PostgresError::Diesel(e).into()), - } + Ok(()) } #[tracing::instrument(level = "debug", skip(self))] @@ -1560,33 +1551,28 @@ impl QueueRepo for PostgresRepo { let mut conn = self.get_connection().await?; - let res = diesel::insert_into(job_queue) + let job_id = diesel::insert_into(job_queue) .values(( queue.eq(queue_name), job.eq(job_json), unique_key.eq(in_unique_key), )) .returning(id) + .on_conflict_do_nothing() .get_result::(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_PUSH) .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? - .map(JobId) - .map(Some); + .optional() + .map_err(PostgresError::Diesel)? + .map(JobId); - match res { - Ok(job_id) => { - guard.disarm(); - - Ok(job_id) - } - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(None), - Err(e) => Err(RepoError::from(PostgresError::Diesel(e))), + if job_id.is_some() { + guard.disarm(); } + + Ok(job_id) } #[tracing::instrument(level = "debug", skip_all, fields(job_id))] @@ -1893,21 +1879,20 @@ impl ProxyRepo for PostgresRepo { let mut conn = self.get_connection().await?; - let res = diesel::insert_into(proxies) + let inserted = diesel::insert_into(proxies) .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) + .on_conflict_do_nothing() .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)?; + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(ProxyAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), + if inserted == 1 { + Ok(Ok(())) + } else { + Ok(Err(ProxyAlreadyExists)) } }