diff --git a/src/error.rs b/src/error.rs index 22e136d..c8b077b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -171,7 +171,7 @@ impl ResponseError for Error { ) => StatusCode::BAD_REQUEST, Some( UploadError::Repo(crate::repo::RepoError::SledError( - crate::repo::sled::SledError::Missing, + crate::repo::sled::SledError::Missing(_), )) | UploadError::MissingAlias, ) => StatusCode::NOT_FOUND, diff --git a/src/lib.rs b/src/lib.rs index 0988612..d047198 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1077,6 +1077,22 @@ fn configure_endpoints( ); } +fn spawn_workers(repo: R, store: S) +where + R: FullRepo + 'static, + S: Store + 'static, +{ + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_cleanup( + repo.clone(), + store.clone(), + next_worker_id(), + )) + }); + tracing::trace_span!(parent: None, "Spawn task") + .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id()))); +} + async fn launch_file_store( repo: R, store: FileStore, @@ -1087,20 +1103,7 @@ async fn launch_file_store( let store = store.clone(); let repo = repo.clone(); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_cleanup( - repo.clone(), - store.clone(), - next_worker_id(), - )) - }); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_images( - repo.clone(), - store.clone(), - next_worker_id(), - )) - }); + spawn_workers(repo.clone(), store.clone()); App::new() .wrap(TracingLogger::default()) @@ -1122,20 +1125,7 @@ async fn launch_object_store( let store = store_config.clone().build(client.clone()); let repo = repo.clone(); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_cleanup( - repo.clone(), - store.clone(), - next_worker_id(), - )) - }); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_images( - repo.clone(), - store.clone(), - next_worker_id(), - )) - }); + spawn_workers(repo.clone(), store.clone()); App::new() .wrap(TracingLogger::default()) @@ -1380,7 +1370,7 @@ async fn migrate_store( where S1: Store + Clone, S2: Store + Clone, - R: IdentifierRepo + HashRepo + SettingsRepo, + R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, { tracing::warn!("Migrating store"); @@ -1412,7 +1402,7 @@ async fn do_migrate_store( where S1: Store, S2: Store, - R: IdentifierRepo + HashRepo + SettingsRepo, + R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, { let repo_size = repo.size().await?; @@ -1443,6 +1433,19 @@ where continue; } + let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await { + Ok(identifier) => identifier, + Err(e) if e.is_missing() => { + tracing::warn!( + "Original File identifier for hash {} is missing, queue cleanup task", + hex::encode(&hash) + ); + crate::queue::cleanup_hash(repo, hash).await?; + continue; + } + Err(e) => return Err(e.into()), + }; + if let Some(identifier) = repo .motion_identifier(hash.as_ref().to_vec().into()) .await? @@ -1503,11 +1506,9 @@ where } } - let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; - - match migrate_file(&from, &to, &identifier, skip_missing_files).await { + match migrate_file(&from, &to, &original_identifier, skip_missing_files).await { Ok(new_identifier) => { - migrate_details(repo, identifier, &new_identifier).await?; + migrate_details(repo, original_identifier, &new_identifier).await?; repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) .await?; } diff --git a/src/repo.rs b/src/repo.rs index f6bd9d9..84e8fb8 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -58,6 +58,15 @@ pub(crate) enum RepoError { Canceled, } +impl RepoError { + pub(crate) const fn is_missing(&self) -> bool { + match self { + Self::SledError(e) => e.is_missing(), + _ => false, + } + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait FullRepo: UploadRepo diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 8d4696d..04c0623 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -44,13 +44,19 @@ pub(crate) enum SledError { #[error("Invalid details json")] Details(#[from] serde_json::Error), - #[error("Required field was not present")] - Missing, + #[error("Required field was not present: {0}")] + Missing(&'static str), #[error("Operation panicked")] Panic, } +impl SledError { + pub(super) const fn is_missing(&self) -> bool { + matches!(self, Self::Missing(_)) + } +} + #[derive(Clone)] pub(crate) struct SledRepo { healthz_count: Arc, @@ -512,7 +518,7 @@ impl HashRepo for SledRepo { ) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); - opt.ok_or(SledError::Missing) + opt.ok_or(SledError::Missing("hash -> identifier")) .map_err(RepoError::from) .map_err(StoreError::from) .and_then(|ivec| I::from_bytes(ivec.to_vec())) @@ -709,7 +715,7 @@ impl AliasRepo for SledRepo { let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) - .ok_or(SledError::Missing) + .ok_or(SledError::Missing("alias -> delete-token")) .map_err(RepoError::from) } @@ -728,7 +734,8 @@ impl AliasRepo for SledRepo { let opt = b!(self.alias_hashes, alias_hashes.get(key)); - opt.ok_or(SledError::Missing).map_err(RepoError::from) + opt.ok_or(SledError::Missing("alias -> hash")) + .map_err(RepoError::from) } #[tracing::instrument(skip(self))] diff --git a/src/store.rs b/src/store.rs index 553e79e..550b531 100644 --- a/src/store.rs +++ b/src/store.rs @@ -28,6 +28,13 @@ impl StoreError { pub(crate) const fn is_not_found(&self) -> bool { matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) } + + pub(crate) const fn is_missing(&self) -> bool { + match self { + Self::Repo(e) => e.is_missing(), + _ => false, + } + } } impl From for StoreError {