diff --git a/src/lib.rs b/src/lib.rs index 946656b..8fef44a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ use std::{ path::Path, path::PathBuf, sync::atomic::{AtomicU64, Ordering}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use tokio::sync::Semaphore; use tracing_actix_web::TracingLogger; @@ -1382,7 +1382,7 @@ where S2: Store + Clone, R: IdentifierRepo + HashRepo + SettingsRepo, { - tracing::info!("Migrating store"); + tracing::warn!("Migrating store"); let mut failure_count = 0; @@ -1411,12 +1411,26 @@ where S2: Store, R: IdentifierRepo + HashRepo + SettingsRepo, { + let repo_size = repo.size().await?; + + tracing::warn!("{repo_size} Hashes will be migrated"); + + if repo_size == 0 { + return Ok(()); + } + + let pct = repo_size / 100; + let stream = repo.hashes().await; let mut stream = Box::pin(stream); let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; + let now = Instant::now(); + let mut index = 0; while let Some(hash) = stream.next().await { + index += 1; + let hash = hash?; if let Some(progress) = &progress_opt { @@ -1504,6 +1518,21 @@ where .await?; repo.remove(STORE_MIGRATION_VARIANT).await?; repo.remove(STORE_MIGRATION_MOTION).await?; + + if index % pct == 0 { + let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range"); + if percent == 0 { + continue; + } + + let elapsed = now.elapsed(); + let estimated_duration_percent = elapsed / percent; + let estimated_duration_remaining = + (100u32.saturating_sub(percent)) * estimated_duration_percent; + + tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)"); + tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); + } } // clean up the migration key to avoid interfering with future migrations diff --git a/src/repo.rs b/src/repo.rs index 6e85133..f6bd9d9 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -248,6 +248,8 @@ where pub(crate) trait HashRepo: BaseRepo { type Stream: Stream>; + async fn size(&self) -> Result; + async fn hashes(&self) -> Self::Stream; async fn create(&self, hash: Self::Bytes) -> Result, RepoError>; @@ -301,6 +303,10 @@ where { type Stream = T::Stream; + async fn size(&self) -> Result { + T::size(self).await + } + async fn hashes(&self) -> Self::Stream { T::hashes(self).await } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ed47afa..8d4696d 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -431,6 +431,14 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { impl HashRepo for SledRepo { type Stream = LocalBoxStream<'static, StreamItem>; + async fn size(&self) -> Result { + Ok(b!( + self.hashes, + Ok(u64::try_from(hashes.len()).expect("Length is reasonable")) + as Result + )) + } + async fn hashes(&self) -> Self::Stream { let iter = self .hashes