use crate::{ bytes_stream::BytesStream, either::Either, error::{Error, UploadError}, magick::ValidInputType, repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, store::Store, CONFIG, }; use actix_web::web::Bytes; use futures_util::{Stream, StreamExt}; use sha2::{Digest, Sha256}; use tracing::{Instrument, Span}; mod hasher; use hasher::Hasher; #[derive(Debug)] pub(crate) struct Session where R: FullRepo + 'static, S: Store, { repo: R, hash: Option>, alias: Option, identifier: Option, } #[tracing::instrument(name = "Aggregate", skip(stream))] async fn aggregate(mut stream: S) -> Result where S: Stream> + Unpin, { let mut buf = BytesStream::new(); while let Some(res) = stream.next().await { buf.add_bytes(res?); } Ok(buf.into_bytes()) } #[tracing::instrument(name = "Ingest", skip(stream))] pub(crate) async fn ingest( repo: &R, store: &S, stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, is_cached: bool, ) -> Result, Error> where R: FullRepo + 'static, S: Store, { let permit = crate::PROCESS_SEMAPHORE.acquire().await; let bytes = aggregate(stream).await?; tracing::debug!("Validating bytes"); let (input_type, validated_reader) = crate::validate::validate_bytes( bytes, CONFIG.media.format, CONFIG.media.enable_silent_video, CONFIG.media.enable_full_video, CONFIG.media.video_codec, CONFIG.media.audio_codec, should_validate, ) .await?; let processed_reader = if let Some(operations) = CONFIG.media.preprocess_steps() { if let Some(format) = input_type.to_format() { let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?; let processed_reader = crate::magick::process_image_async_read(validated_reader, magick_args, format)?; Either::left(processed_reader) } else { Either::right(validated_reader) } } else { Either::right(validated_reader) }; let hasher_reader = Hasher::new(processed_reader, Sha256::new()); let hasher = hasher_reader.hasher(); let identifier = store.save_async_read(hasher_reader).await?; drop(permit); let mut session = Session { repo: repo.clone(), hash: None, alias: None, identifier: Some(identifier.clone()), }; let hash = hasher.borrow_mut().finalize_reset().to_vec(); session.hash = Some(hash.clone()); save_upload(repo, store, &hash, &identifier).await?; if let Some(alias) = declared_alias { session.add_existing_alias(&hash, alias, is_cached).await? } else { session.create_alias(&hash, input_type, is_cached).await?; } Ok(session) } #[tracing::instrument] async fn save_upload( repo: &R, store: &S, hash: &[u8], identifier: &S::Identifier, ) -> Result<(), Error> where S: Store, R: FullRepo, { if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() { store.remove(identifier).await?; return Ok(()); } repo.relate_identifier(hash.to_vec().into(), identifier) .await?; Ok(()) } impl Session where R: FullRepo + 'static, S: Store, { pub(crate) fn disarm(&mut self) { let _ = self.hash.take(); let _ = self.alias.take(); let _ = self.identifier.take(); } pub(crate) fn alias(&self) -> Option<&Alias> { self.alias.as_ref() } #[tracing::instrument] pub(crate) async fn delete_token(&self) -> Result { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; tracing::debug!("Generating delete token"); let delete_token = DeleteToken::generate(); tracing::debug!("Saving delete token"); let res = self.repo.relate_delete_token(&alias, &delete_token).await?; if res.is_err() { let delete_token = self.repo.delete_token(&alias).await?; tracing::debug!("Returning existing delete token, {:?}", delete_token); return Ok(delete_token); } tracing::debug!("Returning new delete token, {:?}", delete_token); Ok(delete_token) } #[tracing::instrument] async fn add_existing_alias( &mut self, hash: &[u8], alias: Alias, is_cached: bool, ) -> Result<(), Error> { AliasRepo::create(&self.repo, &alias) .await? .map_err(|_| UploadError::DuplicateAlias)?; self.alias = Some(alias.clone()); self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; if is_cached { self.repo.mark_cached(&alias).await?; } Ok(()) } #[tracing::instrument] async fn create_alias( &mut self, hash: &[u8], input_type: ValidInputType, is_cached: bool, ) -> Result<(), Error> { tracing::debug!("Alias gen loop"); loop { let alias = Alias::generate(input_type.as_ext().to_string()); if AliasRepo::create(&self.repo, &alias).await?.is_ok() { self.alias = Some(alias.clone()); self.repo.relate_hash(&alias, hash.to_vec().into()).await?; self.repo.relate_alias(hash.to_vec().into(), &alias).await?; if is_cached { self.repo.mark_cached(&alias).await?; } return Ok(()); } tracing::debug!("Alias exists, regenerating"); } } } impl Drop for Session where R: FullRepo + 'static, S: Store, { #[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))] fn drop(&mut self) { if let Some(hash) = self.hash.take() { let repo = self.repo.clone(); let cleanup_span = tracing::info_span!(parent: None, "Session cleanup hash", hash = ?hash); cleanup_span.follows_from(Span::current()); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; } .instrument(cleanup_span), ) }); } if let Some(alias) = self.alias.take() { let repo = self.repo.clone(); let cleanup_span = tracing::info_span!(parent: None, "Session cleanup alias", alias = ?alias); cleanup_span.follows_from(Span::current()); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { if let Ok(token) = repo.delete_token(&alias).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; } else { let token = DeleteToken::generate(); if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; } } } .instrument(cleanup_span), ) }); } if let Some(identifier) = self.identifier.take() { let repo = self.repo.clone(); let cleanup_span = tracing::info_span!(parent: None, "Session cleanup identifier", identifier = ?identifier); cleanup_span.follows_from(Span::current()); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { let _ = crate::queue::cleanup_identifier(&repo, identifier).await; } .instrument(cleanup_span), ) }); } } }