From aac1bb7bc4dcd11ab05e28d6018cc3442b19e7e4 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 23 Jul 2023 15:45:52 -0500 Subject: [PATCH] Finish media proxy implementation --- src/lib.rs | 120 ++++++++++++++++++++++++++++++++----------- src/queue.rs | 7 +++ src/queue/cleanup.rs | 33 +++++++++++- 3 files changed, 128 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index daf3334..2aad02f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ use futures_util::{ use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; use once_cell::sync::Lazy; +use repo::AliasAccessRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; @@ -482,6 +483,24 @@ struct UrlQuery { backgrounded: bool, } +async fn ingest_inline( + stream: impl Stream> + Unpin + 'static, + repo: &R, + store: &S, + config: &Configuration, +) -> Result<(Alias, DeleteToken, Details), Error> { + let mut session = ingest::ingest(repo, store, stream, None, &config.media).await?; + + let alias = session.alias().expect("alias should exist").to_owned(); + let delete_token = session.delete_token().await?; + + let details = ensure_details(repo, store, config, &alias).await?; + + session.disarm(); + + Ok((alias, delete_token, details)) +} + /// download an image from a URL #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] async fn download( @@ -491,11 +510,25 @@ async fn download( config: web::Data, query: web::Query, ) -> Result { + let stream = download_stream(client, &query.url, &config).await?; + + if query.backgrounded { + do_download_backgrounded(stream, repo, store).await + } else { + do_download_inline(stream, repo, store, config).await + } +} + +async fn download_stream( + client: web::Data, + url: &str, + config: &Configuration, +) -> Result> + Unpin + 'static, Error> { if config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let res = client.get(&query.url).send().await?; + let res = client.get(url).send().await?; if !res.status().is_success() { return Err(UploadError::Download(res.status()).into()); @@ -506,15 +539,11 @@ async fn download( .map_err(Error::from) .limit((config.media.max_file_size * MEGABYTES) as u64); - if query.backgrounded { - do_download_backgrounded(stream, repo, store).await - } else { - do_download_inline(stream, repo, store, config).await - } + Ok(stream) } #[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))] -async fn do_download_inline( +async fn do_download_inline( stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, @@ -522,14 +551,7 @@ async fn do_download_inline( ) -> Result { metrics::increment_counter!("pict-rs.files", "download" => "inline"); - let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?; - - let alias = session.alias().expect("alias should exist").to_owned(); - let delete_token = session.delete_token().await?; - - let details = ensure_details(&repo, &store, &config, &alias).await?; - - session.disarm(); + let (alias, delete_token, details) = ingest_inline(stream, &repo, &store, &config).await?; Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -660,8 +682,7 @@ async fn process_details( let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); if !config.server.read_only { - repo.accessed(hash.clone(), thumbnail_string.clone()) - .await?; + VariantAccessRepo::accessed(&repo, hash.clone(), thumbnail_string.clone()).await?; } let identifier = repo @@ -697,7 +718,7 @@ async fn not_found_hash(repo: &R) -> Result( range: Option>, @@ -705,6 +726,7 @@ async fn process( ext: web::Path, repo: web::Data, store: web::Data, + client: web::Data, config: web::Data, process_map: web::Data, ) -> Result { @@ -713,11 +735,25 @@ async fn process( Serde::into_inner(alias) } ProcessSource::Proxy { proxy } => { - if let Some(alias) = repo.related(proxy).await? { + let alias = if let Some(alias) = repo.related(proxy.clone()).await? { + alias + } else if !config.server.read_only { + let stream = download_stream(client, proxy.as_str(), &config).await?; + + let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?; + + repo.relate_url(proxy, alias.clone()).await?; + alias } else { - todo!("proxy URL") + return Err(UploadError::ReadOnly.into()); + }; + + if !config.server.read_only { + AliasAccessRepo::accessed(&repo, alias.clone()).await?; } + + alias } }; @@ -737,7 +773,7 @@ async fn process( }; if !config.server.read_only { - repo.accessed(hash.clone(), path_string.clone()).await?; + VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?; } let identifier_opt = repo @@ -866,7 +902,7 @@ async fn process_head( }; if !config.server.read_only { - repo.accessed(hash.clone(), path_string.clone()).await?; + VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?; } let identifier_opt = repo @@ -1002,22 +1038,37 @@ async fn do_details( } /// Serve files based on alias query -#[tracing::instrument(name = "Serving file query", skip(repo, store, config))] +#[tracing::instrument(name = "Serving file query", skip(repo, store, client, config))] async fn serve_query( range: Option>, web::Query(alias_query): web::Query, repo: web::Data, store: web::Data, + client: web::Data, config: web::Data, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - if let Some(alias) = repo.related(proxy).await? { + let alias = if let Some(alias) = repo.related(proxy.clone()).await? { + alias + } else if !config.server.read_only { + let stream = download_stream(client, proxy.as_str(), &config).await?; + + let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?; + + repo.relate_url(proxy, alias.clone()).await?; + alias } else { - todo!("proxy URL") + return Err(UploadError::ReadOnly.into()); + }; + + if !config.server.read_only { + AliasAccessRepo::accessed(&repo, alias.clone()).await?; } + + alias } }; @@ -1288,6 +1339,7 @@ async fn clean_variants( } #[derive(Debug, serde::Deserialize)] +#[serde(untagged)] enum AliasQuery { Proxy { proxy: url::Url }, Alias { alias: Serde }, @@ -1297,6 +1349,7 @@ enum AliasQuery { async fn set_not_found( json: web::Json, repo: web::Data, + client: web::Data, config: web::Data, ) -> Result { if config.server.read_only { @@ -1305,12 +1358,10 @@ async fn set_not_found( let alias = match json.into_inner() { AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - if let Some(alias) = repo.related(proxy).await? { - alias - } else { - todo!("proxy URL") - } + AliasQuery::Proxy { .. } => { + return Ok(HttpResponse::BadRequest().json(serde_json::json!({ + "msg": "Cannot use proxied media as Not Found image", + }))); } }; @@ -1567,6 +1618,13 @@ where format!("\n{e}\n{e:?}") ); } + + if let Err(e) = queue::cleanup_outdated_proxies(&repo).await { + tracing::warn!( + "Failed to spawn cleanup for outdated proxies:{}", + format!("\n{e}\n{e:?}") + ); + } } }); }) diff --git a/src/queue.rs b/src/queue.rs index d36c758..211d415 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -64,6 +64,7 @@ enum Cleanup { }, AllVariants, OutdatedVariants, + OutdatedProxies, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -126,6 +127,12 @@ async fn cleanup_variants( Ok(()) } +pub(crate) async fn cleanup_outdated_proxies(repo: &R) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::OutdatedProxies)?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn cleanup_outdated_variants(repo: &R) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?; repo.push(CLEANUP_QUEUE, job.into()).await?; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index cd71a51..4c33344 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -2,7 +2,10 @@ use crate::{ config::Configuration, error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, VariantAccessRepo}, + repo::{ + Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, + VariantAccessRepo, + }, serde_str::Serde, store::{Identifier, Store}, }; @@ -44,6 +47,7 @@ where } => hash_variant::(repo, hash, variant).await?, Cleanup::AllVariants => all_variants::(repo).await?, Cleanup::OutdatedVariants => outdated_variants::(repo, configuration).await?, + Cleanup::OutdatedProxies => outdated_proxies::(repo, configuration).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -139,6 +143,8 @@ where let hash = repo.hash(&alias).await?; AliasRepo::cleanup(repo, &alias).await?; + repo.remove_relation(alias.clone()).await?; + AliasAccessRepo::remove_access(repo, alias.clone()).await?; let Some(hash) = hash else { // hash doesn't exist, nothing to do @@ -189,6 +195,31 @@ where Ok(()) } +#[tracing::instrument(skip_all)] +async fn outdated_proxies(repo: &R, config: &Configuration) -> Result<(), Error> +where + R: FullRepo, + S: Store, +{ + let now = time::OffsetDateTime::now_utc(); + let since = now.saturating_sub(config.media.retention.proxy.to_duration()); + + let mut alias_stream = Box::pin(repo.older_aliases(since).await?); + + while let Some(res) = alias_stream.next().await { + let alias = res?; + if let Some(token) = repo.delete_token(&alias).await? { + super::cleanup_alias(repo, alias, token).await?; + } else { + tracing::warn!("Skipping alias cleanup - no delete token"); + repo.remove_relation(alias.clone()).await?; + AliasAccessRepo::remove_access(repo, alias).await?; + } + } + + Ok(()) +} + #[tracing::instrument(skip_all)] async fn hash_variant( repo: &R,