From 281ac43dff2167e8197836cfb8231404baebe168 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 25 Feb 2023 11:34:48 -0600 Subject: [PATCH] Remove cache functionality --- README.md | 8 +- src/ingest.rs | 15 +--- src/lib.rs | 30 ++----- src/queue.rs | 3 - src/queue/process.rs | 14 +--- src/repo.rs | 36 +-------- src/repo/sled.rs | 160 +------------------------------------- src/repo/sled/bucket.rs | 36 --------- src/repo/sled/datetime.rs | 58 -------------- 9 files changed, 16 insertions(+), 344 deletions(-) delete mode 100644 src/repo/sled/bucket.rs delete mode 100644 src/repo/sled/datetime.rs diff --git a/README.md b/README.md index 8e6a209..ea8abaf 100644 --- a/README.md +++ b/README.md @@ -91,8 +91,6 @@ Options: Which media filters should be enabled on the `process` endpoint --media-format Enforce uploaded media is transcoded to the provided format [possible values: jpeg, webp, png] - --media-cache-duration - How long, in hours, to keep media ingested through the "cached" endpoint -h, --help Print help information (use `--help` for more detail) ``` @@ -230,15 +228,11 @@ pict-rs offers the following endpoints: "msg": "ok" } ``` -- `GET /image/download?url={url}&backgrounded=(true|false)&ephemeral=(true|false)` Download an image +- `GET /image/download?url={url}&backgrounded=(true|false)` Download an image from a remote server, returning the same JSON payload as the `POST /image` endpoint by default. if `backgrounded` is set to `true`, then the ingest processing will be queued for later and the response json will be the same as the `POST /image/backgrounded` endpoint. - - if `ephemeral` is set to true, the downloaded image will be marked as a "cached" image, and - automatically removed from pict-rs N hours after its last access. The duration is configurable - with the `--media-cache-duration` run flag, or the `[media] cache_duration` toml option. - `GET /image/backgrounded/claim?upload_id={uuid}` Wait for a backgrounded upload to complete, claiming it's result Possible results: - 200 Ok (validation and ingest complete): diff --git a/src/ingest.rs b/src/ingest.rs index 9bd0d63..8b1eb9a 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -48,7 +48,6 @@ pub(crate) async fn ingest( stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, - is_cached: bool, ) -> Result, Error> where R: FullRepo + 'static, @@ -98,9 +97,9 @@ where save_upload(repo, store, &hash, &identifier).await?; if let Some(alias) = declared_alias { - session.add_existing_alias(&hash, alias, is_cached).await? + session.add_existing_alias(&hash, alias).await? } else { - session.create_alias(&hash, input_type, is_cached).await?; + session.create_alias(&hash, input_type).await?; } Ok(session) @@ -168,7 +167,6 @@ where &mut self, hash: &[u8], alias: Alias, - is_cached: bool, ) -> Result<(), Error> { AliasRepo::create(&self.repo, &alias) .await? @@ -179,10 +177,6 @@ where self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; - if is_cached { - self.repo.mark_cached(&alias).await?; - } - Ok(()) } @@ -191,7 +185,6 @@ where &mut self, hash: &[u8], input_type: ValidInputType, - is_cached: bool, ) -> Result<(), Error> { loop { let alias = Alias::generate(input_type.as_ext().to_string()); @@ -202,10 +195,6 @@ where self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; - if is_cached { - self.repo.mark_cached(&alias).await?; - } - return Ok(()); } diff --git a/src/lib.rs b/src/lib.rs index 348e1cf..952648e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,10 +156,8 @@ impl FormData for Upload { let stream = stream.map_err(Error::from); Box::pin( - async move { - ingest::ingest(&**repo, &**store, stream, None, true, false).await - } - .instrument(span), + async move { ingest::ingest(&**repo, &**store, stream, None, true).await } + .instrument(span), ) })), ) @@ -211,7 +209,6 @@ impl FormData for Import { stream, Some(Alias::from_existing(&filename)), !CONFIG.media.skip_validate_imports, - false, ) .await } @@ -367,7 +364,7 @@ async fn upload_backgrounded( .expect("Identifier exists") .to_bytes()?; - queue::queue_ingest(&repo, identifier, upload_id, None, true, false).await?; + queue::queue_ingest(&repo, identifier, upload_id, None, true).await?; files.push(serde_json::json!({ "upload_id": upload_id.to_string(), @@ -433,9 +430,6 @@ struct UrlQuery { #[serde(default)] backgrounded: bool, - - #[serde(default)] - ephemeral: bool, } /// download an image from a URL @@ -457,9 +451,9 @@ async fn download( .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); if query.backgrounded { - do_download_backgrounded(stream, repo, store, query.ephemeral).await + do_download_backgrounded(stream, repo, store).await } else { - do_download_inline(stream, repo, store, query.ephemeral).await + do_download_inline(stream, repo, store).await } } @@ -468,9 +462,8 @@ async fn do_download_inline( stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, - is_cached: bool, ) -> Result { - let mut session = ingest::ingest(&repo, &store, stream, None, true, is_cached).await?; + let mut session = ingest::ingest(&repo, &store, stream, None, true).await?; let alias = session.alias().expect("alias should exist").to_owned(); let delete_token = session.delete_token().await?; @@ -494,7 +487,6 @@ async fn do_download_backgrounded( stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, - is_cached: bool, ) -> Result { let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?; @@ -504,7 +496,7 @@ async fn do_download_backgrounded( .expect("Identifier exists") .to_bytes()?; - queue::queue_ingest(&repo, identifier, upload_id, None, true, is_cached).await?; + queue::queue_ingest(&repo, identifier, upload_id, None, true).await?; backgrounded.disarm(); @@ -605,8 +597,6 @@ async fn process( ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; - repo.check_cached(&alias).await?; - let path_string = thumbnail_path.to_string_lossy().to_string(); let hash = repo.hash(&alias).await?; @@ -694,8 +684,6 @@ async fn process_head( ) -> Result { let (format, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; - repo.check_cached(&alias).await?; - let path_string = thumbnail_path.to_string_lossy().to_string(); let hash = repo.hash(&alias).await?; let identifier_opt = repo @@ -776,8 +764,6 @@ async fn serve( ) -> Result { let alias = alias.into_inner(); - repo.check_cached(&alias).await?; - let identifier = repo.identifier_from_alias::(&alias).await?; let details = ensure_details(&repo, &store, &alias).await?; @@ -794,8 +780,6 @@ async fn serve_head( ) -> Result { let alias = alias.into_inner(); - repo.check_cached(&alias).await?; - let identifier = repo.identifier_from_alias::(&alias).await?; let details = ensure_details(&repo, &store, &alias).await?; diff --git a/src/queue.rs b/src/queue.rs index ca48bd1..ba0dea3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -68,7 +68,6 @@ enum Process { upload_id: Serde, declared_alias: Option>, should_validate: bool, - is_cached: bool, }, Generate { target_format: ImageFormat, @@ -130,14 +129,12 @@ pub(crate) async fn queue_ingest( upload_id: UploadId, declared_alias: Option, should_validate: bool, - is_cached: bool, ) -> Result<(), Error> { let job = serde_json::to_vec(&Process::Ingest { identifier: Base64Bytes(identifier), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), should_validate, - is_cached, })?; repo.push(PROCESS_QUEUE, job.into()).await?; Ok(()) diff --git a/src/queue/process.rs b/src/queue/process.rs index 034fbc8..636129a 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -27,7 +27,6 @@ where upload_id, declared_alias, should_validate, - is_cached, } => { process_ingest( repo, @@ -36,7 +35,6 @@ where Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), should_validate, - is_cached, ) .await? } @@ -74,7 +72,6 @@ async fn process_ingest( upload_id: UploadId, declared_alias: Option, should_validate: bool, - is_cached: bool, ) -> Result<(), Error> where R: FullRepo + 'static, @@ -88,15 +85,8 @@ where .await? .map_err(Error::from); - let session = crate::ingest::ingest( - repo, - store, - stream, - declared_alias, - should_validate, - is_cached, - ) - .await?; + let session = + crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?; let token = session.delete_token().await?; diff --git a/src/repo.rs b/src/repo.rs index c1e8b0f..638b5c0 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -49,8 +49,7 @@ pub(crate) enum UploadResult { #[async_trait::async_trait(?Send)] pub(crate) trait FullRepo: - CachedRepo - + UploadRepo + UploadRepo + SettingsRepo + IdentifierRepo + AliasRepo @@ -92,18 +91,6 @@ pub(crate) trait FullRepo: None => Ok(None), } } - - #[tracing::instrument(skip(self))] - async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { - let aliases = CachedRepo::update(self, alias).await?; - - for alias in aliases { - let token = self.delete_token(&alias).await?; - crate::queue::cleanup_alias(self, alias, token).await?; - } - - Ok(()) - } } #[async_trait::async_trait(?Send)] @@ -127,27 +114,6 @@ where type Bytes = T::Bytes; } -#[async_trait::async_trait(?Send)] -pub(crate) trait CachedRepo: BaseRepo { - async fn mark_cached(&self, alias: &Alias) -> Result<(), Error>; - - async fn update(&self, alias: &Alias) -> Result, Error>; -} - -#[async_trait::async_trait(?Send)] -impl CachedRepo for actix_web::web::Data -where - T: CachedRepo, -{ - async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { - T::mark_cached(self, alias).await - } - - async fn update(&self, alias: &Alias) -> Result, Error> { - T::update(self, alias).await - } -} - #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { async fn create(&self, upload_id: UploadId) -> Result<(), Error>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 663377f..347e348 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,15 +1,14 @@ use crate::{ error::{Error, UploadError}, repo::{ - Alias, AliasRepo, AlreadyExists, BaseRepo, CachedRepo, DeleteToken, Details, FullRepo, - HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, - UploadResult, + Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, + Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, stream::from_iterator, }; use futures_util::Stream; -use sled::{CompareAndSwapError, Db, IVec, Tree}; +use sled::{Db, IVec, Tree}; use std::{ collections::HashMap, pin::Pin, @@ -20,12 +19,6 @@ use std::{ }; use tokio::sync::Notify; -mod bucket; -mod datetime; - -use bucket::Bucket; -use datetime::DateTime; - macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -71,8 +64,6 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, - cache: Tree, - cache_inverse: Tree, db: Db, } @@ -95,8 +86,6 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, - cache: db.open_tree("pict-rs-cache-tree")?, - cache_inverse: db.open_tree("pict-rs-cache-inverse-tree")?, db, }) } @@ -154,149 +143,6 @@ impl From for UploadResult { } } -fn insert_cache_inverse( - cache_inverse: &Tree, - now_bytes: &[u8], - alias_bytes: &[u8], -) -> Result<(), Error> { - let mut old = cache_inverse.get(now_bytes)?; - - loop { - // unsure of whether to bail on deserialize error or fail with empty bucket - let mut bucket = old - .as_ref() - .and_then(|old| serde_cbor::from_slice::(old).ok()) - .unwrap_or_else(Bucket::empty); - - bucket.insert(alias_bytes.to_vec()); - - tracing::trace!("Inserting new {:?}", bucket); - let bucket_bytes = serde_cbor::to_vec(&bucket)?; - let new = Some(bucket_bytes); - - let res = cache_inverse.compare_and_swap(now_bytes, old, new)?; - - if let Err(CompareAndSwapError { current, .. }) = res { - old = current; - } else { - break; - } - } - - Ok(()) -} - -#[async_trait::async_trait(?Send)] -impl CachedRepo for SledRepo { - #[tracing::instrument(skip(self))] - async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { - let now = DateTime::now(); - let now_bytes = serde_json::to_vec(&now)?; - - let alias_bytes = alias.to_bytes(); - - let cache_inverse = self.cache_inverse.clone(); - b!(self.cache, { - cache.insert(&alias_bytes, now_bytes.clone())?; - - insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes) - }); - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn update(&self, alias: &Alias) -> Result, Error> { - let now = DateTime::now(); - let now_bytes = serde_json::to_vec(&now)?; - - let to_clean = now.min_cache_date(); - let to_clean_bytes = serde_json::to_vec(&to_clean)?; - - let alias_bytes = alias.to_bytes(); - - let cache_inverse = self.cache_inverse.clone(); - let aliases = b!(self.cache, { - let previous_datetime_opt = cache - .fetch_and_update(&alias_bytes, |previous_datetime_opt| { - previous_datetime_opt.map(|_| now_bytes.clone()) - })?; - - if let Some(previous_datetime_bytes) = previous_datetime_opt { - // Insert cached media into new date bucket - insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)?; - - // Remove cached media from old date bucket - let mut old = cache_inverse.get(&previous_datetime_bytes)?; - loop { - let new = old - .as_ref() - .and_then(|bucket_bytes| { - let mut bucket = serde_cbor::from_slice::(bucket_bytes).ok()?; - - bucket.remove(&alias_bytes); - - if bucket.is_empty() { - tracing::trace!("Removed old {:?}", bucket); - None - } else { - tracing::trace!("Inserting old {:?}", bucket); - Some(serde_cbor::to_vec(&bucket)) - } - }) - .transpose()?; - - if let Err(CompareAndSwapError { current, .. }) = - cache_inverse.compare_and_swap(&previous_datetime_bytes, old, new)? - { - old = current; - } else { - break; - } - } - } - - let mut aliases: Vec = Vec::new(); - - for (date_bytes, bucket_bytes) in - cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) - { - if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { - tracing::trace!("Checking {}", datetime); - } else { - tracing::warn!("Invalid date bytes"); - } - if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { - tracing::trace!("Read for deletion: {:?}", bucket); - for alias_bytes in bucket { - // Best effort cleanup - let _ = cache.remove(&alias_bytes); - if let Some(alias) = Alias::from_slice(&alias_bytes) { - aliases.push(alias); - } - } - } else { - tracing::warn!("Invalid bucket"); - } - - cache_inverse.remove(date_bytes)?; - } - - #[cfg(debug)] - for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) { - if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { - tracing::trace!("Not cleaning for {}", datetime); - } else { - tracing::warn!("Invalid date bytes"); - } - } - - Ok(aliases) as Result<_, Error> - }); - - Ok(aliases) - } -} - #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] diff --git a/src/repo/sled/bucket.rs b/src/repo/sled/bucket.rs deleted file mode 100644 index 2fd89c1..0000000 --- a/src/repo/sled/bucket.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::collections::HashSet; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub(super) struct Bucket { - // each Vec represents a unique image hash - inner: HashSet>, -} - -impl Bucket { - pub(super) fn empty() -> Self { - Self { - inner: HashSet::new(), - } - } - - pub(super) fn insert(&mut self, alias_bytes: Vec) { - self.inner.insert(alias_bytes); - } - - pub(super) fn remove(&mut self, alias_bytes: &[u8]) { - self.inner.remove(alias_bytes); - } - - pub(super) fn is_empty(&self) -> bool { - self.inner.is_empty() - } -} - -impl IntoIterator for Bucket { - type Item = > as IntoIterator>::Item; - type IntoIter = > as IntoIterator>::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() - } -} diff --git a/src/repo/sled/datetime.rs b/src/repo/sled/datetime.rs deleted file mode 100644 index 0c75ea0..0000000 --- a/src/repo/sled/datetime.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::ops::Deref; -use time::{Duration, OffsetDateTime}; - -use crate::CONFIG; - -const SECONDS: i64 = 1; -const MINUTES: i64 = 60 * SECONDS; -const HOURS: i64 = 60 * MINUTES; - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize)] -pub(super) struct DateTime { - #[serde(with = "time::serde::rfc3339")] - inner_date: OffsetDateTime, -} - -impl DateTime { - pub(super) fn now() -> Self { - DateTime { - inner_date: OffsetDateTime::now_utc(), - } - } - - pub(super) fn min_cache_date(&self) -> Self { - let cache_duration = Duration::new(CONFIG.media.cache_duration * HOURS, 0); - - Self { - inner_date: self - .checked_sub(cache_duration) - .expect("Should never be older than Jan 7, 1970"), - } - } -} - -impl std::fmt::Display for DateTime { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.inner_date.fmt(f) - } -} - -impl AsRef for DateTime { - fn as_ref(&self) -> &OffsetDateTime { - &self.inner_date - } -} - -impl Deref for DateTime { - type Target = OffsetDateTime; - - fn deref(&self) -> &Self::Target { - &self.inner_date - } -} - -impl From for DateTime { - fn from(inner_date: OffsetDateTime) -> Self { - Self { inner_date } - } -}