diff --git a/src/queue/process.rs b/src/queue/process.rs index 0c055e1..2e22f1d 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,5 +1,5 @@ use crate::{ - error::Error, + error::{Error, UploadError}, formats::InputProcessableFormat, ingest::Session, queue::{Base64Bytes, LocalBoxFuture, Process}, @@ -71,27 +71,37 @@ async fn process_ingest( unprocessed_identifier: Vec, upload_id: UploadId, declared_alias: Option, - media: &crate::config::Media, + media: &'static crate::config::Media, ) -> Result<(), Error> where R: FullRepo + 'static, - S: Store, + S: Store + 'static, { let fut = async { let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?; - let stream = store - .to_stream(&unprocessed_identifier, None, None) - .await? - .map_err(Error::from); + let ident = unprocessed_identifier.clone(); + let store2 = store.clone(); + let repo = repo.clone(); - let session = crate::ingest::ingest(repo, store, stream, declared_alias, media).await?; + let error_boundary = actix_rt::spawn(async move { + let stream = store2 + .to_stream(&ident, None, None) + .await? + .map_err(Error::from); - let token = session.delete_token().await?; + let session = + crate::ingest::ingest(&repo, &store2, stream, declared_alias, media).await?; + + let token = session.delete_token().await?; + + Ok((session, token)) as Result<(Session, DeleteToken), Error> + }) + .await; store.remove(&unprocessed_identifier).await?; - Ok((session, token)) as Result<(Session, DeleteToken), Error> + error_boundary.map_err(|_| UploadError::Canceled)? }; let result = match fut.await {