mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-11-09 22:14:59 +00:00
Add danger_dummy_mode
This commit is contained in:
parent
5fd59fc5b4
commit
210af5d7d9
10 changed files with 159 additions and 110 deletions
|
@ -1,6 +1,7 @@
|
|||
[server]
|
||||
address = "0.0.0.0:8080"
|
||||
read_only = false
|
||||
danger_dummy_mode = false
|
||||
max_file_count = 1
|
||||
|
||||
[client]
|
||||
|
|
|
@ -12,6 +12,14 @@ address = '0.0.0.0:8080'
|
|||
# This can be useful if you need to run a copy of pict-rs while performing maintenance.
|
||||
read_only = false
|
||||
|
||||
## Optional: whether to run pict-rs without dependencies.
|
||||
# environment variable: PICTRS__SERVER__DANGER_DUMMY_MODE
|
||||
# default: false
|
||||
#
|
||||
# This means pict-rs will not be able to inspect metadata of uploaded media, or perform processing
|
||||
# on it. This mode is provided for use in test environments. It should not be used in production.
|
||||
danger_dummy_mode = false
|
||||
|
||||
## Optional: shared secret for internal endpoints
|
||||
# environment variable: PICTRS__SERVER__API_KEY
|
||||
# default: empty
|
||||
|
|
|
@ -100,6 +100,7 @@ impl Args {
|
|||
media_video_quality_2160,
|
||||
media_filters,
|
||||
read_only,
|
||||
danger_dummy_mode,
|
||||
max_file_count,
|
||||
store,
|
||||
}) => {
|
||||
|
@ -107,6 +108,7 @@ impl Args {
|
|||
address,
|
||||
api_key,
|
||||
read_only,
|
||||
danger_dummy_mode,
|
||||
max_file_count,
|
||||
};
|
||||
|
||||
|
@ -509,6 +511,8 @@ struct Server {
|
|||
api_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
read_only: bool,
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
danger_dummy_mode: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_file_count: Option<u32>,
|
||||
}
|
||||
|
@ -1097,6 +1101,11 @@ struct Run {
|
|||
#[arg(long)]
|
||||
read_only: bool,
|
||||
|
||||
/// Allow running without ffmpeg, imagemagick, or exiftool. This will allow hosting arbitrary
|
||||
/// files and provide inaccurate metadata for uploaded media
|
||||
#[arg(long)]
|
||||
danger_dummy_mode: bool,
|
||||
|
||||
#[command(subcommand)]
|
||||
store: Option<RunStore>,
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ pub(crate) struct Defaults {
|
|||
struct ServerDefaults {
|
||||
address: SocketAddr,
|
||||
read_only: bool,
|
||||
danger_dummy_mode: bool,
|
||||
max_file_count: u32,
|
||||
}
|
||||
|
||||
|
@ -181,6 +182,7 @@ impl Default for ServerDefaults {
|
|||
ServerDefaults {
|
||||
address: "0.0.0.0:8080".parse().expect("Valid address string"),
|
||||
read_only: false,
|
||||
danger_dummy_mode: false,
|
||||
max_file_count: 1,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,6 +113,8 @@ pub(crate) struct Server {
|
|||
|
||||
pub(crate) read_only: bool,
|
||||
|
||||
pub(crate) danger_dummy_mode: bool,
|
||||
|
||||
pub(crate) max_file_count: u32,
|
||||
}
|
||||
|
||||
|
|
|
@ -119,6 +119,18 @@ impl Details {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn danger_dummy(format: InternalFormat) -> Self {
|
||||
Self::from_parts_full(
|
||||
format,
|
||||
0,
|
||||
0,
|
||||
None,
|
||||
HumanDate {
|
||||
timestamp: time::OffsetDateTime::now_utc(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn from_parts_full(
|
||||
format: InternalFormat,
|
||||
width: u16,
|
||||
|
|
|
@ -42,7 +42,7 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(repo, store, hash, process_map, media))]
|
||||
#[tracing::instrument(skip(repo, store, hash, process_map, config))]
|
||||
pub(crate) async fn generate<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
repo: &ArcRepo,
|
||||
|
@ -52,30 +52,41 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
thumbnail_path: PathBuf,
|
||||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
media: &crate::config::Media,
|
||||
config: &crate::config::Configuration,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
let process_fut = process(
|
||||
tmp_dir,
|
||||
repo,
|
||||
store,
|
||||
format,
|
||||
thumbnail_path.clone(),
|
||||
thumbnail_args,
|
||||
original_details,
|
||||
media,
|
||||
hash.clone(),
|
||||
);
|
||||
if config.server.danger_dummy_mode {
|
||||
let identifier = repo
|
||||
.identifier(hash)
|
||||
.await?
|
||||
.ok_or(UploadError::MissingIdentifier)?;
|
||||
|
||||
let (details, bytes) = process_map
|
||||
.process(hash, thumbnail_path, process_fut)
|
||||
.await?;
|
||||
let bytes = store.to_bytes(&identifier, None, None).await?.into_bytes();
|
||||
|
||||
Ok((details, bytes))
|
||||
Ok((original_details.clone(), bytes))
|
||||
} else {
|
||||
let process_fut = process(
|
||||
tmp_dir,
|
||||
repo,
|
||||
store,
|
||||
format,
|
||||
thumbnail_path.clone(),
|
||||
thumbnail_args,
|
||||
original_details,
|
||||
config,
|
||||
hash.clone(),
|
||||
);
|
||||
|
||||
let (details, bytes) = process_map
|
||||
.process(hash, thumbnail_path, process_fut)
|
||||
.await?;
|
||||
|
||||
Ok((details, bytes))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(repo, store, hash, media))]
|
||||
#[tracing::instrument(skip(repo, store, hash, config))]
|
||||
async fn process<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
repo: &ArcRepo,
|
||||
|
@ -84,7 +95,7 @@ async fn process<S: Store + 'static>(
|
|||
thumbnail_path: PathBuf,
|
||||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
media: &crate::config::Media,
|
||||
config: &crate::config::Configuration,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
let guard = MetricsGuard::guard();
|
||||
|
@ -97,22 +108,12 @@ async fn process<S: Store + 'static>(
|
|||
output_format,
|
||||
hash.clone(),
|
||||
original_details,
|
||||
media,
|
||||
&config.media,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let input_details = if let Some(details) = repo.details(&identifier).await? {
|
||||
details
|
||||
} else {
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
|
||||
let details =
|
||||
Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?;
|
||||
|
||||
repo.relate_details(&identifier, &details).await?;
|
||||
|
||||
details
|
||||
};
|
||||
let input_details =
|
||||
crate::ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await?;
|
||||
|
||||
let input_format = input_details
|
||||
.internal_format()
|
||||
|
@ -122,8 +123,8 @@ async fn process<S: Store + 'static>(
|
|||
let format = input_format.process_to(output_format);
|
||||
|
||||
let quality = match format {
|
||||
ProcessableFormat::Image(format) => media.image.quality_for(format),
|
||||
ProcessableFormat::Animation(format) => media.animation.quality_for(format),
|
||||
ProcessableFormat::Image(format) => config.media.image.quality_for(format),
|
||||
ProcessableFormat::Animation(format) => config.media.animation.quality_for(format),
|
||||
};
|
||||
|
||||
let mut processed_reader = crate::magick::process_image_store_read(
|
||||
|
@ -134,7 +135,7 @@ async fn process<S: Store + 'static>(
|
|||
input_format,
|
||||
format,
|
||||
quality,
|
||||
media.process_timeout,
|
||||
config.media.process_timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -147,7 +148,7 @@ async fn process<S: Store + 'static>(
|
|||
|
||||
drop(permit);
|
||||
|
||||
let details = Details::from_bytes(tmp_dir, media.process_timeout, bytes.clone()).await?;
|
||||
let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes.clone()).await?;
|
||||
|
||||
let identifier = store
|
||||
.save_bytes(bytes.clone(), details.media_type())
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration};
|
||||
|
||||
use crate::{
|
||||
bytes_stream::BytesStream,
|
||||
|
@ -18,7 +18,7 @@ use streem::IntoStreamer;
|
|||
use tracing::{Instrument, Span};
|
||||
|
||||
mod hasher;
|
||||
use hasher::Hasher;
|
||||
use hasher::{Hasher, State};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Session {
|
||||
|
@ -46,16 +46,12 @@ where
|
|||
Ok(buf.into_bytes())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(repo, store, client, stream, media))]
|
||||
pub(crate) async fn ingest<S>(
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
declared_alias: Option<Alias>,
|
||||
media: &crate::config::Media,
|
||||
) -> Result<Session, Error>
|
||||
) -> Result<(InternalFormat, Arc<str>, Details, Rc<RefCell<State>>), Error>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
|
@ -115,6 +111,56 @@ where
|
|||
|
||||
drop(permit);
|
||||
|
||||
Ok((input_type, identifier, details, state))
|
||||
}
|
||||
|
||||
async fn dummy_ingest<S>(
|
||||
store: &S,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
) -> Result<(InternalFormat, Arc<str>, Details, Rc<RefCell<State>>), Error>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
let stream = crate::stream::map(stream, |res| match res {
|
||||
Ok(bytes) => Ok(bytes),
|
||||
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|
||||
});
|
||||
|
||||
let reader = Box::pin(tokio_util::io::StreamReader::new(stream));
|
||||
|
||||
let hasher_reader = Hasher::new(reader);
|
||||
let state = hasher_reader.state();
|
||||
|
||||
let input_type = InternalFormat::Image(crate::formats::ImageFormat::Png);
|
||||
|
||||
let identifier = store
|
||||
.save_async_read(hasher_reader, input_type.media_type())
|
||||
.await?;
|
||||
|
||||
let details = Details::danger_dummy(input_type);
|
||||
|
||||
Ok((input_type, identifier, details, state))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(repo, store, client, stream, config))]
|
||||
pub(crate) async fn ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
declared_alias: Option<Alias>,
|
||||
config: &crate::config::Configuration,
|
||||
) -> Result<Session, Error>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
let (input_type, identifier, details, state) = if config.server.danger_dummy_mode {
|
||||
dummy_ingest(store, stream).await?
|
||||
} else {
|
||||
process_ingest(tmp_dir, store, stream, &config.media).await?
|
||||
};
|
||||
|
||||
let mut session = Session {
|
||||
repo: repo.clone(),
|
||||
delete_token: DeleteToken::generate(),
|
||||
|
@ -123,12 +169,14 @@ where
|
|||
identifier: Some(identifier.clone()),
|
||||
};
|
||||
|
||||
if let Some(endpoint) = &media.external_validation {
|
||||
if let Some(endpoint) = &config.media.external_validation {
|
||||
let stream = store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let response = client
|
||||
.post(endpoint.as_str())
|
||||
.timeout(Duration::from_secs(media.external_validation_timeout))
|
||||
.timeout(Duration::from_secs(
|
||||
config.media.external_validation_timeout,
|
||||
))
|
||||
.header("Content-Type", input_type.media_type().as_ref())
|
||||
.body(Body::wrap_stream(crate::stream::make_send(stream)))
|
||||
.send()
|
||||
|
|
82
src/lib.rs
82
src/lib.rs
|
@ -113,6 +113,16 @@ async fn ensure_details<S: Store + 'static>(
|
|||
return Err(UploadError::MissingAlias.into());
|
||||
};
|
||||
|
||||
ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await
|
||||
}
|
||||
|
||||
async fn ensure_details_identifier<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
identifier: &Arc<str>,
|
||||
) -> Result<Details, Error> {
|
||||
let details = repo.details(&identifier).await?;
|
||||
|
||||
if let Some(details) = details {
|
||||
|
@ -121,10 +131,14 @@ async fn ensure_details<S: Store + 'static>(
|
|||
} else {
|
||||
if config.server.read_only {
|
||||
return Err(UploadError::ReadOnly.into());
|
||||
} else if config.server.danger_dummy_mode {
|
||||
return Ok(Details::danger_dummy(formats::InternalFormat::Image(
|
||||
formats::ImageFormat::Png,
|
||||
)));
|
||||
}
|
||||
|
||||
tracing::debug!("generating new details from {:?}", identifier);
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
let bytes_stream = store.to_bytes(identifier, None, None).await?;
|
||||
let new_details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
config.media.process_timeout,
|
||||
|
@ -132,7 +146,7 @@ async fn ensure_details<S: Store + 'static>(
|
|||
)
|
||||
.await?;
|
||||
tracing::debug!("storing details for {:?}", identifier);
|
||||
repo.relate_details(&identifier, &new_details).await?;
|
||||
repo.relate_details(identifier, &new_details).await?;
|
||||
tracing::debug!("stored");
|
||||
Ok(new_details)
|
||||
}
|
||||
|
@ -195,13 +209,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
|
|||
let stream = crate::stream::from_err(stream);
|
||||
|
||||
ingest::ingest(
|
||||
&tmp_dir,
|
||||
&repo,
|
||||
&**store,
|
||||
&client,
|
||||
stream,
|
||||
None,
|
||||
&config.media,
|
||||
&tmp_dir, &repo, &**store, &client, stream, None, &config,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -279,7 +287,7 @@ impl<S: Store + 'static> FormData for Import<S> {
|
|||
&client,
|
||||
stream,
|
||||
Some(Alias::from_existing(&filename)),
|
||||
&config.media,
|
||||
&config,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -534,7 +542,7 @@ async fn ingest_inline<S: Store + 'static>(
|
|||
client: &ClientWithMiddleware,
|
||||
config: &Configuration,
|
||||
) -> Result<(Alias, DeleteToken, Details), Error> {
|
||||
let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config.media).await?;
|
||||
let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config).await?;
|
||||
|
||||
let alias = session.alias().expect("alias should exist").to_owned();
|
||||
|
||||
|
@ -922,29 +930,8 @@ async fn process<S: Store + 'static>(
|
|||
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
|
||||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let details = repo.details(&identifier).await?;
|
||||
|
||||
let details = if let Some(details) = details {
|
||||
tracing::debug!("details exist");
|
||||
details
|
||||
} else {
|
||||
if config.server.read_only {
|
||||
return Err(UploadError::ReadOnly.into());
|
||||
}
|
||||
|
||||
tracing::debug!("generating new details from {:?}", identifier);
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
let new_details = Details::from_bytes(
|
||||
&tmp_dir,
|
||||
config.media.process_timeout,
|
||||
bytes_stream.into_bytes(),
|
||||
)
|
||||
.await?;
|
||||
tracing::debug!("storing details for {:?}", identifier);
|
||||
repo.relate_details(&identifier, &new_details).await?;
|
||||
tracing::debug!("stored");
|
||||
new_details
|
||||
};
|
||||
let details =
|
||||
ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
@ -970,7 +957,7 @@ async fn process<S: Store + 'static>(
|
|||
thumbnail_path,
|
||||
thumbnail_args,
|
||||
&original_details,
|
||||
&config.media,
|
||||
&config,
|
||||
hash,
|
||||
)
|
||||
.await?;
|
||||
|
@ -1047,29 +1034,8 @@ async fn process_head<S: Store + 'static>(
|
|||
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
|
||||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let details = repo.details(&identifier).await?;
|
||||
|
||||
let details = if let Some(details) = details {
|
||||
tracing::debug!("details exist");
|
||||
details
|
||||
} else {
|
||||
if config.server.read_only {
|
||||
return Err(UploadError::ReadOnly.into());
|
||||
}
|
||||
|
||||
tracing::debug!("generating new details from {:?}", identifier);
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
let new_details = Details::from_bytes(
|
||||
&tmp_dir,
|
||||
config.media.process_timeout,
|
||||
bytes_stream.into_bytes(),
|
||||
)
|
||||
.await?;
|
||||
tracing::debug!("storing details for {:?}", identifier);
|
||||
repo.relate_details(&identifier, &new_details).await?;
|
||||
tracing::debug!("stored");
|
||||
new_details
|
||||
};
|
||||
let details =
|
||||
ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?;
|
||||
|
||||
if let Some(public_url) = store.public_url(&identifier) {
|
||||
return Ok(HttpResponse::SeeOther()
|
||||
|
|
|
@ -44,7 +44,7 @@ where
|
|||
Arc::from(identifier),
|
||||
Serde::into_inner(upload_id),
|
||||
declared_alias.map(Serde::into_inner),
|
||||
&config.media,
|
||||
config,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ impl Drop for UploadGuard {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, client, media))]
|
||||
#[tracing::instrument(skip(tmp_dir, repo, store, client, config))]
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
repo: &ArcRepo,
|
||||
|
@ -121,7 +121,7 @@ async fn process_ingest<S>(
|
|||
unprocessed_identifier: Arc<str>,
|
||||
upload_id: UploadId,
|
||||
declared_alias: Option<Alias>,
|
||||
media: &crate::config::Media,
|
||||
config: &Configuration,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store + 'static,
|
||||
|
@ -135,7 +135,7 @@ where
|
|||
let repo = repo.clone();
|
||||
let client = client.clone();
|
||||
|
||||
let media = media.clone();
|
||||
let config = config.clone();
|
||||
let error_boundary = crate::sync::spawn("ingest-media", async move {
|
||||
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?);
|
||||
|
||||
|
@ -146,7 +146,7 @@ where
|
|||
&client,
|
||||
stream,
|
||||
declared_alias,
|
||||
&media,
|
||||
&config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -218,7 +218,7 @@ async fn generate<S: Store + 'static>(
|
|||
process_path,
|
||||
process_args,
|
||||
&original_details,
|
||||
&config.media,
|
||||
config,
|
||||
hash,
|
||||
)
|
||||
.await?;
|
||||
|
|
Loading…
Reference in a new issue