2023-09-02 23:30:45 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2022-04-03 01:56:29 +00:00
|
|
|
use crate::{
|
|
|
|
error::Error,
|
2023-08-16 21:09:40 +00:00
|
|
|
repo::{ArcRepo, UploadId},
|
2024-02-04 00:18:13 +00:00
|
|
|
state::State,
|
2022-04-03 01:56:29 +00:00
|
|
|
store::Store,
|
|
|
|
};
|
|
|
|
use actix_web::web::Bytes;
|
2023-08-23 16:59:42 +00:00
|
|
|
use futures_core::Stream;
|
2023-07-14 19:53:37 +00:00
|
|
|
use mime::APPLICATION_OCTET_STREAM;
|
2022-04-08 17:51:33 +00:00
|
|
|
use tracing::{Instrument, Span};
|
2022-04-03 01:56:29 +00:00
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
pub(crate) struct Backgrounded {
|
2023-08-16 00:19:03 +00:00
|
|
|
repo: ArcRepo,
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: Option<Arc<str>>,
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: Option<UploadId>,
|
|
|
|
}
|
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
impl Backgrounded {
|
2022-04-03 01:56:29 +00:00
|
|
|
pub(crate) fn disarm(mut self) {
|
|
|
|
let _ = self.identifier.take();
|
|
|
|
let _ = self.upload_id.take();
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn upload_id(&self) -> Option<UploadId> {
|
|
|
|
self.upload_id
|
|
|
|
}
|
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
pub(crate) fn identifier(&self) -> Option<&Arc<str>> {
|
2022-04-03 01:56:29 +00:00
|
|
|
self.identifier.as_ref()
|
|
|
|
}
|
|
|
|
|
2024-02-04 00:18:13 +00:00
|
|
|
pub(crate) async fn proxy<S, P>(state: &State<S>, stream: P) -> Result<Self, Error>
|
2022-04-03 01:56:29 +00:00
|
|
|
where
|
2023-09-02 23:30:45 +00:00
|
|
|
S: Store,
|
2023-09-10 22:55:13 +00:00
|
|
|
P: Stream<Item = Result<Bytes, Error>> + 'static,
|
2022-04-03 01:56:29 +00:00
|
|
|
{
|
|
|
|
let mut this = Self {
|
2024-02-04 00:18:13 +00:00
|
|
|
repo: state.repo.clone(),
|
2022-04-03 01:56:29 +00:00
|
|
|
identifier: None,
|
2023-09-01 23:41:04 +00:00
|
|
|
upload_id: None,
|
2022-04-03 01:56:29 +00:00
|
|
|
};
|
|
|
|
|
2024-02-04 00:18:13 +00:00
|
|
|
this.do_proxy(&state.store, stream).await?;
|
2022-04-03 01:56:29 +00:00
|
|
|
|
|
|
|
Ok(this)
|
|
|
|
}
|
|
|
|
|
2024-02-04 00:18:13 +00:00
|
|
|
async fn do_proxy<S, P>(&mut self, store: &S, stream: P) -> Result<(), Error>
|
2022-04-03 01:56:29 +00:00
|
|
|
where
|
2023-09-02 23:30:45 +00:00
|
|
|
S: Store,
|
2023-09-10 22:55:13 +00:00
|
|
|
P: Stream<Item = Result<Bytes, Error>> + 'static,
|
2022-04-03 01:56:29 +00:00
|
|
|
{
|
2023-09-01 23:41:04 +00:00
|
|
|
self.upload_id = Some(self.repo.create_upload().await?);
|
2022-04-03 01:56:29 +00:00
|
|
|
|
2023-09-10 22:55:13 +00:00
|
|
|
let stream = Box::pin(crate::stream::map_err(stream, |e| {
|
|
|
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
|
|
|
}));
|
2022-04-03 01:56:29 +00:00
|
|
|
|
2023-07-14 19:53:37 +00:00
|
|
|
// use octet-stream, we don't know the upload's real type yet
|
|
|
|
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;
|
2022-04-03 01:56:29 +00:00
|
|
|
|
2022-04-03 02:15:39 +00:00
|
|
|
self.identifier = Some(identifier);
|
2022-04-03 01:56:29 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
impl Drop for Backgrounded {
|
2022-04-03 01:56:29 +00:00
|
|
|
fn drop(&mut self) {
|
2023-07-23 02:11:28 +00:00
|
|
|
let any_items = self.identifier.is_some() || self.upload_id.is_some();
|
2023-07-22 21:47:59 +00:00
|
|
|
|
2023-12-27 00:06:38 +00:00
|
|
|
metrics::counter!("pict-rs.background.upload", "completed" => (!any_items).to_string())
|
|
|
|
.increment(1);
|
2023-07-23 02:11:28 +00:00
|
|
|
|
|
|
|
if any_items {
|
2022-10-02 03:47:52 +00:00
|
|
|
let cleanup_parent_span =
|
|
|
|
tracing::info_span!(parent: None, "Dropped backgrounded cleanup");
|
|
|
|
cleanup_parent_span.follows_from(Span::current());
|
|
|
|
|
|
|
|
if let Some(identifier) = self.identifier.take() {
|
|
|
|
let repo = self.repo.clone();
|
|
|
|
|
2022-12-08 04:43:12 +00:00
|
|
|
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier);
|
2022-10-02 03:47:52 +00:00
|
|
|
|
2023-09-04 02:30:47 +00:00
|
|
|
crate::sync::spawn(
|
2023-10-21 00:08:11 +00:00
|
|
|
"backgrounded-cleanup-identifier",
|
2023-09-04 02:30:47 +00:00
|
|
|
async move {
|
|
|
|
let _ = crate::queue::cleanup_identifier(&repo, &identifier).await;
|
|
|
|
}
|
|
|
|
.instrument(cleanup_span),
|
|
|
|
);
|
2022-10-02 03:47:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(upload_id) = self.upload_id {
|
|
|
|
let repo = self.repo.clone();
|
|
|
|
|
2022-12-08 04:43:12 +00:00
|
|
|
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
|
2022-10-02 03:47:52 +00:00
|
|
|
|
2023-09-04 02:30:47 +00:00
|
|
|
crate::sync::spawn(
|
2023-10-21 00:08:11 +00:00
|
|
|
"backgrounded-claim-upload",
|
2023-09-04 02:30:47 +00:00
|
|
|
async move {
|
|
|
|
let _ = repo.claim(upload_id).await;
|
|
|
|
}
|
|
|
|
.instrument(cleanup_span),
|
|
|
|
);
|
2022-10-02 03:47:52 +00:00
|
|
|
}
|
2022-04-03 01:56:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|