Enhance migration handling of missing identifiers

Queue a cleanup task for hashes that don't have original file identifiers
Be more specific about what field is missing when a field is missing
This commit is contained in:
asonix 2023-07-05 09:52:19 -05:00
parent 2274dfecb4
commit 23e67b9697
5 changed files with 64 additions and 40 deletions

View File

@ -171,7 +171,7 @@ impl ResponseError for Error {
) => StatusCode::BAD_REQUEST, ) => StatusCode::BAD_REQUEST,
Some( Some(
UploadError::Repo(crate::repo::RepoError::SledError( UploadError::Repo(crate::repo::RepoError::SledError(
crate::repo::sled::SledError::Missing, crate::repo::sled::SledError::Missing(_),
)) ))
| UploadError::MissingAlias, | UploadError::MissingAlias,
) => StatusCode::NOT_FOUND, ) => StatusCode::NOT_FOUND,

View File

@ -1077,6 +1077,22 @@ fn configure_endpoints<R: FullRepo + 'static, S: Store + 'static>(
); );
} }
fn spawn_workers<R, S>(repo: R, store: S)
where
R: FullRepo + 'static,
S: Store + 'static,
{
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(queue::process_cleanup(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
tracing::trace_span!(parent: None, "Spawn task")
.in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id())));
}
async fn launch_file_store<R: FullRepo + 'static>( async fn launch_file_store<R: FullRepo + 'static>(
repo: R, repo: R,
store: FileStore, store: FileStore,
@ -1087,20 +1103,7 @@ async fn launch_file_store<R: FullRepo + 'static>(
let store = store.clone(); let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { spawn_workers(repo.clone(), store.clone());
actix_rt::spawn(queue::process_cleanup(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(queue::process_images(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
@ -1122,20 +1125,7 @@ async fn launch_object_store<R: FullRepo + 'static>(
let store = store_config.clone().build(client.clone()); let store = store_config.clone().build(client.clone());
let repo = repo.clone(); let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { spawn_workers(repo.clone(), store.clone());
actix_rt::spawn(queue::process_cleanup(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(queue::process_images(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
@ -1380,7 +1370,7 @@ async fn migrate_store<R, S1, S2>(
where where
S1: Store + Clone, S1: Store + Clone,
S2: Store + Clone, S2: Store + Clone,
R: IdentifierRepo + HashRepo + SettingsRepo, R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
{ {
tracing::warn!("Migrating store"); tracing::warn!("Migrating store");
@ -1412,7 +1402,7 @@ async fn do_migrate_store<R, S1, S2>(
where where
S1: Store, S1: Store,
S2: Store, S2: Store,
R: IdentifierRepo + HashRepo + SettingsRepo, R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
{ {
let repo_size = repo.size().await?; let repo_size = repo.size().await?;
@ -1443,6 +1433,19 @@ where
continue; continue;
} }
let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await {
Ok(identifier) => identifier,
Err(e) if e.is_missing() => {
tracing::warn!(
"Original File identifier for hash {} is missing, queue cleanup task",
hex::encode(&hash)
);
crate::queue::cleanup_hash(repo, hash).await?;
continue;
}
Err(e) => return Err(e.into()),
};
if let Some(identifier) = repo if let Some(identifier) = repo
.motion_identifier(hash.as_ref().to_vec().into()) .motion_identifier(hash.as_ref().to_vec().into())
.await? .await?
@ -1503,11 +1506,9 @@ where
} }
} }
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; match migrate_file(&from, &to, &original_identifier, skip_missing_files).await {
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
Ok(new_identifier) => { Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?; migrate_details(repo, original_identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?; .await?;
} }

View File

@ -58,6 +58,15 @@ pub(crate) enum RepoError {
Canceled, Canceled,
} }
impl RepoError {
pub(crate) const fn is_missing(&self) -> bool {
match self {
Self::SledError(e) => e.is_missing(),
_ => false,
}
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait FullRepo: pub(crate) trait FullRepo:
UploadRepo UploadRepo

View File

@ -44,13 +44,19 @@ pub(crate) enum SledError {
#[error("Invalid details json")] #[error("Invalid details json")]
Details(#[from] serde_json::Error), Details(#[from] serde_json::Error),
#[error("Required field was not present")] #[error("Required field was not present: {0}")]
Missing, Missing(&'static str),
#[error("Operation panicked")] #[error("Operation panicked")]
Panic, Panic,
} }
impl SledError {
pub(super) const fn is_missing(&self) -> bool {
matches!(self, Self::Missing(_))
}
}
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct SledRepo { pub(crate) struct SledRepo {
healthz_count: Arc<AtomicU64>, healthz_count: Arc<AtomicU64>,
@ -512,7 +518,7 @@ impl HashRepo for SledRepo {
) -> Result<I, StoreError> { ) -> Result<I, StoreError> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
opt.ok_or(SledError::Missing) opt.ok_or(SledError::Missing("hash -> identifier"))
.map_err(RepoError::from) .map_err(RepoError::from)
.map_err(StoreError::from) .map_err(StoreError::from)
.and_then(|ivec| I::from_bytes(ivec.to_vec())) .and_then(|ivec| I::from_bytes(ivec.to_vec()))
@ -709,7 +715,7 @@ impl AliasRepo for SledRepo {
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) opt.and_then(|ivec| DeleteToken::from_slice(&ivec))
.ok_or(SledError::Missing) .ok_or(SledError::Missing("alias -> delete-token"))
.map_err(RepoError::from) .map_err(RepoError::from)
} }
@ -728,7 +734,8 @@ impl AliasRepo for SledRepo {
let opt = b!(self.alias_hashes, alias_hashes.get(key)); let opt = b!(self.alias_hashes, alias_hashes.get(key));
opt.ok_or(SledError::Missing).map_err(RepoError::from) opt.ok_or(SledError::Missing("alias -> hash"))
.map_err(RepoError::from)
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]

View File

@ -28,6 +28,13 @@ impl StoreError {
pub(crate) const fn is_not_found(&self) -> bool { pub(crate) const fn is_not_found(&self) -> bool {
matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_))
} }
pub(crate) const fn is_missing(&self) -> bool {
match self {
Self::Repo(e) => e.is_missing(),
_ => false,
}
}
} }
impl From<crate::store::file_store::FileError> for StoreError { impl From<crate::store::file_store::FileError> for StoreError {