diff --git a/src/lib.rs b/src/lib.rs index 83c38ad..feafb89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ mod ingest; mod init_tracing; mod magick; mod middleware; +mod migrate_store; mod process; mod processor; mod queue; @@ -36,15 +37,13 @@ use futures_util::{ Stream, StreamExt, TryStreamExt, }; use once_cell::sync::{Lazy, OnceCell}; -use repo::sled::SledRepo; use rusty_s3::UrlStyle; use std::{ future::ready, path::Path, path::PathBuf, - rc::Rc, sync::atomic::{AtomicU64, Ordering}, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; use tracing_actix_web::TracingLogger; @@ -61,9 +60,10 @@ use self::{ init_tracing::init_tracing, magick::{details_hint, ValidInputType}, middleware::{Deadline, Internal}, + migrate_store::migrate_store, queue::queue_generate, repo::{ - Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo, Repo, + sled::SledRepo, Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, UploadId, UploadResult, }, serde_str::Serde, @@ -1506,409 +1506,3 @@ pub async fn run() -> color_eyre::Result<()> { Ok(()) } - -async fn migrate_store( - repo: R, - from: S1, - to: S2, - skip_missing_files: bool, -) -> Result<(), Error> -where - S1: Store + Clone + 'static, - S2: Store + Clone + 'static, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, -{ - tracing::warn!("Running checks"); - - if let Err(e) = from.health_check().await { - tracing::warn!("Old store is not configured correctly"); - return Err(e.into()); - } - if let Err(e) = to.health_check().await { - tracing::warn!("New store is not configured correctly"); - return Err(e.into()); - } - - tracing::warn!("Checks complete, migrating store"); - - let mut failure_count = 0; - - while let Err(e) = - do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await - { - tracing::error!("Migration failed with {}", format!("{e:?}")); - - failure_count += 1; - - if failure_count >= 50 { - tracing::error!("Exceeded 50 errors"); - return Err(e); - } else { - tracing::warn!("Retrying migration +{failure_count}"); - } - - tokio::time::sleep(Duration::from_secs(3)).await; - } - - Ok(()) -} - -struct MigrateState { - repo: R, - from: S1, - to: S2, - continuing_migration: bool, - skip_missing_files: bool, - initial_repo_size: u64, - repo_size: AtomicU64, - pct: AtomicU64, - index: AtomicU64, - started_at: Instant, -} - -async fn do_migrate_store( - repo: R, - from: S1, - to: S2, - skip_missing_files: bool, -) -> Result<(), Error> -where - S1: Store + 'static, - S2: Store + 'static, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, -{ - let continuing_migration = repo.is_continuing_migration().await?; - let initial_repo_size = repo.size().await?; - - if continuing_migration { - tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes"); - } else { - tracing::warn!("{initial_repo_size} hashes will be migrated"); - } - - if initial_repo_size == 0 { - return Ok(()); - } - - // Hashes are read in a consistent order - let stream = repo.hashes().await; - let mut stream = Box::pin(stream); - - let state = Rc::new(MigrateState { - repo: repo.clone(), - from, - to, - continuing_migration, - skip_missing_files, - initial_repo_size, - repo_size: AtomicU64::new(initial_repo_size), - pct: AtomicU64::new(initial_repo_size / 100), - index: AtomicU64::new(0), - started_at: Instant::now(), - }); - - let mut joinset = tokio::task::JoinSet::new(); - - while let Some(hash) = stream.next().await { - let hash = hash?.as_ref().to_vec(); - - if joinset.len() >= 32 { - if let Some(res) = joinset.join_next().await { - res.map_err(|_| UploadError::Canceled)??; - } - } - - let state = Rc::clone(&state); - joinset.spawn_local(async move { migrate_hash(&state, hash).await }); - } - - // clean up the migration table to avoid interfering with future migrations - repo.clear().await?; - - tracing::warn!("Migration completed successfully"); - - Ok(()) -} - -#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] -async fn migrate_hash( - state: &MigrateState, - hash: Vec, -) -> Result<(), Error> -where - S1: Store, - S2: Store, - R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo, -{ - let MigrateState { - repo, - from, - to, - continuing_migration, - skip_missing_files, - initial_repo_size, - repo_size, - pct, - index, - started_at, - } = state; - - let current_index = index.fetch_add(1, Ordering::Relaxed); - - let original_identifier = match repo.identifier(hash.clone().into()).await { - Ok(Some(identifier)) => identifier, - Ok(None) => { - tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) - ); - crate::queue::cleanup_hash(repo, hash.clone().into()).await?; - return Ok(()); - } - Err(e) => return Err(e.into()), - }; - - if repo.is_migrated(&original_identifier).await? { - // migrated original for hash - this means we can skip - return Ok(()); - } - - let current_repo_size = repo_size.load(Ordering::Acquire); - - if *continuing_migration && current_repo_size == *initial_repo_size { - // first time reaching unmigrated hash - - let new_repo_size = initial_repo_size.saturating_sub(current_index); - - if repo_size - .compare_exchange( - current_repo_size, - new_repo_size, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - // we successfully updated the count, we're now in charge of setting up pct and - // index and printing migration message - - pct.store(new_repo_size / 100, Ordering::Release); - index.store(0, Ordering::Release); - - tracing::warn!( - "Caught up to previous migration's end. {new_repo_size} hashes will be migrated" - ); - } - } - - if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { - if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, &identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.clone().into(), &new_identifier) - .await?; - - repo.mark_migrated(&identifier, &new_identifier).await?; - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error fetching details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e.into()); - } - Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating motion file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating motion file to new store"); - return Err(e.into()); - } - } - } - } - - for (variant, identifier) in repo.variants(hash.clone().into()).await? { - if !repo.is_migrated(&identifier).await? { - match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, &identifier, &new_identifier).await?; - repo.remove_variant(hash.clone().into(), variant.clone()) - .await?; - repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) - .await?; - - repo.mark_migrated(&identifier, &new_identifier).await?; - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error fetching details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e.into()); - } - Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!( - "Skipping variant {} for hash {}", - variant, - hex::encode(&hash) - ); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating variant file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating variant file to new store"); - return Err(e.into()); - } - } - } - } - - match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, &original_identifier, &new_identifier).await?; - repo.relate_identifier(hash.clone().into(), &new_identifier) - .await?; - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error fetching details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e.into()); - } - Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating original file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating original file to new store"); - return Err(e.into()); - } - } - - let current_pct = pct.load(Ordering::Relaxed); - if current_pct > 0 && current_index % current_pct == 0 { - let percent = u32::try_from(current_index / current_pct) - .expect("values 0-100 are always in u32 range"); - if percent == 0 { - return Ok(()); - } - - let elapsed = started_at.elapsed(); - let estimated_duration_percent = elapsed / percent; - let estimated_duration_remaining = - (100u32.saturating_sub(percent)) * estimated_duration_percent; - - let current_repo_size = repo_size.load(Ordering::Relaxed); - - tracing::warn!( - "Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)" - ); - tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); - } - - Ok(()) -} - -async fn migrate_file( - repo: &R, - from: &S1, - to: &S2, - identifier: &S1::Identifier, - skip_missing_files: bool, -) -> Result -where - R: IdentifierRepo, - S1: Store, - S2: Store, -{ - let mut failure_count = 0; - - loop { - match do_migrate_file(repo, from, to, identifier).await { - Ok(identifier) => return Ok(identifier), - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - return Err(MigrateError::From(e)); - } - Err(migrate_error) => { - failure_count += 1; - - if failure_count > 10 { - tracing::error!("Error migrating file, not retrying"); - return Err(migrate_error); - } else { - tracing::warn!("Failed moving file. Retrying +{failure_count}"); - } - - tokio::time::sleep(Duration::from_secs(3)).await; - } - } - } -} - -#[derive(Debug)] -enum MigrateError { - From(crate::store::StoreError), - Details(crate::store::StoreError), - To(crate::store::StoreError), -} - -async fn do_migrate_file( - repo: &R, - from: &S1, - to: &S2, - identifier: &S1::Identifier, -) -> Result -where - R: IdentifierRepo, - S1: Store, - S2: Store, -{ - let stream = from - .to_stream(identifier, None, None) - .await - .map_err(MigrateError::From)?; - - let details_opt = repo - .details(identifier) - .await - .map_err(MigrateError::Details)?; - - let content_type = if let Some(details) = details_opt { - details.content_type() - } else { - mime::APPLICATION_OCTET_STREAM - }; - - let new_identifier = to - .save_stream(stream, content_type) - .await - .map_err(MigrateError::To)?; - - Ok(new_identifier) -} - -async fn migrate_details(repo: &R, from: &I1, to: &I2) -> Result<(), Error> -where - R: IdentifierRepo, - I1: Identifier, - I2: Identifier, -{ - if let Some(details) = repo.details(from).await? { - repo.relate_details(to, &details).await?; - repo.cleanup(from).await?; - } - - Ok(()) -} diff --git a/src/migrate_store.rs b/src/migrate_store.rs new file mode 100644 index 0000000..e606287 --- /dev/null +++ b/src/migrate_store.rs @@ -0,0 +1,418 @@ +use futures_util::StreamExt; +use std::{ + rc::Rc, + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, Instant}, +}; + +use crate::{ + error::{Error, UploadError}, + repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, + store::{Identifier, Store}, +}; + +pub(super) async fn migrate_store( + repo: R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> +where + S1: Store + Clone + 'static, + S2: Store + Clone + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, +{ + tracing::warn!("Running checks"); + + if let Err(e) = from.health_check().await { + tracing::warn!("Old store is not configured correctly"); + return Err(e.into()); + } + if let Err(e) = to.health_check().await { + tracing::warn!("New store is not configured correctly"); + return Err(e.into()); + } + + tracing::warn!("Checks complete, migrating store"); + + let mut failure_count = 0; + + while let Err(e) = + do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await + { + tracing::error!("Migration failed with {}", format!("{e:?}")); + + failure_count += 1; + + if failure_count >= 50 { + tracing::error!("Exceeded 50 errors"); + return Err(e); + } else { + tracing::warn!("Retrying migration +{failure_count}"); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + } + + Ok(()) +} + +struct MigrateState { + repo: R, + from: S1, + to: S2, + continuing_migration: bool, + skip_missing_files: bool, + initial_repo_size: u64, + repo_size: AtomicU64, + pct: AtomicU64, + index: AtomicU64, + started_at: Instant, +} + +async fn do_migrate_store( + repo: R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> +where + S1: Store + 'static, + S2: Store + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, +{ + let continuing_migration = repo.is_continuing_migration().await?; + let initial_repo_size = repo.size().await?; + + if continuing_migration { + tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes"); + } else { + tracing::warn!("{initial_repo_size} hashes will be migrated"); + } + + if initial_repo_size == 0 { + return Ok(()); + } + + // Hashes are read in a consistent order + let stream = repo.hashes().await; + let mut stream = Box::pin(stream); + + let state = Rc::new(MigrateState { + repo: repo.clone(), + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size: AtomicU64::new(initial_repo_size), + pct: AtomicU64::new(initial_repo_size / 100), + index: AtomicU64::new(0), + started_at: Instant::now(), + }); + + let mut joinset = tokio::task::JoinSet::new(); + + while let Some(hash) = stream.next().await { + let hash = hash?.as_ref().to_vec(); + + if joinset.len() >= 32 { + if let Some(res) = joinset.join_next().await { + res.map_err(|_| UploadError::Canceled)??; + } + } + + let state = Rc::clone(&state); + joinset.spawn_local(async move { migrate_hash(&state, hash).await }); + } + + // clean up the migration table to avoid interfering with future migrations + repo.clear().await?; + + tracing::warn!("Migration completed successfully"); + + Ok(()) +} + +#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] +async fn migrate_hash( + state: &MigrateState, + hash: Vec, +) -> Result<(), Error> +where + S1: Store, + S2: Store, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo, +{ + let MigrateState { + repo, + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size, + pct, + index, + started_at, + } = state; + + let current_index = index.fetch_add(1, Ordering::Relaxed); + + let original_identifier = match repo.identifier(hash.clone().into()).await { + Ok(Some(identifier)) => identifier, + Ok(None) => { + tracing::warn!( + "Original File identifier for hash {} is missing, queue cleanup task", + hex::encode(&hash) + ); + crate::queue::cleanup_hash(repo, hash.clone().into()).await?; + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + if repo.is_migrated(&original_identifier).await? { + // migrated original for hash - this means we can skip + return Ok(()); + } + + let current_repo_size = repo_size.load(Ordering::Acquire); + + if *continuing_migration && current_repo_size == *initial_repo_size { + // first time reaching unmigrated hash + + let new_repo_size = initial_repo_size.saturating_sub(current_index); + + if repo_size + .compare_exchange( + current_repo_size, + new_repo_size, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() + { + // we successfully updated the count, we're now in charge of setting up pct and + // index and printing migration message + + pct.store(new_repo_size / 100, Ordering::Release); + index.store(0, Ordering::Release); + + tracing::warn!( + "Caught up to previous migration's end. {new_repo_size} hashes will be migrated" + ); + } + } + + if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.clone().into(), &new_identifier) + .await?; + + repo.mark_migrated(&identifier, &new_identifier).await?; + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error fetching details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating motion file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating motion file to new store"); + return Err(e.into()); + } + } + } + } + + for (variant, identifier) in repo.variants(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &identifier, &new_identifier).await?; + repo.remove_variant(hash.clone().into(), variant.clone()) + .await?; + repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) + .await?; + + repo.mark_migrated(&identifier, &new_identifier).await?; + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error fetching details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!( + "Skipping variant {} for hash {}", + variant, + hex::encode(&hash) + ); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating variant file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating variant file to new store"); + return Err(e.into()); + } + } + } + } + + match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &original_identifier, &new_identifier).await?; + repo.relate_identifier(hash.clone().into(), &new_identifier) + .await?; + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error fetching details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating original file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating original file to new store"); + return Err(e.into()); + } + } + + let current_pct = pct.load(Ordering::Relaxed); + if current_pct > 0 && current_index % current_pct == 0 { + let percent = u32::try_from(current_index / current_pct) + .expect("values 0-100 are always in u32 range"); + if percent == 0 { + return Ok(()); + } + + let elapsed = started_at.elapsed(); + let estimated_duration_percent = elapsed / percent; + let estimated_duration_remaining = + (100u32.saturating_sub(percent)) * estimated_duration_percent; + + let current_repo_size = repo_size.load(Ordering::Relaxed); + + tracing::warn!( + "Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)" + ); + tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); + } + + Ok(()) +} + +async fn migrate_file( + repo: &R, + from: &S1, + to: &S2, + identifier: &S1::Identifier, + skip_missing_files: bool, +) -> Result +where + R: IdentifierRepo, + S1: Store, + S2: Store, +{ + let mut failure_count = 0; + + loop { + match do_migrate_file(repo, from, to, identifier).await { + Ok(identifier) => return Ok(identifier), + Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { + return Err(MigrateError::From(e)); + } + Err(migrate_error) => { + failure_count += 1; + + if failure_count > 10 { + tracing::error!("Error migrating file, not retrying"); + return Err(migrate_error); + } else { + tracing::warn!("Failed moving file. Retrying +{failure_count}"); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + } +} + +#[derive(Debug)] +enum MigrateError { + From(crate::store::StoreError), + Details(crate::store::StoreError), + To(crate::store::StoreError), +} + +async fn do_migrate_file( + repo: &R, + from: &S1, + to: &S2, + identifier: &S1::Identifier, +) -> Result +where + R: IdentifierRepo, + S1: Store, + S2: Store, +{ + let stream = from + .to_stream(identifier, None, None) + .await + .map_err(MigrateError::From)?; + + let details_opt = repo + .details(identifier) + .await + .map_err(MigrateError::Details)?; + + let content_type = if let Some(details) = details_opt { + details.content_type() + } else { + mime::APPLICATION_OCTET_STREAM + }; + + let new_identifier = to + .save_stream(stream, content_type) + .await + .map_err(MigrateError::To)?; + + Ok(new_identifier) +} + +async fn migrate_details(repo: &R, from: &I1, to: &I2) -> Result<(), Error> +where + R: IdentifierRepo, + I1: Identifier, + I2: Identifier, +{ + if let Some(details) = repo.details(from).await? { + repo.relate_details(to, &details).await?; + repo.cleanup(from).await?; + } + + Ok(()) +}