2022-04-02 21:44:03 +00:00
|
|
|
use crate::{
|
2022-10-01 17:06:33 +00:00
|
|
|
bytes_stream::BytesStream,
|
2022-09-25 20:17:33 +00:00
|
|
|
either::Either,
|
2022-04-02 21:44:03 +00:00
|
|
|
error::{Error, UploadError},
|
|
|
|
magick::ValidInputType,
|
|
|
|
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
|
|
|
|
store::Store,
|
|
|
|
CONFIG,
|
|
|
|
};
|
2022-10-01 17:06:33 +00:00
|
|
|
use actix_web::web::Bytes;
|
2022-04-02 21:44:03 +00:00
|
|
|
use futures_util::{Stream, StreamExt};
|
|
|
|
use sha2::{Digest, Sha256};
|
2022-04-08 17:51:33 +00:00
|
|
|
use tracing::{Instrument, Span};
|
2022-04-02 21:44:03 +00:00
|
|
|
|
|
|
|
mod hasher;
|
|
|
|
use hasher::Hasher;
|
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
#[derive(Debug)]
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) struct Session<R, S>
|
|
|
|
where
|
|
|
|
R: FullRepo + 'static,
|
|
|
|
S: Store,
|
|
|
|
{
|
|
|
|
repo: R,
|
|
|
|
hash: Option<Vec<u8>>,
|
|
|
|
alias: Option<Alias>,
|
|
|
|
identifier: Option<S::Identifier>,
|
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(skip(stream))]
|
2022-10-01 17:06:33 +00:00
|
|
|
async fn aggregate<S>(mut stream: S) -> Result<Bytes, Error>
|
2022-04-07 02:40:49 +00:00
|
|
|
where
|
2022-10-01 17:06:33 +00:00
|
|
|
S: Stream<Item = Result<Bytes, Error>> + Unpin,
|
2022-04-07 02:40:49 +00:00
|
|
|
{
|
2022-10-01 17:06:33 +00:00
|
|
|
let mut buf = BytesStream::new();
|
2022-04-07 02:40:49 +00:00
|
|
|
|
|
|
|
while let Some(res) = stream.next().await {
|
2022-10-01 17:06:33 +00:00
|
|
|
buf.add_bytes(res?);
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
|
|
|
|
2022-10-01 17:06:33 +00:00
|
|
|
Ok(buf.into_bytes())
|
2022-04-07 02:40:49 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip(repo, store, stream))]
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) async fn ingest<R, S>(
|
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
2022-09-24 22:18:53 +00:00
|
|
|
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
|
2022-04-02 21:44:03 +00:00
|
|
|
declared_alias: Option<Alias>,
|
|
|
|
should_validate: bool,
|
|
|
|
) -> Result<Session<R, S>, Error>
|
|
|
|
where
|
|
|
|
R: FullRepo + 'static,
|
|
|
|
S: Store,
|
|
|
|
{
|
2022-04-07 17:56:40 +00:00
|
|
|
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-04-07 02:40:49 +00:00
|
|
|
let bytes = aggregate(stream).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Validating bytes");
|
2023-02-04 23:32:36 +00:00
|
|
|
let (input_type, validated_reader) =
|
|
|
|
crate::validate::validate_bytes(bytes, &CONFIG.media, should_validate).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-09-25 20:17:33 +00:00
|
|
|
let processed_reader = if let Some(operations) = CONFIG.media.preprocess_steps() {
|
|
|
|
if let Some(format) = input_type.to_format() {
|
|
|
|
let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?;
|
|
|
|
|
|
|
|
let processed_reader =
|
|
|
|
crate::magick::process_image_async_read(validated_reader, magick_args, format)?;
|
|
|
|
|
|
|
|
Either::left(processed_reader)
|
|
|
|
} else {
|
|
|
|
Either::right(validated_reader)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
Either::right(validated_reader)
|
|
|
|
};
|
|
|
|
|
|
|
|
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
|
2022-09-24 22:18:53 +00:00
|
|
|
let hasher = hasher_reader.hasher();
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-09-24 22:18:53 +00:00
|
|
|
let identifier = store.save_async_read(hasher_reader).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
|
|
|
drop(permit);
|
|
|
|
|
|
|
|
let mut session = Session {
|
|
|
|
repo: repo.clone(),
|
|
|
|
hash: None,
|
|
|
|
alias: None,
|
|
|
|
identifier: Some(identifier.clone()),
|
|
|
|
};
|
|
|
|
|
2022-09-24 22:18:53 +00:00
|
|
|
let hash = hasher.borrow_mut().finalize_reset().to_vec();
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2023-06-23 16:39:43 +00:00
|
|
|
save_upload(&mut session, repo, store, &hash, &identifier).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
|
|
|
if let Some(alias) = declared_alias {
|
2023-02-25 17:34:48 +00:00
|
|
|
session.add_existing_alias(&hash, alias).await?
|
2022-04-02 21:44:03 +00:00
|
|
|
} else {
|
2023-02-25 17:34:48 +00:00
|
|
|
session.create_alias(&hash, input_type).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(session)
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip_all)]
|
2022-04-02 21:44:03 +00:00
|
|
|
async fn save_upload<R, S>(
|
2023-06-23 16:39:43 +00:00
|
|
|
session: &mut Session<R, S>,
|
2022-04-02 21:44:03 +00:00
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
|
|
|
hash: &[u8],
|
|
|
|
identifier: &S::Identifier,
|
|
|
|
) -> Result<(), Error>
|
|
|
|
where
|
|
|
|
S: Store,
|
|
|
|
R: FullRepo,
|
|
|
|
{
|
|
|
|
if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() {
|
2023-06-23 16:39:43 +00:00
|
|
|
// duplicate upload
|
2022-04-02 21:44:03 +00:00
|
|
|
store.remove(identifier).await?;
|
2023-06-23 16:43:15 +00:00
|
|
|
session.identifier.take();
|
2022-04-02 21:44:03 +00:00
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2023-06-23 16:39:43 +00:00
|
|
|
// Set hash after upload uniquness check so we don't clean existing files on failure
|
|
|
|
session.hash = Some(Vec::from(hash));
|
|
|
|
|
2022-04-02 21:44:03 +00:00
|
|
|
repo.relate_identifier(hash.to_vec().into(), identifier)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<R, S> Session<R, S>
|
|
|
|
where
|
|
|
|
R: FullRepo + 'static,
|
|
|
|
S: Store,
|
|
|
|
{
|
|
|
|
pub(crate) fn disarm(&mut self) {
|
2022-04-06 02:47:35 +00:00
|
|
|
let _ = self.hash.take();
|
2022-04-02 21:44:03 +00:00
|
|
|
let _ = self.alias.take();
|
|
|
|
let _ = self.identifier.take();
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn alias(&self) -> Option<&Alias> {
|
|
|
|
self.alias.as_ref()
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip_all)]
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
|
|
|
|
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Generating delete token");
|
2022-04-02 21:44:03 +00:00
|
|
|
let delete_token = DeleteToken::generate();
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Saving delete token");
|
2022-04-02 21:44:03 +00:00
|
|
|
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
|
|
|
|
|
|
|
|
if res.is_err() {
|
|
|
|
let delete_token = self.repo.delete_token(&alias).await?;
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Returning existing delete token, {:?}", delete_token);
|
2022-04-02 21:44:03 +00:00
|
|
|
return Ok(delete_token);
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Returning new delete token, {:?}", delete_token);
|
2022-04-02 21:44:03 +00:00
|
|
|
Ok(delete_token)
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip(self, hash))]
|
2023-02-25 20:31:57 +00:00
|
|
|
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> {
|
2022-04-02 21:44:03 +00:00
|
|
|
AliasRepo::create(&self.repo, &alias)
|
|
|
|
.await?
|
|
|
|
.map_err(|_| UploadError::DuplicateAlias)?;
|
|
|
|
|
|
|
|
self.alias = Some(alias.clone());
|
|
|
|
|
|
|
|
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
|
|
|
|
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self, hash))]
|
2023-02-25 20:31:57 +00:00
|
|
|
async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> {
|
2022-04-02 21:44:03 +00:00
|
|
|
loop {
|
|
|
|
let alias = Alias::generate(input_type.as_ext().to_string());
|
|
|
|
|
|
|
|
if AliasRepo::create(&self.repo, &alias).await?.is_ok() {
|
|
|
|
self.alias = Some(alias.clone());
|
|
|
|
|
|
|
|
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
|
|
|
|
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
|
|
|
|
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace!("Alias exists, regenerating");
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<R, S> Drop for Session<R, S>
|
|
|
|
where
|
|
|
|
R: FullRepo + 'static,
|
|
|
|
S: Store,
|
|
|
|
{
|
|
|
|
fn drop(&mut self) {
|
2022-10-02 03:47:52 +00:00
|
|
|
if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() {
|
|
|
|
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
|
|
|
|
cleanup_parent_span.follows_from(Span::current());
|
|
|
|
|
|
|
|
if let Some(hash) = self.hash.take() {
|
|
|
|
let repo = self.repo.clone();
|
|
|
|
|
2022-12-08 04:43:12 +00:00
|
|
|
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup hash", hash = ?hash);
|
2022-10-02 03:47:52 +00:00
|
|
|
|
|
|
|
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
|
|
|
|
}
|
|
|
|
.instrument(cleanup_span),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
if let Some(alias) = self.alias.take() {
|
|
|
|
let repo = self.repo.clone();
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-12-08 04:43:12 +00:00
|
|
|
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias);
|
2022-04-08 17:51:33 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
if let Ok(token) = repo.delete_token(&alias).await {
|
2022-04-08 17:51:33 +00:00
|
|
|
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
|
2022-10-02 03:47:52 +00:00
|
|
|
} else {
|
|
|
|
let token = DeleteToken::generate();
|
|
|
|
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
|
|
|
|
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
|
|
|
|
}
|
2022-04-08 17:51:33 +00:00
|
|
|
}
|
2022-04-07 17:56:40 +00:00
|
|
|
}
|
2022-10-02 03:47:52 +00:00
|
|
|
.instrument(cleanup_span),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
if let Some(identifier) = self.identifier.take() {
|
|
|
|
let repo = self.repo.clone();
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-12-08 04:43:12 +00:00
|
|
|
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier);
|
2022-04-08 17:51:33 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
|
|
|
actix_rt::spawn(
|
|
|
|
async move {
|
|
|
|
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
|
|
|
|
}
|
|
|
|
.instrument(cleanup_span),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|