mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2025-01-24 10:25:50 +00:00
Move store migration to separate file
This commit is contained in:
parent
6a05d3588c
commit
37e2012509
2 changed files with 422 additions and 410 deletions
414
src/lib.rs
414
src/lib.rs
|
@ -13,6 +13,7 @@ mod ingest;
|
|||
mod init_tracing;
|
||||
mod magick;
|
||||
mod middleware;
|
||||
mod migrate_store;
|
||||
mod process;
|
||||
mod processor;
|
||||
mod queue;
|
||||
|
@ -36,15 +37,13 @@ use futures_util::{
|
|||
Stream, StreamExt, TryStreamExt,
|
||||
};
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use repo::sled::SledRepo;
|
||||
use rusty_s3::UrlStyle;
|
||||
use std::{
|
||||
future::ready,
|
||||
path::Path,
|
||||
path::PathBuf,
|
||||
rc::Rc,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
time::{Duration, Instant, SystemTime},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing_actix_web::TracingLogger;
|
||||
|
@ -61,9 +60,10 @@ use self::{
|
|||
init_tracing::init_tracing,
|
||||
magick::{details_hint, ValidInputType},
|
||||
middleware::{Deadline, Internal},
|
||||
migrate_store::migrate_store,
|
||||
queue::queue_generate,
|
||||
repo::{
|
||||
Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo, Repo,
|
||||
sled::SledRepo, Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo,
|
||||
UploadId, UploadResult,
|
||||
},
|
||||
serde_str::Serde,
|
||||
|
@ -1506,409 +1506,3 @@ pub async fn run() -> color_eyre::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_store<R, S1, S2>(
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store + Clone + 'static,
|
||||
S2: Store + Clone + 'static,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||
{
|
||||
tracing::warn!("Running checks");
|
||||
|
||||
if let Err(e) = from.health_check().await {
|
||||
tracing::warn!("Old store is not configured correctly");
|
||||
return Err(e.into());
|
||||
}
|
||||
if let Err(e) = to.health_check().await {
|
||||
tracing::warn!("New store is not configured correctly");
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
tracing::warn!("Checks complete, migrating store");
|
||||
|
||||
let mut failure_count = 0;
|
||||
|
||||
while let Err(e) =
|
||||
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await
|
||||
{
|
||||
tracing::error!("Migration failed with {}", format!("{e:?}"));
|
||||
|
||||
failure_count += 1;
|
||||
|
||||
if failure_count >= 50 {
|
||||
tracing::error!("Exceeded 50 errors");
|
||||
return Err(e);
|
||||
} else {
|
||||
tracing::warn!("Retrying migration +{failure_count}");
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MigrateState<R, S1, S2> {
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
continuing_migration: bool,
|
||||
skip_missing_files: bool,
|
||||
initial_repo_size: u64,
|
||||
repo_size: AtomicU64,
|
||||
pct: AtomicU64,
|
||||
index: AtomicU64,
|
||||
started_at: Instant,
|
||||
}
|
||||
|
||||
async fn do_migrate_store<R, S1, S2>(
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store + 'static,
|
||||
S2: Store + 'static,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||
{
|
||||
let continuing_migration = repo.is_continuing_migration().await?;
|
||||
let initial_repo_size = repo.size().await?;
|
||||
|
||||
if continuing_migration {
|
||||
tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes");
|
||||
} else {
|
||||
tracing::warn!("{initial_repo_size} hashes will be migrated");
|
||||
}
|
||||
|
||||
if initial_repo_size == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Hashes are read in a consistent order
|
||||
let stream = repo.hashes().await;
|
||||
let mut stream = Box::pin(stream);
|
||||
|
||||
let state = Rc::new(MigrateState {
|
||||
repo: repo.clone(),
|
||||
from,
|
||||
to,
|
||||
continuing_migration,
|
||||
skip_missing_files,
|
||||
initial_repo_size,
|
||||
repo_size: AtomicU64::new(initial_repo_size),
|
||||
pct: AtomicU64::new(initial_repo_size / 100),
|
||||
index: AtomicU64::new(0),
|
||||
started_at: Instant::now(),
|
||||
});
|
||||
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
while let Some(hash) = stream.next().await {
|
||||
let hash = hash?.as_ref().to_vec();
|
||||
|
||||
if joinset.len() >= 32 {
|
||||
if let Some(res) = joinset.join_next().await {
|
||||
res.map_err(|_| UploadError::Canceled)??;
|
||||
}
|
||||
}
|
||||
|
||||
let state = Rc::clone(&state);
|
||||
joinset.spawn_local(async move { migrate_hash(&state, hash).await });
|
||||
}
|
||||
|
||||
// clean up the migration table to avoid interfering with future migrations
|
||||
repo.clear().await?;
|
||||
|
||||
tracing::warn!("Migration completed successfully");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))]
|
||||
async fn migrate_hash<R, S1, S2>(
|
||||
state: &MigrateState<R, S1, S2>,
|
||||
hash: Vec<u8>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo,
|
||||
{
|
||||
let MigrateState {
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
continuing_migration,
|
||||
skip_missing_files,
|
||||
initial_repo_size,
|
||||
repo_size,
|
||||
pct,
|
||||
index,
|
||||
started_at,
|
||||
} = state;
|
||||
|
||||
let current_index = index.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let original_identifier = match repo.identifier(hash.clone().into()).await {
|
||||
Ok(Some(identifier)) => identifier,
|
||||
Ok(None) => {
|
||||
tracing::warn!(
|
||||
"Original File identifier for hash {} is missing, queue cleanup task",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
crate::queue::cleanup_hash(repo, hash.clone().into()).await?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if repo.is_migrated(&original_identifier).await? {
|
||||
// migrated original for hash - this means we can skip
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_repo_size = repo_size.load(Ordering::Acquire);
|
||||
|
||||
if *continuing_migration && current_repo_size == *initial_repo_size {
|
||||
// first time reaching unmigrated hash
|
||||
|
||||
let new_repo_size = initial_repo_size.saturating_sub(current_index);
|
||||
|
||||
if repo_size
|
||||
.compare_exchange(
|
||||
current_repo_size,
|
||||
new_repo_size,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
// we successfully updated the count, we're now in charge of setting up pct and
|
||||
// index and printing migration message
|
||||
|
||||
pct.store(new_repo_size / 100, Ordering::Release);
|
||||
index.store(0, Ordering::Release);
|
||||
|
||||
tracing::warn!(
|
||||
"Caught up to previous migration's end. {new_repo_size} hashes will be migrated"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating motion file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating motion file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (variant, identifier) in repo.variants(hash.clone().into()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.remove_variant(hash.clone().into(), variant.clone())
|
||||
.await?;
|
||||
repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!(
|
||||
"Skipping variant {} for hash {}",
|
||||
variant,
|
||||
hex::encode(&hash)
|
||||
);
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating variant file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating variant file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &original_identifier, &new_identifier).await?;
|
||||
repo.relate_identifier(hash.clone().into(), &new_identifier)
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating original file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating original file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
let current_pct = pct.load(Ordering::Relaxed);
|
||||
if current_pct > 0 && current_index % current_pct == 0 {
|
||||
let percent = u32::try_from(current_index / current_pct)
|
||||
.expect("values 0-100 are always in u32 range");
|
||||
if percent == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
let estimated_duration_percent = elapsed / percent;
|
||||
let estimated_duration_remaining =
|
||||
(100u32.saturating_sub(percent)) * estimated_duration_percent;
|
||||
|
||||
let current_repo_size = repo_size.load(Ordering::Relaxed);
|
||||
|
||||
tracing::warn!(
|
||||
"Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)"
|
||||
);
|
||||
tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
let mut failure_count = 0;
|
||||
|
||||
loop {
|
||||
match do_migrate_file(repo, from, to, identifier).await {
|
||||
Ok(identifier) => return Ok(identifier),
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
return Err(MigrateError::From(e));
|
||||
}
|
||||
Err(migrate_error) => {
|
||||
failure_count += 1;
|
||||
|
||||
if failure_count > 10 {
|
||||
tracing::error!("Error migrating file, not retrying");
|
||||
return Err(migrate_error);
|
||||
} else {
|
||||
tracing::warn!("Failed moving file. Retrying +{failure_count}");
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum MigrateError {
|
||||
From(crate::store::StoreError),
|
||||
Details(crate::store::StoreError),
|
||||
To(crate::store::StoreError),
|
||||
}
|
||||
|
||||
async fn do_migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
let stream = from
|
||||
.to_stream(identifier, None, None)
|
||||
.await
|
||||
.map_err(MigrateError::From)?;
|
||||
|
||||
let details_opt = repo
|
||||
.details(identifier)
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
|
||||
let content_type = if let Some(details) = details_opt {
|
||||
details.content_type()
|
||||
} else {
|
||||
mime::APPLICATION_OCTET_STREAM
|
||||
};
|
||||
|
||||
let new_identifier = to
|
||||
.save_stream(stream, content_type)
|
||||
.await
|
||||
.map_err(MigrateError::To)?;
|
||||
|
||||
Ok(new_identifier)
|
||||
}
|
||||
|
||||
async fn migrate_details<R, I1, I2>(repo: &R, from: &I1, to: &I2) -> Result<(), Error>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
I1: Identifier,
|
||||
I2: Identifier,
|
||||
{
|
||||
if let Some(details) = repo.details(from).await? {
|
||||
repo.relate_details(to, &details).await?;
|
||||
repo.cleanup(from).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
418
src/migrate_store.rs
Normal file
418
src/migrate_store.rs
Normal file
|
@ -0,0 +1,418 @@
|
|||
use futures_util::StreamExt;
|
||||
use std::{
|
||||
rc::Rc,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
error::{Error, UploadError},
|
||||
repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo},
|
||||
store::{Identifier, Store},
|
||||
};
|
||||
|
||||
pub(super) async fn migrate_store<R, S1, S2>(
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store + Clone + 'static,
|
||||
S2: Store + Clone + 'static,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||
{
|
||||
tracing::warn!("Running checks");
|
||||
|
||||
if let Err(e) = from.health_check().await {
|
||||
tracing::warn!("Old store is not configured correctly");
|
||||
return Err(e.into());
|
||||
}
|
||||
if let Err(e) = to.health_check().await {
|
||||
tracing::warn!("New store is not configured correctly");
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
tracing::warn!("Checks complete, migrating store");
|
||||
|
||||
let mut failure_count = 0;
|
||||
|
||||
while let Err(e) =
|
||||
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await
|
||||
{
|
||||
tracing::error!("Migration failed with {}", format!("{e:?}"));
|
||||
|
||||
failure_count += 1;
|
||||
|
||||
if failure_count >= 50 {
|
||||
tracing::error!("Exceeded 50 errors");
|
||||
return Err(e);
|
||||
} else {
|
||||
tracing::warn!("Retrying migration +{failure_count}");
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MigrateState<R, S1, S2> {
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
continuing_migration: bool,
|
||||
skip_missing_files: bool,
|
||||
initial_repo_size: u64,
|
||||
repo_size: AtomicU64,
|
||||
pct: AtomicU64,
|
||||
index: AtomicU64,
|
||||
started_at: Instant,
|
||||
}
|
||||
|
||||
async fn do_migrate_store<R, S1, S2>(
|
||||
repo: R,
|
||||
from: S1,
|
||||
to: S2,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store + 'static,
|
||||
S2: Store + 'static,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||
{
|
||||
let continuing_migration = repo.is_continuing_migration().await?;
|
||||
let initial_repo_size = repo.size().await?;
|
||||
|
||||
if continuing_migration {
|
||||
tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes");
|
||||
} else {
|
||||
tracing::warn!("{initial_repo_size} hashes will be migrated");
|
||||
}
|
||||
|
||||
if initial_repo_size == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Hashes are read in a consistent order
|
||||
let stream = repo.hashes().await;
|
||||
let mut stream = Box::pin(stream);
|
||||
|
||||
let state = Rc::new(MigrateState {
|
||||
repo: repo.clone(),
|
||||
from,
|
||||
to,
|
||||
continuing_migration,
|
||||
skip_missing_files,
|
||||
initial_repo_size,
|
||||
repo_size: AtomicU64::new(initial_repo_size),
|
||||
pct: AtomicU64::new(initial_repo_size / 100),
|
||||
index: AtomicU64::new(0),
|
||||
started_at: Instant::now(),
|
||||
});
|
||||
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
while let Some(hash) = stream.next().await {
|
||||
let hash = hash?.as_ref().to_vec();
|
||||
|
||||
if joinset.len() >= 32 {
|
||||
if let Some(res) = joinset.join_next().await {
|
||||
res.map_err(|_| UploadError::Canceled)??;
|
||||
}
|
||||
}
|
||||
|
||||
let state = Rc::clone(&state);
|
||||
joinset.spawn_local(async move { migrate_hash(&state, hash).await });
|
||||
}
|
||||
|
||||
// clean up the migration table to avoid interfering with future migrations
|
||||
repo.clear().await?;
|
||||
|
||||
tracing::warn!("Migration completed successfully");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))]
|
||||
async fn migrate_hash<R, S1, S2>(
|
||||
state: &MigrateState<R, S1, S2>,
|
||||
hash: Vec<u8>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo,
|
||||
{
|
||||
let MigrateState {
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
continuing_migration,
|
||||
skip_missing_files,
|
||||
initial_repo_size,
|
||||
repo_size,
|
||||
pct,
|
||||
index,
|
||||
started_at,
|
||||
} = state;
|
||||
|
||||
let current_index = index.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let original_identifier = match repo.identifier(hash.clone().into()).await {
|
||||
Ok(Some(identifier)) => identifier,
|
||||
Ok(None) => {
|
||||
tracing::warn!(
|
||||
"Original File identifier for hash {} is missing, queue cleanup task",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
crate::queue::cleanup_hash(repo, hash.clone().into()).await?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if repo.is_migrated(&original_identifier).await? {
|
||||
// migrated original for hash - this means we can skip
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_repo_size = repo_size.load(Ordering::Acquire);
|
||||
|
||||
if *continuing_migration && current_repo_size == *initial_repo_size {
|
||||
// first time reaching unmigrated hash
|
||||
|
||||
let new_repo_size = initial_repo_size.saturating_sub(current_index);
|
||||
|
||||
if repo_size
|
||||
.compare_exchange(
|
||||
current_repo_size,
|
||||
new_repo_size,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
// we successfully updated the count, we're now in charge of setting up pct and
|
||||
// index and printing migration message
|
||||
|
||||
pct.store(new_repo_size / 100, Ordering::Release);
|
||||
index.store(0, Ordering::Release);
|
||||
|
||||
tracing::warn!(
|
||||
"Caught up to previous migration's end. {new_repo_size} hashes will be migrated"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating motion file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating motion file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (variant, identifier) in repo.variants(hash.clone().into()).await? {
|
||||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||
repo.remove_variant(hash.clone().into(), variant.clone())
|
||||
.await?;
|
||||
repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier)
|
||||
.await?;
|
||||
|
||||
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!(
|
||||
"Skipping variant {} for hash {}",
|
||||
variant,
|
||||
hex::encode(&hash)
|
||||
);
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating variant file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating variant file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
|
||||
Ok(new_identifier) => {
|
||||
migrate_details(repo, &original_identifier, &new_identifier).await?;
|
||||
repo.relate_identifier(hash.clone().into(), &new_identifier)
|
||||
.await?;
|
||||
}
|
||||
Err(MigrateError::Details(e)) => {
|
||||
tracing::warn!(
|
||||
"Error fetching details for motion file for hash {}",
|
||||
hex::encode(&hash)
|
||||
);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||
}
|
||||
Err(MigrateError::From(e)) => {
|
||||
tracing::warn!("Error migrating original file from old store");
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(MigrateError::To(e)) => {
|
||||
tracing::warn!("Error migrating original file to new store");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
let current_pct = pct.load(Ordering::Relaxed);
|
||||
if current_pct > 0 && current_index % current_pct == 0 {
|
||||
let percent = u32::try_from(current_index / current_pct)
|
||||
.expect("values 0-100 are always in u32 range");
|
||||
if percent == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
let estimated_duration_percent = elapsed / percent;
|
||||
let estimated_duration_remaining =
|
||||
(100u32.saturating_sub(percent)) * estimated_duration_percent;
|
||||
|
||||
let current_repo_size = repo_size.load(Ordering::Relaxed);
|
||||
|
||||
tracing::warn!(
|
||||
"Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)"
|
||||
);
|
||||
tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
skip_missing_files: bool,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
let mut failure_count = 0;
|
||||
|
||||
loop {
|
||||
match do_migrate_file(repo, from, to, identifier).await {
|
||||
Ok(identifier) => return Ok(identifier),
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
return Err(MigrateError::From(e));
|
||||
}
|
||||
Err(migrate_error) => {
|
||||
failure_count += 1;
|
||||
|
||||
if failure_count > 10 {
|
||||
tracing::error!("Error migrating file, not retrying");
|
||||
return Err(migrate_error);
|
||||
} else {
|
||||
tracing::warn!("Failed moving file. Retrying +{failure_count}");
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum MigrateError {
|
||||
From(crate::store::StoreError),
|
||||
Details(crate::store::StoreError),
|
||||
To(crate::store::StoreError),
|
||||
}
|
||||
|
||||
async fn do_migrate_file<R, S1, S2>(
|
||||
repo: &R,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
identifier: &S1::Identifier,
|
||||
) -> Result<S2::Identifier, MigrateError>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
S1: Store,
|
||||
S2: Store,
|
||||
{
|
||||
let stream = from
|
||||
.to_stream(identifier, None, None)
|
||||
.await
|
||||
.map_err(MigrateError::From)?;
|
||||
|
||||
let details_opt = repo
|
||||
.details(identifier)
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
|
||||
let content_type = if let Some(details) = details_opt {
|
||||
details.content_type()
|
||||
} else {
|
||||
mime::APPLICATION_OCTET_STREAM
|
||||
};
|
||||
|
||||
let new_identifier = to
|
||||
.save_stream(stream, content_type)
|
||||
.await
|
||||
.map_err(MigrateError::To)?;
|
||||
|
||||
Ok(new_identifier)
|
||||
}
|
||||
|
||||
async fn migrate_details<R, I1, I2>(repo: &R, from: &I1, to: &I2) -> Result<(), Error>
|
||||
where
|
||||
R: IdentifierRepo,
|
||||
I1: Identifier,
|
||||
I2: Identifier,
|
||||
{
|
||||
if let Some(details) = repo.details(from).await? {
|
||||
repo.relate_details(to, &details).await?;
|
||||
repo.cleanup(from).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue