Ensure access values are unique

This commit is contained in:
asonix 2023-08-15 21:18:25 -05:00
parent e3b8282f41
commit 1de257bb07
1 changed files with 92 additions and 54 deletions

View File

@ -222,23 +222,36 @@ impl ProxyRepo for SledRepo {
impl AliasAccessRepo for SledRepo { impl AliasAccessRepo for SledRepo {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, alias: Alias) -> Result<(), RepoError> { async fn accessed(&self, alias: Alias) -> Result<(), RepoError> {
let now_string = time::OffsetDateTime::now_utc() let mut value_bytes = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339) .unix_timestamp_nanos()
.map_err(SledError::Format)?; .to_be_bytes()
.to_vec();
value_bytes.extend_from_slice(&alias.to_bytes());
let value_bytes = IVec::from(value_bytes);
let alias_access = self.alias_access.clone(); let alias_access = self.alias_access.clone();
let inverse_alias_access = self.inverse_alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone();
actix_rt::task::spawn_blocking(move || { let res = actix_rt::task::spawn_blocking(move || {
if let Some(old) = alias_access.insert(alias.to_bytes(), now_string.as_bytes())? { (&alias_access, &inverse_alias_access).transaction(
inverse_alias_access.remove(old)?; |(alias_access, inverse_alias_access)| {
} if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? {
inverse_alias_access.insert(now_string, alias.to_bytes())?; inverse_alias_access.remove(old)?;
Ok(()) as Result<(), SledError> }
inverse_alias_access.insert(&value_bytes, alias.to_bytes())?;
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)? .map_err(|_| RepoError::Canceled)?;
.map_err(RepoError::from)
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
return Err(RepoError::from(SledError::from(e)));
}
Ok(())
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
@ -246,13 +259,11 @@ impl AliasAccessRepo for SledRepo {
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> { ) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
let time_string = timestamp let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec();
.format(&time::format_description::well_known::Rfc3339)
.map_err(SledError::Format)?;
let iterator = self let iterator = self
.inverse_alias_access .inverse_alias_access
.range(..=time_string) .range(..=time_bytes)
.filter_map(|res| { .filter_map(|res| {
res.map_err(SledError::from) res.map_err(SledError::from)
.map_err(RepoError::from) .map_err(RepoError::from)
@ -268,15 +279,24 @@ impl AliasAccessRepo for SledRepo {
let alias_access = self.alias_access.clone(); let alias_access = self.alias_access.clone();
let inverse_alias_access = self.inverse_alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone();
actix_rt::task::spawn_blocking(move || { let res = actix_rt::task::spawn_blocking(move || {
if let Some(old) = alias_access.remove(alias.to_bytes())? { (&alias_access, &inverse_alias_access).transaction(
inverse_alias_access.remove(old)?; |(alias_access, inverse_alias_access)| {
} if let Some(old) = alias_access.remove(alias.to_bytes())? {
Ok(()) as Result<(), SledError> inverse_alias_access.remove(old)?;
}
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)? .map_err(|_| RepoError::Canceled)?;
.map_err(RepoError::from)
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
return Err(RepoError::from(SledError::from(e)));
}
Ok(())
} }
} }
@ -285,25 +305,38 @@ impl VariantAccessRepo for SledRepo {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes(); let hash = hash.to_bytes();
let key = variant_access_key(&hash, &variant); let key = IVec::from(variant_access_key(&hash, &variant));
let now_string = time::OffsetDateTime::now_utc() let mut value_bytes = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339) .unix_timestamp_nanos()
.map_err(SledError::Format)?; .to_be_bytes()
.to_vec();
value_bytes.extend_from_slice(&key);
let value_bytes = IVec::from(value_bytes);
let variant_access = self.variant_access.clone(); let variant_access = self.variant_access.clone();
let inverse_variant_access = self.inverse_variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone();
actix_rt::task::spawn_blocking(move || { let res = actix_rt::task::spawn_blocking(move || {
if let Some(old) = variant_access.insert(&key, now_string.as_bytes())? { (&variant_access, &inverse_variant_access).transaction(
inverse_variant_access.remove(old)?; |(variant_access, inverse_variant_access)| {
} if let Some(old) = variant_access.insert(&key, &value_bytes)? {
inverse_variant_access.insert(now_string, key)?; inverse_variant_access.remove(old)?;
Ok(()) as Result<(), SledError> }
inverse_variant_access.insert(&value_bytes, &key)?;
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)? .map_err(|_| RepoError::Canceled)?;
.map_err(RepoError::from)
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
return Err(RepoError::from(SledError::from(e)));
}
Ok(())
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
@ -321,20 +354,15 @@ impl VariantAccessRepo for SledRepo {
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> { ) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
let time_string = timestamp let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec();
.format(&time::format_description::well_known::Rfc3339)
.map_err(SledError::Format)?;
let iterator = self let iterator = self.inverse_variant_access.range(..=time_bytes).map(|res| {
.inverse_variant_access let (_, bytes) = res.map_err(SledError::from)?;
.range(..=time_string)
.map(|res| {
let (_, bytes) = res.map_err(SledError::from)?;
parse_variant_access_key(bytes) parse_variant_access_key(bytes)
.map_err(SledError::from) .map_err(SledError::from)
.map_err(RepoError::from) .map_err(RepoError::from)
}); });
Ok(Box::pin(from_iterator(iterator, 8))) Ok(Box::pin(from_iterator(iterator, 8)))
} }
@ -342,20 +370,30 @@ impl VariantAccessRepo for SledRepo {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes(); let hash = hash.to_bytes();
let key = variant_access_key(&hash, &variant); let key = IVec::from(variant_access_key(&hash, &variant));
let variant_access = self.variant_access.clone(); let variant_access = self.variant_access.clone();
let inverse_variant_access = self.inverse_variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone();
actix_rt::task::spawn_blocking(move || { let res = actix_rt::task::spawn_blocking(move || {
if let Some(old) = variant_access.remove(key)? { (&variant_access, &inverse_variant_access).transaction(
inverse_variant_access.remove(old)?; |(variant_access, inverse_variant_access)| {
} if let Some(old) = variant_access.remove(&key)? {
Ok(()) as Result<(), SledError> inverse_variant_access.remove(old)?;
}
Ok(())
},
)
}) })
.await .await
.map_err(|_| RepoError::Canceled)? .map_err(|_| RepoError::Canceled)?;
.map_err(RepoError::from)
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
return Err(RepoError::from(SledError::from(e)));
}
Ok(())
} }
} }