2023-08-16 16:47:36 +00:00
|
|
|
use tokio::task::JoinSet;
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
use crate::{
|
|
|
|
config::Configuration,
|
|
|
|
details::Details,
|
|
|
|
error::Error,
|
2023-08-16 21:09:40 +00:00
|
|
|
repo::{ArcRepo, DeleteToken, Hash},
|
2023-08-16 00:19:03 +00:00
|
|
|
repo_04::{
|
|
|
|
AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _,
|
|
|
|
SledRepo as OldSledRepo,
|
|
|
|
},
|
|
|
|
store::Store,
|
|
|
|
stream::IntoStreamer,
|
|
|
|
};
|
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
const MIGRATE_CONCURRENCY: usize = 32;
|
|
|
|
const GENERATOR_KEY: &str = "last-path";
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
|
|
|
pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result<(), Error> {
|
|
|
|
tracing::warn!("Running checks");
|
|
|
|
if let Err(e) = old_repo.health_check().await {
|
|
|
|
tracing::warn!("Old repo is not configured correctly");
|
|
|
|
return Err(e.into());
|
|
|
|
}
|
|
|
|
if let Err(e) = new_repo.health_check().await {
|
|
|
|
tracing::warn!("New repo is not configured correctly");
|
|
|
|
return Err(e.into());
|
|
|
|
}
|
|
|
|
|
|
|
|
let total_size = old_repo.size().await?;
|
|
|
|
let pct = (total_size / 100).max(1);
|
|
|
|
tracing::warn!("Checks complete, migrating repo");
|
|
|
|
tracing::warn!("{total_size} hashes will be migrated");
|
|
|
|
|
|
|
|
let mut hash_stream = old_repo.hashes().await.into_streamer();
|
|
|
|
|
|
|
|
let mut index = 0;
|
|
|
|
while let Some(res) = hash_stream.next().await {
|
|
|
|
if let Ok(hash) = res {
|
|
|
|
let _ = migrate_hash(old_repo.clone(), new_repo.clone(), hash).await;
|
|
|
|
} else {
|
|
|
|
tracing::warn!("Failed to read hash, skipping");
|
|
|
|
}
|
|
|
|
|
|
|
|
index += 1;
|
|
|
|
|
|
|
|
if index % pct == 0 {
|
|
|
|
let percent = index / pct;
|
|
|
|
|
|
|
|
tracing::warn!("Migration {percent}% complete - {index}/{total_size}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? {
|
|
|
|
new_repo
|
|
|
|
.set(GENERATOR_KEY, generator_state.to_vec().into())
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
tracing::warn!("Migration complete");
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 16:47:36 +00:00
|
|
|
pub(crate) async fn migrate_04<S: Store + 'static>(
|
2023-08-16 00:19:03 +00:00
|
|
|
old_repo: OldSledRepo,
|
2023-08-16 16:47:36 +00:00
|
|
|
new_repo: ArcRepo,
|
|
|
|
store: S,
|
|
|
|
config: Configuration,
|
2023-08-16 00:19:03 +00:00
|
|
|
) -> Result<(), Error> {
|
|
|
|
tracing::warn!("Running checks");
|
|
|
|
if let Err(e) = old_repo.health_check().await {
|
|
|
|
tracing::warn!("Old repo is not configured correctly");
|
|
|
|
return Err(e.into());
|
|
|
|
}
|
|
|
|
if let Err(e) = new_repo.health_check().await {
|
|
|
|
tracing::warn!("New repo is not configured correctly");
|
|
|
|
return Err(e.into());
|
|
|
|
}
|
|
|
|
if let Err(e) = store.health_check().await {
|
|
|
|
tracing::warn!("Store is not configured correctly");
|
|
|
|
return Err(e.into());
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
let total_size = old_repo.size().await?;
|
|
|
|
let pct = (total_size / 100).max(1);
|
2023-08-16 00:19:03 +00:00
|
|
|
tracing::warn!("Checks complete, migrating repo");
|
2023-08-16 00:33:19 +00:00
|
|
|
tracing::warn!("{total_size} hashes will be migrated");
|
2023-08-16 00:19:03 +00:00
|
|
|
|
|
|
|
let mut hash_stream = old_repo.hashes().await.into_streamer();
|
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
let mut set = JoinSet::new();
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
let mut index = 0;
|
2023-08-16 00:19:03 +00:00
|
|
|
while let Some(res) = hash_stream.next().await {
|
|
|
|
if let Ok(hash) = res {
|
2023-08-16 16:47:36 +00:00
|
|
|
set.spawn_local(migrate_hash_04(
|
|
|
|
old_repo.clone(),
|
|
|
|
new_repo.clone(),
|
|
|
|
store.clone(),
|
|
|
|
config.clone(),
|
|
|
|
hash.clone(),
|
|
|
|
));
|
2023-08-16 00:19:03 +00:00
|
|
|
} else {
|
|
|
|
tracing::warn!("Failed to read hash, skipping");
|
|
|
|
}
|
2023-08-16 00:33:19 +00:00
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
while set.len() >= MIGRATE_CONCURRENCY {
|
2023-08-16 17:43:07 +00:00
|
|
|
if set.join_next().await.is_some() {
|
2023-08-16 16:47:36 +00:00
|
|
|
index += 1;
|
|
|
|
|
|
|
|
if index % pct == 0 {
|
|
|
|
let percent = index / pct;
|
|
|
|
|
|
|
|
tracing::warn!("Migration {percent}% complete - {index}/{total_size}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 17:43:07 +00:00
|
|
|
while set.join_next().await.is_some() {
|
2023-08-16 16:47:36 +00:00
|
|
|
index += 1;
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
if index % pct == 0 {
|
|
|
|
let percent = index / pct;
|
|
|
|
|
|
|
|
tracing::warn!("Migration {percent}% complete - {index}/{total_size}");
|
|
|
|
}
|
2023-08-16 00:19:03 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
if let Some(generator_state) = old_repo.get(GENERATOR_KEY).await? {
|
2023-08-16 00:19:03 +00:00
|
|
|
new_repo
|
2023-08-16 16:47:36 +00:00
|
|
|
.set(GENERATOR_KEY, generator_state.to_vec().into())
|
2023-08-16 00:19:03 +00:00
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
|
2023-08-28 18:40:19 +00:00
|
|
|
if let Some(generator_state) = old_repo.get(crate::NOT_FOUND_KEY).await? {
|
|
|
|
new_repo
|
|
|
|
.set(crate::NOT_FOUND_KEY, generator_state.to_vec().into())
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
tracing::warn!("Migration complete");
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
|
|
|
|
let mut hash_failures = 0;
|
|
|
|
|
|
|
|
while let Err(e) = do_migrate_hash(&old_repo, &new_repo, hash.clone()).await {
|
|
|
|
hash_failures += 1;
|
|
|
|
|
|
|
|
if hash_failures > 10 {
|
|
|
|
tracing::error!(
|
|
|
|
"Failed to migrate hash {}, skipping\n{hash:?}",
|
|
|
|
format!("{e:?}")
|
|
|
|
);
|
|
|
|
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
tracing::warn!("Failed to migrate hash {hash:?}, retrying +{hash_failures}",);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn migrate_hash_04<S: Store>(
|
2023-08-16 16:47:36 +00:00
|
|
|
old_repo: OldSledRepo,
|
|
|
|
new_repo: ArcRepo,
|
|
|
|
store: S,
|
|
|
|
config: Configuration,
|
2023-08-16 00:19:03 +00:00
|
|
|
old_hash: sled::IVec,
|
2023-08-16 21:09:40 +00:00
|
|
|
) {
|
2023-08-16 00:19:03 +00:00
|
|
|
let mut hash_failures = 0;
|
|
|
|
|
2023-08-16 16:47:36 +00:00
|
|
|
while let Err(e) =
|
|
|
|
do_migrate_hash_04(&old_repo, &new_repo, &store, &config, old_hash.clone()).await
|
2023-08-16 00:19:03 +00:00
|
|
|
{
|
|
|
|
hash_failures += 1;
|
|
|
|
|
|
|
|
if hash_failures > 10 {
|
|
|
|
tracing::error!(
|
|
|
|
"Failed to migrate hash {}, skipping\n{}",
|
|
|
|
hex::encode(&old_hash[..]),
|
|
|
|
format!("{e:?}")
|
|
|
|
);
|
2023-08-16 21:09:40 +00:00
|
|
|
|
|
|
|
break;
|
2023-08-16 00:19:03 +00:00
|
|
|
} else {
|
|
|
|
tracing::warn!(
|
2023-08-16 18:29:22 +00:00
|
|
|
"Failed to migrate hash {}, retrying +{hash_failures}",
|
2023-08-16 00:19:03 +00:00
|
|
|
hex::encode(&old_hash[..])
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
2023-08-16 21:09:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
|
|
async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
|
|
|
|
let Some(identifier) = old_repo.identifier(hash.clone()).await? else {
|
|
|
|
tracing::warn!("Skipping hash {hash:?}, no identifier");
|
|
|
|
return Ok(());
|
|
|
|
};
|
|
|
|
|
|
|
|
let _ = new_repo.create_hash(hash.clone(), &identifier).await?;
|
|
|
|
|
|
|
|
if let Some(details) = old_repo.details(&identifier).await? {
|
|
|
|
new_repo.relate_details(&identifier, &details).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(identifier) = old_repo.motion_identifier(hash.clone()).await? {
|
|
|
|
new_repo
|
|
|
|
.relate_motion_identifier(hash.clone(), &identifier)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
if let Some(details) = old_repo.details(&identifier).await? {
|
|
|
|
new_repo.relate_details(&identifier, &details).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for alias in old_repo.for_hash(hash.clone()).await? {
|
|
|
|
let delete_token = old_repo
|
|
|
|
.delete_token(&alias)
|
|
|
|
.await?
|
|
|
|
.unwrap_or_else(DeleteToken::generate);
|
|
|
|
let _ = new_repo
|
|
|
|
.create_alias(&alias, &delete_token, hash.clone())
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
if let Some(timestamp) = old_repo.alias_accessed_at(alias.clone()).await? {
|
|
|
|
new_repo.set_accessed_alias(alias, timestamp).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (variant, identifier) in old_repo.variants(hash.clone()).await? {
|
|
|
|
let _ = new_repo
|
|
|
|
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
if let Some(timestamp) = new_repo
|
|
|
|
.variant_accessed_at(hash.clone(), variant.clone())
|
|
|
|
.await?
|
|
|
|
{
|
|
|
|
new_repo
|
|
|
|
.set_accessed_variant(hash.clone(), variant, timestamp)
|
|
|
|
.await?;
|
|
|
|
} else {
|
|
|
|
new_repo.accessed_variant(hash.clone(), variant).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(details) = old_repo.details(&identifier).await? {
|
|
|
|
new_repo.relate_details(&identifier, &details).await?;
|
|
|
|
}
|
|
|
|
}
|
2023-08-16 00:19:03 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn do_migrate_hash_04<S: Store>(
|
|
|
|
old_repo: &OldSledRepo,
|
|
|
|
new_repo: &ArcRepo,
|
|
|
|
store: &S,
|
|
|
|
config: &Configuration,
|
|
|
|
old_hash: sled::IVec,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
let Some(identifier) = old_repo.identifier::<S::Identifier>(old_hash.clone()).await? else {
|
|
|
|
tracing::warn!("Skipping hash {}, no identifier", hex::encode(&old_hash));
|
|
|
|
return Ok(());
|
|
|
|
};
|
|
|
|
|
|
|
|
let size = store.len(&identifier).await?;
|
|
|
|
|
2023-08-16 16:57:09 +00:00
|
|
|
let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?;
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
let aliases = old_repo.for_hash(old_hash.clone()).await?;
|
|
|
|
let variants = old_repo.variants::<S::Identifier>(old_hash.clone()).await?;
|
|
|
|
let motion_identifier = old_repo
|
|
|
|
.motion_identifier::<S::Identifier>(old_hash.clone())
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
let hash = old_hash[..].try_into().expect("Invalid hash size");
|
|
|
|
|
2023-08-16 17:36:18 +00:00
|
|
|
let hash = Hash::new(hash, size, hash_details.internal_format());
|
2023-08-16 00:19:03 +00:00
|
|
|
|
2023-08-28 23:43:24 +00:00
|
|
|
let _ = new_repo
|
|
|
|
.create_hash_with_timestamp(hash.clone(), &identifier, hash_details.created_at())
|
|
|
|
.await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
|
|
|
|
for alias in aliases {
|
|
|
|
let delete_token = old_repo
|
|
|
|
.delete_token(&alias)
|
|
|
|
.await?
|
|
|
|
.unwrap_or_else(DeleteToken::generate);
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
let _ = new_repo
|
|
|
|
.create_alias(&alias, &delete_token, hash.clone())
|
|
|
|
.await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 16:57:09 +00:00
|
|
|
if let Some(identifier) = motion_identifier {
|
2023-08-16 00:19:03 +00:00
|
|
|
new_repo
|
2023-08-16 16:57:09 +00:00
|
|
|
.relate_motion_identifier(hash.clone(), &identifier)
|
2023-08-16 00:19:03 +00:00
|
|
|
.await?;
|
2023-08-16 16:57:09 +00:00
|
|
|
|
|
|
|
set_details(old_repo, new_repo, store, config, &identifier).await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for (variant, identifier) in variants {
|
2023-08-16 20:12:16 +00:00
|
|
|
let _ = new_repo
|
2023-08-16 00:19:03 +00:00
|
|
|
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
|
|
|
|
.await?;
|
2023-08-16 16:57:09 +00:00
|
|
|
|
|
|
|
set_details(old_repo, new_repo, store, config, &identifier).await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
new_repo.accessed_variant(hash.clone(), variant).await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 16:57:09 +00:00
|
|
|
async fn set_details<S: Store>(
|
|
|
|
old_repo: &OldSledRepo,
|
|
|
|
new_repo: &ArcRepo,
|
|
|
|
store: &S,
|
|
|
|
config: &Configuration,
|
|
|
|
identifier: &S::Identifier,
|
|
|
|
) -> Result<Details, Error> {
|
|
|
|
if let Some(details) = new_repo.details(identifier).await? {
|
|
|
|
Ok(details)
|
|
|
|
} else {
|
|
|
|
let details = fetch_or_generate_details(old_repo, store, config, identifier).await?;
|
|
|
|
new_repo.relate_details(identifier, &details).await?;
|
|
|
|
Ok(details)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:33:19 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 16:57:09 +00:00
|
|
|
async fn fetch_or_generate_details<S: Store>(
|
2023-08-16 00:19:03 +00:00
|
|
|
old_repo: &OldSledRepo,
|
|
|
|
store: &S,
|
|
|
|
config: &Configuration,
|
|
|
|
identifier: &S::Identifier,
|
|
|
|
) -> Result<Details, Error> {
|
2023-08-16 17:36:18 +00:00
|
|
|
let details_opt = old_repo.details(identifier).await?;
|
2023-08-16 00:19:03 +00:00
|
|
|
|
|
|
|
if let Some(details) = details_opt {
|
|
|
|
Ok(details)
|
|
|
|
} else {
|
|
|
|
Details::from_store(store, identifier, config.media.process_timeout)
|
|
|
|
.await
|
|
|
|
.map_err(From::from)
|
|
|
|
}
|
|
|
|
}
|