diff --git a/src/error.rs b/src/error.rs index c8b077b..348ac54 100644 --- a/src/error.rs +++ b/src/error.rs @@ -81,6 +81,9 @@ pub(crate) enum UploadError { #[error("Requested a file that doesn't exist")] MissingAlias, + #[error("Requested a file that pict-rs lost track of")] + MissingIdentifier, + #[error("Provided token did not match expected token")] InvalidToken, @@ -169,12 +172,7 @@ impl ResponseError for Error { | UploadError::Repo(crate::repo::RepoError::AlreadyClaimed) | UploadError::SilentVideoDisabled, ) => StatusCode::BAD_REQUEST, - Some( - UploadError::Repo(crate::repo::RepoError::SledError( - crate::repo::sled::SledError::Missing(_), - )) - | UploadError::MissingAlias, - ) => StatusCode::NOT_FOUND, + Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND, Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN, Some(UploadError::Range) => StatusCode::RANGE_NOT_SATISFIABLE, _ => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/src/generate.rs b/src/generate.rs index 4ebfe14..c4db291 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -2,7 +2,7 @@ use crate::{ concurrent_processor::CancelSafeProcessor, config::ImageFormat, details::Details, - error::Error, + error::{Error, UploadError}, ffmpeg::{ThumbnailFormat, VideoFormat}, repo::{Alias, FullRepo}, store::Store, @@ -64,7 +64,10 @@ async fn process( { identifier } else { - let identifier = repo.identifier(hash.clone()).await?; + let Some(identifier) = repo.identifier(hash.clone()).await? else { + return Err(UploadError::MissingIdentifier.into()); + }; + let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, diff --git a/src/ingest.rs b/src/ingest.rs index 47a2878..fd53e27 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,7 +3,7 @@ use crate::{ either::Either, error::{Error, UploadError}, magick::ValidInputType, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, + repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, FullRepo, HashRepo}, store::Store, CONFIG, }; @@ -156,8 +156,7 @@ where tracing::trace!("Saving delete token"); let res = self.repo.relate_delete_token(&alias, &delete_token).await?; - if res.is_err() { - let delete_token = self.repo.delete_token(&alias).await?; + if let Err(AlreadyExists(delete_token)) = res { tracing::trace!("Returning existing delete token, {:?}", delete_token); return Ok(delete_token); } @@ -232,7 +231,7 @@ where tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - if let Ok(token) = repo.delete_token(&alias).await { + if let Ok(Some(token)) = repo.delete_token(&alias).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; } else { let token = DeleteToken::generate(); diff --git a/src/lib.rs b/src/lib.rs index 3bd6bfe..ba051cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -822,7 +822,14 @@ async fn serve( (hash, alias, true) }; - let identifier = repo.identifier(hash).await?; + let Some(identifier) = repo.identifier(hash.clone()).await? else { + tracing::warn!( + "Original File identifier for hash {} is missing, queue cleanup task", + hex::encode(&hash) + ); + crate::queue::cleanup_hash(&repo, hash).await?; + return Ok(HttpResponse::NotFound().finish()); + }; let details = ensure_details(&repo, &store, &alias).await?; @@ -1548,8 +1555,8 @@ where } let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await { - Ok(identifier) => identifier, - Err(e) if e.is_missing() => { + Ok(Some(identifier)) => identifier, + Ok(None) => { tracing::warn!( "Original File identifier for hash {} is missing, queue cleanup task", hex::encode(&hash) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index dd3c667..8d801a7 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -92,8 +92,10 @@ where if !aliases.is_empty() { for alias in aliases { - let token = repo.delete_token(&alias).await?; - super::cleanup_alias(repo, alias, token).await?; + // TODO: decide if it is okay to skip aliases without tokens + if let Some(token) = repo.delete_token(&alias).await? { + super::cleanup_alias(repo, alias, token).await?; + } } // Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned return Ok(()); @@ -105,7 +107,7 @@ where .into_iter() .map(|(_, v)| v) .collect::>(); - idents.push(repo.identifier(hash.clone()).await?); + idents.extend(repo.identifier(hash.clone()).await?); idents.extend(repo.motion_identifier(hash.clone()).await?); for identifier in idents { @@ -123,7 +125,8 @@ where R: FullRepo, { let saved_delete_token = repo.delete_token(&alias).await?; - if saved_delete_token != token { + + if saved_delete_token.is_some() && saved_delete_token != Some(token) { return Err(UploadError::InvalidToken.into()); } diff --git a/src/repo.rs b/src/repo.rs index a8ece2a..cf60d87 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -34,7 +34,10 @@ pub(crate) struct DeleteToken { id: MaybeUuid, } -pub(crate) struct AlreadyExists; +pub(crate) struct HashAlreadyExists; +pub(crate) struct AliasAlreadyExists; + +pub(crate) struct AlreadyExists(pub(super) DeleteToken); #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { @@ -56,15 +59,9 @@ pub(crate) enum RepoError { #[error("Panic in blocking operation")] Canceled, -} -impl RepoError { - pub(crate) const fn is_missing(&self) -> bool { - match self { - Self::SledError(e) => e.is_missing(), - _ => false, - } - } + #[error("Required field is missing {0}")] + Missing(&'static str), } #[async_trait::async_trait(?Send)] @@ -91,7 +88,7 @@ pub(crate) trait FullRepo: return Ok(None); }; - self.identifier(hash).await.map(Some) + self.identifier(hash).await } #[tracing::instrument(skip(self))] @@ -112,7 +109,9 @@ pub(crate) trait FullRepo: return Ok(None); }; - let identifier = self.identifier::(hash.clone()).await?; + let Some(identifier) = self.identifier::(hash.clone()).await? else { + return Ok(None); + }; match self.details(&identifier).await? { Some(details) if details.is_motion() => self.motion_identifier::(hash).await, @@ -270,7 +269,7 @@ pub(crate) trait HashRepo: BaseRepo { async fn hashes(&self) -> Self::Stream; - async fn create(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn create(&self, hash: Self::Bytes) -> Result, RepoError>; async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; @@ -281,8 +280,10 @@ pub(crate) trait HashRepo: BaseRepo { hash: Self::Bytes, identifier: &I, ) -> Result<(), StoreError>; - async fn identifier(&self, hash: Self::Bytes) - -> Result; + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result, StoreError>; async fn relate_variant_identifier( &self, @@ -329,7 +330,7 @@ where T::hashes(self).await } - async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { T::create(self, hash).await } @@ -356,7 +357,7 @@ where async fn identifier( &self, hash: Self::Bytes, - ) -> Result { + ) -> Result, StoreError> { T::identifier(self, hash).await } @@ -410,14 +411,14 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo: BaseRepo { - async fn create(&self, alias: &Alias) -> Result, RepoError>; + async fn create(&self, alias: &Alias) -> Result, RepoError>; async fn relate_delete_token( &self, alias: &Alias, delete_token: &DeleteToken, ) -> Result, RepoError>; - async fn delete_token(&self, alias: &Alias) -> Result; + async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>; async fn hash(&self, alias: &Alias) -> Result, RepoError>; @@ -430,7 +431,7 @@ impl AliasRepo for actix_web::web::Data where T: AliasRepo, { - async fn create(&self, alias: &Alias) -> Result, RepoError> { + async fn create(&self, alias: &Alias) -> Result, RepoError> { T::create(self, alias).await } @@ -442,7 +443,7 @@ where T::relate_delete_token(self, alias, delete_token).await } - async fn delete_token(&self, alias: &Alias) -> Result { + async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { T::delete_token(self, alias).await } @@ -612,7 +613,11 @@ where } } - let main_identifier = repo.identifier::(hash.clone()).await?; + let Some(main_identifier) = repo.identifier::(hash.clone()).await? else { + tracing::warn!("Missing identifier for hash {}, queueing cleanup", hex::encode(&hash)); + crate::queue::cleanup_hash(repo, hash.clone()).await?; + return Err(RepoError::Missing("hash -> identifier").into()); + }; if let Some(new_main_identifier) = main_identifier.normalize_for_migration() { migrate_identifier_details(repo, &main_identifier, &new_main_identifier).await?; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ab41230..de4e4d6 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,14 +1,15 @@ use crate::{ repo::{ - Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, - Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, + Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, + FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, + UploadId, UploadRepo, UploadResult, }, serde_str::Serde, store::StoreError, stream::from_iterator, }; use futures_util::Stream; -use sled::{Db, IVec, Tree}; +use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ collections::HashMap, pin::Pin, @@ -44,19 +45,10 @@ pub(crate) enum SledError { #[error("Invalid details json")] Details(#[from] serde_json::Error), - #[error("Required field was not present: {0}")] - Missing(&'static str), - #[error("Operation panicked")] Panic, } -impl SledError { - pub(super) const fn is_missing(&self) -> bool { - matches!(self, Self::Missing(_)) - } -} - #[derive(Clone)] pub(crate) struct SledRepo { healthz_count: Arc, @@ -456,13 +448,13 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { let res = b!(self.hashes, { let hash2 = hash.clone(); hashes.compare_and_swap(hash, None as Option, Some(hash2)) }); - Ok(res.map_err(|_| AlreadyExists)) + Ok(res.map_err(|_| HashAlreadyExists)) } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] @@ -515,13 +507,12 @@ impl HashRepo for SledRepo { async fn identifier( &self, hash: Self::Bytes, - ) -> Result { - let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); + ) -> Result, StoreError> { + let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { + return Ok(None); + }; - opt.ok_or(SledError::Missing("hash -> identifier")) - .map_err(RepoError::from) - .map_err(StoreError::from) - .and_then(|ivec| I::from_bytes(ivec.to_vec())) + Ok(Some(I::from_bytes(ivec.to_vec())?)) } #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] @@ -679,7 +670,7 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, alias: &Alias) -> Result, RepoError> { + async fn create(&self, alias: &Alias) -> Result, RepoError> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -688,7 +679,7 @@ impl AliasRepo for SledRepo { aliases.compare_and_swap(bytes, None as Option, Some(bytes2)) ); - Ok(res.map_err(|_| AlreadyExists)) + Ok(res.map_err(|_| AliasAlreadyExists)) } #[tracing::instrument(level = "trace", skip(self))] @@ -700,23 +691,50 @@ impl AliasRepo for SledRepo { let key = alias.to_bytes(); let token = delete_token.to_bytes(); - let res = b!( - self.alias_delete_tokens, - alias_delete_tokens.compare_and_swap(key, None as Option, Some(token)) - ); + let res = b!(self.alias_delete_tokens, { + let mut prev: Option = None; - Ok(res.map_err(|_| AlreadyExists)) + loop { + let key = key.clone(); + let token = token.clone(); + + let res = alias_delete_tokens.compare_and_swap(key, prev, Some(token))?; + + match res { + Ok(()) => return Ok(Ok(())) as Result<_, SledError>, + Err(CompareAndSwapError { + current: Some(token), + .. + }) => { + if let Some(token) = DeleteToken::from_slice(&token) { + return Ok(Err(AlreadyExists(token))); + } else { + prev = Some(token); + }; + } + Err(CompareAndSwapError { current: None, .. }) => { + prev = None; + } + } + } + }); + + Ok(res) } #[tracing::instrument(level = "trace", skip(self))] - async fn delete_token(&self, alias: &Alias) -> Result { + async fn delete_token(&self, alias: &Alias) -> Result, RepoError> { let key = alias.to_bytes(); - let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); + let Some(ivec) = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)) else { + return Ok(None); + }; - opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) - .ok_or(SledError::Missing("alias -> delete-token")) - .map_err(RepoError::from) + let Some(token) = DeleteToken::from_slice(&ivec) else { + return Ok(None); + }; + + Ok(Some(token)) } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] diff --git a/src/store.rs b/src/store.rs index 467d105..180c1af 100644 --- a/src/store.rs +++ b/src/store.rs @@ -28,13 +28,6 @@ impl StoreError { pub(crate) const fn is_not_found(&self) -> bool { matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) } - - pub(crate) const fn is_missing(&self) -> bool { - match self { - Self::Repo(e) => e.is_missing(), - _ => false, - } - } } impl From for StoreError {