Finish media proxy implementation

This commit is contained in:
asonix 2023-07-23 15:45:52 -05:00
parent 4cca7d0f86
commit aac1bb7bc4
3 changed files with 128 additions and 32 deletions

View File

@ -40,6 +40,7 @@ use futures_util::{
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
use middleware::Metrics; use middleware::Metrics;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use repo::AliasAccessRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware; use reqwest_tracing::TracingMiddleware;
use rusty_s3::UrlStyle; use rusty_s3::UrlStyle;
@ -482,6 +483,24 @@ struct UrlQuery {
backgrounded: bool, backgrounded: bool,
} }
async fn ingest_inline<R: FullRepo, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: &R,
store: &S,
config: &Configuration,
) -> Result<(Alias, DeleteToken, Details), Error> {
let mut session = ingest::ingest(repo, store, stream, None, &config.media).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let details = ensure_details(repo, store, config, &alias).await?;
session.disarm();
Ok((alias, delete_token, details))
}
/// download an image from a URL /// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))] #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))]
async fn download<R: FullRepo + 'static, S: Store + 'static>( async fn download<R: FullRepo + 'static, S: Store + 'static>(
@ -491,11 +510,25 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
config: web::Data<Configuration>, config: web::Data<Configuration>,
query: web::Query<UrlQuery>, query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let stream = download_stream(client, &query.url, &config).await?;
if query.backgrounded {
do_download_backgrounded(stream, repo, store).await
} else {
do_download_inline(stream, repo, store, config).await
}
}
async fn download_stream(
client: web::Data<ClientWithMiddleware>,
url: &str,
config: &Configuration,
) -> Result<impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static, Error> {
if config.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
let res = client.get(&query.url).send().await?; let res = client.get(url).send().await?;
if !res.status().is_success() { if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into()); return Err(UploadError::Download(res.status()).into());
@ -506,15 +539,11 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
.map_err(Error::from) .map_err(Error::from)
.limit((config.media.max_file_size * MEGABYTES) as u64); .limit((config.media.max_file_size * MEGABYTES) as u64);
if query.backgrounded { Ok(stream)
do_download_backgrounded(stream, repo, store).await
} else {
do_download_inline(stream, repo, store, config).await
}
} }
#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))] #[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))]
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>( async fn do_download_inline<R: FullRepo, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static, stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
@ -522,14 +551,7 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
metrics::increment_counter!("pict-rs.files", "download" => "inline"); metrics::increment_counter!("pict-rs.files", "download" => "inline");
let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?; let (alias, delete_token, details) = ingest_inline(stream, &repo, &store, &config).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let details = ensure_details(&repo, &store, &config, &alias).await?;
session.disarm();
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
@ -660,8 +682,7 @@ async fn process_details<R: FullRepo, S: Store>(
let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
if !config.server.read_only { if !config.server.read_only {
repo.accessed(hash.clone(), thumbnail_string.clone()) VariantAccessRepo::accessed(&repo, hash.clone(), thumbnail_string.clone()).await?;
.await?;
} }
let identifier = repo let identifier = repo
@ -697,7 +718,7 @@ async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, R::Bytes
/// Process files /// Process files
#[tracing::instrument( #[tracing::instrument(
name = "Serving processed image", name = "Serving processed image",
skip(repo, store, config, process_map) skip(repo, store, client, config, process_map)
)] )]
async fn process<R: FullRepo, S: Store + 'static>( async fn process<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
@ -705,6 +726,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
ext: web::Path<String>, ext: web::Path<String>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>, config: web::Data<Configuration>,
process_map: web::Data<ProcessMap>, process_map: web::Data<ProcessMap>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
@ -713,11 +735,25 @@ async fn process<R: FullRepo, S: Store + 'static>(
Serde::into_inner(alias) Serde::into_inner(alias)
} }
ProcessSource::Proxy { proxy } => { ProcessSource::Proxy { proxy } => {
if let Some(alias) = repo.related(proxy).await? { let alias = if let Some(alias) = repo.related(proxy.clone()).await? {
alias
} else if !config.server.read_only {
let stream = download_stream(client, proxy.as_str(), &config).await?;
let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?;
repo.relate_url(proxy, alias.clone()).await?;
alias alias
} else { } else {
todo!("proxy URL") return Err(UploadError::ReadOnly.into());
};
if !config.server.read_only {
AliasAccessRepo::accessed(&repo, alias.clone()).await?;
} }
alias
} }
}; };
@ -737,7 +773,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
repo.accessed(hash.clone(), path_string.clone()).await?; VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?;
} }
let identifier_opt = repo let identifier_opt = repo
@ -866,7 +902,7 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
}; };
if !config.server.read_only { if !config.server.read_only {
repo.accessed(hash.clone(), path_string.clone()).await?; VariantAccessRepo::accessed(&repo, hash.clone(), path_string.clone()).await?;
} }
let identifier_opt = repo let identifier_opt = repo
@ -1002,22 +1038,37 @@ async fn do_details<R: FullRepo, S: Store + 'static>(
} }
/// Serve files based on alias query /// Serve files based on alias query
#[tracing::instrument(name = "Serving file query", skip(repo, store, config))] #[tracing::instrument(name = "Serving file query", skip(repo, store, client, config))]
async fn serve_query<R: FullRepo, S: Store + 'static>( async fn serve_query<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>, web::Query(alias_query): web::Query<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>, config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = match alias_query { let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => { AliasQuery::Proxy { proxy } => {
if let Some(alias) = repo.related(proxy).await? { let alias = if let Some(alias) = repo.related(proxy.clone()).await? {
alias
} else if !config.server.read_only {
let stream = download_stream(client, proxy.as_str(), &config).await?;
let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?;
repo.relate_url(proxy, alias.clone()).await?;
alias alias
} else { } else {
todo!("proxy URL") return Err(UploadError::ReadOnly.into());
};
if !config.server.read_only {
AliasAccessRepo::accessed(&repo, alias.clone()).await?;
} }
alias
} }
}; };
@ -1288,6 +1339,7 @@ async fn clean_variants<R: FullRepo>(
} }
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
enum AliasQuery { enum AliasQuery {
Proxy { proxy: url::Url }, Proxy { proxy: url::Url },
Alias { alias: Serde<Alias> }, Alias { alias: Serde<Alias> },
@ -1297,6 +1349,7 @@ enum AliasQuery {
async fn set_not_found<R: FullRepo>( async fn set_not_found<R: FullRepo>(
json: web::Json<AliasQuery>, json: web::Json<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
client: web::Data<ClientWithMiddleware>,
config: web::Data<Configuration>, config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
if config.server.read_only { if config.server.read_only {
@ -1305,12 +1358,10 @@ async fn set_not_found<R: FullRepo>(
let alias = match json.into_inner() { let alias = match json.into_inner() {
AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => { AliasQuery::Proxy { .. } => {
if let Some(alias) = repo.related(proxy).await? { return Ok(HttpResponse::BadRequest().json(serde_json::json!({
alias "msg": "Cannot use proxied media as Not Found image",
} else { })));
todo!("proxy URL")
}
} }
}; };
@ -1567,6 +1618,13 @@ where
format!("\n{e}\n{e:?}") format!("\n{e}\n{e:?}")
); );
} }
if let Err(e) = queue::cleanup_outdated_proxies(&repo).await {
tracing::warn!(
"Failed to spawn cleanup for outdated proxies:{}",
format!("\n{e}\n{e:?}")
);
}
} }
}); });
}) })

View File

@ -64,6 +64,7 @@ enum Cleanup {
}, },
AllVariants, AllVariants,
OutdatedVariants, OutdatedVariants,
OutdatedProxies,
} }
#[derive(Debug, serde::Deserialize, serde::Serialize)] #[derive(Debug, serde::Deserialize, serde::Serialize)]
@ -126,6 +127,12 @@ async fn cleanup_variants<R: QueueRepo>(
Ok(()) Ok(())
} }
pub(crate) async fn cleanup_outdated_proxies<R: QueueRepo>(repo: &R) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::OutdatedProxies)?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn cleanup_outdated_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> { pub(crate) async fn cleanup_outdated_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?; let job = serde_json::to_vec(&Cleanup::OutdatedVariants)?;
repo.push(CLEANUP_QUEUE, job.into()).await?; repo.push(CLEANUP_QUEUE, job.into()).await?;

View File

@ -2,7 +2,10 @@ use crate::{
config::Configuration, config::Configuration,
error::{Error, UploadError}, error::{Error, UploadError},
queue::{Base64Bytes, Cleanup, LocalBoxFuture}, queue::{Base64Bytes, Cleanup, LocalBoxFuture},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, VariantAccessRepo}, repo::{
Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo,
VariantAccessRepo,
},
serde_str::Serde, serde_str::Serde,
store::{Identifier, Store}, store::{Identifier, Store},
}; };
@ -44,6 +47,7 @@ where
} => hash_variant::<R, S>(repo, hash, variant).await?, } => hash_variant::<R, S>(repo, hash, variant).await?,
Cleanup::AllVariants => all_variants::<R, S>(repo).await?, Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
Cleanup::OutdatedVariants => outdated_variants::<R, S>(repo, configuration).await?, Cleanup::OutdatedVariants => outdated_variants::<R, S>(repo, configuration).await?,
Cleanup::OutdatedProxies => outdated_proxies::<R, S>(repo, configuration).await?,
}, },
Err(e) => { Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}")); tracing::warn!("Invalid job: {}", format!("{e}"));
@ -139,6 +143,8 @@ where
let hash = repo.hash(&alias).await?; let hash = repo.hash(&alias).await?;
AliasRepo::cleanup(repo, &alias).await?; AliasRepo::cleanup(repo, &alias).await?;
repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo, alias.clone()).await?;
let Some(hash) = hash else { let Some(hash) = hash else {
// hash doesn't exist, nothing to do // hash doesn't exist, nothing to do
@ -189,6 +195,31 @@ where
Ok(()) Ok(())
} }
#[tracing::instrument(skip_all)]
async fn outdated_proxies<R, S>(repo: &R, config: &Configuration) -> Result<(), Error>
where
R: FullRepo,
S: Store,
{
let now = time::OffsetDateTime::now_utc();
let since = now.saturating_sub(config.media.retention.proxy.to_duration());
let mut alias_stream = Box::pin(repo.older_aliases(since).await?);
while let Some(res) = alias_stream.next().await {
let alias = res?;
if let Some(token) = repo.delete_token(&alias).await? {
super::cleanup_alias(repo, alias, token).await?;
} else {
tracing::warn!("Skipping alias cleanup - no delete token");
repo.remove_relation(alias.clone()).await?;
AliasAccessRepo::remove_access(repo, alias).await?;
}
}
Ok(())
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn hash_variant<R, S>( async fn hash_variant<R, S>(
repo: &R, repo: &R,