From 1de257bb07f6bdef230dd03a0cdce09e91d6d05f Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 15 Aug 2023 21:18:25 -0500 Subject: [PATCH] Ensure access values are unique --- src/repo/sled.rs | 146 +++++++++++++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 54 deletions(-) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 99d8630..b19c338 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -222,23 +222,36 @@ impl ProxyRepo for SledRepo { impl AliasAccessRepo for SledRepo { #[tracing::instrument(level = "debug", skip(self))] async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { - let now_string = time::OffsetDateTime::now_utc() - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; + let mut value_bytes = time::OffsetDateTime::now_utc() + .unix_timestamp_nanos() + .to_be_bytes() + .to_vec(); + value_bytes.extend_from_slice(&alias.to_bytes()); + let value_bytes = IVec::from(value_bytes); let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - actix_rt::task::spawn_blocking(move || { - if let Some(old) = alias_access.insert(alias.to_bytes(), now_string.as_bytes())? { - inverse_alias_access.remove(old)?; - } - inverse_alias_access.insert(now_string, alias.to_bytes())?; - Ok(()) as Result<(), SledError> + let res = actix_rt::task::spawn_blocking(move || { + (&alias_access, &inverse_alias_access).transaction( + |(alias_access, inverse_alias_access)| { + if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? { + inverse_alias_access.remove(old)?; + } + inverse_alias_access.insert(&value_bytes, alias.to_bytes())?; + + Ok(()) + }, + ) }) .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } + + Ok(()) } #[tracing::instrument(level = "debug", skip(self))] @@ -246,13 +259,11 @@ impl AliasAccessRepo for SledRepo { &self, timestamp: time::OffsetDateTime, ) -> Result>, RepoError> { - let time_string = timestamp - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; + let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec(); let iterator = self .inverse_alias_access - .range(..=time_string) + .range(..=time_bytes) .filter_map(|res| { res.map_err(SledError::from) .map_err(RepoError::from) @@ -268,15 +279,24 @@ impl AliasAccessRepo for SledRepo { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - actix_rt::task::spawn_blocking(move || { - if let Some(old) = alias_access.remove(alias.to_bytes())? { - inverse_alias_access.remove(old)?; - } - Ok(()) as Result<(), SledError> + let res = actix_rt::task::spawn_blocking(move || { + (&alias_access, &inverse_alias_access).transaction( + |(alias_access, inverse_alias_access)| { + if let Some(old) = alias_access.remove(alias.to_bytes())? { + inverse_alias_access.remove(old)?; + } + Ok(()) + }, + ) }) .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } + + Ok(()) } } @@ -285,25 +305,38 @@ impl VariantAccessRepo for SledRepo { #[tracing::instrument(level = "debug", skip(self))] async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let hash = hash.to_bytes(); - let key = variant_access_key(&hash, &variant); + let key = IVec::from(variant_access_key(&hash, &variant)); - let now_string = time::OffsetDateTime::now_utc() - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; + let mut value_bytes = time::OffsetDateTime::now_utc() + .unix_timestamp_nanos() + .to_be_bytes() + .to_vec(); + value_bytes.extend_from_slice(&key); + let value_bytes = IVec::from(value_bytes); let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - actix_rt::task::spawn_blocking(move || { - if let Some(old) = variant_access.insert(&key, now_string.as_bytes())? { - inverse_variant_access.remove(old)?; - } - inverse_variant_access.insert(now_string, key)?; - Ok(()) as Result<(), SledError> + let res = actix_rt::task::spawn_blocking(move || { + (&variant_access, &inverse_variant_access).transaction( + |(variant_access, inverse_variant_access)| { + if let Some(old) = variant_access.insert(&key, &value_bytes)? { + inverse_variant_access.remove(old)?; + } + inverse_variant_access.insert(&value_bytes, &key)?; + + Ok(()) + }, + ) }) .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } + + Ok(()) } #[tracing::instrument(level = "debug", skip(self))] @@ -321,20 +354,15 @@ impl VariantAccessRepo for SledRepo { &self, timestamp: time::OffsetDateTime, ) -> Result>, RepoError> { - let time_string = timestamp - .format(&time::format_description::well_known::Rfc3339) - .map_err(SledError::Format)?; + let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec(); - let iterator = self - .inverse_variant_access - .range(..=time_string) - .map(|res| { - let (_, bytes) = res.map_err(SledError::from)?; + let iterator = self.inverse_variant_access.range(..=time_bytes).map(|res| { + let (_, bytes) = res.map_err(SledError::from)?; - parse_variant_access_key(bytes) - .map_err(SledError::from) - .map_err(RepoError::from) - }); + parse_variant_access_key(bytes) + .map_err(SledError::from) + .map_err(RepoError::from) + }); Ok(Box::pin(from_iterator(iterator, 8))) } @@ -342,20 +370,30 @@ impl VariantAccessRepo for SledRepo { #[tracing::instrument(level = "debug", skip(self))] async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let hash = hash.to_bytes(); - let key = variant_access_key(&hash, &variant); + let key = IVec::from(variant_access_key(&hash, &variant)); let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - actix_rt::task::spawn_blocking(move || { - if let Some(old) = variant_access.remove(key)? { - inverse_variant_access.remove(old)?; - } - Ok(()) as Result<(), SledError> + let res = actix_rt::task::spawn_blocking(move || { + (&variant_access, &inverse_variant_access).transaction( + |(variant_access, inverse_variant_access)| { + if let Some(old) = variant_access.remove(&key)? { + inverse_variant_access.remove(old)?; + } + + Ok(()) + }, + ) }) .await - .map_err(|_| RepoError::Canceled)? - .map_err(RepoError::from) + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } + + Ok(()) } }