diff --git a/src/repo.rs b/src/repo.rs index 2510715..dd986b7 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -86,19 +86,13 @@ pub(crate) trait FullRepo: } } - #[tracing::instrument] - async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { - let hash = self.hash(alias).await?; - CachedRepo::create(self, hash).await - } - #[tracing::instrument] async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { - let hash = self.hash(alias).await?; - let hashes = CachedRepo::update(self, hash).await?; + let aliases = CachedRepo::update(self, alias).await?; - for hash in hashes { - crate::queue::cleanup_hash(self, hash).await?; + for alias in aliases { + let token = self.delete_token(&alias).await?; + crate::queue::cleanup_alias(self, alias, token).await?; } Ok(()) @@ -111,9 +105,9 @@ pub(crate) trait BaseRepo { #[async_trait::async_trait(?Send)] pub(crate) trait CachedRepo: BaseRepo { - async fn create(&self, hash: Self::Bytes) -> Result<(), Error>; + async fn mark_cached(&self, alias: &Alias) -> Result<(), Error>; - async fn update(&self, hash: Self::Bytes) -> Result, Error>; + async fn update(&self, alias: &Alias) -> Result, Error>; } #[async_trait::async_trait(?Send)] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 5034625..5709f7d 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -11,21 +11,25 @@ use crate::{ use futures_util::Stream; use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, sync::{Arc, RwLock}, }; use tokio::sync::Notify; +mod bucket; mod datetime; +use bucket::Bucket; use datetime::DateTime; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - actix_rt::task::spawn_blocking(move || $expr) + let span = tracing::Span::current(); + + actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr)) .await .map_err(SledError::from)?? }}; @@ -132,129 +136,99 @@ impl From for UploadResult { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -struct Bucket { - // each Vec represents a unique image hash - inner: HashSet>, +fn insert_cache_inverse( + cache_inverse: &Tree, + now_bytes: &[u8], + alias_bytes: &[u8], +) -> Result<(), Error> { + let mut old = cache_inverse.get(now_bytes)?; + + loop { + // unsure of whether to bail on deserialize error or fail with empty bucket + let mut bucket = old + .as_ref() + .and_then(|old| serde_cbor::from_slice::(old).ok()) + .unwrap_or_else(Bucket::empty); + + bucket.insert(alias_bytes.to_vec()); + + tracing::info!("Inserting new {:?}", bucket); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + let new = Some(bucket_bytes); + + let res = cache_inverse.compare_and_swap(now_bytes, old, new)?; + + if let Err(CompareAndSwapError { current, .. }) = res { + old = current; + } else { + break; + } + } + + Ok(()) } #[async_trait::async_trait(?Send)] impl CachedRepo for SledRepo { - #[tracing::instrument(skip(hash))] - async fn create(&self, hash: Self::Bytes) -> Result<(), Error> { + #[tracing::instrument] + async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { let now = DateTime::now(); - let bytes = serde_json::to_vec(&now)?; + let now_bytes = serde_json::to_vec(&now)?; + + let alias_bytes = alias.to_bytes(); let cache_inverse = self.cache_inverse.clone(); b!(self.cache, { - cache.insert(hash.clone(), bytes.clone())?; + cache.insert(&alias_bytes, now_bytes.clone())?; - let mut old = cache_inverse.get(bytes.clone())?; - loop { - let mut bucket = if let Some(old) = old.as_ref() { - // unsure of whether to bail on deserialize error or fail with empty bucket - if let Ok(bucket) = serde_cbor::from_slice::(old) { - bucket - } else { - Bucket { - inner: HashSet::new(), - } - } - } else { - Bucket { - inner: HashSet::new(), - } - }; - - bucket.inner.insert(hash.as_ref().to_vec()); - tracing::info!("Inserting new {:?}", bucket); - let bucket_bytes = serde_cbor::to_vec(&bucket)?; - let new = Some(bucket_bytes); - - let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?; - - if let Err(CompareAndSwapError { current, .. }) = res { - old = current; - } else { - break; - } - } - - Ok(()) as Result<(), Error> + insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes) }); Ok(()) } - #[tracing::instrument(skip(hash))] - async fn update(&self, hash: Self::Bytes) -> Result, Error> { + #[tracing::instrument] + async fn update(&self, alias: &Alias) -> Result, Error> { let now = DateTime::now(); let now_bytes = serde_json::to_vec(&now)?; let to_clean = now.min_cache_date(); let to_clean_bytes = serde_json::to_vec(&to_clean)?; + let alias_bytes = alias.to_bytes(); + let cache_inverse = self.cache_inverse.clone(); - let hashes = b!(self.cache, { + let aliases = b!(self.cache, { let previous_datetime_opt = cache - .fetch_and_update(hash.clone(), |previous_datetime_opt| { + .fetch_and_update(&alias_bytes, |previous_datetime_opt| { previous_datetime_opt.map(|_| now_bytes.clone()) })?; if let Some(previous_datetime_bytes) = previous_datetime_opt { // Insert cached media into new date bucket - let mut old = cache_inverse.get(now_bytes.clone())?; - loop { - let mut bucket = if let Some(bucket_bytes) = old.as_ref() { - if let Ok(bucket) = serde_cbor::from_slice::(bucket_bytes) { - bucket - } else { - Bucket { - inner: HashSet::new(), - } - } - } else { - Bucket { - inner: HashSet::new(), - } - }; - - bucket.inner.insert(hash.as_ref().to_vec()); - tracing::info!("Inserting new {:?}", bucket); - let bucket_bytes = serde_cbor::to_vec(&bucket)?; - let new = Some(bucket_bytes); - - if let Err(CompareAndSwapError { current, .. }) = - cache_inverse.compare_and_swap(now_bytes.clone(), old, new)? - { - old = current; - } else { - break; - } - } + insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)?; // Remove cached media from old date bucket - let mut old = cache_inverse.get(previous_datetime_bytes.clone())?; + let mut old = cache_inverse.get(&previous_datetime_bytes)?; loop { - let new = if let Some(bucket_bytes) = old.as_ref() { - if let Ok(mut bucket) = serde_cbor::from_slice::(bucket_bytes) { - bucket.inner.remove(hash.as_ref()); - if bucket.inner.is_empty() { + let new = old + .as_ref() + .and_then(|bucket_bytes| { + let mut bucket = serde_cbor::from_slice::(bucket_bytes).ok()?; + + bucket.remove(&alias_bytes); + + if bucket.is_empty() { tracing::info!("Removed old {:?}", bucket); None } else { tracing::info!("Inserting old {:?}", bucket); - let bucket_bytes = serde_cbor::to_vec(&bucket)?; - Some(bucket_bytes) + Some(serde_cbor::to_vec(&bucket)) } - } else { - None - } - } else { - None - }; + }) + .transpose()?; if let Err(CompareAndSwapError { current, .. }) = - cache_inverse.compare_and_swap(previous_datetime_bytes.clone(), old, new)? + cache_inverse.compare_and_swap(&previous_datetime_bytes, old, new)? { old = current; } else { @@ -263,7 +237,7 @@ impl CachedRepo for SledRepo { } } - let mut hashes: Vec = Vec::new(); + let mut aliases: Vec = Vec::new(); for (date_bytes, bucket_bytes) in cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) @@ -275,10 +249,12 @@ impl CachedRepo for SledRepo { } if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { tracing::info!("Read for deletion: {:?}", bucket); - for hash in bucket.inner { + for alias_bytes in bucket { // Best effort cleanup - let _ = cache.remove(&hash); - hashes.push(hash.into()); + let _ = cache.remove(&alias_bytes); + if let Some(alias) = Alias::from_slice(&alias_bytes) { + aliases.push(alias); + } } } else { tracing::warn!("Invalid bucket"); @@ -296,10 +272,10 @@ impl CachedRepo for SledRepo { } } - Ok(hashes) as Result<_, Error> + Ok(aliases) as Result<_, Error> }); - Ok(hashes) + Ok(aliases) } } diff --git a/src/repo/sled/bucket.rs b/src/repo/sled/bucket.rs new file mode 100644 index 0000000..2fd89c1 --- /dev/null +++ b/src/repo/sled/bucket.rs @@ -0,0 +1,36 @@ +use std::collections::HashSet; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub(super) struct Bucket { + // each Vec represents a unique image hash + inner: HashSet>, +} + +impl Bucket { + pub(super) fn empty() -> Self { + Self { + inner: HashSet::new(), + } + } + + pub(super) fn insert(&mut self, alias_bytes: Vec) { + self.inner.insert(alias_bytes); + } + + pub(super) fn remove(&mut self, alias_bytes: &[u8]) { + self.inner.remove(alias_bytes); + } + + pub(super) fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +impl IntoIterator for Bucket { + type Item = > as IntoIterator>::Item; + type IntoIter = > as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +}