From c55b1115d147b4be5bcdd0125cc5ceec7e1c15e3 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 4 Jan 2023 15:58:32 -0600 Subject: [PATCH] Increase client timeout, attempt to keep track of migration progress for resuming --- src/lib.rs | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index efed8dc..1f9f2ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -995,6 +995,7 @@ fn build_client() -> awc::Client { Client::builder() .wrap(Tracing) .add_default_header(("User-Agent", "pict-rs v0.4.0-main")) + .timeout(Duration::from_secs(30)) .finish() } @@ -1310,6 +1311,8 @@ pub async fn run() -> color_eyre::Result<()> { } const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; +const STORE_MIGRATION_MOTION: &str = "store-migration-motion"; +const STORE_MIGRATION_VARIANT: &str = "store-migration-variant"; async fn migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> where @@ -1320,23 +1323,49 @@ where let stream = repo.hashes().await; let mut stream = Box::pin(stream); + let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; + while let Some(hash) = stream.next().await { let hash = hash?; + + if let Some(progress) = &progress_opt { + if progress.as_ref() == hash.as_ref() { + progress_opt.take(); + } + continue; + } + if let Some(identifier) = repo .motion_identifier(hash.as_ref().to_vec().into()) .await? { - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; + if repo.get(STORE_MIGRATION_MOTION).await?.is_none() { + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into()) + .await?; + } } + let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?; + for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { + if let Some(variant_progress) = &variant_progress_opt { + if variant.as_bytes() == variant_progress.as_ref() { + variant_progress_opt.take(); + } + continue; + } + let new_identifier = migrate_file(&from, &to, &identifier).await?; migrate_details(repo, identifier, &new_identifier).await?; repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) .await?; + + repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) + .await?; } let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; @@ -1347,6 +1376,8 @@ where repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) .await?; + repo.remove(STORE_MIGRATION_VARIANT).await?; + repo.remove(STORE_MIGRATION_MOTION).await?; } // clean up the migration key to avoid interfering with future migrations