Fix dangling unprocessed uploads

Adds error boundary around backgrounded ingest
This commit is contained in:
asonix 2023-07-17 22:30:10 -05:00
parent 20daced620
commit b32e1cc6f6
1 changed files with 20 additions and 10 deletions

View File

@ -1,6 +1,6 @@
use crate::{
config::ImageFormat,
error::Error,
error::{Error, UploadError},
ingest::Session,
queue::{Base64Bytes, LocalBoxFuture, Process},
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
@ -75,24 +75,34 @@ async fn process_ingest<R, S>(
) -> Result<(), Error>
where
R: FullRepo + 'static,
S: Store,
S: Store + 'static,
{
let fut = async {
let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?;
let stream = store
.to_stream(&unprocessed_identifier, None, None)
.await?
.map_err(Error::from);
let ident = unprocessed_identifier.clone();
let session =
crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?;
let store2 = store.clone();
let repo = repo.clone();
let error_boundary = actix_rt::spawn(async move {
let stream = store2
.to_stream(&ident, None, None)
.await?
.map_err(Error::from);
let token = session.delete_token().await?;
let session =
crate::ingest::ingest(&repo, &store2, stream, declared_alias, should_validate)
.await?;
let token = session.delete_token().await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
})
.await;
store.remove(&unprocessed_identifier).await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
error_boundary.map_err(|_| UploadError::Canceled)?
};
let result = match fut.await {