diff --git a/README.md b/README.md index 7b5e1ac..6f3f42f 100644 --- a/README.md +++ b/README.md @@ -243,8 +243,6 @@ pict-rs offers the following endpoints: "msg": "ok" } ``` -- `GET /image/download?url=...` Download an image from a remote server, returning the same JSON - payload as the `POST` endpoint - `POST /image/backgrounded` Upload an image, like the `/image` endpoint, but don't wait to validate and process it. This endpoint returns the following JSON structure on success with a 202 Accepted status ```json @@ -260,7 +258,16 @@ pict-rs offers the following endpoints: "msg": "ok" } ``` -- `GET /image/backgrounded/claim?upload_id=` Wait for a backgrounded upload to complete, claiming it's result +- `GET /image/download?url={url}&backgrounded=(true|false)&ephemeral=(true|false)` Download an image + from a remote server, returning the same JSON payload as the `POST` endpoint by default. + + if `backgrounded` is set to `true`, then the ingest processing will be queued for later and the + response json will be the same as the `/image/backgrounded` endpoint. + + if `ephemeral` is set to true, the downloaded image will be marked as a "cached" image, and + automatically removed from pict-rs N hours after its last access. The duration is configurable + with the `--media-cache-duration` run flag, or the `[media] cache_duration` toml option. +- `GET /image/backgrounded/claim?upload_id={uuid}` Wait for a backgrounded upload to complete, claiming it's result Possible results: - 200 Ok (validation and ingest complete): ```json diff --git a/src/ingest.rs b/src/ingest.rs index ae84a49..7419683 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -57,6 +57,7 @@ pub(crate) async fn ingest( stream: impl Stream>, declared_alias: Option, should_validate: bool, + is_cached: bool, ) -> Result, Error> where R: FullRepo + 'static, @@ -95,9 +96,9 @@ where save_upload(repo, store, &hash, &identifier).await?; if let Some(alias) = declared_alias { - session.add_existing_alias(&hash, alias).await? + session.add_existing_alias(&hash, alias, is_cached).await? } else { - session.create_alias(&hash, input_type).await?; + session.create_alias(&hash, input_type, is_cached).await?; } Ok(session) @@ -161,7 +162,12 @@ where } #[tracing::instrument] - async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { + async fn add_existing_alias( + &mut self, + hash: &[u8], + alias: Alias, + is_cached: bool, + ) -> Result<(), Error> { AliasRepo::create(&self.repo, &alias) .await? .map_err(|_| UploadError::DuplicateAlias)?; @@ -171,11 +177,20 @@ where self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; + if is_cached { + self.repo.mark_cached(&alias).await?; + } + Ok(()) } #[tracing::instrument] - async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> { + async fn create_alias( + &mut self, + hash: &[u8], + input_type: ValidInputType, + is_cached: bool, + ) -> Result<(), Error> { tracing::debug!("Alias gen loop"); loop { @@ -187,6 +202,10 @@ where self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; + if is_cached { + self.repo.mark_cached(&alias).await?; + } + return Ok(()); } diff --git a/src/main.rs b/src/main.rs index ec5ede3..5e12dc4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -164,7 +164,7 @@ async fn upload_backgrounded( .expect("Identifier exists") .to_bytes()?; - queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; + queue::queue_ingest(&**repo, identifier, upload_id, None, true, false).await?; files.push(serde_json::json!({ "upload_id": upload_id.to_string(), @@ -223,6 +223,12 @@ async fn claim_upload( #[derive(Debug, serde::Deserialize)] struct UrlQuery { url: String, + + #[serde(default)] + backgrounded: bool, + + #[serde(default)] + ephemeral: bool, } /// download an image from a URL @@ -233,7 +239,22 @@ async fn download( store: web::Data, query: web::Query, ) -> Result { - let res = client.get(&query.url).send().await?; + if query.backgrounded { + do_download_backgrounded(client, repo, store, &query.url, query.ephemeral).await + } else { + do_download_inline(client, repo, store, &query.url, query.ephemeral).await + } +} + +#[instrument(name = "Downloading file inline", skip(client, repo))] +async fn do_download_inline( + client: web::Data, + repo: web::Data, + store: web::Data, + url: &str, + is_cached: bool, +) -> Result { + let res = client.get(url).send().await?; if !res.status().is_success() { return Err(UploadError::Download(res.status()).into()); @@ -243,7 +264,7 @@ async fn download( .map_err(Error::from) .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); - let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; + let mut session = ingest::ingest(&**repo, &**store, stream, None, true, is_cached).await?; let alias = session.alias().expect("alias should exist").to_owned(); let delete_token = session.delete_token().await?; @@ -262,6 +283,7 @@ async fn download( }; session.disarm(); + Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": [{ @@ -272,14 +294,15 @@ async fn download( }))) } -#[instrument(name = "Downloading file for background", skip(client))] -async fn download_backgrounded( +#[instrument(name = "Downloading file in background", skip(client))] +async fn do_download_backgrounded( client: web::Data, repo: web::Data, store: web::Data, - query: web::Query, + url: &str, + is_cached: bool, ) -> Result { - let res = client.get(&query.url).send().await?; + let res = client.get(url).send().await?; if !res.status().is_success() { return Err(UploadError::Download(res.status()).into()); @@ -297,7 +320,7 @@ async fn download_backgrounded( .expect("Identifier exists") .to_bytes()?; - queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; + queue::queue_ingest(&**repo, identifier, upload_id, None, true, is_cached).await?; Ok(HttpResponse::Accepted().json(&serde_json::json!({ "msg": "ok", @@ -307,55 +330,6 @@ async fn download_backgrounded( }))) } -/// cache an image from a URL -#[instrument(name = "Caching file", skip(client, repo))] -async fn cache( - client: web::Data, - repo: web::Data, - store: web::Data, - query: web::Query, -) -> Result { - let res = client.get(&query.url).send().await?; - - if !res.status().is_success() { - return Err(UploadError::Download(res.status()).into()); - } - - let stream = res - .map_err(Error::from) - .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); - - let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; - - let alias = session.alias().expect("alias should exist").to_owned(); - let delete_token = session.delete_token().await?; - - let identifier = repo.identifier_from_alias::(&alias).await?; - - let details = repo.details(&identifier).await?; - - let details = if let Some(details) = details { - details - } else { - let hint = details_hint(&alias); - let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - repo.relate_details(&identifier, &new_details).await?; - new_details - }; - - repo.mark_cached(&alias).await?; - - session.disarm(); - Ok(HttpResponse::Created().json(&serde_json::json!({ - "msg": "ok", - "files": [{ - "file": alias.to_string(), - "delete_token": delete_token.to_string(), - "details": details, - }] - }))) -} - /// Delete aliases and files #[instrument(name = "Deleting file", skip(repo))] async fn delete( @@ -757,7 +731,7 @@ async fn launch( let stream = stream.map_err(Error::from); Box::pin( - async move { ingest::ingest(&repo, &store, stream, None, true).await } + async move { ingest::ingest(&repo, &store, stream, None, true, false).await } .instrument(span), ) })), @@ -790,6 +764,7 @@ async fn launch( stream, Some(Alias::from_existing(&filename)), !CONFIG.media.skip_validate_imports, + false, ) .await } @@ -869,11 +844,6 @@ async fn launch( ), ) .service(web::resource("/download").route(web::get().to(download::))) - .service( - web::resource("/download_backgrounded") - .route(web::get().to(download_backgrounded::)), - ) - .service(web::resource("/cache").route(web::get().to(cache::))) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) diff --git a/src/queue.rs b/src/queue.rs index 22e095f..aeade42 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -37,6 +37,7 @@ enum Process { upload_id: Serde, declared_alias: Option>, should_validate: bool, + is_cached: bool, }, Generate { target_format: ImageFormat, @@ -84,12 +85,14 @@ pub(crate) async fn queue_ingest( upload_id: UploadId, declared_alias: Option, should_validate: bool, + is_cached: bool, ) -> Result<(), Error> { let job = serde_json::to_vec(&Process::Ingest { identifier, declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), should_validate, + is_cached, })?; repo.push(PROCESS_QUEUE, job.into()).await?; Ok(()) diff --git a/src/queue/process.rs b/src/queue/process.rs index 6b31a78..5487b8b 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -27,6 +27,7 @@ where upload_id, declared_alias, should_validate, + is_cached, } => { process_ingest( repo, @@ -35,6 +36,7 @@ where Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), should_validate, + is_cached, ) .await? } @@ -72,6 +74,7 @@ async fn process_ingest( upload_id: UploadId, declared_alias: Option, should_validate: bool, + is_cached: bool, ) -> Result<(), Error> where R: FullRepo + 'static, @@ -85,8 +88,15 @@ where .await? .map_err(Error::from); - let session = - crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?; + let session = crate::ingest::ingest( + repo, + store, + stream, + declared_alias, + should_validate, + is_cached, + ) + .await?; let token = session.delete_token().await?;