2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-23 03:41:23 +00:00

Consolidate endpoints for downloading media, cached or backgrounded

This commit is contained in:
Aode (lion) 2022-04-08 12:05:14 -05:00
parent 07c61cbe87
commit 55d5e43cd2
5 changed files with 81 additions and 72 deletions

View file

@ -243,8 +243,6 @@ pict-rs offers the following endpoints:
"msg": "ok" "msg": "ok"
} }
``` ```
- `GET /image/download?url=...` Download an image from a remote server, returning the same JSON
payload as the `POST` endpoint
- `POST /image/backgrounded` Upload an image, like the `/image` endpoint, but don't wait to validate and process it. - `POST /image/backgrounded` Upload an image, like the `/image` endpoint, but don't wait to validate and process it.
This endpoint returns the following JSON structure on success with a 202 Accepted status This endpoint returns the following JSON structure on success with a 202 Accepted status
```json ```json
@ -260,7 +258,16 @@ pict-rs offers the following endpoints:
"msg": "ok" "msg": "ok"
} }
``` ```
- `GET /image/backgrounded/claim?upload_id=` Wait for a backgrounded upload to complete, claiming it's result - `GET /image/download?url={url}&backgrounded=(true|false)&ephemeral=(true|false)` Download an image
from a remote server, returning the same JSON payload as the `POST` endpoint by default.
if `backgrounded` is set to `true`, then the ingest processing will be queued for later and the
response json will be the same as the `/image/backgrounded` endpoint.
if `ephemeral` is set to true, the downloaded image will be marked as a "cached" image, and
automatically removed from pict-rs N hours after its last access. The duration is configurable
with the `--media-cache-duration` run flag, or the `[media] cache_duration` toml option.
- `GET /image/backgrounded/claim?upload_id={uuid}` Wait for a backgrounded upload to complete, claiming it's result
Possible results: Possible results:
- 200 Ok (validation and ingest complete): - 200 Ok (validation and ingest complete):
```json ```json

View file

@ -57,6 +57,7 @@ pub(crate) async fn ingest<R, S>(
stream: impl Stream<Item = Result<Bytes, Error>>, stream: impl Stream<Item = Result<Bytes, Error>>,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
should_validate: bool, should_validate: bool,
is_cached: bool,
) -> Result<Session<R, S>, Error> ) -> Result<Session<R, S>, Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,
@ -95,9 +96,9 @@ where
save_upload(repo, store, &hash, &identifier).await?; save_upload(repo, store, &hash, &identifier).await?;
if let Some(alias) = declared_alias { if let Some(alias) = declared_alias {
session.add_existing_alias(&hash, alias).await? session.add_existing_alias(&hash, alias, is_cached).await?
} else { } else {
session.create_alias(&hash, input_type).await?; session.create_alias(&hash, input_type, is_cached).await?;
} }
Ok(session) Ok(session)
@ -161,7 +162,12 @@ where
} }
#[tracing::instrument] #[tracing::instrument]
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { async fn add_existing_alias(
&mut self,
hash: &[u8],
alias: Alias,
is_cached: bool,
) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias) AliasRepo::create(&self.repo, &alias)
.await? .await?
.map_err(|_| UploadError::DuplicateAlias)?; .map_err(|_| UploadError::DuplicateAlias)?;
@ -171,11 +177,20 @@ where
self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
if is_cached {
self.repo.mark_cached(&alias).await?;
}
Ok(()) Ok(())
} }
#[tracing::instrument] #[tracing::instrument]
async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> { async fn create_alias(
&mut self,
hash: &[u8],
input_type: ValidInputType,
is_cached: bool,
) -> Result<(), Error> {
tracing::debug!("Alias gen loop"); tracing::debug!("Alias gen loop");
loop { loop {
@ -187,6 +202,10 @@ where
self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
if is_cached {
self.repo.mark_cached(&alias).await?;
}
return Ok(()); return Ok(());
} }

View file

@ -164,7 +164,7 @@ async fn upload_backgrounded<R: FullRepo, S: Store>(
.expect("Identifier exists") .expect("Identifier exists")
.to_bytes()?; .to_bytes()?;
queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; queue::queue_ingest(&**repo, identifier, upload_id, None, true, false).await?;
files.push(serde_json::json!({ files.push(serde_json::json!({
"upload_id": upload_id.to_string(), "upload_id": upload_id.to_string(),
@ -223,6 +223,12 @@ async fn claim_upload<R: FullRepo>(
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
struct UrlQuery { struct UrlQuery {
url: String, url: String,
#[serde(default)]
backgrounded: bool,
#[serde(default)]
ephemeral: bool,
} }
/// download an image from a URL /// download an image from a URL
@ -233,7 +239,22 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
store: web::Data<S>, store: web::Data<S>,
query: web::Query<UrlQuery>, query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let res = client.get(&query.url).send().await?; if query.backgrounded {
do_download_backgrounded(client, repo, store, &query.url, query.ephemeral).await
} else {
do_download_inline(client, repo, store, &query.url, query.ephemeral).await
}
}
#[instrument(name = "Downloading file inline", skip(client, repo))]
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
repo: web::Data<R>,
store: web::Data<S>,
url: &str,
is_cached: bool,
) -> Result<HttpResponse, Error> {
let res = client.get(url).send().await?;
if !res.status().is_success() { if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into()); return Err(UploadError::Download(res.status()).into());
@ -243,7 +264,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
.map_err(Error::from) .map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64); .limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; let mut session = ingest::ingest(&**repo, &**store, stream, None, true, is_cached).await?;
let alias = session.alias().expect("alias should exist").to_owned(); let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?; let delete_token = session.delete_token().await?;
@ -262,6 +283,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}; };
session.disarm(); session.disarm();
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
"files": [{ "files": [{
@ -272,14 +294,15 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}))) })))
} }
#[instrument(name = "Downloading file for background", skip(client))] #[instrument(name = "Downloading file in background", skip(client))]
async fn download_backgrounded<R: FullRepo + 'static, S: Store + 'static>( async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>, client: web::Data<Client>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
query: web::Query<UrlQuery>, url: &str,
is_cached: bool,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let res = client.get(&query.url).send().await?; let res = client.get(url).send().await?;
if !res.status().is_success() { if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into()); return Err(UploadError::Download(res.status()).into());
@ -297,7 +320,7 @@ async fn download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
.expect("Identifier exists") .expect("Identifier exists")
.to_bytes()?; .to_bytes()?;
queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; queue::queue_ingest(&**repo, identifier, upload_id, None, true, is_cached).await?;
Ok(HttpResponse::Accepted().json(&serde_json::json!({ Ok(HttpResponse::Accepted().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
@ -307,55 +330,6 @@ async fn download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
}))) })))
} }
/// cache an image from a URL
#[instrument(name = "Caching file", skip(client, repo))]
async fn cache<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
repo: web::Data<R>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> {
let res = client.get(&query.url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
}
let stream = res
.map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
repo.relate_details(&identifier, &new_details).await?;
new_details
};
repo.mark_cached(&alias).await?;
session.disarm();
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
"files": [{
"file": alias.to_string(),
"delete_token": delete_token.to_string(),
"details": details,
}]
})))
}
/// Delete aliases and files /// Delete aliases and files
#[instrument(name = "Deleting file", skip(repo))] #[instrument(name = "Deleting file", skip(repo))]
async fn delete<R: FullRepo>( async fn delete<R: FullRepo>(
@ -757,7 +731,7 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
Box::pin( Box::pin(
async move { ingest::ingest(&repo, &store, stream, None, true).await } async move { ingest::ingest(&repo, &store, stream, None, true, false).await }
.instrument(span), .instrument(span),
) )
})), })),
@ -790,6 +764,7 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
stream, stream,
Some(Alias::from_existing(&filename)), Some(Alias::from_existing(&filename)),
!CONFIG.media.skip_validate_imports, !CONFIG.media.skip_validate_imports,
false,
) )
.await .await
} }
@ -869,11 +844,6 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
), ),
) )
.service(web::resource("/download").route(web::get().to(download::<R, S>))) .service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(
web::resource("/download_backgrounded")
.route(web::get().to(download_backgrounded::<R, S>)),
)
.service(web::resource("/cache").route(web::get().to(cache::<R, S>)))
.service( .service(
web::resource("/delete/{delete_token}/{filename}") web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>)) .route(web::delete().to(delete::<R>))

View file

@ -37,6 +37,7 @@ enum Process {
upload_id: Serde<UploadId>, upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>, declared_alias: Option<Serde<Alias>>,
should_validate: bool, should_validate: bool,
is_cached: bool,
}, },
Generate { Generate {
target_format: ImageFormat, target_format: ImageFormat,
@ -84,12 +85,14 @@ pub(crate) async fn queue_ingest<R: QueueRepo>(
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
should_validate: bool, should_validate: bool,
is_cached: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Ingest { let job = serde_json::to_vec(&Process::Ingest {
identifier, identifier,
declared_alias: declared_alias.map(Serde::new), declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id), upload_id: Serde::new(upload_id),
should_validate, should_validate,
is_cached,
})?; })?;
repo.push(PROCESS_QUEUE, job.into()).await?; repo.push(PROCESS_QUEUE, job.into()).await?;
Ok(()) Ok(())

View file

@ -27,6 +27,7 @@ where
upload_id, upload_id,
declared_alias, declared_alias,
should_validate, should_validate,
is_cached,
} => { } => {
process_ingest( process_ingest(
repo, repo,
@ -35,6 +36,7 @@ where
Serde::into_inner(upload_id), Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner), declared_alias.map(Serde::into_inner),
should_validate, should_validate,
is_cached,
) )
.await? .await?
} }
@ -72,6 +74,7 @@ async fn process_ingest<R, S>(
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
should_validate: bool, should_validate: bool,
is_cached: bool,
) -> Result<(), Error> ) -> Result<(), Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,
@ -85,8 +88,15 @@ where
.await? .await?
.map_err(Error::from); .map_err(Error::from);
let session = let session = crate::ingest::ingest(
crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?; repo,
store,
stream,
declared_alias,
should_validate,
is_cached,
)
.await?;
let token = session.delete_token().await?; let token = session.delete_token().await?;