mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Don't rely on global state for tmp_dir
This commit is contained in:
parent
fff4afe105
commit
7b5a3020fa
16 changed files with 235 additions and 119 deletions
|
@ -3,6 +3,7 @@ use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
formats::{InternalFormat, InternalVideoFormat},
|
formats::{InternalFormat, InternalVideoFormat},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
|
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
|
||||||
|
@ -79,13 +80,17 @@ impl Details {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "DEBUG")]
|
#[tracing::instrument(level = "DEBUG")]
|
||||||
pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result<Self, Error> {
|
pub(crate) async fn from_bytes(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
|
timeout: u64,
|
||||||
|
input: web::Bytes,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
let Discovery {
|
let Discovery {
|
||||||
input,
|
input,
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
frames,
|
frames,
|
||||||
} = crate::discover::discover_bytes(timeout, input).await?;
|
} = crate::discover::discover_bytes(tmp_dir, timeout, input).await?;
|
||||||
|
|
||||||
Ok(Details::from_parts(
|
Ok(Details::from_parts(
|
||||||
input.internal_format(),
|
input.internal_format(),
|
||||||
|
|
|
@ -4,7 +4,7 @@ mod magick;
|
||||||
|
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
|
|
||||||
use crate::formats::InputFile;
|
use crate::{formats::InputFile, tmp_file::TmpDir};
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub(crate) struct Discovery {
|
pub(crate) struct Discovery {
|
||||||
|
@ -27,12 +27,13 @@ pub(crate) enum DiscoverError {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn discover_bytes(
|
pub(crate) async fn discover_bytes(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
) -> Result<Discovery, crate::error::Error> {
|
) -> Result<Discovery, crate::error::Error> {
|
||||||
let discovery = ffmpeg::discover_bytes(timeout, bytes.clone()).await?;
|
let discovery = ffmpeg::discover_bytes(tmp_dir, timeout, bytes.clone()).await?;
|
||||||
|
|
||||||
let discovery = magick::confirm_bytes(discovery, timeout, bytes.clone()).await?;
|
let discovery = magick::confirm_bytes(tmp_dir, discovery, timeout, bytes.clone()).await?;
|
||||||
|
|
||||||
let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?;
|
let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?;
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ use crate::{
|
||||||
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec,
|
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmAudioCodec, WebmCodec,
|
||||||
},
|
},
|
||||||
process::Process,
|
process::Process,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
@ -158,11 +159,11 @@ struct Flags {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn discover_bytes(
|
pub(super) async fn discover_bytes(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
) -> Result<Option<Discovery>, FfMpegError> {
|
) -> Result<Option<Discovery>, FfMpegError> {
|
||||||
discover_file(
|
discover_file(tmp_dir, timeout, move |mut file| {
|
||||||
move |mut file| {
|
|
||||||
let bytes = bytes.clone();
|
let bytes = bytes.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
|
@ -171,9 +172,7 @@ pub(super) async fn discover_bytes(
|
||||||
.map_err(FfMpegError::Write)?;
|
.map_err(FfMpegError::Write)?;
|
||||||
Ok(file)
|
Ok(file)
|
||||||
}
|
}
|
||||||
},
|
})
|
||||||
timeout,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,12 +191,16 @@ async fn allows_alpha(pixel_format: &str, timeout: u64) -> Result<bool, FfMpegEr
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(f))]
|
#[tracing::instrument(skip(f))]
|
||||||
async fn discover_file<F, Fut>(f: F, timeout: u64) -> Result<Option<Discovery>, FfMpegError>
|
async fn discover_file<F, Fut>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
|
timeout: u64,
|
||||||
|
f: F,
|
||||||
|
) -> Result<Option<Discovery>, FfMpegError>
|
||||||
where
|
where
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
F: FnOnce(crate::file::File) -> Fut,
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
|
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
|
||||||
{
|
{
|
||||||
let input_file = crate::tmp_file::tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -9,6 +9,7 @@ use crate::{
|
||||||
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
||||||
magick::MagickError,
|
magick::MagickError,
|
||||||
process::Process,
|
process::Process,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::Discovery;
|
use super::Discovery;
|
||||||
|
@ -31,6 +32,7 @@ struct Geometry {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn confirm_bytes(
|
pub(super) async fn confirm_bytes(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
discovery: Option<Discovery>,
|
discovery: Option<Discovery>,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
|
@ -42,15 +44,12 @@ pub(super) async fn confirm_bytes(
|
||||||
height,
|
height,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
let frames = count_avif_frames(
|
let frames = count_avif_frames(tmp_dir, timeout, move |mut file| async move {
|
||||||
move |mut file| async move {
|
|
||||||
file.write_from_bytes(bytes)
|
file.write_from_bytes(bytes)
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::Write)?;
|
.map_err(MagickError::Write)?;
|
||||||
Ok(file)
|
Ok(file)
|
||||||
},
|
})
|
||||||
timeout,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if frames == 1 {
|
if frames == 1 {
|
||||||
|
@ -84,26 +83,23 @@ pub(super) async fn confirm_bytes(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
discover_file(
|
discover_file(tmp_dir, timeout, move |mut file| async move {
|
||||||
move |mut file| async move {
|
|
||||||
file.write_from_bytes(bytes)
|
file.write_from_bytes(bytes)
|
||||||
.await
|
.await
|
||||||
.map_err(MagickError::Write)?;
|
.map_err(MagickError::Write)?;
|
||||||
|
|
||||||
Ok(file)
|
Ok(file)
|
||||||
},
|
})
|
||||||
timeout,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "DEBUG", skip(f))]
|
#[tracing::instrument(level = "DEBUG", skip(f))]
|
||||||
async fn count_avif_frames<F, Fut>(f: F, timeout: u64) -> Result<u32, MagickError>
|
async fn count_avif_frames<F, Fut>(tmp_dir: &TmpDir, timeout: u64, f: F) -> Result<u32, MagickError>
|
||||||
where
|
where
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
F: FnOnce(crate::file::File) -> Fut,
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||||
{
|
{
|
||||||
let input_file = crate::tmp_file::tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
|
@ -149,12 +145,16 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "DEBUG", skip(f))]
|
#[tracing::instrument(level = "DEBUG", skip(f))]
|
||||||
async fn discover_file<F, Fut>(f: F, timeout: u64) -> Result<Discovery, MagickError>
|
async fn discover_file<F, Fut>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
|
timeout: u64,
|
||||||
|
f: F,
|
||||||
|
) -> Result<Discovery, MagickError>
|
||||||
where
|
where
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
F: FnOnce(crate::file::File) -> Fut,
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||||
{
|
{
|
||||||
let input_file = crate::tmp_file::tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -8,6 +8,7 @@ use crate::{
|
||||||
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
|
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
|
||||||
repo::{ArcRepo, Hash, VariantAlreadyExists},
|
repo::{ArcRepo, Hash, VariantAlreadyExists},
|
||||||
store::Store,
|
store::Store,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use std::{path::PathBuf, sync::Arc, time::Instant};
|
use std::{path::PathBuf, sync::Arc, time::Instant};
|
||||||
|
@ -43,6 +44,7 @@ impl Drop for MetricsGuard {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip(repo, store, hash, process_map, media))]
|
#[tracing::instrument(skip(repo, store, hash, process_map, media))]
|
||||||
pub(crate) async fn generate<S: Store + 'static>(
|
pub(crate) async fn generate<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
process_map: &ProcessMap,
|
process_map: &ProcessMap,
|
||||||
|
@ -54,6 +56,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
) -> Result<(Details, Bytes), Error> {
|
) -> Result<(Details, Bytes), Error> {
|
||||||
let process_fut = process(
|
let process_fut = process(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
format,
|
format,
|
||||||
|
@ -74,6 +77,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip(repo, store, hash, media))]
|
#[tracing::instrument(skip(repo, store, hash, media))]
|
||||||
async fn process<S: Store + 'static>(
|
async fn process<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
output_format: InputProcessableFormat,
|
output_format: InputProcessableFormat,
|
||||||
|
@ -87,6 +91,7 @@ async fn process<S: Store + 'static>(
|
||||||
let permit = crate::process_semaphore().acquire().await?;
|
let permit = crate::process_semaphore().acquire().await?;
|
||||||
|
|
||||||
let identifier = input_identifier(
|
let identifier = input_identifier(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
output_format,
|
output_format,
|
||||||
|
@ -101,7 +106,8 @@ async fn process<S: Store + 'static>(
|
||||||
} else {
|
} else {
|
||||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||||
|
|
||||||
let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?;
|
let details =
|
||||||
|
Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?;
|
||||||
|
|
||||||
repo.relate_details(&identifier, &details).await?;
|
repo.relate_details(&identifier, &details).await?;
|
||||||
|
|
||||||
|
@ -121,6 +127,7 @@ async fn process<S: Store + 'static>(
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut processed_reader = crate::magick::process_image_store_read(
|
let mut processed_reader = crate::magick::process_image_store_read(
|
||||||
|
tmp_dir,
|
||||||
store,
|
store,
|
||||||
&identifier,
|
&identifier,
|
||||||
thumbnail_args,
|
thumbnail_args,
|
||||||
|
@ -140,7 +147,7 @@ async fn process<S: Store + 'static>(
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
let details = Details::from_bytes(media.process_timeout, bytes.clone()).await?;
|
let details = Details::from_bytes(tmp_dir, media.process_timeout, bytes.clone()).await?;
|
||||||
|
|
||||||
let identifier = store
|
let identifier = store
|
||||||
.save_bytes(bytes.clone(), details.media_type())
|
.save_bytes(bytes.clone(), details.media_type())
|
||||||
|
@ -166,6 +173,7 @@ async fn process<S: Store + 'static>(
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn input_identifier<S>(
|
async fn input_identifier<S>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
output_format: InputProcessableFormat,
|
output_format: InputProcessableFormat,
|
||||||
|
@ -202,6 +210,7 @@ where
|
||||||
let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp);
|
let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp);
|
||||||
|
|
||||||
let reader = magick::thumbnail(
|
let reader = magick::thumbnail(
|
||||||
|
tmp_dir,
|
||||||
store,
|
store,
|
||||||
&identifier,
|
&identifier,
|
||||||
processable_format,
|
processable_format,
|
||||||
|
@ -222,6 +231,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
let reader = ffmpeg::thumbnail(
|
let reader = ffmpeg::thumbnail(
|
||||||
|
tmp_dir,
|
||||||
store.clone(),
|
store.clone(),
|
||||||
identifier,
|
identifier,
|
||||||
original_details
|
original_details
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead,
|
ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead,
|
||||||
store::Store,
|
store::Store, tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
@ -47,19 +47,20 @@ impl ThumbnailFormat {
|
||||||
|
|
||||||
#[tracing::instrument(skip(store))]
|
#[tracing::instrument(skip(store))]
|
||||||
pub(super) async fn thumbnail<S: Store>(
|
pub(super) async fn thumbnail<S: Store>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
store: S,
|
store: S,
|
||||||
from: Arc<str>,
|
from: Arc<str>,
|
||||||
input_format: InternalVideoFormat,
|
input_format: InternalVideoFormat,
|
||||||
format: ThumbnailFormat,
|
format: ThumbnailFormat,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<BoxRead<'static>, FfMpegError> {
|
) -> Result<BoxRead<'static>, FfMpegError> {
|
||||||
let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension()));
|
let input_file = tmp_dir.tmp_file(Some(input_format.file_extension()));
|
||||||
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
.map_err(FfMpegError::CreateDir)?;
|
.map_err(FfMpegError::CreateDir)?;
|
||||||
|
|
||||||
let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension()));
|
let output_file = tmp_dir.tmp_file(Some(format.to_file_extension()));
|
||||||
let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?;
|
let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&output_file)
|
crate::store::file_store::safe_create_parent(&output_file)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -2,9 +2,11 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
formats::ProcessableFormat, magick::MagickError, process::Process, read::BoxRead, store::Store,
|
formats::ProcessableFormat, magick::MagickError, process::Process, read::BoxRead, store::Store,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
|
|
||||||
async fn thumbnail_animation<F, Fut>(
|
async fn thumbnail_animation<F, Fut>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
input_format: ProcessableFormat,
|
input_format: ProcessableFormat,
|
||||||
format: ProcessableFormat,
|
format: ProcessableFormat,
|
||||||
quality: Option<u8>,
|
quality: Option<u8>,
|
||||||
|
@ -15,7 +17,7 @@ where
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
F: FnOnce(crate::file::File) -> Fut,
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||||
{
|
{
|
||||||
let input_file = crate::tmp_file::tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
|
@ -52,6 +54,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn thumbnail<S: Store + 'static>(
|
pub(super) async fn thumbnail<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
store: &S,
|
store: &S,
|
||||||
identifier: &Arc<str>,
|
identifier: &Arc<str>,
|
||||||
input_format: ProcessableFormat,
|
input_format: ProcessableFormat,
|
||||||
|
@ -65,6 +68,7 @@ pub(super) async fn thumbnail<S: Store + 'static>(
|
||||||
.map_err(MagickError::Store)?;
|
.map_err(MagickError::Store)?;
|
||||||
|
|
||||||
thumbnail_animation(
|
thumbnail_animation(
|
||||||
|
tmp_dir,
|
||||||
input_format,
|
input_format,
|
||||||
format,
|
format,
|
||||||
quality,
|
quality,
|
||||||
|
|
|
@ -86,6 +86,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
crate::magick::process_image_async_read(
|
crate::magick::process_image_async_read(
|
||||||
|
tmp_dir,
|
||||||
validated_reader,
|
validated_reader,
|
||||||
magick_args,
|
magick_args,
|
||||||
format,
|
format,
|
||||||
|
@ -109,7 +110,8 @@ where
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||||
let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?;
|
let details =
|
||||||
|
Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?;
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
|
|
106
src/lib.rs
106
src/lib.rs
|
@ -103,6 +103,7 @@ fn process_semaphore() -> &'static Semaphore {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ensure_details<S: Store + 'static>(
|
async fn ensure_details<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
config: &Configuration,
|
config: &Configuration,
|
||||||
|
@ -124,8 +125,12 @@ async fn ensure_details<S: Store + 'static>(
|
||||||
|
|
||||||
tracing::debug!("generating new details from {:?}", identifier);
|
tracing::debug!("generating new details from {:?}", identifier);
|
||||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||||
let new_details =
|
let new_details = Details::from_bytes(
|
||||||
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()).await?;
|
tmp_dir,
|
||||||
|
config.media.process_timeout,
|
||||||
|
bytes_stream.into_bytes(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
tracing::debug!("storing details for {:?}", identifier);
|
tracing::debug!("storing details for {:?}", identifier);
|
||||||
repo.relate_details(&identifier, &new_details).await?;
|
repo.relate_details(&identifier, &new_details).await?;
|
||||||
tracing::debug!("stored");
|
tracing::debug!("stored");
|
||||||
|
@ -296,28 +301,31 @@ impl<S: Store + 'static> FormData for Import<S> {
|
||||||
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
||||||
async fn upload<S: Store + 'static>(
|
async fn upload<S: Store + 'static>(
|
||||||
Multipart(Upload(value, _)): Multipart<Upload<S>>,
|
Multipart(Upload(value, _)): Multipart<Upload<S>>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
handle_upload(value, repo, store, config).await
|
handle_upload(value, tmp_dir, repo, store, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle responding to successful uploads
|
/// Handle responding to successful uploads
|
||||||
#[tracing::instrument(name = "Imported files", skip(value, repo, store, config))]
|
#[tracing::instrument(name = "Imported files", skip(value, repo, store, config))]
|
||||||
async fn import<S: Store + 'static>(
|
async fn import<S: Store + 'static>(
|
||||||
Multipart(Import(value, _)): Multipart<Import<S>>,
|
Multipart(Import(value, _)): Multipart<Import<S>>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
handle_upload(value, repo, store, config).await
|
handle_upload(value, tmp_dir, repo, store, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle responding to successful uploads
|
/// Handle responding to successful uploads
|
||||||
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
||||||
async fn handle_upload<S: Store + 'static>(
|
async fn handle_upload<S: Store + 'static>(
|
||||||
value: Value<Session>,
|
value: Value<Session>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -339,7 +347,7 @@ async fn handle_upload<S: Store + 'static>(
|
||||||
tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
|
tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
|
||||||
let delete_token = image.result.delete_token();
|
let delete_token = image.result.delete_token();
|
||||||
|
|
||||||
let details = ensure_details(&repo, &store, &config, alias).await?;
|
let details = ensure_details(&tmp_dir, &repo, &store, &config, alias).await?;
|
||||||
|
|
||||||
files.push(serde_json::json!({
|
files.push(serde_json::json!({
|
||||||
"file": alias.to_string(),
|
"file": alias.to_string(),
|
||||||
|
@ -468,6 +476,7 @@ struct ClaimQuery {
|
||||||
/// Claim a backgrounded upload
|
/// Claim a backgrounded upload
|
||||||
#[tracing::instrument(name = "Waiting on upload", skip_all)]
|
#[tracing::instrument(name = "Waiting on upload", skip_all)]
|
||||||
async fn claim_upload<S: Store + 'static>(
|
async fn claim_upload<S: Store + 'static>(
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -487,7 +496,7 @@ async fn claim_upload<S: Store + 'static>(
|
||||||
|
|
||||||
match upload_result {
|
match upload_result {
|
||||||
UploadResult::Success { alias, token } => {
|
UploadResult::Success { alias, token } => {
|
||||||
let details = ensure_details(&repo, &store, &config, &alias).await?;
|
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(&serde_json::json!({
|
Ok(HttpResponse::Ok().json(&serde_json::json!({
|
||||||
"msg": "ok",
|
"msg": "ok",
|
||||||
|
@ -529,7 +538,7 @@ async fn ingest_inline<S: Store + 'static>(
|
||||||
|
|
||||||
let alias = session.alias().expect("alias should exist").to_owned();
|
let alias = session.alias().expect("alias should exist").to_owned();
|
||||||
|
|
||||||
let details = ensure_details(repo, store, config, &alias).await?;
|
let details = ensure_details(tmp_dir, repo, store, config, &alias).await?;
|
||||||
|
|
||||||
let delete_token = session.disarm();
|
let delete_token = session.disarm();
|
||||||
|
|
||||||
|
@ -925,8 +934,11 @@ async fn process<S: Store + 'static>(
|
||||||
|
|
||||||
tracing::debug!("generating new details from {:?}", identifier);
|
tracing::debug!("generating new details from {:?}", identifier);
|
||||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||||
let new_details =
|
let new_details = Details::from_bytes(
|
||||||
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes())
|
&tmp_dir,
|
||||||
|
config.media.process_timeout,
|
||||||
|
bytes_stream.into_bytes(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
tracing::debug!("storing details for {:?}", identifier);
|
tracing::debug!("storing details for {:?}", identifier);
|
||||||
repo.relate_details(&identifier, &new_details).await?;
|
repo.relate_details(&identifier, &new_details).await?;
|
||||||
|
@ -947,9 +959,10 @@ async fn process<S: Store + 'static>(
|
||||||
return Err(UploadError::ReadOnly.into());
|
return Err(UploadError::ReadOnly.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let original_details = ensure_details(&repo, &store, &config, &alias).await?;
|
let original_details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||||
|
|
||||||
let (details, bytes) = generate::generate(
|
let (details, bytes) = generate::generate(
|
||||||
|
&tmp_dir,
|
||||||
&repo,
|
&repo,
|
||||||
&store,
|
&store,
|
||||||
&process_map,
|
&process_map,
|
||||||
|
@ -1001,6 +1014,7 @@ async fn process_head<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
|
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
|
||||||
ext: web::Path<String>,
|
ext: web::Path<String>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1045,8 +1059,11 @@ async fn process_head<S: Store + 'static>(
|
||||||
|
|
||||||
tracing::debug!("generating new details from {:?}", identifier);
|
tracing::debug!("generating new details from {:?}", identifier);
|
||||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||||
let new_details =
|
let new_details = Details::from_bytes(
|
||||||
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes())
|
&tmp_dir,
|
||||||
|
config.media.process_timeout,
|
||||||
|
bytes_stream.into_bytes(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
tracing::debug!("storing details for {:?}", identifier);
|
tracing::debug!("storing details for {:?}", identifier);
|
||||||
repo.relate_details(&identifier, &new_details).await?;
|
repo.relate_details(&identifier, &new_details).await?;
|
||||||
|
@ -1114,6 +1131,7 @@ async fn process_backgrounded<S: Store>(
|
||||||
#[tracing::instrument(name = "Fetching query details", skip(repo, store, config))]
|
#[tracing::instrument(name = "Fetching query details", skip(repo, store, config))]
|
||||||
async fn details_query<S: Store + 'static>(
|
async fn details_query<S: Store + 'static>(
|
||||||
web::Query(alias_query): web::Query<AliasQuery>,
|
web::Query(alias_query): web::Query<AliasQuery>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1130,27 +1148,36 @@ async fn details_query<S: Store + 'static>(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
do_details(alias, repo, store, config).await
|
do_details(alias, tmp_dir, repo, store, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch file details
|
/// Fetch file details
|
||||||
#[tracing::instrument(name = "Fetching details", skip(repo, store, config))]
|
#[tracing::instrument(name = "Fetching details", skip(repo, store, config))]
|
||||||
async fn details<S: Store + 'static>(
|
async fn details<S: Store + 'static>(
|
||||||
alias: web::Path<Serde<Alias>>,
|
alias: web::Path<Serde<Alias>>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
do_details(Serde::into_inner(alias.into_inner()), repo, store, config).await
|
do_details(
|
||||||
|
Serde::into_inner(alias.into_inner()),
|
||||||
|
tmp_dir,
|
||||||
|
repo,
|
||||||
|
store,
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_details<S: Store + 'static>(
|
async fn do_details<S: Store + 'static>(
|
||||||
alias: Alias,
|
alias: Alias,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let details = ensure_details(&repo, &store, &config, &alias).await?;
|
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(&details.into_api_details()))
|
Ok(HttpResponse::Ok().json(&details.into_api_details()))
|
||||||
}
|
}
|
||||||
|
@ -1192,7 +1219,7 @@ async fn serve_query<S: Store + 'static>(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
do_serve(range, alias, repo, store, config).await
|
do_serve(range, alias, tmp_dir, repo, store, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serve files
|
/// Serve files
|
||||||
|
@ -1200,6 +1227,7 @@ async fn serve_query<S: Store + 'static>(
|
||||||
async fn serve<S: Store + 'static>(
|
async fn serve<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
alias: web::Path<Serde<Alias>>,
|
alias: web::Path<Serde<Alias>>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1207,6 +1235,7 @@ async fn serve<S: Store + 'static>(
|
||||||
do_serve(
|
do_serve(
|
||||||
range,
|
range,
|
||||||
Serde::into_inner(alias.into_inner()),
|
Serde::into_inner(alias.into_inner()),
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
config,
|
config,
|
||||||
|
@ -1217,6 +1246,7 @@ async fn serve<S: Store + 'static>(
|
||||||
async fn do_serve<S: Store + 'static>(
|
async fn do_serve<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
alias: Alias,
|
alias: Alias,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1237,7 +1267,7 @@ async fn do_serve<S: Store + 'static>(
|
||||||
return Ok(HttpResponse::NotFound().finish());
|
return Ok(HttpResponse::NotFound().finish());
|
||||||
};
|
};
|
||||||
|
|
||||||
let details = ensure_details(&repo, &store, &config, &alias).await?;
|
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||||
|
|
||||||
if let Some(public_url) = store.public_url(&identifier) {
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
return Ok(HttpResponse::SeeOther()
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
@ -1252,6 +1282,7 @@ async fn do_serve<S: Store + 'static>(
|
||||||
async fn serve_query_head<S: Store + 'static>(
|
async fn serve_query_head<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
web::Query(alias_query): web::Query<AliasQuery>,
|
web::Query(alias_query): web::Query<AliasQuery>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1266,13 +1297,14 @@ async fn serve_query_head<S: Store + 'static>(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
do_serve_head(range, alias, repo, store, config).await
|
do_serve_head(range, alias, tmp_dir, repo, store, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Serving file headers", skip(repo, store, config))]
|
#[tracing::instrument(name = "Serving file headers", skip(repo, store, config))]
|
||||||
async fn serve_head<S: Store + 'static>(
|
async fn serve_head<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
alias: web::Path<Serde<Alias>>,
|
alias: web::Path<Serde<Alias>>,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1280,6 +1312,7 @@ async fn serve_head<S: Store + 'static>(
|
||||||
do_serve_head(
|
do_serve_head(
|
||||||
range,
|
range,
|
||||||
Serde::into_inner(alias.into_inner()),
|
Serde::into_inner(alias.into_inner()),
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
config,
|
config,
|
||||||
|
@ -1290,6 +1323,7 @@ async fn serve_head<S: Store + 'static>(
|
||||||
async fn do_serve_head<S: Store + 'static>(
|
async fn do_serve_head<S: Store + 'static>(
|
||||||
range: Option<web::Header<Range>>,
|
range: Option<web::Header<Range>>,
|
||||||
alias: Alias,
|
alias: Alias,
|
||||||
|
tmp_dir: web::Data<ArcTmpDir>,
|
||||||
repo: web::Data<ArcRepo>,
|
repo: web::Data<ArcRepo>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
config: web::Data<Configuration>,
|
config: web::Data<Configuration>,
|
||||||
|
@ -1299,7 +1333,7 @@ async fn do_serve_head<S: Store + 'static>(
|
||||||
return Ok(HttpResponse::NotFound().finish());
|
return Ok(HttpResponse::NotFound().finish());
|
||||||
};
|
};
|
||||||
|
|
||||||
let details = ensure_details(&repo, &store, &config, &alias).await?;
|
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||||
|
|
||||||
if let Some(public_url) = store.public_url(&identifier) {
|
if let Some(public_url) = store.public_url(&identifier) {
|
||||||
return Ok(HttpResponse::SeeOther()
|
return Ok(HttpResponse::SeeOther()
|
||||||
|
@ -1876,7 +1910,9 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn migrate_inner<S1>(
|
async fn migrate_inner<S1>(
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
repo: ArcRepo,
|
repo: ArcRepo,
|
||||||
client: ClientWithMiddleware,
|
client: ClientWithMiddleware,
|
||||||
from: S1,
|
from: S1,
|
||||||
|
@ -1892,7 +1928,16 @@ where
|
||||||
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let to = FileStore::build(path.clone(), repo.clone()).await?;
|
let to = FileStore::build(path.clone(), repo.clone()).await?;
|
||||||
|
|
||||||
migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await?
|
migrate_store(
|
||||||
|
tmp_dir,
|
||||||
|
repo,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
skip_missing_files,
|
||||||
|
timeout,
|
||||||
|
concurrency,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
}
|
}
|
||||||
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
|
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
|
||||||
endpoint,
|
endpoint,
|
||||||
|
@ -1926,7 +1971,16 @@ where
|
||||||
.await?
|
.await?
|
||||||
.build(client);
|
.build(client);
|
||||||
|
|
||||||
migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await?
|
migrate_store(
|
||||||
|
tmp_dir,
|
||||||
|
repo,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
skip_missing_files,
|
||||||
|
timeout,
|
||||||
|
concurrency,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2022,6 +2076,8 @@ impl PictRsConfiguration {
|
||||||
pub async fn run(self) -> color_eyre::Result<()> {
|
pub async fn run(self) -> color_eyre::Result<()> {
|
||||||
let PictRsConfiguration { config, operation } = self;
|
let PictRsConfiguration { config, operation } = self;
|
||||||
|
|
||||||
|
let tmp_dir = TmpDir::init().await?;
|
||||||
|
|
||||||
let client = build_client()?;
|
let client = build_client()?;
|
||||||
|
|
||||||
match operation {
|
match operation {
|
||||||
|
@ -2038,6 +2094,7 @@ impl PictRsConfiguration {
|
||||||
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
||||||
migrate_inner(
|
migrate_inner(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
client,
|
client,
|
||||||
from,
|
from,
|
||||||
|
@ -2083,6 +2140,7 @@ impl PictRsConfiguration {
|
||||||
.build(client.clone());
|
.build(client.clone());
|
||||||
|
|
||||||
migrate_inner(
|
migrate_inner(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
client,
|
client,
|
||||||
from,
|
from,
|
||||||
|
@ -2112,8 +2170,6 @@ impl PictRsConfiguration {
|
||||||
tracing::warn!("Launching in READ ONLY mode");
|
tracing::warn!("Launching in READ ONLY mode");
|
||||||
}
|
}
|
||||||
|
|
||||||
let tmp_dir = TmpDir::init().await?;
|
|
||||||
|
|
||||||
match config.store.clone() {
|
match config.store.clone() {
|
||||||
config::Store::Filesystem(config::Filesystem { path }) => {
|
config::Store::Filesystem(config::Filesystem { path }) => {
|
||||||
let arc_repo = repo.to_arc();
|
let arc_repo = repo.to_arc();
|
||||||
|
@ -2124,6 +2180,7 @@ impl PictRsConfiguration {
|
||||||
if let Some(path) = config.old_repo_path() {
|
if let Some(path) = config.old_repo_path() {
|
||||||
if let Some(old_repo) = repo_04::open(path)? {
|
if let Some(old_repo) = repo_04::open(path)? {
|
||||||
repo::migrate_04(
|
repo::migrate_04(
|
||||||
|
tmp_dir.clone(),
|
||||||
old_repo,
|
old_repo,
|
||||||
arc_repo.clone(),
|
arc_repo.clone(),
|
||||||
store.clone(),
|
store.clone(),
|
||||||
|
@ -2187,6 +2244,7 @@ impl PictRsConfiguration {
|
||||||
if let Some(path) = config.old_repo_path() {
|
if let Some(path) = config.old_repo_path() {
|
||||||
if let Some(old_repo) = repo_04::open(path)? {
|
if let Some(old_repo) = repo_04::open(path)? {
|
||||||
repo::migrate_04(
|
repo::migrate_04(
|
||||||
|
tmp_dir.clone(),
|
||||||
old_repo,
|
old_repo,
|
||||||
arc_repo.clone(),
|
arc_repo.clone(),
|
||||||
store.clone(),
|
store.clone(),
|
||||||
|
@ -2215,8 +2273,6 @@ impl PictRsConfiguration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self::tmp_file::remove_tmp_dir().await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
||||||
process::{Process, ProcessError},
|
process::{Process, ProcessError},
|
||||||
read::BoxRead,
|
read::BoxRead,
|
||||||
store::Store,
|
store::Store,
|
||||||
|
tmp_file::TmpDir,
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
|
@ -90,6 +91,7 @@ impl MagickError {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_image<F, Fut>(
|
async fn process_image<F, Fut>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
process_args: Vec<String>,
|
process_args: Vec<String>,
|
||||||
input_format: ProcessableFormat,
|
input_format: ProcessableFormat,
|
||||||
format: ProcessableFormat,
|
format: ProcessableFormat,
|
||||||
|
@ -101,7 +103,7 @@ where
|
||||||
F: FnOnce(crate::file::File) -> Fut,
|
F: FnOnce(crate::file::File) -> Fut,
|
||||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||||
{
|
{
|
||||||
let input_file = crate::tmp_file::tmp_file(None);
|
let input_file = tmp_dir.tmp_file(None);
|
||||||
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;
|
||||||
crate::store::file_store::safe_create_parent(&input_file)
|
crate::store::file_store::safe_create_parent(&input_file)
|
||||||
.await
|
.await
|
||||||
|
@ -141,7 +143,9 @@ where
|
||||||
Ok(Box::pin(clean_reader))
|
Ok(Box::pin(clean_reader))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(crate) async fn process_image_store_read<S: Store + 'static>(
|
pub(crate) async fn process_image_store_read<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
store: &S,
|
store: &S,
|
||||||
identifier: &Arc<str>,
|
identifier: &Arc<str>,
|
||||||
args: Vec<String>,
|
args: Vec<String>,
|
||||||
|
@ -156,6 +160,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
|
||||||
.map_err(MagickError::Store)?;
|
.map_err(MagickError::Store)?;
|
||||||
|
|
||||||
process_image(
|
process_image(
|
||||||
|
tmp_dir,
|
||||||
args,
|
args,
|
||||||
input_format,
|
input_format,
|
||||||
format,
|
format,
|
||||||
|
@ -173,6 +178,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
|
pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
async_read: A,
|
async_read: A,
|
||||||
args: Vec<String>,
|
args: Vec<String>,
|
||||||
input_format: ProcessableFormat,
|
input_format: ProcessableFormat,
|
||||||
|
@ -181,6 +187,7 @@ pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<BoxRead<'static>, MagickError> {
|
) -> Result<BoxRead<'static>, MagickError> {
|
||||||
process_image(
|
process_image(
|
||||||
|
tmp_dir,
|
||||||
args,
|
args,
|
||||||
input_format,
|
input_format,
|
||||||
format,
|
format,
|
||||||
|
|
|
@ -14,9 +14,11 @@ use crate::{
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
repo::{ArcRepo, Hash},
|
repo::{ArcRepo, Hash},
|
||||||
store::Store,
|
store::Store,
|
||||||
|
tmp_file::{ArcTmpDir, TmpDir},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(super) async fn migrate_store<S1, S2>(
|
pub(super) async fn migrate_store<S1, S2>(
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
repo: ArcRepo,
|
repo: ArcRepo,
|
||||||
from: S1,
|
from: S1,
|
||||||
to: S2,
|
to: S2,
|
||||||
|
@ -44,6 +46,7 @@ where
|
||||||
let mut failure_count = 0;
|
let mut failure_count = 0;
|
||||||
|
|
||||||
while let Err(e) = do_migrate_store(
|
while let Err(e) = do_migrate_store(
|
||||||
|
tmp_dir.clone(),
|
||||||
repo.clone(),
|
repo.clone(),
|
||||||
from.clone(),
|
from.clone(),
|
||||||
to.clone(),
|
to.clone(),
|
||||||
|
@ -71,6 +74,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MigrateState<S1, S2> {
|
struct MigrateState<S1, S2> {
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
repo: ArcRepo,
|
repo: ArcRepo,
|
||||||
from: S1,
|
from: S1,
|
||||||
to: S2,
|
to: S2,
|
||||||
|
@ -85,6 +89,7 @@ struct MigrateState<S1, S2> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_migrate_store<S1, S2>(
|
async fn do_migrate_store<S1, S2>(
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
repo: ArcRepo,
|
repo: ArcRepo,
|
||||||
from: S1,
|
from: S1,
|
||||||
to: S2,
|
to: S2,
|
||||||
|
@ -114,6 +119,7 @@ where
|
||||||
let mut stream = stream.into_streamer();
|
let mut stream = stream.into_streamer();
|
||||||
|
|
||||||
let state = Rc::new(MigrateState {
|
let state = Rc::new(MigrateState {
|
||||||
|
tmp_dir: tmp_dir.clone(),
|
||||||
repo: repo.clone(),
|
repo: repo.clone(),
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
|
@ -161,6 +167,7 @@ where
|
||||||
S2: Store,
|
S2: Store,
|
||||||
{
|
{
|
||||||
let MigrateState {
|
let MigrateState {
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
|
@ -223,7 +230,17 @@ where
|
||||||
|
|
||||||
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
|
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
|
||||||
if !repo.is_migrated(&identifier).await? {
|
if !repo.is_migrated(&identifier).await? {
|
||||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
|
match migrate_file(
|
||||||
|
tmp_dir,
|
||||||
|
repo,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
&identifier,
|
||||||
|
*skip_missing_files,
|
||||||
|
*timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(new_identifier) => {
|
Ok(new_identifier) => {
|
||||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||||
repo.relate_motion_identifier(hash.clone(), &new_identifier)
|
repo.relate_motion_identifier(hash.clone(), &new_identifier)
|
||||||
|
@ -252,7 +269,17 @@ where
|
||||||
|
|
||||||
for (variant, identifier) in repo.variants(hash.clone()).await? {
|
for (variant, identifier) in repo.variants(hash.clone()).await? {
|
||||||
if !repo.is_migrated(&identifier).await? {
|
if !repo.is_migrated(&identifier).await? {
|
||||||
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
|
match migrate_file(
|
||||||
|
tmp_dir,
|
||||||
|
repo,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
&identifier,
|
||||||
|
*skip_missing_files,
|
||||||
|
*timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(new_identifier) => {
|
Ok(new_identifier) => {
|
||||||
migrate_details(repo, &identifier, &new_identifier).await?;
|
migrate_details(repo, &identifier, &new_identifier).await?;
|
||||||
repo.remove_variant(hash.clone(), variant.clone()).await?;
|
repo.remove_variant(hash.clone(), variant.clone()).await?;
|
||||||
|
@ -282,6 +309,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
match migrate_file(
|
match migrate_file(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
|
@ -340,6 +368,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn migrate_file<S1, S2>(
|
async fn migrate_file<S1, S2>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
from: &S1,
|
from: &S1,
|
||||||
to: &S2,
|
to: &S2,
|
||||||
|
@ -354,7 +383,7 @@ where
|
||||||
let mut failure_count = 0;
|
let mut failure_count = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match do_migrate_file(repo, from, to, identifier, timeout).await {
|
match do_migrate_file(tmp_dir, repo, from, to, identifier, timeout).await {
|
||||||
Ok(identifier) => return Ok(identifier),
|
Ok(identifier) => return Ok(identifier),
|
||||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||||
return Err(MigrateError::From(e));
|
return Err(MigrateError::From(e));
|
||||||
|
@ -383,6 +412,7 @@ enum MigrateError {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_migrate_file<S1, S2>(
|
async fn do_migrate_file<S1, S2>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
from: &S1,
|
from: &S1,
|
||||||
to: &S2,
|
to: &S2,
|
||||||
|
@ -412,7 +442,7 @@ where
|
||||||
.await
|
.await
|
||||||
.map_err(From::from)
|
.map_err(From::from)
|
||||||
.map_err(MigrateError::Details)?;
|
.map_err(MigrateError::Details)?;
|
||||||
let new_details = Details::from_bytes(timeout, bytes_stream.into_bytes())
|
let new_details = Details::from_bytes(tmp_dir, timeout, bytes_stream.into_bytes())
|
||||||
.await
|
.await
|
||||||
.map_err(MigrateError::Details)?;
|
.map_err(MigrateError::Details)?;
|
||||||
repo.relate_details(identifier, &new_details)
|
repo.relate_details(identifier, &new_details)
|
||||||
|
|
|
@ -297,6 +297,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn process_image_jobs<S, F>(
|
async fn process_image_jobs<S, F>(
|
||||||
tmp_dir: &ArcTmpDir,
|
tmp_dir: &ArcTmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
|
|
|
@ -12,7 +12,7 @@ use crate::{
|
||||||
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::Store,
|
store::Store,
|
||||||
tmp_file::ArcTmpDir,
|
tmp_file::{ArcTmpDir, TmpDir},
|
||||||
};
|
};
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
|
@ -55,6 +55,7 @@ where
|
||||||
process_args,
|
process_args,
|
||||||
} => {
|
} => {
|
||||||
generate(
|
generate(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
process_map,
|
process_map,
|
||||||
|
@ -110,7 +111,8 @@ impl Drop for UploadGuard {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(repo, store, client, media))]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
#[tracing::instrument(skip(tmp_dir, repo, store, client, media))]
|
||||||
async fn process_ingest<S>(
|
async fn process_ingest<S>(
|
||||||
tmp_dir: &ArcTmpDir,
|
tmp_dir: &ArcTmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
|
@ -183,6 +185,7 @@ where
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
|
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
|
||||||
async fn generate<S: Store + 'static>(
|
async fn generate<S: Store + 'static>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
process_map: &ProcessMap,
|
process_map: &ProcessMap,
|
||||||
|
@ -204,9 +207,10 @@ async fn generate<S: Store + 'static>(
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let original_details = crate::ensure_details(repo, store, config, &source).await?;
|
let original_details = crate::ensure_details(tmp_dir, repo, store, config, &source).await?;
|
||||||
|
|
||||||
crate::generate::generate(
|
crate::generate::generate(
|
||||||
|
tmp_dir,
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
process_map,
|
process_map,
|
||||||
|
|
|
@ -13,6 +13,7 @@ use crate::{
|
||||||
SledRepo as OldSledRepo,
|
SledRepo as OldSledRepo,
|
||||||
},
|
},
|
||||||
store::Store,
|
store::Store,
|
||||||
|
tmp_file::{ArcTmpDir, TmpDir},
|
||||||
};
|
};
|
||||||
|
|
||||||
const GENERATOR_KEY: &str = "last-path";
|
const GENERATOR_KEY: &str = "last-path";
|
||||||
|
@ -67,6 +68,7 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(crate) async fn migrate_04<S: Store + 'static>(
|
pub(crate) async fn migrate_04<S: Store + 'static>(
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
old_repo: OldSledRepo,
|
old_repo: OldSledRepo,
|
||||||
new_repo: ArcRepo,
|
new_repo: ArcRepo,
|
||||||
store: S,
|
store: S,
|
||||||
|
@ -99,6 +101,7 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
|
||||||
while let Some(res) = hash_stream.next().await {
|
while let Some(res) = hash_stream.next().await {
|
||||||
if let Ok(hash) = res {
|
if let Ok(hash) = res {
|
||||||
set.spawn_local(migrate_hash_04(
|
set.spawn_local(migrate_hash_04(
|
||||||
|
tmp_dir.clone(),
|
||||||
old_repo.clone(),
|
old_repo.clone(),
|
||||||
new_repo.clone(),
|
new_repo.clone(),
|
||||||
store.clone(),
|
store.clone(),
|
||||||
|
@ -169,6 +172,7 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn migrate_hash_04<S: Store>(
|
async fn migrate_hash_04<S: Store>(
|
||||||
|
tmp_dir: ArcTmpDir,
|
||||||
old_repo: OldSledRepo,
|
old_repo: OldSledRepo,
|
||||||
new_repo: ArcRepo,
|
new_repo: ArcRepo,
|
||||||
store: S,
|
store: S,
|
||||||
|
@ -177,8 +181,15 @@ async fn migrate_hash_04<S: Store>(
|
||||||
) {
|
) {
|
||||||
let mut hash_failures = 0;
|
let mut hash_failures = 0;
|
||||||
|
|
||||||
while let Err(e) =
|
while let Err(e) = do_migrate_hash_04(
|
||||||
do_migrate_hash_04(&old_repo, &new_repo, &store, &config, old_hash.clone()).await
|
&tmp_dir,
|
||||||
|
&old_repo,
|
||||||
|
&new_repo,
|
||||||
|
&store,
|
||||||
|
&config,
|
||||||
|
old_hash.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
hash_failures += 1;
|
hash_failures += 1;
|
||||||
|
|
||||||
|
@ -266,6 +277,7 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) ->
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn do_migrate_hash_04<S: Store>(
|
async fn do_migrate_hash_04<S: Store>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
old_repo: &OldSledRepo,
|
old_repo: &OldSledRepo,
|
||||||
new_repo: &ArcRepo,
|
new_repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
@ -279,7 +291,7 @@ async fn do_migrate_hash_04<S: Store>(
|
||||||
|
|
||||||
let size = store.len(&identifier).await?;
|
let size = store.len(&identifier).await?;
|
||||||
|
|
||||||
let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?;
|
let hash_details = set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||||
|
|
||||||
let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?;
|
let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?;
|
||||||
let variants = old_repo.variants(old_hash.clone()).await?;
|
let variants = old_repo.variants(old_hash.clone()).await?;
|
||||||
|
@ -309,7 +321,7 @@ async fn do_migrate_hash_04<S: Store>(
|
||||||
.relate_motion_identifier(hash.clone(), &identifier)
|
.relate_motion_identifier(hash.clone(), &identifier)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
set_details(old_repo, new_repo, store, config, &identifier).await?;
|
set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (variant, identifier) in variants {
|
for (variant, identifier) in variants {
|
||||||
|
@ -317,7 +329,7 @@ async fn do_migrate_hash_04<S: Store>(
|
||||||
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
|
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
set_details(old_repo, new_repo, store, config, &identifier).await?;
|
set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||||
|
|
||||||
new_repo.accessed_variant(hash.clone(), variant).await?;
|
new_repo.accessed_variant(hash.clone(), variant).await?;
|
||||||
}
|
}
|
||||||
|
@ -326,6 +338,7 @@ async fn do_migrate_hash_04<S: Store>(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_details<S: Store>(
|
async fn set_details<S: Store>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
old_repo: &OldSledRepo,
|
old_repo: &OldSledRepo,
|
||||||
new_repo: &ArcRepo,
|
new_repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
@ -335,7 +348,8 @@ async fn set_details<S: Store>(
|
||||||
if let Some(details) = new_repo.details(identifier).await? {
|
if let Some(details) = new_repo.details(identifier).await? {
|
||||||
Ok(details)
|
Ok(details)
|
||||||
} else {
|
} else {
|
||||||
let details = fetch_or_generate_details(old_repo, store, config, identifier).await?;
|
let details =
|
||||||
|
fetch_or_generate_details(tmp_dir, old_repo, store, config, identifier).await?;
|
||||||
new_repo.relate_details(identifier, &details).await?;
|
new_repo.relate_details(identifier, &details).await?;
|
||||||
Ok(details)
|
Ok(details)
|
||||||
}
|
}
|
||||||
|
@ -355,6 +369,7 @@ fn details_semaphore() -> &'static Semaphore {
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn fetch_or_generate_details<S: Store>(
|
async fn fetch_or_generate_details<S: Store>(
|
||||||
|
tmp_dir: &TmpDir,
|
||||||
old_repo: &OldSledRepo,
|
old_repo: &OldSledRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
config: &Configuration,
|
config: &Configuration,
|
||||||
|
@ -369,7 +384,7 @@ async fn fetch_or_generate_details<S: Store>(
|
||||||
let bytes = bytes_stream.into_bytes();
|
let bytes = bytes_stream.into_bytes();
|
||||||
|
|
||||||
let guard = details_semaphore().acquire().await?;
|
let guard = details_semaphore().acquire().await?;
|
||||||
let details = Details::from_bytes(config.media.process_timeout, bytes).await?;
|
let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes).await?;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
|
|
||||||
Ok(details)
|
Ok(details)
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
use std::{
|
use std::{path::PathBuf, sync::Arc};
|
||||||
path::PathBuf,
|
|
||||||
sync::{Arc, OnceLock},
|
|
||||||
};
|
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -34,16 +31,6 @@ impl Drop for TmpDir {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static TMP_DIR: OnceLock<PathBuf> = OnceLock::new();
|
|
||||||
|
|
||||||
fn tmp_dir() -> &'static PathBuf {
|
|
||||||
TMP_DIR.get_or_init(|| {
|
|
||||||
let dir = std::env::temp_dir().join(Uuid::new_v4().to_string());
|
|
||||||
std::fs::create_dir(&dir).unwrap();
|
|
||||||
dir
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TmpFile(PathBuf);
|
struct TmpFile(PathBuf);
|
||||||
|
|
||||||
impl Drop for TmpFile {
|
impl Drop for TmpFile {
|
||||||
|
@ -61,18 +48,6 @@ pin_project_lite::pin_project! {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn tmp_file(ext: Option<&str>) -> PathBuf {
|
|
||||||
if let Some(ext) = ext {
|
|
||||||
tmp_dir().join(format!("{}{}", Uuid::new_v4(), ext))
|
|
||||||
} else {
|
|
||||||
tmp_dir().join(Uuid::new_v4().to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn remove_tmp_dir() -> std::io::Result<()> {
|
|
||||||
tokio::fs::remove_dir_all(tmp_dir()).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn cleanup_tmpfile<R: AsyncRead>(reader: R, file: PathBuf) -> TmpFileCleanup<R> {
|
pub(crate) fn cleanup_tmpfile<R: AsyncRead>(reader: R, file: PathBuf) -> TmpFileCleanup<R> {
|
||||||
TmpFileCleanup {
|
TmpFileCleanup {
|
||||||
inner: reader,
|
inner: reader,
|
||||||
|
|
|
@ -71,7 +71,7 @@ pub(crate) async fn validate_bytes(
|
||||||
width,
|
width,
|
||||||
height,
|
height,
|
||||||
frames,
|
frames,
|
||||||
} = crate::discover::discover_bytes(timeout, bytes.clone()).await?;
|
} = crate::discover::discover_bytes(tmp_dir, timeout, bytes.clone()).await?;
|
||||||
|
|
||||||
match &input {
|
match &input {
|
||||||
InputFile::Image(input) => {
|
InputFile::Image(input) => {
|
||||||
|
@ -186,7 +186,8 @@ fn validate_animation(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(bytes, validations))]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
#[tracing::instrument(skip(tmp_dir, bytes, validations))]
|
||||||
async fn process_animation(
|
async fn process_animation(
|
||||||
tmp_dir: &TmpDir,
|
tmp_dir: &TmpDir,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
|
@ -244,7 +245,8 @@ fn validate_video(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(bytes, validations))]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
#[tracing::instrument(skip(tmp_dir, bytes, validations))]
|
||||||
async fn process_video(
|
async fn process_video(
|
||||||
tmp_dir: &TmpDir,
|
tmp_dir: &TmpDir,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
|
|
Loading…
Reference in a new issue