2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Add basic progress indication for store migration

This commit is contained in:
asonix 2023-06-29 11:39:47 -05:00
parent fb3bf2df71
commit 01d3610f46
3 changed files with 45 additions and 2 deletions

View file

@ -42,7 +42,7 @@ use std::{
path::Path, path::Path,
path::PathBuf, path::PathBuf,
sync::atomic::{AtomicU64, Ordering}, sync::atomic::{AtomicU64, Ordering},
time::{Duration, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
@ -1382,7 +1382,7 @@ where
S2: Store + Clone, S2: Store + Clone,
R: IdentifierRepo + HashRepo + SettingsRepo, R: IdentifierRepo + HashRepo + SettingsRepo,
{ {
tracing::info!("Migrating store"); tracing::warn!("Migrating store");
let mut failure_count = 0; let mut failure_count = 0;
@ -1411,12 +1411,26 @@ where
S2: Store, S2: Store,
R: IdentifierRepo + HashRepo + SettingsRepo, R: IdentifierRepo + HashRepo + SettingsRepo,
{ {
let repo_size = repo.size().await?;
tracing::warn!("{repo_size} Hashes will be migrated");
if repo_size == 0 {
return Ok(());
}
let pct = repo_size / 100;
let stream = repo.hashes().await; let stream = repo.hashes().await;
let mut stream = Box::pin(stream); let mut stream = Box::pin(stream);
let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?;
let now = Instant::now();
let mut index = 0;
while let Some(hash) = stream.next().await { while let Some(hash) = stream.next().await {
index += 1;
let hash = hash?; let hash = hash?;
if let Some(progress) = &progress_opt { if let Some(progress) = &progress_opt {
@ -1504,6 +1518,21 @@ where
.await?; .await?;
repo.remove(STORE_MIGRATION_VARIANT).await?; repo.remove(STORE_MIGRATION_VARIANT).await?;
repo.remove(STORE_MIGRATION_MOTION).await?; repo.remove(STORE_MIGRATION_MOTION).await?;
if index % pct == 0 {
let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range");
if percent == 0 {
continue;
}
let elapsed = now.elapsed();
let estimated_duration_percent = elapsed / percent;
let estimated_duration_remaining =
(100u32.saturating_sub(percent)) * estimated_duration_percent;
tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)");
tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
}
} }
// clean up the migration key to avoid interfering with future migrations // clean up the migration key to avoid interfering with future migrations

View file

@ -248,6 +248,8 @@ where
pub(crate) trait HashRepo: BaseRepo { pub(crate) trait HashRepo: BaseRepo {
type Stream: Stream<Item = Result<Self::Bytes, RepoError>>; type Stream: Stream<Item = Result<Self::Bytes, RepoError>>;
async fn size(&self) -> Result<u64, RepoError>;
async fn hashes(&self) -> Self::Stream; async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError>; async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError>;
@ -301,6 +303,10 @@ where
{ {
type Stream = T::Stream; type Stream = T::Stream;
async fn size(&self) -> Result<u64, RepoError> {
T::size(self).await
}
async fn hashes(&self) -> Self::Stream { async fn hashes(&self) -> Self::Stream {
T::hashes(self).await T::hashes(self).await
} }

View file

@ -431,6 +431,14 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
impl HashRepo for SledRepo { impl HashRepo for SledRepo {
type Stream = LocalBoxStream<'static, StreamItem>; type Stream = LocalBoxStream<'static, StreamItem>;
async fn size(&self) -> Result<u64, RepoError> {
Ok(b!(
self.hashes,
Ok(u64::try_from(hashes.len()).expect("Length is reasonable"))
as Result<u64, SledError>
))
}
async fn hashes(&self) -> Self::Stream { async fn hashes(&self) -> Self::Stream {
let iter = self let iter = self
.hashes .hashes