2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2025-01-12 04:25:49 +00:00

Attempt to implement concurrent hash migrations

This commit is contained in:
asonix 2023-07-15 14:37:17 -05:00
parent e23b33e245
commit 25c717771c
3 changed files with 302 additions and 169 deletions

View file

@ -42,6 +42,7 @@ use std::{
future::ready, future::ready,
path::Path, path::Path,
path::PathBuf, path::PathBuf,
rc::Rc,
sync::atomic::{AtomicU64, Ordering}, sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
@ -62,7 +63,7 @@ use self::{
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
queue::queue_generate, queue::queue_generate,
repo::{ repo::{
Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo, Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo, Repo,
UploadId, UploadResult, UploadId, UploadResult,
}, },
serde_str::Serde, serde_str::Serde,
@ -1263,14 +1264,14 @@ async fn launch_object_store<
} }
async fn migrate_inner<S1>( async fn migrate_inner<S1>(
repo: &Repo, repo: Repo,
client: Client, client: Client,
from: S1, from: S1,
to: config::primitives::Store, to: config::primitives::Store,
skip_missing_files: bool, skip_missing_files: bool,
) -> color_eyre::Result<()> ) -> color_eyre::Result<()>
where where
S1: Store, S1: Store + 'static,
{ {
match to { match to {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
@ -1405,7 +1406,7 @@ pub async fn run() -> color_eyre::Result<()> {
match from { match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?; let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&repo, client, from, to, skip_missing_files).await?; migrate_inner(repo, client, from, to, skip_missing_files).await?;
} }
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
@ -1437,7 +1438,7 @@ pub async fn run() -> color_eyre::Result<()> {
.await? .await?
.build(client.clone()); .build(client.clone());
migrate_inner(&repo, client, from, to, skip_missing_files).await?; migrate_inner(repo, client, from, to, skip_missing_files).await?;
} }
} }
@ -1506,20 +1507,16 @@ pub async fn run() -> color_eyre::Result<()> {
Ok(()) Ok(())
} }
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const STORE_MIGRATION_MOTION: &str = "store-migration-motion";
const STORE_MIGRATION_VARIANT: &str = "store-migration-variant";
async fn migrate_store<R, S1, S2>( async fn migrate_store<R, S1, S2>(
repo: &R, repo: R,
from: S1, from: S1,
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store + Clone, S1: Store + Clone + 'static,
S2: Store + Clone, S2: Store + Clone + 'static,
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
{ {
tracing::warn!("Running checks"); tracing::warn!("Running checks");
@ -1536,7 +1533,9 @@ where
let mut failure_count = 0; let mut failure_count = 0;
while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await { while let Err(e) =
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await
{
tracing::error!("Migration failed with {}", format!("{e:?}")); tracing::error!("Migration failed with {}", format!("{e:?}"));
failure_count += 1; failure_count += 1;
@ -1553,92 +1552,164 @@ where
Ok(()) Ok(())
} }
struct MigrateState<R, S1, S2> {
repo: R,
from: S1,
to: S2,
continuing_migration: bool,
skip_missing_files: bool,
initial_repo_size: u64,
repo_size: AtomicU64,
pct: AtomicU64,
index: AtomicU64,
started_at: Instant,
}
async fn do_migrate_store<R, S1, S2>( async fn do_migrate_store<R, S1, S2>(
repo: &R, repo: R,
from: S1, from: S1,
to: S2, to: S2,
skip_missing_files: bool, skip_missing_files: bool,
) -> Result<(), Error> ) -> Result<(), Error>
where where
S1: Store, S1: Store + 'static,
S2: Store, S2: Store + 'static,
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
{ {
let mut repo_size = repo.size().await?; let continuing_migration = repo.is_continuing_migration().await?;
let initial_repo_size = repo.size().await?;
let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; if continuing_migration {
tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes");
if progress_opt.is_some() {
tracing::warn!("Continuing previous migration of {repo_size} total hashes");
} else { } else {
tracing::warn!("{repo_size} hashes will be migrated"); tracing::warn!("{initial_repo_size} hashes will be migrated");
} }
if repo_size == 0 { if initial_repo_size == 0 {
return Ok(()); return Ok(());
} }
let mut pct = repo_size / 100;
// Hashes are read in a consistent order // Hashes are read in a consistent order
let stream = repo.hashes().await; let stream = repo.hashes().await;
let mut stream = Box::pin(stream); let mut stream = Box::pin(stream);
let now = Instant::now(); let state = Rc::new(MigrateState {
let mut index = 0; repo: repo.clone(),
from,
to,
continuing_migration,
skip_missing_files,
initial_repo_size,
repo_size: AtomicU64::new(initial_repo_size),
pct: AtomicU64::new(initial_repo_size / 100),
index: AtomicU64::new(0),
started_at: Instant::now(),
});
let mut joinset = tokio::task::JoinSet::new();
while let Some(hash) = stream.next().await { while let Some(hash) = stream.next().await {
index += 1; let hash = hash?.as_ref().to_vec();
let hash = hash?; if joinset.len() >= 32 {
if let Some(res) = joinset.join_next().await {
if let Some(progress) = &progress_opt { res.map_err(|_| UploadError::Canceled)??;
// we've reached the most recently migrated hash.
if progress.as_ref() == hash.as_ref() {
progress_opt.take();
// update repo size to remaining size
repo_size = repo_size.saturating_sub(index);
// update pct to be proportional to remainging size
pct = repo_size / 100;
// reset index to 0 for proper percent scaling
index = 0;
tracing::warn!(
"Caught up to previous migration's end. {repo_size} hashes will be migrated"
);
} }
continue;
} }
let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await { let state = Rc::clone(&state);
joinset.spawn_local(async move { migrate_hash(&state, hash).await });
}
// clean up the migration table to avoid interfering with future migrations
repo.clear().await?;
tracing::warn!("Migration completed successfully");
Ok(())
}
#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))]
async fn migrate_hash<R, S1, S2>(
state: &MigrateState<R, S1, S2>,
hash: Vec<u8>,
) -> Result<(), Error>
where
S1: Store,
S2: Store,
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo,
{
let MigrateState {
repo,
from,
to,
continuing_migration,
skip_missing_files,
initial_repo_size,
repo_size,
pct,
index,
started_at,
} = state;
let current_index = index.fetch_add(1, Ordering::Relaxed);
let original_identifier = match repo.identifier(hash.clone().into()).await {
Ok(Some(identifier)) => identifier, Ok(Some(identifier)) => identifier,
Ok(None) => { Ok(None) => {
tracing::warn!( tracing::warn!(
"Original File identifier for hash {} is missing, queue cleanup task", "Original File identifier for hash {} is missing, queue cleanup task",
hex::encode(&hash) hex::encode(&hash)
); );
crate::queue::cleanup_hash(repo, hash).await?; crate::queue::cleanup_hash(repo, hash.clone().into()).await?;
continue; return Ok(());
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
if let Some(identifier) = repo if repo.is_migrated(&original_identifier).await? {
.motion_identifier(hash.as_ref().to_vec().into()) // migrated original for hash - this means we can skip
.await? return Ok(());
{ }
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { let current_repo_size = repo_size.load(Ordering::Acquire);
Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?; if *continuing_migration && current_repo_size == *initial_repo_size {
repo.relate_motion_identifier( // first time reaching unmigrated hash
hash.as_ref().to_vec().into(),
&new_identifier, let new_repo_size = initial_repo_size.saturating_sub(current_index);
if repo_size
.compare_exchange(
current_repo_size,
new_repo_size,
Ordering::AcqRel,
Ordering::Relaxed,
) )
.is_ok()
{
// we successfully updated the count, we're now in charge of setting up pct and
// index and printing migration message
pct.store(new_repo_size / 100, Ordering::Release);
index.store(0, Ordering::Release);
tracing::warn!(
"Caught up to previous migration's end. {new_repo_size} hashes will be migrated"
);
}
}
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
if !repo.is_migrated(&identifier).await? {
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
Ok(new_identifier) => {
migrate_details(repo, &identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
.await?; .await?;
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
.await?; repo.mark_migrated(&identifier, &new_identifier).await?;
} }
Err(MigrateError::Details(e)) => { Err(MigrateError::Details(e)) => {
tracing::warn!( tracing::warn!(
@ -1647,7 +1718,7 @@ where
); );
return Err(e.into()); return Err(e.into());
} }
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
} }
Err(MigrateError::From(e)) => { Err(MigrateError::From(e)) => {
@ -1662,30 +1733,17 @@ where
} }
} }
let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?; for (variant, identifier) in repo.variants(hash.clone().into()).await? {
if !repo.is_migrated(&identifier).await? {
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
if let Some(variant_progress) = &variant_progress_opt {
if variant.as_bytes() == variant_progress.as_ref() {
variant_progress_opt.take();
}
continue;
}
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
Ok(new_identifier) => { Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?; migrate_details(repo, &identifier, &new_identifier).await?;
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) repo.remove_variant(hash.clone().into(), variant.clone())
.await?; .await?;
repo.relate_variant_identifier( repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier)
hash.as_ref().to_vec().into(),
variant,
&new_identifier,
)
.await?; .await?;
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) repo.mark_migrated(&identifier, &new_identifier).await?;
.await?;
} }
Err(MigrateError::Details(e)) => { Err(MigrateError::Details(e)) => {
tracing::warn!( tracing::warn!(
@ -1694,7 +1752,7 @@ where
); );
return Err(e.into()); return Err(e.into());
} }
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
tracing::warn!( tracing::warn!(
"Skipping variant {} for hash {}", "Skipping variant {} for hash {}",
variant, variant,
@ -1711,11 +1769,12 @@ where
} }
} }
} }
}
match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await { match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
Ok(new_identifier) => { Ok(new_identifier) => {
migrate_details(repo, original_identifier, &new_identifier).await?; migrate_details(repo, &original_identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) repo.relate_identifier(hash.clone().into(), &new_identifier)
.await?; .await?;
} }
Err(MigrateError::Details(e)) => { Err(MigrateError::Details(e)) => {
@ -1725,7 +1784,7 @@ where
); );
return Err(e.into()); return Err(e.into());
} }
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
} }
Err(MigrateError::From(e)) => { Err(MigrateError::From(e)) => {
@ -1738,31 +1797,26 @@ where
} }
} }
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) let current_pct = pct.load(Ordering::Relaxed);
.await?; if current_pct > 0 && current_index % current_pct == 0 {
repo.remove(STORE_MIGRATION_VARIANT).await?; let percent = u32::try_from(current_index / current_pct)
repo.remove(STORE_MIGRATION_MOTION).await?; .expect("values 0-100 are always in u32 range");
if pct > 0 && index % pct == 0 {
let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range");
if percent == 0 { if percent == 0 {
continue; return Ok(());
} }
let elapsed = now.elapsed(); let elapsed = started_at.elapsed();
let estimated_duration_percent = elapsed / percent; let estimated_duration_percent = elapsed / percent;
let estimated_duration_remaining = let estimated_duration_remaining =
(100u32.saturating_sub(percent)) * estimated_duration_percent; (100u32.saturating_sub(percent)) * estimated_duration_percent;
tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)"); let current_repo_size = repo_size.load(Ordering::Relaxed);
tracing::warn!(
"Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)"
);
tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
} }
}
// clean up the migration key to avoid interfering with future migrations
repo.remove(STORE_MIGRATION_PROGRESS).await?;
tracing::warn!("Migration completed successfully");
Ok(()) Ok(())
} }
@ -1845,15 +1899,15 @@ where
Ok(new_identifier) Ok(new_identifier)
} }
async fn migrate_details<R, I1, I2>(repo: &R, from: I1, to: &I2) -> Result<(), Error> async fn migrate_details<R, I1, I2>(repo: &R, from: &I1, to: &I2) -> Result<(), Error>
where where
R: IdentifierRepo, R: IdentifierRepo,
I1: Identifier, I1: Identifier,
I2: Identifier, I2: Identifier,
{ {
if let Some(details) = repo.details(&from).await? { if let Some(details) = repo.details(from).await? {
repo.relate_details(to, &details).await?; repo.relate_details(to, &details).await?;
repo.cleanup(&from).await?; repo.cleanup(from).await?;
} }
Ok(()) Ok(())

View file

@ -72,6 +72,7 @@ pub(crate) trait FullRepo:
+ AliasRepo + AliasRepo
+ QueueRepo + QueueRepo
+ HashRepo + HashRepo
+ MigrationRepo
+ Send + Send
+ Sync + Sync
+ Clone + Clone
@ -261,6 +262,47 @@ where
} }
} }
#[async_trait::async_trait(?Send)]
pub(crate) trait MigrationRepo: BaseRepo {
async fn is_continuing_migration(&self) -> Result<bool, RepoError>;
async fn mark_migrated<I1: Identifier, I2: Identifier>(
&self,
old_identifier: &I1,
new_identifier: &I2,
) -> Result<(), StoreError>;
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError>;
async fn clear(&self) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> MigrationRepo for actix_web::web::Data<T>
where
T: MigrationRepo,
{
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
T::is_continuing_migration(self).await
}
async fn mark_migrated<I1: Identifier, I2: Identifier>(
&self,
old_identifier: &I1,
new_identifier: &I2,
) -> Result<(), StoreError> {
T::mark_migrated(self, old_identifier, new_identifier).await
}
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
T::is_migrated(self, identifier).await
}
async fn clear(&self) -> Result<(), RepoError> {
T::clear(self).await
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo: BaseRepo { pub(crate) trait HashRepo: BaseRepo {
type Stream: Stream<Item = Result<Self::Bytes, RepoError>>; type Stream: Stream<Item = Result<Self::Bytes, RepoError>>;

View file

@ -2,8 +2,8 @@ use crate::{
details::MaybeHumanDate, details::MaybeHumanDate,
repo::{ repo::{
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo,
UploadId, UploadRepo, UploadResult, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
}, },
serde_str::Serde, serde_str::Serde,
store::StoreError, store::StoreError,
@ -69,6 +69,7 @@ pub(crate) struct SledRepo {
in_progress_queue: Tree, in_progress_queue: Tree,
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>, queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
uploads: Tree, uploads: Tree,
migration_identifiers: Tree,
cache_capacity: u64, cache_capacity: u64,
export_path: PathBuf, export_path: PathBuf,
db: Db, db: Db,
@ -100,6 +101,7 @@ impl SledRepo {
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(RwLock::new(HashMap::new())), queue_notifier: Arc::new(RwLock::new(HashMap::new())),
uploads: db.open_tree("pict-rs-uploads-tree")?, uploads: db.open_tree("pict-rs-uploads-tree")?,
migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?,
cache_capacity, cache_capacity,
export_path, export_path,
db, db,
@ -461,6 +463,41 @@ impl IdentifierRepo for SledRepo {
} }
} }
#[async_trait::async_trait(?Send)]
impl MigrationRepo for SledRepo {
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
Ok(!self.migration_identifiers.is_empty())
}
async fn mark_migrated<I1: Identifier, I2: Identifier>(
&self,
old_identifier: &I1,
new_identifier: &I2,
) -> Result<(), StoreError> {
let key = new_identifier.to_bytes()?;
let value = old_identifier.to_bytes()?;
b!(
self.migration_identifiers,
migration_identifiers.insert(key, value)
);
Ok(())
}
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
let key = identifier.to_bytes()?;
Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some())
}
async fn clear(&self) -> Result<(), RepoError> {
b!(self.migration_identifiers, migration_identifiers.clear());
Ok(())
}
}
type StreamItem = Result<IVec, RepoError>; type StreamItem = Result<IVec, RepoError>;
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>; type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;