mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Fix dangling unprocessed uploads
Adds error boundary around backgrounded ingest
This commit is contained in:
parent
42c801f0fe
commit
558605381d
1 changed files with 20 additions and 10 deletions
|
@ -1,5 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::{Error, UploadError},
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
ingest::Session,
|
ingest::Session,
|
||||||
queue::{Base64Bytes, LocalBoxFuture, Process},
|
queue::{Base64Bytes, LocalBoxFuture, Process},
|
||||||
|
@ -71,27 +71,37 @@ async fn process_ingest<R, S>(
|
||||||
unprocessed_identifier: Vec<u8>,
|
unprocessed_identifier: Vec<u8>,
|
||||||
upload_id: UploadId,
|
upload_id: UploadId,
|
||||||
declared_alias: Option<Alias>,
|
declared_alias: Option<Alias>,
|
||||||
media: &crate::config::Media,
|
media: &'static crate::config::Media,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
R: FullRepo + 'static,
|
R: FullRepo + 'static,
|
||||||
S: Store,
|
S: Store + 'static,
|
||||||
{
|
{
|
||||||
let fut = async {
|
let fut = async {
|
||||||
let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?;
|
let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?;
|
||||||
|
|
||||||
let stream = store
|
let ident = unprocessed_identifier.clone();
|
||||||
.to_stream(&unprocessed_identifier, None, None)
|
let store2 = store.clone();
|
||||||
.await?
|
let repo = repo.clone();
|
||||||
.map_err(Error::from);
|
|
||||||
|
|
||||||
let session = crate::ingest::ingest(repo, store, stream, declared_alias, media).await?;
|
let error_boundary = actix_rt::spawn(async move {
|
||||||
|
let stream = store2
|
||||||
|
.to_stream(&ident, None, None)
|
||||||
|
.await?
|
||||||
|
.map_err(Error::from);
|
||||||
|
|
||||||
let token = session.delete_token().await?;
|
let session =
|
||||||
|
crate::ingest::ingest(&repo, &store2, stream, declared_alias, media).await?;
|
||||||
|
|
||||||
|
let token = session.delete_token().await?;
|
||||||
|
|
||||||
|
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
store.remove(&unprocessed_identifier).await?;
|
store.remove(&unprocessed_identifier).await?;
|
||||||
|
|
||||||
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
|
error_boundary.map_err(|_| UploadError::Canceled)?
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = match fut.await {
|
let result = match fut.await {
|
||||||
|
|
Loading…
Reference in a new issue