mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 03:11:24 +00:00
Enable configuring imagemagick security policy from pictrs
This commit is contained in:
parent
50fd3e6182
commit
5805eb0aed
19 changed files with 589 additions and 81 deletions
|
@ -43,6 +43,11 @@ filters = [
|
|||
variants = "7d"
|
||||
proxy = "7d"
|
||||
|
||||
[media.magick]
|
||||
max_width = 10000
|
||||
max_height = 10000
|
||||
max_area = 40000000
|
||||
|
||||
[media.image]
|
||||
max_width = 10000
|
||||
max_height = 10000
|
||||
|
|
35
pict-rs.toml
35
pict-rs.toml
|
@ -244,6 +244,41 @@ variants = "7d"
|
|||
proxy = "7d"
|
||||
|
||||
|
||||
[media.magick]
|
||||
## Optional: maximum width, in pixels, of media that imagemagick will attempt to process
|
||||
# environment variable: PICTRS__MEDIA__MAGICK__MAX_WIDTH
|
||||
# default: 10_000
|
||||
#
|
||||
# This value should be at least as large as the greatest max_width set for images, animations, and
|
||||
# videos. Any image that exceeds this limit will cause imagemagick to abort processing, which could
|
||||
# lead to less clear errors, especially on image upload. In order for pict-rs to return helpful
|
||||
# validation errors for images that don't meet other requirements, imagemagick must be allowed to
|
||||
# process the image.
|
||||
max_width = 10000
|
||||
|
||||
## Optional: maximum height, in pixels, of media that imagemagick will attempt to process
|
||||
# environment variable: PICTRS__MEDIA__MAGICK__MAX_HEIGHT
|
||||
# default: 10_000
|
||||
#
|
||||
# This value should be at least as large as the greatest max_height set for images, animations, and
|
||||
# videos. Any image that exceeds this limit will cause imagemagick to abort processing, which could
|
||||
# lead to less clear errors, especially on image upload. In order for pict-rs to return helpful
|
||||
# validation errors for images that don't meet other requirements, imagemagick must be allowed to
|
||||
# process the image.
|
||||
max_height = 10000
|
||||
|
||||
## Optional: maximum area, in pixels, of media that imagemagick will attempt to process
|
||||
# environment variable: PICTRS__MEDIA__MAGICK__MAX_HEIGHT
|
||||
# default: 40_000_000
|
||||
#
|
||||
# This value should be at least as large as the greatest max_area set for images, animations, and
|
||||
# videos. Any image that exceeds this limit will cause imagemagick to abort processing, which could
|
||||
# lead to less clear errors, especially on image upload. In order for pict-rs to return helpful
|
||||
# validation errors for images that don't meet other requirements, imagemagick must be allowed to
|
||||
# process the image.
|
||||
max_area = 40000000
|
||||
|
||||
|
||||
[media.image]
|
||||
## Optional: max media width (in pixels)
|
||||
# environment variable: PICTRS__MEDIA__IMAGE__MAX_WIDTH
|
||||
|
|
|
@ -67,6 +67,9 @@ impl Args {
|
|||
media_process_timeout,
|
||||
media_retention_variants,
|
||||
media_retention_proxy,
|
||||
media_magick_max_width,
|
||||
media_magick_max_height,
|
||||
media_magick_max_area,
|
||||
media_image_max_width,
|
||||
media_image_max_height,
|
||||
media_image_max_area,
|
||||
|
@ -137,6 +140,12 @@ impl Args {
|
|||
proxy: media_retention_proxy,
|
||||
};
|
||||
|
||||
let magick = Magick {
|
||||
max_width: media_magick_max_width,
|
||||
max_height: media_magick_max_height,
|
||||
max_area: media_magick_max_area,
|
||||
};
|
||||
|
||||
let image_quality = ImageQuality {
|
||||
avif: media_image_quality_avif,
|
||||
jpeg: media_image_quality_jpeg,
|
||||
|
@ -202,6 +211,7 @@ impl Args {
|
|||
external_validation_timeout: media_external_validation_timeout,
|
||||
filters: media_filters,
|
||||
retention: retention.set(),
|
||||
magick: magick.set(),
|
||||
image: image.set(),
|
||||
animation: animation.set(),
|
||||
video: video.set(),
|
||||
|
@ -609,6 +619,8 @@ struct Media {
|
|||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
retention: Option<Retention>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
magick: Option<Magick>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
image: Option<Image>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
animation: Option<Animation>,
|
||||
|
@ -637,6 +649,30 @@ impl Retention {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct Magick {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_width: Option<u16>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_height: Option<u16>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_area: Option<u32>,
|
||||
}
|
||||
|
||||
impl Magick {
|
||||
fn set(self) -> Option<Self> {
|
||||
let any_set =
|
||||
self.max_width.is_some() || self.max_height.is_some() || self.max_area.is_some();
|
||||
|
||||
if any_set {
|
||||
Some(self)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct Image {
|
||||
|
@ -991,6 +1027,16 @@ struct Run {
|
|||
#[arg(long)]
|
||||
media_retention_proxy: Option<RetentionValue>,
|
||||
|
||||
/// The maximum width, in pixels, for uploaded media that imagemagick will attempt to process
|
||||
#[arg(long)]
|
||||
media_magick_max_width: Option<u16>,
|
||||
/// The maximum height, in pixels, for uploaded media that imagemagick will attempt to process
|
||||
#[arg(long)]
|
||||
media_magick_max_height: Option<u16>,
|
||||
/// The maximum area, in pixels, for uploaded media that imagemagick will attempt to process
|
||||
#[arg(long)]
|
||||
media_magick_max_area: Option<u32>,
|
||||
|
||||
/// The maximum width, in pixels, for uploaded images
|
||||
#[arg(long)]
|
||||
media_image_max_width: Option<u16>,
|
||||
|
|
|
@ -77,6 +77,7 @@ struct MediaDefaults {
|
|||
process_timeout: u64,
|
||||
filters: Vec<String>,
|
||||
retention: RetentionDefaults,
|
||||
magick: MagickDefaults,
|
||||
image: ImageDefaults,
|
||||
animation: AnimationDefaults,
|
||||
video: VideoDefaults,
|
||||
|
@ -89,6 +90,14 @@ struct RetentionDefaults {
|
|||
proxy: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct MagickDefaults {
|
||||
max_width: u16,
|
||||
max_height: u16,
|
||||
max_area: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct ImageDefaults {
|
||||
|
@ -256,6 +265,7 @@ impl Default for MediaDefaults {
|
|||
"thumbnail".into(),
|
||||
],
|
||||
retention: Default::default(),
|
||||
magick: Default::default(),
|
||||
image: Default::default(),
|
||||
animation: Default::default(),
|
||||
video: Default::default(),
|
||||
|
@ -272,6 +282,16 @@ impl Default for RetentionDefaults {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for MagickDefaults {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_width: 10_000,
|
||||
max_height: 10_000,
|
||||
max_area: 40_000_000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ImageDefaults {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
|
|
@ -207,6 +207,8 @@ pub(crate) struct Media {
|
|||
|
||||
pub(crate) retention: Retention,
|
||||
|
||||
pub(crate) magick: Magick,
|
||||
|
||||
pub(crate) image: Image,
|
||||
|
||||
pub(crate) animation: Animation,
|
||||
|
@ -221,6 +223,15 @@ pub(crate) struct Retention {
|
|||
pub(crate) proxy: RetentionValue,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Magick {
|
||||
pub(crate) max_width: u16,
|
||||
|
||||
pub(crate) max_height: u16,
|
||||
|
||||
pub(crate) max_area: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Image {
|
||||
pub(crate) max_width: u16,
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::{
|
|||
discover::Discovery,
|
||||
error::Error,
|
||||
formats::{InternalFormat, InternalVideoFormat},
|
||||
magick::PolicyDir,
|
||||
serde_str::Serde,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
@ -82,6 +83,7 @@ impl Details {
|
|||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub(crate) async fn from_bytes(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
timeout: u64,
|
||||
input: web::Bytes,
|
||||
) -> Result<Self, Error> {
|
||||
|
@ -90,7 +92,7 @@ impl Details {
|
|||
width,
|
||||
height,
|
||||
frames,
|
||||
} = crate::discover::discover_bytes(tmp_dir, timeout, input).await?;
|
||||
} = crate::discover::discover_bytes(tmp_dir, policy_dir, timeout, input).await?;
|
||||
|
||||
Ok(Details::from_parts(
|
||||
input.internal_format(),
|
||||
|
|
|
@ -4,7 +4,7 @@ mod magick;
|
|||
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{formats::InputFile, tmp_file::TmpDir};
|
||||
use crate::{formats::InputFile, magick::PolicyDir, tmp_file::TmpDir};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub(crate) struct Discovery {
|
||||
|
@ -29,12 +29,14 @@ pub(crate) enum DiscoverError {
|
|||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn discover_bytes(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
) -> Result<Discovery, crate::error::Error> {
|
||||
let discovery = ffmpeg::discover_bytes(tmp_dir, timeout, bytes.clone()).await?;
|
||||
|
||||
let discovery = magick::confirm_bytes(tmp_dir, discovery, timeout, bytes.clone()).await?;
|
||||
let discovery =
|
||||
magick::confirm_bytes(tmp_dir, policy_dir, discovery, timeout, bytes.clone()).await?;
|
||||
|
||||
let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?;
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use actix_web::web::Bytes;
|
|||
use crate::{
|
||||
discover::DiscoverError,
|
||||
formats::{AnimationFormat, ImageFormat, ImageInput, InputFile},
|
||||
magick::{MagickError, MAGICK_TEMPORARY_PATH},
|
||||
magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
process::Process,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
@ -33,6 +33,7 @@ struct Geometry {
|
|||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn confirm_bytes(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
discovery: Option<Discovery>,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
|
@ -50,7 +51,7 @@ pub(super) async fn confirm_bytes(
|
|||
}
|
||||
}
|
||||
|
||||
discover_file(tmp_dir, timeout, move |mut file| async move {
|
||||
discover_file(tmp_dir, policy_dir, timeout, move |mut file| async move {
|
||||
file.write_from_bytes(bytes)
|
||||
.await
|
||||
.map_err(MagickError::Write)?;
|
||||
|
@ -63,6 +64,7 @@ pub(super) async fn confirm_bytes(
|
|||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn discover_file<F, Fut>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
timeout: u64,
|
||||
f: F,
|
||||
) -> Result<Discovery, MagickError>
|
||||
|
@ -86,7 +88,10 @@ where
|
|||
let tmp_one = (f)(tmp_one).await?;
|
||||
tmp_one.close().await.map_err(MagickError::CloseFile)?;
|
||||
|
||||
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let res = Process::run(
|
||||
"magick",
|
||||
|
|
|
@ -7,6 +7,7 @@ use crate::{
|
|||
error::{Error, UploadError},
|
||||
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
|
||||
future::{WithMetrics, WithTimeout},
|
||||
magick::PolicyDir,
|
||||
repo::{ArcRepo, Hash, VariantAlreadyExists},
|
||||
store::Store,
|
||||
tmp_file::TmpDir,
|
||||
|
@ -49,9 +50,10 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, hash, process_map, config))]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, process_map, config))]
|
||||
pub(crate) async fn generate<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
process_map: &ProcessMap,
|
||||
|
@ -74,6 +76,7 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
} else {
|
||||
let process_fut = process(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
format,
|
||||
|
@ -96,9 +99,10 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, hash, config))]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, config))]
|
||||
async fn process<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
output_format: InputProcessableFormat,
|
||||
|
@ -113,6 +117,7 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
let identifier = input_identifier(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
output_format,
|
||||
|
@ -123,7 +128,8 @@ async fn process<S: Store + 'static>(
|
|||
.await?;
|
||||
|
||||
let input_details =
|
||||
crate::ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await?;
|
||||
crate::ensure_details_identifier(tmp_dir, policy_dir, repo, store, config, &identifier)
|
||||
.await?;
|
||||
|
||||
let input_format = input_details
|
||||
.internal_format()
|
||||
|
@ -141,6 +147,7 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
let vec = crate::magick::process_image_stream_read(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
stream,
|
||||
thumbnail_args,
|
||||
input_format,
|
||||
|
@ -157,7 +164,13 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
drop(permit);
|
||||
|
||||
let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes.clone()).await?;
|
||||
let details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
config.media.process_timeout,
|
||||
bytes.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let identifier = store
|
||||
.save_bytes(bytes.clone(), details.media_type())
|
||||
|
@ -181,9 +194,11 @@ async fn process<S: Store + 'static>(
|
|||
Ok((details, bytes)) as Result<(Details, Bytes), Error>
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn input_identifier<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
output_format: InputProcessableFormat,
|
||||
|
@ -223,6 +238,7 @@ where
|
|||
|
||||
let reader = magick::thumbnail(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
stream,
|
||||
processable_format,
|
||||
ProcessableFormat::Image(thumbnail_format),
|
||||
|
|
|
@ -4,7 +4,7 @@ use actix_web::web::Bytes;
|
|||
|
||||
use crate::{
|
||||
formats::ProcessableFormat,
|
||||
magick::{MagickError, MAGICK_TEMPORARY_PATH},
|
||||
magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
process::{Process, ProcessRead},
|
||||
stream::LocalBoxStream,
|
||||
tmp_file::TmpDir,
|
||||
|
@ -12,6 +12,7 @@ use crate::{
|
|||
|
||||
async fn thumbnail_animation<F, Fut>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
|
@ -59,7 +60,10 @@ where
|
|||
}
|
||||
args.push(output_arg.as_ref());
|
||||
|
||||
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?
|
||||
.read()
|
||||
|
@ -71,6 +75,7 @@ where
|
|||
|
||||
pub(super) async fn thumbnail(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
|
@ -79,6 +84,7 @@ pub(super) async fn thumbnail(
|
|||
) -> Result<ProcessRead, MagickError> {
|
||||
thumbnail_animation(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
|||
error::{Error, UploadError},
|
||||
formats::{InternalFormat, Validations},
|
||||
future::WithMetrics,
|
||||
magick::PolicyDir,
|
||||
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
||||
store::Store,
|
||||
tmp_file::TmpDir,
|
||||
|
@ -50,6 +51,7 @@ where
|
|||
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
store: &S,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
media: &crate::config::Media,
|
||||
|
@ -70,8 +72,14 @@ where
|
|||
};
|
||||
|
||||
tracing::trace!("Validating bytes");
|
||||
let (input_type, process_read) =
|
||||
crate::validate::validate_bytes(tmp_dir, bytes, prescribed, media.process_timeout).await?;
|
||||
let (input_type, process_read) = crate::validate::validate_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
prescribed,
|
||||
media.process_timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let process_read = if let Some(operations) = media.preprocess_steps() {
|
||||
if let Some(format) = input_type.processable_format() {
|
||||
|
@ -87,6 +95,7 @@ where
|
|||
|
||||
crate::magick::process_image_process_read(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
process_read,
|
||||
magick_args,
|
||||
format,
|
||||
|
@ -115,8 +124,13 @@ where
|
|||
.await??;
|
||||
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
let details =
|
||||
Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?;
|
||||
let details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
media.process_timeout,
|
||||
bytes_stream.into_bytes(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(permit);
|
||||
|
||||
|
@ -151,9 +165,11 @@ where
|
|||
Ok((input_type, identifier, details, state))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, client, stream, config))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, stream, config))]
|
||||
pub(crate) async fn ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
|
@ -167,7 +183,7 @@ where
|
|||
let (input_type, identifier, details, state) = if config.server.danger_dummy_mode {
|
||||
dummy_ingest(store, stream).await?
|
||||
} else {
|
||||
process_ingest(tmp_dir, store, stream, &config.media).await?
|
||||
process_ingest(tmp_dir, policy_dir, store, stream, &config.media).await?
|
||||
};
|
||||
|
||||
let mut session = Session {
|
||||
|
|
213
src/lib.rs
213
src/lib.rs
|
@ -42,6 +42,7 @@ use actix_web::{
|
|||
use details::{ApiDetails, HumanDate};
|
||||
use future::WithTimeout;
|
||||
use futures_core::Stream;
|
||||
use magick::{ArcPolicyDir, PolicyDir};
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use middleware::{Metrics, Payload};
|
||||
use repo::ArcRepo;
|
||||
|
@ -106,6 +107,7 @@ fn process_semaphore() -> &'static Semaphore {
|
|||
|
||||
async fn ensure_details<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
|
@ -115,11 +117,12 @@ async fn ensure_details<S: Store + 'static>(
|
|||
return Err(UploadError::MissingAlias.into());
|
||||
};
|
||||
|
||||
ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await
|
||||
ensure_details_identifier(tmp_dir, policy_dir, repo, store, config, &identifier).await
|
||||
}
|
||||
|
||||
async fn ensure_details_identifier<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
|
@ -143,6 +146,7 @@ async fn ensure_details_identifier<S: Store + 'static>(
|
|||
let bytes_stream = store.to_bytes(identifier, None, None).await?;
|
||||
let new_details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
config.media.process_timeout,
|
||||
bytes_stream.into_bytes(),
|
||||
)
|
||||
|
@ -165,6 +169,10 @@ impl<S: Store + 'static> FormData for Upload<S> {
|
|||
.app_data::<web::Data<ArcTmpDir>>()
|
||||
.expect("No TmpDir in request")
|
||||
.clone();
|
||||
let policy_dir = req
|
||||
.app_data::<web::Data<ArcPolicyDir>>()
|
||||
.expect("No TmpDir in request")
|
||||
.clone();
|
||||
let repo = req
|
||||
.app_data::<web::Data<ArcRepo>>()
|
||||
.expect("No repo in request")
|
||||
|
@ -193,6 +201,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
|
|||
"images",
|
||||
Field::array(Field::file(move |filename, _, stream| {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let repo = repo.clone();
|
||||
let store = store.clone();
|
||||
let client = client.clone();
|
||||
|
@ -211,7 +220,14 @@ impl<S: Store + 'static> FormData for Upload<S> {
|
|||
let stream = crate::stream::from_err(stream);
|
||||
|
||||
ingest::ingest(
|
||||
&tmp_dir, &repo, &**store, &client, stream, None, &config,
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&**store,
|
||||
&client,
|
||||
stream,
|
||||
None,
|
||||
&config,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -237,6 +253,10 @@ impl<S: Store + 'static> FormData for Import<S> {
|
|||
.app_data::<web::Data<ArcTmpDir>>()
|
||||
.expect("No TmpDir in request")
|
||||
.clone();
|
||||
let policy_dir = req
|
||||
.app_data::<web::Data<ArcPolicyDir>>()
|
||||
.expect("No TmpDir in request")
|
||||
.clone();
|
||||
let repo = req
|
||||
.app_data::<web::Data<ArcRepo>>()
|
||||
.expect("No repo in request")
|
||||
|
@ -265,6 +285,7 @@ impl<S: Store + 'static> FormData for Import<S> {
|
|||
"images",
|
||||
Field::array(Field::file(move |filename, _, stream| {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let repo = repo.clone();
|
||||
let store = store.clone();
|
||||
let client = client.clone();
|
||||
|
@ -284,6 +305,7 @@ impl<S: Store + 'static> FormData for Import<S> {
|
|||
|
||||
ingest::ingest(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&**store,
|
||||
&client,
|
||||
|
@ -308,34 +330,46 @@ impl<S: Store + 'static> FormData for Import<S> {
|
|||
}
|
||||
|
||||
/// Handle responding to successful uploads
|
||||
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Uploaded files",
|
||||
skip(value, tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn upload<S: Store + 'static>(
|
||||
Multipart(Upload(value, _)): Multipart<Upload<S>>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
handle_upload(value, tmp_dir, repo, store, config).await
|
||||
handle_upload(value, tmp_dir, policy_dir, repo, store, config).await
|
||||
}
|
||||
|
||||
/// Handle responding to successful uploads
|
||||
#[tracing::instrument(name = "Imported files", skip(value, repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Imported files",
|
||||
skip(value, tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn import<S: Store + 'static>(
|
||||
Multipart(Import(value, _)): Multipart<Import<S>>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
handle_upload(value, tmp_dir, repo, store, config).await
|
||||
handle_upload(value, tmp_dir, policy_dir, repo, store, config).await
|
||||
}
|
||||
|
||||
/// Handle responding to successful uploads
|
||||
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Uploaded files",
|
||||
skip(value, tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn handle_upload<S: Store + 'static>(
|
||||
value: Value<Session>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -357,7 +391,8 @@ async fn handle_upload<S: Store + 'static>(
|
|||
tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
|
||||
let delete_token = image.result.delete_token();
|
||||
|
||||
let details = ensure_details(&tmp_dir, &repo, &store, &config, alias).await?;
|
||||
let details =
|
||||
ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, alias).await?;
|
||||
|
||||
files.push(serde_json::json!({
|
||||
"file": alias.to_string(),
|
||||
|
@ -487,6 +522,7 @@ struct ClaimQuery {
|
|||
#[tracing::instrument(name = "Waiting on upload", skip_all)]
|
||||
async fn claim_upload<S: Store + 'static>(
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -506,7 +542,9 @@ async fn claim_upload<S: Store + 'static>(
|
|||
|
||||
match upload_result {
|
||||
UploadResult::Success { alias, token } => {
|
||||
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||
let details =
|
||||
ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias)
|
||||
.await?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(&serde_json::json!({
|
||||
"msg": "ok",
|
||||
|
@ -539,16 +577,20 @@ struct UrlQuery {
|
|||
async fn ingest_inline<S: Store + 'static>(
|
||||
stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static,
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
config: &Configuration,
|
||||
) -> Result<(Alias, DeleteToken, Details), Error> {
|
||||
let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, config).await?;
|
||||
let session = ingest::ingest(
|
||||
tmp_dir, policy_dir, repo, store, client, stream, None, config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let alias = session.alias().expect("alias should exist").to_owned();
|
||||
|
||||
let details = ensure_details(tmp_dir, repo, store, config, &alias).await?;
|
||||
let details = ensure_details(tmp_dir, policy_dir, repo, store, config, &alias).await?;
|
||||
|
||||
let delete_token = session.disarm();
|
||||
|
||||
|
@ -556,10 +598,14 @@ async fn ingest_inline<S: Store + 'static>(
|
|||
}
|
||||
|
||||
/// download an image from a URL
|
||||
#[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Downloading file",
|
||||
skip(client, tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn download<S: Store + 'static>(
|
||||
client: web::Data<ClientWithMiddleware>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -570,7 +616,7 @@ async fn download<S: Store + 'static>(
|
|||
if query.backgrounded {
|
||||
do_download_backgrounded(stream, repo, store).await
|
||||
} else {
|
||||
do_download_inline(stream, &tmp_dir, repo, store, &client, config).await
|
||||
do_download_inline(stream, &tmp_dir, &policy_dir, repo, store, &client, config).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -599,11 +645,12 @@ async fn download_stream(
|
|||
|
||||
#[tracing::instrument(
|
||||
name = "Downloading file inline",
|
||||
skip(stream, repo, store, client, config)
|
||||
skip(stream, tmp_dir, policy_dir, repo, store, client, config)
|
||||
)]
|
||||
async fn do_download_inline<S: Store + 'static>(
|
||||
stream: impl Stream<Item = Result<web::Bytes, Error>> + 'static,
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
client: &ClientWithMiddleware,
|
||||
|
@ -612,7 +659,7 @@ async fn do_download_inline<S: Store + 'static>(
|
|||
metrics::counter!("pict-rs.files", "download" => "inline").increment(1);
|
||||
|
||||
let (alias, delete_token, details) =
|
||||
ingest_inline(stream, tmp_dir, &repo, &store, client, &config).await?;
|
||||
ingest_inline(stream, tmp_dir, policy_dir, &repo, &store, client, &config).await?;
|
||||
|
||||
Ok(HttpResponse::Created().json(&serde_json::json!({
|
||||
"msg": "ok",
|
||||
|
@ -868,13 +915,14 @@ async fn not_found_hash(repo: &ArcRepo) -> Result<Option<(Alias, Hash)>, Error>
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "Serving processed image",
|
||||
skip(tmp_dir, repo, store, client, config, process_map)
|
||||
skip(tmp_dir, policy_dir, repo, store, client, config, process_map)
|
||||
)]
|
||||
async fn process<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
|
||||
ext: web::Path<String>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
client: web::Data<ClientWithMiddleware>,
|
||||
|
@ -891,8 +939,16 @@ async fn process<S: Store + 'static>(
|
|||
} else if !config.server.read_only {
|
||||
let stream = download_stream(&client, proxy.as_str(), &config).await?;
|
||||
|
||||
let (alias, _, _) =
|
||||
ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?;
|
||||
let (alias, _, _) = ingest_inline(
|
||||
stream,
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store,
|
||||
&client,
|
||||
&config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
repo.relate_url(proxy, alias.clone()).await?;
|
||||
|
||||
|
@ -933,7 +989,8 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let details =
|
||||
ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?;
|
||||
ensure_details_identifier(&tmp_dir, &policy_dir, &repo, &store, &config, &identifier)
|
||||
.await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
@ -948,10 +1005,12 @@ async fn process<S: Store + 'static>(
|
|||
return Err(UploadError::ReadOnly.into());
|
||||
}
|
||||
|
||||
let original_details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||
let original_details =
|
||||
ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?;
|
||||
|
||||
let (details, bytes) = generate::generate(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store,
|
||||
&process_map,
|
||||
|
@ -998,15 +1057,17 @@ async fn process<S: Store + 'static>(
|
|||
))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "Serving processed image headers",
|
||||
skip(tmp_dir, repo, store, config)
|
||||
skip(tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn process_head<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
|
||||
ext: web::Path<String>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1040,7 +1101,8 @@ async fn process_head<S: Store + 'static>(
|
|||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let details =
|
||||
ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?;
|
||||
ensure_details_identifier(&tmp_dir, &policy_dir, &repo, &store, &config, &identifier)
|
||||
.await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
@ -1099,10 +1161,14 @@ async fn process_backgrounded<S: Store>(
|
|||
}
|
||||
|
||||
/// Fetch file details
|
||||
#[tracing::instrument(name = "Fetching query details", skip(repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Fetching query details",
|
||||
skip(tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn details_query<S: Store + 'static>(
|
||||
web::Query(alias_query): web::Query<AliasQuery>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1119,14 +1185,18 @@ async fn details_query<S: Store + 'static>(
|
|||
}
|
||||
};
|
||||
|
||||
do_details(alias, tmp_dir, repo, store, config).await
|
||||
do_details(alias, tmp_dir, policy_dir, repo, store, config).await
|
||||
}
|
||||
|
||||
/// Fetch file details
|
||||
#[tracing::instrument(name = "Fetching details", skip(tmp_dir, repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Fetching details",
|
||||
skip(tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn details<S: Store + 'static>(
|
||||
alias: web::Path<Serde<Alias>>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1134,6 +1204,7 @@ async fn details<S: Store + 'static>(
|
|||
do_details(
|
||||
Serde::into_inner(alias.into_inner()),
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
config,
|
||||
|
@ -1144,21 +1215,27 @@ async fn details<S: Store + 'static>(
|
|||
async fn do_details<S: Store + 'static>(
|
||||
alias: Alias,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||
let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(&details.into_api_details()))
|
||||
}
|
||||
|
||||
/// Serve files based on alias query
|
||||
#[tracing::instrument(name = "Serving file query", skip(repo, store, client, config))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
name = "Serving file query",
|
||||
skip(tmp_dir, policy_dir, repo, store, client, config)
|
||||
)]
|
||||
async fn serve_query<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
web::Query(alias_query): web::Query<AliasQuery>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
client: web::Data<ClientWithMiddleware>,
|
||||
|
@ -1172,8 +1249,16 @@ async fn serve_query<S: Store + 'static>(
|
|||
} else if !config.server.read_only {
|
||||
let stream = download_stream(&client, proxy.as_str(), &config).await?;
|
||||
|
||||
let (alias, _, _) =
|
||||
ingest_inline(stream, &tmp_dir, &repo, &store, &client, &config).await?;
|
||||
let (alias, _, _) = ingest_inline(
|
||||
stream,
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store,
|
||||
&client,
|
||||
&config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
repo.relate_url(proxy, alias.clone()).await?;
|
||||
|
||||
|
@ -1190,15 +1275,16 @@ async fn serve_query<S: Store + 'static>(
|
|||
}
|
||||
};
|
||||
|
||||
do_serve(range, alias, tmp_dir, repo, store, config).await
|
||||
do_serve(range, alias, tmp_dir, policy_dir, repo, store, config).await
|
||||
}
|
||||
|
||||
/// Serve files
|
||||
#[tracing::instrument(name = "Serving file", skip(repo, store, config))]
|
||||
#[tracing::instrument(name = "Serving file", skip(tmp_dir, policy_dir, repo, store, config))]
|
||||
async fn serve<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
alias: web::Path<Serde<Alias>>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1207,6 +1293,7 @@ async fn serve<S: Store + 'static>(
|
|||
range,
|
||||
Serde::into_inner(alias.into_inner()),
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
config,
|
||||
|
@ -1218,6 +1305,7 @@ async fn do_serve<S: Store + 'static>(
|
|||
range: Option<web::Header<Range>>,
|
||||
alias: Alias,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1238,7 +1326,7 @@ async fn do_serve<S: Store + 'static>(
|
|||
return Ok(HttpResponse::NotFound().finish());
|
||||
};
|
||||
|
||||
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||
let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
@ -1249,11 +1337,15 @@ async fn do_serve<S: Store + 'static>(
|
|||
ranged_file_resp(&store, identifier, range, details, not_found).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Serving query file headers", skip(repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Serving query file headers",
|
||||
skip(repo, tmp_dir, policy_dir, store, config)
|
||||
)]
|
||||
async fn serve_query_head<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
web::Query(alias_query): web::Query<AliasQuery>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1268,14 +1360,18 @@ async fn serve_query_head<S: Store + 'static>(
|
|||
}
|
||||
};
|
||||
|
||||
do_serve_head(range, alias, tmp_dir, repo, store, config).await
|
||||
do_serve_head(range, alias, tmp_dir, policy_dir, repo, store, config).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Serving file headers", skip(repo, store, config))]
|
||||
#[tracing::instrument(
|
||||
name = "Serving file headers",
|
||||
skip(tmp_dir, policy_dir, repo, store, config)
|
||||
)]
|
||||
async fn serve_head<S: Store + 'static>(
|
||||
range: Option<web::Header<Range>>,
|
||||
alias: web::Path<Serde<Alias>>,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1284,6 +1380,7 @@ async fn serve_head<S: Store + 'static>(
|
|||
range,
|
||||
Serde::into_inner(alias.into_inner()),
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
config,
|
||||
|
@ -1295,6 +1392,7 @@ async fn do_serve_head<S: Store + 'static>(
|
|||
range: Option<web::Header<Range>>,
|
||||
alias: Alias,
|
||||
tmp_dir: web::Data<ArcTmpDir>,
|
||||
policy_dir: web::Data<ArcPolicyDir>,
|
||||
repo: web::Data<ArcRepo>,
|
||||
store: web::Data<S>,
|
||||
config: web::Data<Configuration>,
|
||||
|
@ -1304,7 +1402,7 @@ async fn do_serve_head<S: Store + 'static>(
|
|||
return Ok(HttpResponse::NotFound().finish());
|
||||
};
|
||||
|
||||
let details = ensure_details(&tmp_dir, &repo, &store, &config, &alias).await?;
|
||||
let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
@ -1817,6 +1915,7 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) {
|
|||
|
||||
fn spawn_workers<S>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
store: S,
|
||||
client: ClientWithMiddleware,
|
||||
|
@ -1831,12 +1930,21 @@ fn spawn_workers<S>(
|
|||
);
|
||||
crate::sync::spawn(
|
||||
"process-worker",
|
||||
queue::process_images(tmp_dir, repo, store, client, process_map, config),
|
||||
queue::process_images(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
process_map,
|
||||
config,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
store: FileStore,
|
||||
client: ClientWithMiddleware,
|
||||
|
@ -1853,6 +1961,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
|
|||
|
||||
let server = HttpServer::new(move || {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let client = client.clone();
|
||||
let store = store.clone();
|
||||
let repo = repo.clone();
|
||||
|
@ -1861,6 +1970,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
|
|||
|
||||
spawn_workers(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
repo.clone(),
|
||||
store.clone(),
|
||||
client.clone(),
|
||||
|
@ -1875,6 +1985,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
|
|||
.wrap(Payload::new())
|
||||
.app_data(web::Data::new(process_map.clone()))
|
||||
.app_data(web::Data::new(tmp_dir))
|
||||
.app_data(web::Data::new(policy_dir))
|
||||
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
|
||||
});
|
||||
|
||||
|
@ -1915,6 +2026,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
|
|||
|
||||
async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
store: ObjectStore,
|
||||
client: ClientWithMiddleware,
|
||||
|
@ -1931,6 +2043,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
|
|||
|
||||
let server = HttpServer::new(move || {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let client = client.clone();
|
||||
let store = store.clone();
|
||||
let repo = repo.clone();
|
||||
|
@ -1939,6 +2052,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
|
|||
|
||||
spawn_workers(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
repo.clone(),
|
||||
store.clone(),
|
||||
client.clone(),
|
||||
|
@ -1953,6 +2067,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
|
|||
.wrap(Payload::new())
|
||||
.app_data(web::Data::new(process_map.clone()))
|
||||
.app_data(web::Data::new(tmp_dir))
|
||||
.app_data(web::Data::new(policy_dir))
|
||||
.configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
|
||||
});
|
||||
|
||||
|
@ -1994,6 +2109,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn migrate_inner<S1>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
client: ClientWithMiddleware,
|
||||
from: S1,
|
||||
|
@ -2011,6 +2127,7 @@ where
|
|||
|
||||
migrate_store(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -2054,6 +2171,7 @@ where
|
|||
|
||||
migrate_store(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -2213,6 +2331,7 @@ impl PictRsConfiguration {
|
|||
let PictRsConfiguration { config, operation } = self;
|
||||
|
||||
let tmp_dir = TmpDir::init(&config.server.temporary_directory).await?;
|
||||
let policy_dir = magick::write_magick_policy(&config.media, &tmp_dir).await?;
|
||||
|
||||
let client = build_client()?;
|
||||
|
||||
|
@ -2231,6 +2350,7 @@ impl PictRsConfiguration {
|
|||
let from = FileStore::build(path.clone(), repo.clone()).await?;
|
||||
migrate_inner(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
client,
|
||||
from,
|
||||
|
@ -2277,6 +2397,7 @@ impl PictRsConfiguration {
|
|||
|
||||
migrate_inner(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
client,
|
||||
from,
|
||||
|
@ -2317,6 +2438,7 @@ impl PictRsConfiguration {
|
|||
if let Some(old_repo) = repo_04::open(path)? {
|
||||
repo::migrate_04(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
old_repo,
|
||||
arc_repo.clone(),
|
||||
store.clone(),
|
||||
|
@ -2334,6 +2456,7 @@ impl PictRsConfiguration {
|
|||
Repo::Sled(sled_repo) => {
|
||||
launch_file_store(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
arc_repo,
|
||||
store,
|
||||
client,
|
||||
|
@ -2343,8 +2466,16 @@ impl PictRsConfiguration {
|
|||
.await?;
|
||||
}
|
||||
Repo::Postgres(_) => {
|
||||
launch_file_store(tmp_dir.clone(), arc_repo, store, client, config, |_| {})
|
||||
.await?;
|
||||
launch_file_store(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
arc_repo,
|
||||
store,
|
||||
client,
|
||||
config,
|
||||
|_| {},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2387,6 +2518,7 @@ impl PictRsConfiguration {
|
|||
if let Some(old_repo) = repo_04::open(path)? {
|
||||
repo::migrate_04(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
old_repo,
|
||||
arc_repo.clone(),
|
||||
store.clone(),
|
||||
|
@ -2404,6 +2536,7 @@ impl PictRsConfiguration {
|
|||
Repo::Sled(sled_repo) => {
|
||||
launch_object_store(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
arc_repo,
|
||||
store,
|
||||
client,
|
||||
|
@ -2415,6 +2548,7 @@ impl PictRsConfiguration {
|
|||
Repo::Postgres(_) => {
|
||||
launch_object_store(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
arc_repo,
|
||||
store,
|
||||
client,
|
||||
|
@ -2427,6 +2561,7 @@ impl PictRsConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
policy_dir.cleanup().await?;
|
||||
tmp_dir.cleanup().await?;
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
use std::ffi::OsStr;
|
||||
use std::{ffi::OsStr, ops::Deref, path::Path, sync::Arc};
|
||||
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::{
|
||||
config::Media,
|
||||
error_code::ErrorCode,
|
||||
formats::ProcessableFormat,
|
||||
process::{Process, ProcessError, ProcessRead},
|
||||
stream::LocalBoxStream,
|
||||
tmp_file::TmpDir,
|
||||
tmp_file::{TmpDir, TmpFolder},
|
||||
};
|
||||
|
||||
pub(crate) const MAGICK_TEMPORARY_PATH: &str = "MAGICK_TEMPORARY_PATH";
|
||||
pub(crate) const MAGICK_CONFIGURE_PATH: &str = "MAGICK_CONFIGURE_PATH";
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum MagickError {
|
||||
|
@ -83,8 +85,10 @@ impl MagickError {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_image<F, Fut>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
process_args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
|
@ -137,7 +141,10 @@ where
|
|||
}
|
||||
args.push(output_arg.as_ref());
|
||||
|
||||
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?
|
||||
.read()
|
||||
|
@ -150,6 +157,7 @@ where
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn process_image_stream_read(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
|
@ -159,6 +167,7 @@ pub(crate) async fn process_image_stream_read(
|
|||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
|
@ -175,8 +184,10 @@ pub(crate) async fn process_image_stream_read(
|
|||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn process_image_process_read(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
process_read: ProcessRead,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
|
@ -186,6 +197,7 @@ pub(crate) async fn process_image_process_read(
|
|||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
|
@ -206,3 +218,81 @@ pub(crate) async fn process_image_process_read(
|
|||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) type ArcPolicyDir = Arc<PolicyDir>;
|
||||
|
||||
pub(crate) struct PolicyDir {
|
||||
folder: TmpFolder,
|
||||
}
|
||||
|
||||
impl PolicyDir {
|
||||
pub(crate) async fn cleanup(self: Arc<Self>) -> std::io::Result<()> {
|
||||
if let Some(this) = Arc::into_inner(self) {
|
||||
this.folder.cleanup().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Path> for PolicyDir {
|
||||
fn as_ref(&self) -> &Path {
|
||||
&self.folder
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PolicyDir {
|
||||
type Target = Path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.folder
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn write_magick_policy(
|
||||
media: &Media,
|
||||
tmp_dir: &TmpDir,
|
||||
) -> std::io::Result<ArcPolicyDir> {
|
||||
let folder = tmp_dir.tmp_folder().await?;
|
||||
let file = folder.join("policy.xml");
|
||||
|
||||
let res = tokio::fs::write(&file, generate_policy(media)).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
folder.cleanup().await?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(Arc::new(PolicyDir { folder }))
|
||||
}
|
||||
|
||||
fn generate_policy(media: &Media) -> String {
|
||||
let width = media.magick.max_width;
|
||||
let height = media.magick.max_height;
|
||||
let area = media.magick.max_area;
|
||||
let timeout = media.process_timeout;
|
||||
|
||||
format!(
|
||||
r#"<policymap>
|
||||
<policy domain="resource" name="width" value="{width}" />
|
||||
<policy domain="resource" name="height" value="{height}" />
|
||||
<policy domain="resource" name="area" value="{area}" />
|
||||
<policy domain="resource" name="time" value="{timeout}" />
|
||||
<policy domain="resource" name="memory" value="256MiB" />
|
||||
<policy domain="resource" name="list-length" value="128" />
|
||||
<policy domain="resource" name="map" value="512MiB" />
|
||||
<policy domain="resource" name="disk" value="1GiB" />
|
||||
<policy domain="resource" name="file" value="768" />
|
||||
<policy domain="resource" name="thread" value="2" />
|
||||
<policy domain="path" rights="none" pattern="@*" />
|
||||
<policy domain="coder" rights="none" pattern="*" />
|
||||
<policy domain="coder" rights="read | write" pattern="{{APNG,AVIF,GIF,HEIC,JPEG,JSON,JXL,PNG,WEBP,MP4,WEBM,TMP,PAM}}" />
|
||||
<policy domain="delegate" rights="none" pattern="*" />
|
||||
<policy domain="delegate" rights="execute" pattern="ffmpeg" />
|
||||
<policy domain="filter" rights="none" pattern="*" />
|
||||
<policy domain="module" rights="none" pattern="*" />
|
||||
<policy domain="module" rights="read | write" pattern="{{APNG,AVIF,GIF,HEIC,JPEG,JSON,JXL,PNG,WEBP,TMP,PAM,PNM,VIDEO}}" />
|
||||
<!-- indirect reads not permitted -->
|
||||
<policy domain="system" name="precision" value="6" />
|
||||
</policymap>"#
|
||||
)
|
||||
}
|
||||
|
|
|
@ -12,13 +12,16 @@ use streem::IntoStreamer;
|
|||
use crate::{
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
magick::{ArcPolicyDir, PolicyDir},
|
||||
repo::{ArcRepo, Hash},
|
||||
store::Store,
|
||||
tmp_file::{ArcTmpDir, TmpDir},
|
||||
};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn migrate_store<S1, S2>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
from: S1,
|
||||
to: S2,
|
||||
|
@ -47,6 +50,7 @@ where
|
|||
|
||||
while let Err(e) = do_migrate_store(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
repo.clone(),
|
||||
from.clone(),
|
||||
to.clone(),
|
||||
|
@ -75,6 +79,7 @@ where
|
|||
|
||||
struct MigrateState<S1, S2> {
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
from: S1,
|
||||
to: S2,
|
||||
|
@ -88,8 +93,10 @@ struct MigrateState<S1, S2> {
|
|||
timeout: u64,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn do_migrate_store<S1, S2>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
from: S1,
|
||||
to: S2,
|
||||
|
@ -120,6 +127,7 @@ where
|
|||
|
||||
let state = Rc::new(MigrateState {
|
||||
tmp_dir: tmp_dir.clone(),
|
||||
policy_dir: policy_dir.clone(),
|
||||
repo: repo.clone(),
|
||||
from,
|
||||
to,
|
||||
|
@ -172,6 +180,7 @@ where
|
|||
{
|
||||
let MigrateState {
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -236,6 +245,7 @@ where
|
|||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -275,6 +285,7 @@ where
|
|||
if !repo.is_migrated(&identifier).await? {
|
||||
match migrate_file(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -314,6 +325,7 @@ where
|
|||
|
||||
match migrate_file(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
from,
|
||||
to,
|
||||
|
@ -371,8 +383,10 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn migrate_file<S1, S2>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
|
@ -389,7 +403,7 @@ where
|
|||
loop {
|
||||
tracing::trace!("migrate_file: looping");
|
||||
|
||||
match do_migrate_file(tmp_dir, repo, from, to, identifier, timeout).await {
|
||||
match do_migrate_file(tmp_dir, policy_dir, repo, from, to, identifier, timeout).await {
|
||||
Ok(identifier) => return Ok(identifier),
|
||||
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
|
||||
return Err(MigrateError::From(e));
|
||||
|
@ -419,6 +433,7 @@ enum MigrateError {
|
|||
|
||||
async fn do_migrate_file<S1, S2>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
from: &S1,
|
||||
to: &S2,
|
||||
|
@ -448,9 +463,10 @@ where
|
|||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(MigrateError::Details)?;
|
||||
let new_details = Details::from_bytes(tmp_dir, timeout, bytes_stream.into_bytes())
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
let new_details =
|
||||
Details::from_bytes(tmp_dir, policy_dir, timeout, bytes_stream.into_bytes())
|
||||
.await
|
||||
.map_err(MigrateError::Details)?;
|
||||
repo.relate_details(identifier, &new_details)
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
|
|
19
src/queue.rs
19
src/queue.rs
|
@ -4,6 +4,7 @@ use crate::{
|
|||
error::{Error, UploadError},
|
||||
formats::InputProcessableFormat,
|
||||
future::LocalBoxFuture,
|
||||
magick::ArcPolicyDir,
|
||||
repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
|
||||
serde_str::Serde,
|
||||
store::Store,
|
||||
|
@ -197,6 +198,7 @@ pub(crate) async fn process_cleanup<S: Store + 'static>(
|
|||
|
||||
pub(crate) async fn process_images<S: Store + 'static>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
store: S,
|
||||
client: ClientWithMiddleware,
|
||||
|
@ -205,6 +207,7 @@ pub(crate) async fn process_images<S: Store + 'static>(
|
|||
) {
|
||||
process_image_jobs(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store,
|
||||
&client,
|
||||
|
@ -340,6 +343,7 @@ where
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_image_jobs<S, F>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
|
@ -351,6 +355,7 @@ async fn process_image_jobs<S, F>(
|
|||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcTmpDir,
|
||||
&'a ArcPolicyDir,
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a ClientWithMiddleware,
|
||||
|
@ -369,6 +374,7 @@ async fn process_image_jobs<S, F>(
|
|||
|
||||
let res = image_job_loop(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
|
@ -398,6 +404,7 @@ async fn process_image_jobs<S, F>(
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn image_job_loop<S, F>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
|
@ -411,6 +418,7 @@ where
|
|||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcTmpDir,
|
||||
&'a ArcPolicyDir,
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a ClientWithMiddleware,
|
||||
|
@ -435,7 +443,16 @@ where
|
|||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(tmp_dir, repo, store, client, process_map, config, job),
|
||||
(callback)(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
process_map,
|
||||
config,
|
||||
job,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ use crate::{
|
|||
formats::InputProcessableFormat,
|
||||
future::LocalBoxFuture,
|
||||
ingest::Session,
|
||||
magick::{ArcPolicyDir, PolicyDir},
|
||||
queue::Process,
|
||||
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
||||
serde_str::Serde,
|
||||
|
@ -17,8 +18,10 @@ use crate::{
|
|||
};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn perform<'a, S>(
|
||||
tmp_dir: &'a ArcTmpDir,
|
||||
policy_dir: &'a ArcPolicyDir,
|
||||
repo: &'a ArcRepo,
|
||||
store: &'a S,
|
||||
client: &'a ClientWithMiddleware,
|
||||
|
@ -39,6 +42,7 @@ where
|
|||
} => {
|
||||
process_ingest(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
|
@ -57,6 +61,7 @@ where
|
|||
} => {
|
||||
generate(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
process_map,
|
||||
|
@ -113,9 +118,10 @@ impl Drop for UploadGuard {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, client, config))]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, config))]
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
|
@ -131,6 +137,7 @@ where
|
|||
|
||||
let fut = async {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let ident = unprocessed_identifier.clone();
|
||||
let store2 = store.clone();
|
||||
let repo = repo.clone();
|
||||
|
@ -147,6 +154,7 @@ where
|
|||
|
||||
let session = crate::ingest::ingest(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store2,
|
||||
&client,
|
||||
|
@ -191,9 +199,19 @@ where
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
|
||||
#[tracing::instrument(skip(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
process_map,
|
||||
process_path,
|
||||
process_args,
|
||||
config
|
||||
))]
|
||||
async fn generate<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
process_map: &ProcessMap,
|
||||
|
@ -215,10 +233,12 @@ async fn generate<S: Store + 'static>(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let original_details = crate::ensure_details(tmp_dir, repo, store, config, &source).await?;
|
||||
let original_details =
|
||||
crate::ensure_details(tmp_dir, policy_dir, repo, store, config, &source).await?;
|
||||
|
||||
crate::generate::generate(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
process_map,
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::{
|
|||
config::Configuration,
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
magick::{ArcPolicyDir, PolicyDir},
|
||||
repo::{ArcRepo, DeleteToken, Hash},
|
||||
repo_04::{
|
||||
AliasRepo as _, HashRepo as _, IdentifierRepo as _, SettingsRepo as _,
|
||||
|
@ -80,6 +81,7 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
|
|||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn migrate_04<S: Store + 'static>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
old_repo: OldSledRepo,
|
||||
new_repo: ArcRepo,
|
||||
store: S,
|
||||
|
@ -115,6 +117,7 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
|
|||
if let Ok(hash) = res {
|
||||
set.spawn_local(migrate_hash_04(
|
||||
tmp_dir.clone(),
|
||||
policy_dir.clone(),
|
||||
old_repo.clone(),
|
||||
new_repo.clone(),
|
||||
store.clone(),
|
||||
|
@ -192,6 +195,7 @@ async fn migrate_hash(old_repo: ArcRepo, new_repo: ArcRepo, hash: Hash) {
|
|||
|
||||
async fn migrate_hash_04<S: Store>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
old_repo: OldSledRepo,
|
||||
new_repo: ArcRepo,
|
||||
store: S,
|
||||
|
@ -202,6 +206,7 @@ async fn migrate_hash_04<S: Store>(
|
|||
|
||||
while let Err(e) = timed_migrate_hash_04(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&old_repo,
|
||||
&new_repo,
|
||||
&store,
|
||||
|
@ -296,6 +301,7 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) ->
|
|||
|
||||
async fn timed_migrate_hash_04<S: Store>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
old_repo: &OldSledRepo,
|
||||
new_repo: &ArcRepo,
|
||||
store: &S,
|
||||
|
@ -304,7 +310,9 @@ async fn timed_migrate_hash_04<S: Store>(
|
|||
) -> Result<(), Error> {
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(config.media.external_validation_timeout * 6),
|
||||
do_migrate_hash_04(tmp_dir, old_repo, new_repo, store, config, old_hash),
|
||||
do_migrate_hash_04(
|
||||
tmp_dir, policy_dir, old_repo, new_repo, store, config, old_hash,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| UploadError::ProcessTimeout)?
|
||||
|
@ -313,6 +321,7 @@ async fn timed_migrate_hash_04<S: Store>(
|
|||
#[tracing::instrument(skip_all)]
|
||||
async fn do_migrate_hash_04<S: Store>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
old_repo: &OldSledRepo,
|
||||
new_repo: &ArcRepo,
|
||||
store: &S,
|
||||
|
@ -326,7 +335,16 @@ async fn do_migrate_hash_04<S: Store>(
|
|||
|
||||
let size = store.len(&identifier).await?;
|
||||
|
||||
let hash_details = set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||
let hash_details = set_details(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
old_repo,
|
||||
new_repo,
|
||||
store,
|
||||
config,
|
||||
&identifier,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?;
|
||||
let variants = old_repo.variants(old_hash.clone()).await?;
|
||||
|
@ -356,7 +374,16 @@ async fn do_migrate_hash_04<S: Store>(
|
|||
.relate_motion_identifier(hash.clone(), &identifier)
|
||||
.await?;
|
||||
|
||||
set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||
set_details(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
old_repo,
|
||||
new_repo,
|
||||
store,
|
||||
config,
|
||||
&identifier,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for (variant, identifier) in variants {
|
||||
|
@ -364,7 +391,16 @@ async fn do_migrate_hash_04<S: Store>(
|
|||
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
|
||||
.await?;
|
||||
|
||||
set_details(tmp_dir, old_repo, new_repo, store, config, &identifier).await?;
|
||||
set_details(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
old_repo,
|
||||
new_repo,
|
||||
store,
|
||||
config,
|
||||
&identifier,
|
||||
)
|
||||
.await?;
|
||||
|
||||
new_repo.accessed_variant(hash.clone(), variant).await?;
|
||||
}
|
||||
|
@ -374,6 +410,7 @@ async fn do_migrate_hash_04<S: Store>(
|
|||
|
||||
async fn set_details<S: Store>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
old_repo: &OldSledRepo,
|
||||
new_repo: &ArcRepo,
|
||||
store: &S,
|
||||
|
@ -384,7 +421,8 @@ async fn set_details<S: Store>(
|
|||
Ok(details)
|
||||
} else {
|
||||
let details =
|
||||
fetch_or_generate_details(tmp_dir, old_repo, store, config, identifier).await?;
|
||||
fetch_or_generate_details(tmp_dir, policy_dir, old_repo, store, config, identifier)
|
||||
.await?;
|
||||
new_repo.relate_details(identifier, &details).await?;
|
||||
Ok(details)
|
||||
}
|
||||
|
@ -405,6 +443,7 @@ fn details_semaphore() -> &'static Semaphore {
|
|||
#[tracing::instrument(skip_all)]
|
||||
async fn fetch_or_generate_details<S: Store>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
old_repo: &OldSledRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
|
@ -419,7 +458,8 @@ async fn fetch_or_generate_details<S: Store>(
|
|||
let bytes = bytes_stream.into_bytes();
|
||||
|
||||
let guard = details_semaphore().acquire().await?;
|
||||
let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes).await?;
|
||||
let details =
|
||||
Details::from_bytes(tmp_dir, policy_dir, config.media.process_timeout, bytes).await?;
|
||||
drop(guard);
|
||||
|
||||
Ok(details)
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::{
|
|||
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
|
||||
InternalFormat, Validations,
|
||||
},
|
||||
magick::PolicyDir,
|
||||
process::ProcessRead,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
@ -58,6 +59,7 @@ const MEGABYTES: usize = 1024 * 1024;
|
|||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn validate_bytes(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
bytes: Bytes,
|
||||
validations: Validations<'_>,
|
||||
timeout: u64,
|
||||
|
@ -71,12 +73,13 @@ pub(crate) async fn validate_bytes(
|
|||
width,
|
||||
height,
|
||||
frames,
|
||||
} = crate::discover::discover_bytes(tmp_dir, timeout, bytes.clone()).await?;
|
||||
} = crate::discover::discover_bytes(tmp_dir, policy_dir, timeout, bytes.clone()).await?;
|
||||
|
||||
match &input {
|
||||
InputFile::Image(input) => {
|
||||
let (format, process_read) = process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
*input,
|
||||
width,
|
||||
|
@ -91,6 +94,7 @@ pub(crate) async fn validate_bytes(
|
|||
InputFile::Animation(input) => {
|
||||
let (format, process_read) = process_animation(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
*input,
|
||||
width,
|
||||
|
@ -121,9 +125,11 @@ pub(crate) async fn validate_bytes(
|
|||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(tmp_dir, bytes, validations))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))]
|
||||
async fn process_image(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
bytes: Bytes,
|
||||
input: ImageInput,
|
||||
width: u16,
|
||||
|
@ -152,7 +158,16 @@ async fn process_image(
|
|||
let process_read = if needs_transcode {
|
||||
let quality = validations.quality_for(format);
|
||||
|
||||
magick::convert_image(tmp_dir, input.format, format, quality, timeout, bytes).await?
|
||||
magick::convert_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
input.format,
|
||||
format,
|
||||
quality,
|
||||
timeout,
|
||||
bytes,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
exiftool::clear_metadata_bytes_read(bytes, timeout)?
|
||||
};
|
||||
|
@ -187,9 +202,10 @@ fn validate_animation(
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, bytes, validations))]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))]
|
||||
async fn process_animation(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
bytes: Bytes,
|
||||
input: AnimationFormat,
|
||||
width: u16,
|
||||
|
@ -208,7 +224,8 @@ async fn process_animation(
|
|||
let process_read = if needs_transcode {
|
||||
let quality = validations.quality_for(format);
|
||||
|
||||
magick::convert_animation(tmp_dir, input, format, quality, timeout, bytes).await?
|
||||
magick::convert_animation(tmp_dir, policy_dir, input, format, quality, timeout, bytes)
|
||||
.await?
|
||||
} else {
|
||||
exiftool::clear_metadata_bytes_read(bytes, timeout)?
|
||||
};
|
||||
|
|
|
@ -4,13 +4,14 @@ use actix_web::web::Bytes;
|
|||
|
||||
use crate::{
|
||||
formats::{AnimationFormat, ImageFormat},
|
||||
magick::{MagickError, MAGICK_TEMPORARY_PATH},
|
||||
magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
process::{Process, ProcessRead},
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
||||
pub(super) async fn convert_image(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
input: ImageFormat,
|
||||
output: ImageFormat,
|
||||
quality: Option<u8>,
|
||||
|
@ -19,6 +20,7 @@ pub(super) async fn convert_image(
|
|||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
input.magick_format(),
|
||||
output.magick_format(),
|
||||
false,
|
||||
|
@ -31,6 +33,7 @@ pub(super) async fn convert_image(
|
|||
|
||||
pub(super) async fn convert_animation(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
input: AnimationFormat,
|
||||
output: AnimationFormat,
|
||||
quality: Option<u8>,
|
||||
|
@ -39,6 +42,7 @@ pub(super) async fn convert_animation(
|
|||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
input.magick_format(),
|
||||
output.magick_format(),
|
||||
true,
|
||||
|
@ -49,8 +53,10 @@ pub(super) async fn convert_animation(
|
|||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn convert(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
input: &'static str,
|
||||
output: &'static str,
|
||||
coalesce: bool,
|
||||
|
@ -96,7 +102,10 @@ async fn convert(
|
|||
|
||||
args.push(output_arg.as_ref());
|
||||
|
||||
let envs = [(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str())];
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?.read();
|
||||
|
||||
|
|
Loading…
Reference in a new issue