mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Port migration changes from 0.4.1
This commit is contained in:
parent
a751d92436
commit
2961aae6e3
6 changed files with 529 additions and 376 deletions
|
@ -42,7 +42,7 @@ impl Details {
|
||||||
Ok(Details::from_parts(format, width, height, frames))
|
Ok(Details::from_parts(format, width, height, frames))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn from_store<S: Store + 'static>(
|
pub(crate) async fn from_store<S: Store>(
|
||||||
store: &S,
|
store: &S,
|
||||||
identifier: &S::Identifier,
|
identifier: &S::Identifier,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
|
|
|
@ -54,7 +54,7 @@ pub(crate) async fn discover_store_lite<S>(
|
||||||
identifier: &S::Identifier,
|
identifier: &S::Identifier,
|
||||||
) -> Result<DiscoveryLite, crate::error::Error>
|
) -> Result<DiscoveryLite, crate::error::Error>
|
||||||
where
|
where
|
||||||
S: Store + 'static,
|
S: Store,
|
||||||
{
|
{
|
||||||
if let Some(discovery) =
|
if let Some(discovery) =
|
||||||
ffmpeg::discover_stream_lite(store.to_stream(identifier, None, None).await?).await?
|
ffmpeg::discover_stream_lite(store.to_stream(identifier, None, None).await?).await?
|
||||||
|
|
378
src/lib.rs
378
src/lib.rs
|
@ -15,6 +15,7 @@ mod ingest;
|
||||||
mod init_tracing;
|
mod init_tracing;
|
||||||
mod magick;
|
mod magick;
|
||||||
mod middleware;
|
mod middleware;
|
||||||
|
mod migrate_store;
|
||||||
mod process;
|
mod process;
|
||||||
mod processor;
|
mod processor;
|
||||||
mod queue;
|
mod queue;
|
||||||
|
@ -46,7 +47,7 @@ use std::{
|
||||||
path::Path,
|
path::Path,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::atomic::{AtomicU64, Ordering},
|
sync::atomic::{AtomicU64, Ordering},
|
||||||
time::{Duration, Instant, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use tracing_actix_web::TracingLogger;
|
use tracing_actix_web::TracingLogger;
|
||||||
|
@ -62,6 +63,7 @@ use self::{
|
||||||
ingest::Session,
|
ingest::Session,
|
||||||
init_tracing::init_tracing,
|
init_tracing::init_tracing,
|
||||||
middleware::{Deadline, Internal},
|
middleware::{Deadline, Internal},
|
||||||
|
migrate_store::migrate_store,
|
||||||
queue::queue_generate,
|
queue::queue_generate,
|
||||||
repo::{
|
repo::{
|
||||||
Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo,
|
Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo,
|
||||||
|
@ -1297,7 +1299,7 @@ async fn launch_object_store<
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn migrate_inner<S1>(
|
async fn migrate_inner<S1>(
|
||||||
repo: &Repo,
|
repo: Repo,
|
||||||
client: Client,
|
client: Client,
|
||||||
from: S1,
|
from: S1,
|
||||||
to: config::primitives::Store,
|
to: config::primitives::Store,
|
||||||
|
@ -1441,7 +1443,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
match from {
|
match from {
|
||||||
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
||||||
migrate_inner(&repo, client, from, to, skip_missing_files).await?;
|
migrate_inner(repo, client, from, to, skip_missing_files).await?;
|
||||||
}
|
}
|
||||||
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
|
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -1475,7 +1477,7 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
.await?
|
.await?
|
||||||
.build(client.clone());
|
.build(client.clone());
|
||||||
|
|
||||||
migrate_inner(&repo, client, from, to, skip_missing_files).await?;
|
migrate_inner(repo, client, from, to, skip_missing_files).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1545,371 +1547,3 @@ pub async fn run() -> color_eyre::Result<()> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
|
|
||||||
const STORE_MIGRATION_MOTION: &str = "store-migration-motion";
|
|
||||||
const STORE_MIGRATION_VARIANT: &str = "store-migration-variant";
|
|
||||||
|
|
||||||
async fn migrate_store<R, S1, S2>(
|
|
||||||
repo: &R,
|
|
||||||
from: S1,
|
|
||||||
to: S2,
|
|
||||||
skip_missing_files: bool,
|
|
||||||
) -> Result<(), Error>
|
|
||||||
where
|
|
||||||
S1: Store + Clone + 'static,
|
|
||||||
S2: Store + Clone,
|
|
||||||
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
|
||||||
{
|
|
||||||
tracing::warn!("Running checks");
|
|
||||||
|
|
||||||
if let Err(e) = from.health_check().await {
|
|
||||||
tracing::warn!("Old store is not configured correctly");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
if let Err(e) = to.health_check().await {
|
|
||||||
tracing::warn!("New store is not configured correctly");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::warn!("Checks complete, migrating store");
|
|
||||||
|
|
||||||
let mut failure_count = 0;
|
|
||||||
|
|
||||||
while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await {
|
|
||||||
tracing::error!("Migration failed with {}", format!("{e:?}"));
|
|
||||||
|
|
||||||
failure_count += 1;
|
|
||||||
|
|
||||||
if failure_count >= 50 {
|
|
||||||
tracing::error!("Exceeded 50 errors");
|
|
||||||
return Err(e);
|
|
||||||
} else {
|
|
||||||
tracing::warn!("Retrying migration +{failure_count}");
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
async fn do_migrate_store<R, S1, S2>(
|
|
||||||
repo: &R,
|
|
||||||
from: S1,
|
|
||||||
to: S2,
|
|
||||||
skip_missing_files: bool,
|
|
||||||
) -> Result<(), Error>
|
|
||||||
where
|
|
||||||
S1: Store + 'static,
|
|
||||||
S2: Store,
|
|
||||||
R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo,
|
|
||||||
{
|
|
||||||
let mut repo_size = repo.size().await?;
|
|
||||||
|
|
||||||
let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?;
|
|
||||||
|
|
||||||
if progress_opt.is_some() {
|
|
||||||
tracing::warn!("Continuing previous migration of {repo_size} total hashes");
|
|
||||||
} else {
|
|
||||||
tracing::warn!("{repo_size} hashes will be migrated");
|
|
||||||
}
|
|
||||||
|
|
||||||
if repo_size == 0 {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut pct = repo_size / 100;
|
|
||||||
|
|
||||||
// Hashes are read in a consistent order
|
|
||||||
let stream = repo.hashes().await;
|
|
||||||
let mut stream = Box::pin(stream);
|
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
let mut index = 0;
|
|
||||||
while let Some(hash) = stream.next().await {
|
|
||||||
index += 1;
|
|
||||||
|
|
||||||
let hash = hash?;
|
|
||||||
|
|
||||||
if let Some(progress) = &progress_opt {
|
|
||||||
// we've reached the most recently migrated hash.
|
|
||||||
if progress.as_ref() == hash.as_ref() {
|
|
||||||
progress_opt.take();
|
|
||||||
|
|
||||||
// update repo size to remaining size
|
|
||||||
repo_size = repo_size.saturating_sub(index);
|
|
||||||
// update pct to be proportional to remainging size
|
|
||||||
pct = repo_size / 100;
|
|
||||||
|
|
||||||
// reset index to 0 for proper percent scaling
|
|
||||||
index = 0;
|
|
||||||
|
|
||||||
tracing::warn!(
|
|
||||||
"Caught up to previous migration's end. {repo_size} hashes will be migrated"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await {
|
|
||||||
Ok(Some(identifier)) => identifier,
|
|
||||||
Ok(None) => {
|
|
||||||
tracing::warn!(
|
|
||||||
"Original File identifier for hash {} is missing, queue cleanup task",
|
|
||||||
hex::encode(&hash)
|
|
||||||
);
|
|
||||||
crate::queue::cleanup_hash(repo, hash).await?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(identifier) = repo
|
|
||||||
.motion_identifier(hash.as_ref().to_vec().into())
|
|
||||||
.await?
|
|
||||||
{
|
|
||||||
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
|
|
||||||
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
|
||||||
Ok(new_identifier) => {
|
|
||||||
migrate_details(repo, identifier, &new_identifier).await?;
|
|
||||||
repo.relate_motion_identifier(
|
|
||||||
hash.as_ref().to_vec().into(),
|
|
||||||
&new_identifier,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
|
||||||
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
|
||||||
}
|
|
||||||
Err(MigrateError::Details(e)) => {
|
|
||||||
tracing::warn!(
|
|
||||||
"Error generating details for motion file for hash {}",
|
|
||||||
hex::encode(&hash)
|
|
||||||
);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) => {
|
|
||||||
tracing::warn!("Error migrating motion file from old store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
Err(MigrateError::To(e)) => {
|
|
||||||
tracing::warn!("Error migrating motion file to new store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?;
|
|
||||||
|
|
||||||
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? {
|
|
||||||
if let Some(variant_progress) = &variant_progress_opt {
|
|
||||||
if variant.as_bytes() == variant_progress.as_ref() {
|
|
||||||
variant_progress_opt.take();
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await {
|
|
||||||
Ok(new_identifier) => {
|
|
||||||
migrate_details(repo, identifier, &new_identifier).await?;
|
|
||||||
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
|
|
||||||
.await?;
|
|
||||||
repo.relate_variant_identifier(
|
|
||||||
hash.as_ref().to_vec().into(),
|
|
||||||
variant,
|
|
||||||
&new_identifier,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into())
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
|
||||||
tracing::warn!(
|
|
||||||
"Skipping variant {} for hash {}",
|
|
||||||
variant,
|
|
||||||
hex::encode(&hash)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(MigrateError::Details(e)) => {
|
|
||||||
tracing::warn!(
|
|
||||||
"Error generating details for variant file for hash {}",
|
|
||||||
hex::encode(&hash)
|
|
||||||
);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) => {
|
|
||||||
tracing::warn!("Error migrating variant file from old store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
Err(MigrateError::To(e)) => {
|
|
||||||
tracing::warn!("Error migrating variant file to new store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await {
|
|
||||||
Ok(new_identifier) => {
|
|
||||||
migrate_details(repo, original_identifier, &new_identifier).await?;
|
|
||||||
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
|
||||||
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
|
||||||
}
|
|
||||||
Err(MigrateError::Details(e)) => {
|
|
||||||
tracing::warn!(
|
|
||||||
"Error generating details for original file for hash {}",
|
|
||||||
hex::encode(&hash)
|
|
||||||
);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
Err(MigrateError::From(e)) => {
|
|
||||||
tracing::warn!("Error migrating original file from old store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
Err(MigrateError::To(e)) => {
|
|
||||||
tracing::warn!("Error migrating original file to new store");
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
|
|
||||||
.await?;
|
|
||||||
repo.remove(STORE_MIGRATION_VARIANT).await?;
|
|
||||||
repo.remove(STORE_MIGRATION_MOTION).await?;
|
|
||||||
|
|
||||||
if pct > 0 && index % pct == 0 {
|
|
||||||
let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range");
|
|
||||||
if percent == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let elapsed = now.elapsed();
|
|
||||||
let estimated_duration_percent = elapsed / percent;
|
|
||||||
let estimated_duration_remaining =
|
|
||||||
(100u32.saturating_sub(percent)) * estimated_duration_percent;
|
|
||||||
|
|
||||||
tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)");
|
|
||||||
tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// clean up the migration key to avoid interfering with future migrations
|
|
||||||
repo.remove(STORE_MIGRATION_PROGRESS).await?;
|
|
||||||
|
|
||||||
tracing::warn!("Migration completed successfully");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn migrate_file<R, S1, S2>(
|
|
||||||
repo: &R,
|
|
||||||
from: &S1,
|
|
||||||
to: &S2,
|
|
||||||
identifier: &S1::Identifier,
|
|
||||||
skip_missing_files: bool,
|
|
||||||
) -> Result<S2::Identifier, MigrateError>
|
|
||||||
where
|
|
||||||
R: IdentifierRepo,
|
|
||||||
S1: Store + 'static,
|
|
||||||
S2: Store,
|
|
||||||
{
|
|
||||||
let mut failure_count = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match do_migrate_file(repo, from, to, identifier).await {
|
|
||||||
Ok(identifier) => return Ok(identifier),
|
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
|
||||||
return Err(MigrateError::From(e));
|
|
||||||
}
|
|
||||||
Err(migrate_error) => {
|
|
||||||
failure_count += 1;
|
|
||||||
|
|
||||||
if failure_count > 10 {
|
|
||||||
tracing::error!("Error migrating file, not retrying");
|
|
||||||
return Err(migrate_error);
|
|
||||||
} else {
|
|
||||||
tracing::warn!("Failed moving file. Retrying +{failure_count}");
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum MigrateError {
|
|
||||||
From(crate::store::StoreError),
|
|
||||||
Details(crate::error::Error),
|
|
||||||
To(crate::store::StoreError),
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_migrate_file<R, S1, S2>(
|
|
||||||
repo: &R,
|
|
||||||
from: &S1,
|
|
||||||
to: &S2,
|
|
||||||
identifier: &S1::Identifier,
|
|
||||||
) -> Result<S2::Identifier, MigrateError>
|
|
||||||
where
|
|
||||||
R: IdentifierRepo,
|
|
||||||
S1: Store + 'static,
|
|
||||||
S2: Store,
|
|
||||||
{
|
|
||||||
let stream = from
|
|
||||||
.to_stream(identifier, None, None)
|
|
||||||
.await
|
|
||||||
.map_err(MigrateError::From)?;
|
|
||||||
|
|
||||||
let details_opt = repo
|
|
||||||
.details(identifier)
|
|
||||||
.await
|
|
||||||
.map_err(Error::from)
|
|
||||||
.map_err(MigrateError::Details)?
|
|
||||||
.and_then(|details| {
|
|
||||||
if details.internal_format().is_some() {
|
|
||||||
Some(details)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let details = if let Some(details) = details_opt {
|
|
||||||
details
|
|
||||||
} else {
|
|
||||||
let new_details = Details::from_store(from, identifier)
|
|
||||||
.await
|
|
||||||
.map_err(MigrateError::Details)?;
|
|
||||||
repo.relate_details(identifier, &new_details)
|
|
||||||
.await
|
|
||||||
.map_err(Error::from)
|
|
||||||
.map_err(MigrateError::Details)?;
|
|
||||||
new_details
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_identifier = to
|
|
||||||
.save_stream(stream, details.media_type())
|
|
||||||
.await
|
|
||||||
.map_err(MigrateError::To)?;
|
|
||||||
|
|
||||||
Ok(new_identifier)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn migrate_details<R, I1, I2>(repo: &R, from: I1, to: &I2) -> Result<(), Error>
|
|
||||||
where
|
|
||||||
R: IdentifierRepo,
|
|
||||||
I1: Identifier,
|
|
||||||
I2: Identifier,
|
|
||||||
{
|
|
||||||
if let Some(details) = repo.details(&from).await? {
|
|
||||||
repo.relate_details(to, &details).await?;
|
|
||||||
repo.cleanup(&from).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
440
src/migrate_store.rs
Normal file
440
src/migrate_store.rs
Normal file
|
@ -0,0 +1,440 @@
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use std::{
|
||||||
|
rc::Rc,
|
||||||
|
sync::atomic::{AtomicU64, Ordering},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
details::Details,
|
||||||
|
error::{Error, UploadError},
|
||||||
|
repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo},
|
||||||
|
store::{Identifier, Store},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(super) async fn migrate_store<R, S1, S2>(
|
||||||
|
repo: R,
|
||||||
|
from: S1,
|
||||||
|
to: S2,
|
||||||
|
skip_missing_files: bool,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
S1: Store + Clone + 'static,
|
||||||
|
S2: Store + Clone + 'static,
|
||||||
|
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||||
|
{
|
||||||
|
tracing::warn!("Running checks");
|
||||||
|
|
||||||
|
if let Err(e) = from.health_check().await {
|
||||||
|
tracing::warn!("Old store is not configured correctly");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
if let Err(e) = to.health_check().await {
|
||||||
|
tracing::warn!("New store is not configured correctly");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::warn!("Checks complete, migrating store");
|
||||||
|
|
||||||
|
let mut failure_count = 0;
|
||||||
|
|
||||||
|
while let Err(e) =
|
||||||
|
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await
|
||||||
|
{
|
||||||
|
tracing::error!("Migration failed with {}", format!("{e:?}"));
|
||||||
|
|
||||||
|
failure_count += 1;
|
||||||
|
|
||||||
|
if failure_count >= 50 {
|
||||||
|
tracing::error!("Exceeded 50 errors");
|
||||||
|
return Err(e);
|
||||||
|
} else {
|
||||||
|
tracing::warn!("Retrying migration +{failure_count}");
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MigrateState<R, S1, S2> {
|
||||||
|
repo: R,
|
||||||
|
from: S1,
|
||||||
|
to: S2,
|
||||||
|
continuing_migration: bool,
|
||||||
|
skip_missing_files: bool,
|
||||||
|
initial_repo_size: u64,
|
||||||
|
repo_size: AtomicU64,
|
||||||
|
pct: AtomicU64,
|
||||||
|
index: AtomicU64,
|
||||||
|
started_at: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_migrate_store<R, S1, S2>(
|
||||||
|
repo: R,
|
||||||
|
from: S1,
|
||||||
|
to: S2,
|
||||||
|
skip_missing_files: bool,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
S1: Store + 'static,
|
||||||
|
S2: Store + 'static,
|
||||||
|
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static,
|
||||||
|
{
|
||||||
|
let continuing_migration = repo.is_continuing_migration().await?;
|
||||||
|
let initial_repo_size = repo.size().await?;
|
||||||
|
|
||||||
|
if continuing_migration {
|
||||||
|
tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes");
|
||||||
|
} else {
|
||||||
|
tracing::warn!("{initial_repo_size} hashes will be migrated");
|
||||||
|
}
|
||||||
|
|
||||||
|
if initial_repo_size == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hashes are read in a consistent order
|
||||||
|
let stream = repo.hashes().await;
|
||||||
|
let mut stream = Box::pin(stream);
|
||||||
|
|
||||||
|
let state = Rc::new(MigrateState {
|
||||||
|
repo: repo.clone(),
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
continuing_migration,
|
||||||
|
skip_missing_files,
|
||||||
|
initial_repo_size,
|
||||||
|
repo_size: AtomicU64::new(initial_repo_size),
|
||||||
|
pct: AtomicU64::new(initial_repo_size / 100),
|
||||||
|
index: AtomicU64::new(0),
|
||||||
|
started_at: Instant::now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut joinset = tokio::task::JoinSet::new();
|
||||||
|
|
||||||
|
while let Some(hash) = stream.next().await {
|
||||||
|
let hash = hash?.as_ref().to_vec();
|
||||||
|
|
||||||
|
if joinset.len() >= 32 {
|
||||||
|
if let Some(res) = joinset.join_next().await {
|
||||||
|
res.map_err(|_| UploadError::Canceled)??;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = Rc::clone(&state);
|
||||||
|
joinset.spawn_local(async move { migrate_hash(&state, hash).await });
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(res) = joinset.join_next().await {
|
||||||
|
res.map_err(|_| UploadError::Canceled)??;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clean up the migration table to avoid interfering with future migrations
|
||||||
|
repo.clear().await?;
|
||||||
|
|
||||||
|
tracing::warn!("Migration completed successfully");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))]
|
||||||
|
async fn migrate_hash<R, S1, S2>(
|
||||||
|
state: &MigrateState<R, S1, S2>,
|
||||||
|
hash: Vec<u8>,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
S1: Store,
|
||||||
|
S2: Store,
|
||||||
|
R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo,
|
||||||
|
{
|
||||||
|
let MigrateState {
|
||||||
|
repo,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
continuing_migration,
|
||||||
|
skip_missing_files,
|
||||||
|
initial_repo_size,
|
||||||
|
repo_size,
|
||||||
|
pct,
|
||||||
|
index,
|
||||||
|
started_at,
|
||||||
|
} = state;
|
||||||
|
|
||||||
|
let current_index = index.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let original_identifier = match repo.identifier(hash.clone().into()).await {
|
||||||
|
Ok(Some(identifier)) => identifier,
|
||||||
|
Ok(None) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Original File identifier for hash {} is missing, queue cleanup task",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
crate::queue::cleanup_hash(repo, hash.clone().into()).await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if repo.is_migrated(&original_identifier).await? {
|
||||||
|
// migrated original for hash - this means we can skip
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_repo_size = repo_size.load(Ordering::Acquire);
|
||||||
|
|
||||||
|
if *continuing_migration && current_repo_size == *initial_repo_size {
|
||||||
|
// first time reaching unmigrated hash
|
||||||
|
|
||||||
|
let new_repo_size = initial_repo_size.saturating_sub(current_index);
|
||||||
|
|
||||||
|
if repo_size
|
||||||
|
.compare_exchange(
|
||||||
|
current_repo_size,
|
||||||
|
new_repo_size,
|
||||||
|
Ordering::AcqRel,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
// we successfully updated the count, we're now in charge of setting up pct and
|
||||||
|
// index and printing migration message
|
||||||
|
|
||||||
|
pct.store(new_repo_size / 100, Ordering::Release);
|
||||||
|
index.store(0, Ordering::Release);
|
||||||
|
|
||||||
|
tracing::warn!(
|
||||||
|
"Caught up to previous migration's end. {new_repo_size} hashes will be migrated"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
|
||||||
|
if !repo.is_migrated(&identifier).await? {
|
||||||
|
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||||
|
Ok(new_identifier) => {
|
||||||
|
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||||
|
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||||
|
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
|
||||||
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for motion file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) => {
|
||||||
|
tracing::warn!("Error migrating motion file from old store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
Err(MigrateError::To(e)) => {
|
||||||
|
tracing::warn!("Error migrating motion file to new store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (variant, identifier) in repo.variants(hash.clone().into()).await? {
|
||||||
|
if !repo.is_migrated(&identifier).await? {
|
||||||
|
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
|
||||||
|
Ok(new_identifier) => {
|
||||||
|
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||||
|
repo.remove_variant(hash.clone().into(), variant.clone())
|
||||||
|
.await?;
|
||||||
|
repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
repo.mark_migrated(&identifier, &new_identifier).await?;
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Skipping variant {} for hash {}",
|
||||||
|
variant,
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for motion file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) => {
|
||||||
|
tracing::warn!("Error migrating variant file from old store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
Err(MigrateError::To(e)) => {
|
||||||
|
tracing::warn!("Error migrating variant file to new store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
|
||||||
|
Ok(new_identifier) => {
|
||||||
|
migrate_details(repo, &original_identifier, &new_identifier).await?;
|
||||||
|
repo.relate_identifier(hash.clone().into(), &new_identifier)
|
||||||
|
.await?;
|
||||||
|
repo.mark_migrated(&original_identifier, &new_identifier)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => {
|
||||||
|
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
|
||||||
|
}
|
||||||
|
Err(MigrateError::Details(e)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Error generating details for motion file for hash {}",
|
||||||
|
hex::encode(&hash)
|
||||||
|
);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
Err(MigrateError::From(e)) => {
|
||||||
|
tracing::warn!("Error migrating original file from old store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
Err(MigrateError::To(e)) => {
|
||||||
|
tracing::warn!("Error migrating original file to new store");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_pct = pct.load(Ordering::Relaxed);
|
||||||
|
if current_pct > 0 && current_index % current_pct == 0 {
|
||||||
|
let percent = u32::try_from(current_index / current_pct)
|
||||||
|
.expect("values 0-100 are always in u32 range");
|
||||||
|
if percent == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let elapsed = started_at.elapsed();
|
||||||
|
let estimated_duration_percent = elapsed / percent;
|
||||||
|
let estimated_duration_remaining =
|
||||||
|
(100u32.saturating_sub(percent)) * estimated_duration_percent;
|
||||||
|
|
||||||
|
let current_repo_size = repo_size.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
tracing::warn!(
|
||||||
|
"Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)"
|
||||||
|
);
|
||||||
|
tracing::warn!("ETA: {estimated_duration_remaining:?} from now");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn migrate_file<R, S1, S2>(
|
||||||
|
repo: &R,
|
||||||
|
from: &S1,
|
||||||
|
to: &S2,
|
||||||
|
identifier: &S1::Identifier,
|
||||||
|
skip_missing_files: bool,
|
||||||
|
) -> Result<S2::Identifier, MigrateError>
|
||||||
|
where
|
||||||
|
R: IdentifierRepo,
|
||||||
|
S1: Store,
|
||||||
|
S2: Store,
|
||||||
|
{
|
||||||
|
let mut failure_count = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match do_migrate_file(repo, from, to, identifier).await {
|
||||||
|
Ok(identifier) => return Ok(identifier),
|
||||||
|
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||||
|
return Err(MigrateError::From(e));
|
||||||
|
}
|
||||||
|
Err(migrate_error) => {
|
||||||
|
failure_count += 1;
|
||||||
|
|
||||||
|
if failure_count > 10 {
|
||||||
|
tracing::error!("Error migrating file, not retrying");
|
||||||
|
return Err(migrate_error);
|
||||||
|
} else {
|
||||||
|
tracing::warn!("Failed moving file. Retrying +{failure_count}");
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum MigrateError {
|
||||||
|
From(crate::store::StoreError),
|
||||||
|
Details(crate::error::Error),
|
||||||
|
To(crate::store::StoreError),
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_migrate_file<R, S1, S2>(
|
||||||
|
repo: &R,
|
||||||
|
from: &S1,
|
||||||
|
to: &S2,
|
||||||
|
identifier: &S1::Identifier,
|
||||||
|
) -> Result<S2::Identifier, MigrateError>
|
||||||
|
where
|
||||||
|
R: IdentifierRepo,
|
||||||
|
S1: Store,
|
||||||
|
S2: Store,
|
||||||
|
{
|
||||||
|
let stream = from
|
||||||
|
.to_stream(identifier, None, None)
|
||||||
|
.await
|
||||||
|
.map_err(MigrateError::From)?;
|
||||||
|
|
||||||
|
let details_opt = repo
|
||||||
|
.details(identifier)
|
||||||
|
.await
|
||||||
|
.map_err(Error::from)
|
||||||
|
.map_err(MigrateError::Details)?
|
||||||
|
.and_then(|details| {
|
||||||
|
if details.internal_format().is_some() {
|
||||||
|
Some(details)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let details = if let Some(details) = details_opt {
|
||||||
|
details
|
||||||
|
} else {
|
||||||
|
let new_details = Details::from_store(from, identifier)
|
||||||
|
.await
|
||||||
|
.map_err(MigrateError::Details)?;
|
||||||
|
repo.relate_details(identifier, &new_details)
|
||||||
|
.await
|
||||||
|
.map_err(Error::from)
|
||||||
|
.map_err(MigrateError::Details)?;
|
||||||
|
new_details
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_identifier = to
|
||||||
|
.save_stream(stream, details.media_type())
|
||||||
|
.await
|
||||||
|
.map_err(MigrateError::To)?;
|
||||||
|
|
||||||
|
Ok(new_identifier)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn migrate_details<R, I1, I2>(repo: &R, from: &I1, to: &I2) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
R: IdentifierRepo,
|
||||||
|
I1: Identifier,
|
||||||
|
I2: Identifier,
|
||||||
|
{
|
||||||
|
if let Some(details) = repo.details(from).await? {
|
||||||
|
repo.relate_details(to, &details).await?;
|
||||||
|
repo.cleanup(from).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
42
src/repo.rs
42
src/repo.rs
|
@ -72,6 +72,7 @@ pub(crate) trait FullRepo:
|
||||||
+ AliasRepo
|
+ AliasRepo
|
||||||
+ QueueRepo
|
+ QueueRepo
|
||||||
+ HashRepo
|
+ HashRepo
|
||||||
|
+ MigrationRepo
|
||||||
+ Send
|
+ Send
|
||||||
+ Sync
|
+ Sync
|
||||||
+ Clone
|
+ Clone
|
||||||
|
@ -261,6 +262,47 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait(?Send)]
|
||||||
|
pub(crate) trait MigrationRepo: BaseRepo {
|
||||||
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError>;
|
||||||
|
|
||||||
|
async fn mark_migrated<I1: Identifier, I2: Identifier>(
|
||||||
|
&self,
|
||||||
|
old_identifier: &I1,
|
||||||
|
new_identifier: &I2,
|
||||||
|
) -> Result<(), StoreError>;
|
||||||
|
|
||||||
|
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError>;
|
||||||
|
|
||||||
|
async fn clear(&self) -> Result<(), RepoError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait(?Send)]
|
||||||
|
impl<T> MigrationRepo for actix_web::web::Data<T>
|
||||||
|
where
|
||||||
|
T: MigrationRepo,
|
||||||
|
{
|
||||||
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
||||||
|
T::is_continuing_migration(self).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_migrated<I1: Identifier, I2: Identifier>(
|
||||||
|
&self,
|
||||||
|
old_identifier: &I1,
|
||||||
|
new_identifier: &I2,
|
||||||
|
) -> Result<(), StoreError> {
|
||||||
|
T::mark_migrated(self, old_identifier, new_identifier).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
|
||||||
|
T::is_migrated(self, identifier).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn clear(&self) -> Result<(), RepoError> {
|
||||||
|
T::clear(self).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
pub(crate) trait HashRepo: BaseRepo {
|
pub(crate) trait HashRepo: BaseRepo {
|
||||||
type Stream: Stream<Item = Result<Self::Bytes, RepoError>>;
|
type Stream: Stream<Item = Result<Self::Bytes, RepoError>>;
|
||||||
|
|
|
@ -2,8 +2,8 @@ use crate::{
|
||||||
details::MaybeHumanDate,
|
details::MaybeHumanDate,
|
||||||
repo::{
|
repo::{
|
||||||
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
|
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
|
||||||
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo,
|
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo,
|
||||||
UploadId, UploadRepo, UploadResult,
|
QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
|
||||||
},
|
},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::StoreError,
|
store::StoreError,
|
||||||
|
@ -69,6 +69,7 @@ pub(crate) struct SledRepo {
|
||||||
in_progress_queue: Tree,
|
in_progress_queue: Tree,
|
||||||
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
|
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
|
||||||
uploads: Tree,
|
uploads: Tree,
|
||||||
|
migration_identifiers: Tree,
|
||||||
cache_capacity: u64,
|
cache_capacity: u64,
|
||||||
export_path: PathBuf,
|
export_path: PathBuf,
|
||||||
db: Db,
|
db: Db,
|
||||||
|
@ -100,6 +101,7 @@ impl SledRepo {
|
||||||
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
|
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
|
||||||
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
|
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
|
||||||
uploads: db.open_tree("pict-rs-uploads-tree")?,
|
uploads: db.open_tree("pict-rs-uploads-tree")?,
|
||||||
|
migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?,
|
||||||
cache_capacity,
|
cache_capacity,
|
||||||
export_path,
|
export_path,
|
||||||
db,
|
db,
|
||||||
|
@ -461,6 +463,41 @@ impl IdentifierRepo for SledRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait(?Send)]
|
||||||
|
impl MigrationRepo for SledRepo {
|
||||||
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
||||||
|
Ok(!self.migration_identifiers.is_empty())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_migrated<I1: Identifier, I2: Identifier>(
|
||||||
|
&self,
|
||||||
|
old_identifier: &I1,
|
||||||
|
new_identifier: &I2,
|
||||||
|
) -> Result<(), StoreError> {
|
||||||
|
let key = new_identifier.to_bytes()?;
|
||||||
|
let value = old_identifier.to_bytes()?;
|
||||||
|
|
||||||
|
b!(
|
||||||
|
self.migration_identifiers,
|
||||||
|
migration_identifiers.insert(key, value)
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn is_migrated<I: Identifier>(&self, identifier: &I) -> Result<bool, StoreError> {
|
||||||
|
let key = identifier.to_bytes()?;
|
||||||
|
|
||||||
|
Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn clear(&self) -> Result<(), RepoError> {
|
||||||
|
b!(self.migration_identifiers, migration_identifiers.clear());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type StreamItem = Result<IVec, RepoError>;
|
type StreamItem = Result<IVec, RepoError>;
|
||||||
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
|
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue