Merge pull request 'Prevent postgres errors by explicitly allowing conflicts' (#72) from asonix/on-conflict-do-nothing into main

Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/72
This commit is contained in:
asonix 2024-10-17 02:23:36 +00:00
commit 46b12895d5
1 changed files with 54 additions and 69 deletions

View File

@ -428,20 +428,19 @@ impl PostgresRepo {
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
let res = diesel::insert_into(keyed_notifications) let inserted = diesel::insert_into(keyed_notifications)
.values(key.eq(input_key)) .values(key.eq(input_key))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { if inserted == 1 {
Ok(_) => Ok(Ok(())), Ok(Ok(()))
Err(diesel::result::Error::DatabaseError( } else {
diesel::result::DatabaseErrorKind::UniqueViolation, Ok(Err(AlreadyInserted))
_,
)) => Ok(Err(AlreadyInserted)),
Err(e) => Err(PostgresError::Diesel(e)),
} }
} }
@ -947,25 +946,24 @@ impl HashRepo for PostgresRepo {
let timestamp = to_primitive(timestamp); let timestamp = to_primitive(timestamp);
let res = diesel::insert_into(hashes) let inserted = diesel::insert_into(hashes)
.values(( .values((
hash.eq(&input_hash), hash.eq(&input_hash),
identifier.eq(input_identifier.as_ref()), identifier.eq(input_identifier.as_ref()),
created_at.eq(&timestamp), created_at.eq(&timestamp),
)) ))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_HASHES_CREATE_HASH) .with_metrics(crate::init_metrics::POSTGRES_HASHES_CREATE_HASH)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { if inserted == 1 {
Ok(_) => Ok(Ok(())), Ok(Ok(()))
Err(diesel::result::Error::DatabaseError( } else {
diesel::result::DatabaseErrorKind::UniqueViolation, Ok(Err(HashAlreadyExists))
_,
)) => Ok(Err(HashAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
} }
} }
@ -1195,25 +1193,24 @@ impl VariantRepo for PostgresRepo {
let mut conn = self.get_connection().await?; let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants) let inserted = diesel::insert_into(variants)
.values(( .values((
hash.eq(&input_hash), hash.eq(&input_hash),
variant.eq(&input_variant), variant.eq(&input_variant),
identifier.eq(input_identifier.to_string()), identifier.eq(input_identifier.to_string()),
)) ))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { if inserted == 1 {
Ok(_) => Ok(Ok(())), Ok(Ok(()))
Err(diesel::result::Error::DatabaseError( } else {
diesel::result::DatabaseErrorKind::UniqueViolation, Ok(Err(VariantAlreadyExists))
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
} }
} }
@ -1302,25 +1299,24 @@ impl AliasRepo for PostgresRepo {
let mut conn = self.get_connection().await?; let mut conn = self.get_connection().await?;
let res = diesel::insert_into(aliases) let inserted = diesel::insert_into(aliases)
.values(( .values((
alias.eq(input_alias), alias.eq(input_alias),
hash.eq(&input_hash), hash.eq(&input_hash),
token.eq(delete_token), token.eq(delete_token),
)) ))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_ALIASES_CREATE) .with_metrics(crate::init_metrics::POSTGRES_ALIASES_CREATE)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { if inserted == 1 {
Ok(_) => Ok(Ok(())), Ok(Ok(()))
Err(diesel::result::Error::DatabaseError( } else {
diesel::result::DatabaseErrorKind::UniqueViolation, Ok(Err(AliasAlreadyExists))
_,
)) => Ok(Err(AliasAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
} }
} }
@ -1467,22 +1463,17 @@ impl DetailsRepo for PostgresRepo {
let value = let value =
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
let res = diesel::insert_into(details) diesel::insert_into(details)
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) .values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE) .with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { Ok(())
Ok(_)
| Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(()),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
@ -1560,34 +1551,29 @@ impl QueueRepo for PostgresRepo {
let mut conn = self.get_connection().await?; let mut conn = self.get_connection().await?;
let res = diesel::insert_into(job_queue) let job_id = diesel::insert_into(job_queue)
.values(( .values((
queue.eq(queue_name), queue.eq(queue_name),
job.eq(job_json), job.eq(job_json),
unique_key.eq(in_unique_key), unique_key.eq(in_unique_key),
)) ))
.returning(id) .returning(id)
.on_conflict_do_nothing()
.get_result::<Uuid>(&mut conn) .get_result::<Uuid>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_PUSH) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_PUSH)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map(JobId) .optional()
.map(Some); .map_err(PostgresError::Diesel)?
.map(JobId);
match res { if job_id.is_some() {
Ok(job_id) => {
guard.disarm(); guard.disarm();
}
Ok(job_id) Ok(job_id)
} }
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(None),
Err(e) => Err(RepoError::from(PostgresError::Diesel(e))),
}
}
#[tracing::instrument(level = "debug", skip_all, fields(job_id))] #[tracing::instrument(level = "debug", skip_all, fields(job_id))]
async fn pop( async fn pop(
@ -1893,21 +1879,20 @@ impl ProxyRepo for PostgresRepo {
let mut conn = self.get_connection().await?; let mut conn = self.get_connection().await?;
let res = diesel::insert_into(proxies) let inserted = diesel::insert_into(proxies)
.values((url.eq(input_url.as_str()), alias.eq(&input_alias))) .values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
.on_conflict_do_nothing()
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL) .with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)?; .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res { if inserted == 1 {
Ok(_) => Ok(Ok(())), Ok(Ok(()))
Err(diesel::result::Error::DatabaseError( } else {
diesel::result::DatabaseErrorKind::UniqueViolation, Ok(Err(ProxyAlreadyExists))
_,
)) => Ok(Err(ProxyAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
} }
} }