2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-31 15:01:25 +00:00

Instrument better with Tracing

This commit is contained in:
Aode (Lion) 2021-09-13 20:22:42 -05:00
parent e31cbb581b
commit 5d3e6f50b3
15 changed files with 567 additions and 345 deletions

142
Cargo.lock generated
View file

@ -20,9 +20,9 @@ dependencies = [
[[package]]
name = "actix-form-data"
version = "0.6.0-beta.6"
version = "0.6.0-beta.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6552c90f3283caa08a8114d49f82cb3eacd6038168bc0ffb199b9304f615be2"
checksum = "304d237617d707993b9210dfaa2c5243ac8bcda5ed1b7a5b6f3b404a4f31a2f3"
dependencies = [
"actix-multipart",
"actix-rt",
@ -278,9 +278,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.43"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
[[package]]
name = "atty"
@ -401,6 +401,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time 0.1.43",
"winapi",
]
@ -557,12 +558,54 @@ dependencies = [
"winapi",
]
[[package]]
name = "futures"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-executor"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-macro"
version = "0.3.17"
@ -595,10 +638,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [
"autocfg",
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
@ -625,6 +671,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "gethostname"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "getrandom"
version = "0.2.3"
@ -951,7 +1007,7 @@ dependencies = [
[[package]]
name = "pict-rs"
version = "0.3.0-alpha.30"
version = "0.3.0-alpha.31"
dependencies = [
"actix-form-data",
"actix-rt",
@ -975,9 +1031,12 @@ dependencies = [
"tokio",
"tokio-util",
"tracing",
"tracing-actix-web",
"tracing-bunyan-formatter",
"tracing-error",
"tracing-futures",
"tracing-log",
"tracing-subscriber",
"uuid",
]
[[package]]
@ -1522,6 +1581,16 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "time"
version = "0.2.27"
@ -1630,21 +1699,35 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.26"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
checksum = "c2ba9ab62b7d6497a8638dfda5e5c4fb3b2d5a7fca4118f2b96151c8ef1a437e"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.15"
name = "tracing-actix-web"
version = "0.4.0-beta.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
checksum = "aef43d92080b0429626deba48d01dad848ad515777b373d7a18eac3f129be359"
dependencies = [
"actix-web",
"futures",
"tracing",
"tracing-futures",
"uuid",
]
[[package]]
name = "tracing-attributes"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77"
dependencies = [
"proc-macro2",
"quote",
@ -1652,14 +1735,41 @@ dependencies = [
]
[[package]]
name = "tracing-core"
version = "0.1.19"
name = "tracing-bunyan-formatter"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ca517f43f0fb96e0c3072ed5c275fe5eece87e8cb52f4a77b69226d3b1c9df8"
checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1"
dependencies = [
"chrono",
"gethostname",
"log",
"serde",
"serde_json",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf"
dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-error"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4d7c0b83d4a500748fa5879461652b361edf5c9d51ede2a2ac03875ca185e24"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@ -1693,9 +1803,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.20"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe"
checksum = "62af966210b88ad5776ee3ba12d5f35b8d6a2b2a12168f3080cf02b814d7376b"
dependencies = [
"ansi_term 0.12.1",
"chrono",

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.3.0-alpha.30"
version = "0.3.0-alpha.31"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -33,6 +33,10 @@ time = { version = "0.3.0", features = ["serde"] }
tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] }
tokio-util = { version = "0.6", default-features = false, features = ["codec"] }
tracing = "0.1.15"
tracing-actix-web = { version = "0.4.0-beta.8" }
tracing-bunyan-formatter = "0.2.6"
tracing-error = "0.1.2"
tracing-futures = "0.2.4"
tracing-log = "0.1.2"
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
uuid = { version = "0.8", features = ["v4"] }
# uuid = { version = "0.8", features = ["v4"] }

View file

@ -73,6 +73,9 @@ pub(crate) struct Config {
help = "An optional string to be checked on requests to privileged endpoints"
)]
api_key: Option<String>,
#[structopt(short, long, help = "Enable json logging for the pict-rs server")]
json_logging: bool,
}
impl Config {
@ -113,6 +116,10 @@ impl Config {
pub(crate) fn api_key(&self) -> Option<&str> {
self.api_key.as_deref()
}
pub(crate) fn json_logging(&self) -> bool {
self.json_logging
}
}
#[derive(Debug, thiserror::Error)]

View file

@ -1,5 +1,56 @@
use crate::{ffmpeg::VideoError, magick::MagickError};
use actix_web::{http::StatusCode, HttpResponse, ResponseError};
use tracing_error::SpanTrace;
pub(crate) struct Error {
context: SpanTrace,
kind: UploadError,
}
impl Error {
pub(crate) fn kind(&self) -> &UploadError {
&self.kind
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}\n", self.kind)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}\n", self.kind)?;
std::fmt::Display::fmt(&self.context, f)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
impl<T> From<T> for Error
where
UploadError: From<T>,
{
fn from(error: T) -> Self {
Error {
kind: UploadError::from(error),
context: SpanTrace::capture(),
}
}
}
impl From<sled::transaction::TransactionError<Error>> for Error {
fn from(e: sled::transaction::TransactionError<Error>) -> Self {
match e {
sled::transaction::TransactionError::Abort(t) => t,
sled::transaction::TransactionError::Storage(e) => e.into(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum UploadError {
@ -39,6 +90,9 @@ pub(crate) enum UploadError {
#[error("Unsupported image format")]
UnsupportedFormat,
#[error("Invalid media dimensions")]
Dimensions,
#[error("Unable to download image, bad response {0}")]
Download(actix_web::http::StatusCode),
@ -66,11 +120,8 @@ pub(crate) enum UploadError {
#[error("Range header not satisfiable")]
Range,
#[error("{0}")]
VideoError(#[from] VideoError),
#[error("{0}")]
MagickError(#[from] MagickError),
#[error("Command failed")]
Status,
}
impl From<awc::error::SendRequestError> for UploadError {
@ -79,18 +130,13 @@ impl From<awc::error::SendRequestError> for UploadError {
}
}
impl From<sled::transaction::TransactionError<UploadError>> for UploadError {
fn from(e: sled::transaction::TransactionError<UploadError>) -> Self {
match e {
sled::transaction::TransactionError::Abort(t) => t,
sled::transaction::TransactionError::Storage(e) => e.into(),
impl From<actix_form_data::Error<Error>> for Error {
fn from(e: actix_form_data::Error<Error>) -> Self {
if let actix_form_data::Error::FileFn(e) = e {
return e;
}
}
}
impl From<actix_form_data::Error> for UploadError {
fn from(e: actix_form_data::Error) -> Self {
UploadError::Upload(e.to_string())
UploadError::Upload(e.to_string()).into()
}
}
@ -106,12 +152,10 @@ impl From<tokio::sync::AcquireError> for UploadError {
}
}
impl ResponseError for UploadError {
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self {
UploadError::VideoError(_)
| UploadError::MagickError(_)
| UploadError::DuplicateAlias
match self.kind {
UploadError::DuplicateAlias
| UploadError::NoFiles
| UploadError::Upload(_)
| UploadError::ParseReq(_) => StatusCode::BAD_REQUEST,
@ -126,8 +170,8 @@ impl ResponseError for UploadError {
HttpResponse::build(self.status_code())
.content_type("application/json")
.body(
serde_json::to_string(&serde_json::json!({ "msg": self.to_string() }))
.unwrap_or_else(|_| r#"{"msg":"Internal Server Error"}"#.to_string()),
serde_json::to_string(&serde_json::json!({ "msg": self.kind.to_string() }))
.unwrap_or_else(|_| r#"{"msg":"Request failed"}"#.to_string()),
)
}
}

View file

@ -1,9 +1,9 @@
use crate::stream::Process;
use actix_web::web::Bytes;
use tokio::{io::AsyncRead, process::Command};
use tokio::io::AsyncRead;
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?;
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?;
Ok(process.bytes_read(input).unwrap())
}

View file

@ -1,21 +1,17 @@
use crate::stream::Process;
use crate::{
error::{Error, UploadError},
stream::Process,
};
use actix_web::web::Bytes;
use tokio::{io::AsyncRead, process::Command};
#[derive(Debug, thiserror::Error)]
pub(crate) enum VideoError {
#[error("Failed to interface with transcode process")]
IO(#[from] std::io::Error),
#[error("Failed to convert file")]
Status,
}
use tracing::instrument;
pub(crate) enum InputFormat {
Gif,
Mp4,
}
#[derive(Debug)]
pub(crate) enum ThumbnailFormat {
Jpeg,
// Webp,
@ -50,55 +46,72 @@ pub(crate) fn to_mp4_bytes(
input: Bytes,
input_format: InputFormat,
) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::spawn(Command::new("ffmpeg").args([
"-f",
input_format.as_format(),
"-i",
"pipe:",
"-movflags",
"faststart+frag_keyframe+empty_moov",
"-pix_fmt",
"yuv420p",
"-vf",
"scale=trunc(iw/2)*2:trunc(ih/2)*2",
"-an",
"-codec",
"h264",
"-f",
"mp4",
"pipe:",
]))?;
let process = Process::run(
"ffmpeg",
&[
"-f",
input_format.as_format(),
"-i",
"pipe:",
"-movflags",
"faststart+frag_keyframe+empty_moov",
"-pix_fmt",
"yuv420p",
"-vf",
"scale=trunc(iw/2)*2:trunc(ih/2)*2",
"-an",
"-codec",
"h264",
"-f",
"mp4",
"pipe:",
],
)?;
Ok(process.bytes_read(input).unwrap())
}
#[instrument(name = "Create video thumbnail", skip(from, to))]
pub(crate) async fn thumbnail<P1, P2>(
from: P1,
to: P2,
format: ThumbnailFormat,
) -> Result<(), VideoError>
) -> Result<(), Error>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
let mut child = Command::new("ffmpeg")
.arg(&"-i")
.arg(&from.as_ref())
.args([
"-vframes",
"1",
"-codec",
format.as_codec(),
"-f",
format.as_format(),
])
.arg(&to.as_ref())
let command = "ffmpeg";
let first_arg = "-i";
let args = [
"-vframes",
"1",
"-codec",
format.as_codec(),
"-f",
format.as_format(),
];
tracing::info!(
"Spawning command: {} {} {:?} {:?} {:?}",
command,
first_arg,
from.as_ref(),
args,
to.as_ref()
);
let mut child = Command::new(command)
.arg(first_arg)
.arg(from.as_ref())
.args(args)
.arg(to.as_ref())
.spawn()?;
let status = child.wait().await?;
if !status.success() {
return Err(VideoError::Status);
return Err(UploadError::Status.into());
}
Ok(())

View file

@ -1,21 +1,14 @@
use crate::{config::Format, stream::Process};
use crate::{
config::Format,
error::{Error, UploadError},
stream::Process,
};
use actix_web::web::Bytes;
use tokio::{
io::{AsyncRead, AsyncReadExt},
process::Command,
};
#[derive(Debug, thiserror::Error)]
pub(crate) enum MagickError {
#[error("{0}")]
IO(#[from] std::io::Error),
#[error("Invalid format")]
Format,
#[error("Image too large")]
Dimensions,
}
use tracing::instrument;
pub(crate) enum ValidInputType {
Mp4,
@ -32,19 +25,16 @@ pub(crate) struct Details {
}
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?;
let process = Process::run("magick", &["convert", "-", "-strip", "-"])?;
Ok(process.bytes_read(input).unwrap())
}
pub(crate) async fn details_bytes(input: Bytes) -> Result<Details, MagickError> {
let process = Process::spawn(Command::new("magick").args([
"identify",
"-ping",
"-format",
"%w %h | %m\n",
"-",
]))?;
pub(crate) async fn details_bytes(input: Bytes) -> Result<Details, Error> {
let process = Process::run(
"magick",
&["identify", "-ping", "-format", "%w %h | %m\n", "-"],
)?;
let mut reader = process.bytes_read(input).unwrap();
@ -59,22 +49,30 @@ pub(crate) fn convert_bytes_read(
input: Bytes,
format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::spawn(Command::new("magick").args([
"convert",
"-",
format!("{}:-", format.to_magick_format()).as_str(),
]))?;
let process = Process::run(
"magick",
&[
"convert",
"-",
format!("{}:-", format.to_magick_format()).as_str(),
],
)?;
Ok(process.bytes_read(input).unwrap())
}
pub(crate) async fn details<P>(file: P) -> Result<Details, MagickError>
pub(crate) async fn details<P>(file: P) -> Result<Details, Error>
where
P: AsRef<std::path::Path>,
{
let output = Command::new("magick")
.args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"])
.arg(&file.as_ref())
let command = "magick";
let args = ["identify", "-ping", "-format", "%w %h | %m\n"];
let last_arg = file.as_ref();
tracing::info!("Spawning command: {} {:?} {:?}", command, args, last_arg);
let output = Command::new(command)
.args(args)
.arg(last_arg)
.output()
.await?;
@ -83,23 +81,39 @@ where
parse_details(s)
}
fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, MagickError> {
fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, Error> {
let mut lines = s.lines();
let first = lines.next().ok_or(MagickError::Format)?;
let first = lines.next().ok_or(UploadError::UnsupportedFormat)?;
let mut segments = first.split('|');
let dimensions = segments.next().ok_or(MagickError::Format)?.trim();
let dimensions = segments
.next()
.ok_or(UploadError::UnsupportedFormat)?
.trim();
tracing::debug!("dimensions: {}", dimensions);
let mut dims = dimensions.split(' ');
let width = dims.next().ok_or(MagickError::Format)?.trim().parse()?;
let height = dims.next().ok_or(MagickError::Format)?.trim().parse()?;
let width = dims
.next()
.ok_or(UploadError::UnsupportedFormat)?
.trim()
.parse()
.map_err(|_| UploadError::UnsupportedFormat)?;
let height = dims
.next()
.ok_or(UploadError::UnsupportedFormat)?
.trim()
.parse()
.map_err(|_| UploadError::UnsupportedFormat)?;
let format = segments.next().ok_or(MagickError::Format)?.trim();
let format = segments
.next()
.ok_or(UploadError::UnsupportedFormat)?
.trim();
tracing::debug!("format: {}", format);
if !lines.all(|item| item.ends_with(format)) {
return Err(MagickError::Format);
return Err(UploadError::UnsupportedFormat.into());
}
let mime_type = match format {
@ -108,7 +122,7 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, MagickError> {
"PNG" => mime::IMAGE_PNG,
"JPEG" => mime::IMAGE_JPEG,
"WEBP" => crate::validate::image_webp(),
_ => return Err(MagickError::Format),
_ => return Err(UploadError::UnsupportedFormat.into()),
};
Ok(Details {
@ -118,29 +132,41 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, MagickError> {
})
}
pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, MagickError> {
pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Error> {
details_bytes(input).await?.validate_input()
}
#[instrument(name = "Spawning process command", skip(input))]
pub(crate) fn process_image_write_read(
input: impl AsyncRead + Unpin + 'static,
args: Vec<String>,
format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> {
let command = "magick";
let convert_args = ["convert", "-"];
let last_arg = format!("{}:-", format.to_magick_format());
tracing::info!(
"Spawning command: {} {:?} {:?} {}",
command,
convert_args,
args,
last_arg
);
let process = Process::spawn(
Command::new("magick")
.args([&"convert", &"-"])
Command::new(command)
.args(convert_args)
.args(args)
.arg(format!("{}:-", format.to_magick_format())),
.arg(last_arg),
)?;
Ok(process.write_read(input).unwrap())
}
impl Details {
fn validate_input(&self) -> Result<ValidInputType, MagickError> {
fn validate_input(&self) -> Result<ValidInputType, Error> {
if self.width > crate::CONFIG.max_width() || self.height > crate::CONFIG.max_height() {
return Err(MagickError::Dimensions);
return Err(UploadError::Dimensions.into());
}
let input_type = match (self.mime_type.type_(), self.mime_type.subtype()) {
@ -149,15 +175,9 @@ impl Details {
(mime::IMAGE, mime::PNG) => ValidInputType::Png,
(mime::IMAGE, mime::JPEG) => ValidInputType::Jpeg,
(mime::IMAGE, subtype) if subtype.as_str() == "webp" => ValidInputType::Webp,
_ => return Err(MagickError::Format),
_ => return Err(UploadError::UnsupportedFormat.into()),
};
Ok(input_type)
}
}
impl From<std::num::ParseIntError> for MagickError {
fn from(_: std::num::ParseIntError) -> MagickError {
MagickError::Format
}
}

View file

@ -2,7 +2,6 @@ use actix_form_data::{Field, Form, Value};
use actix_web::{
guard,
http::header::{CacheControl, CacheDirective, LastModified, ACCEPT_RANGES},
middleware::Logger,
web, App, HttpResponse, HttpResponseBuilder, HttpServer,
};
use awc::Client;
@ -28,8 +27,12 @@ use tokio::{
Semaphore,
},
};
use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter;
use tracing::{debug, error, info, instrument, subscriber::set_global_default, Span};
use tracing_actix_web::TracingLogger;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_error::ErrorLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
mod config;
mod error;
@ -40,14 +43,16 @@ mod middleware;
mod migrate;
mod processor;
mod range;
mod root_span_builder;
mod stream;
mod upload_manager;
mod validate;
use self::{
config::{Config, Format},
error::UploadError,
middleware::{Deadline, Internal, Tracing},
error::{Error, UploadError},
middleware::{Deadline, Internal},
root_span_builder::RootSpanBuilder,
upload_manager::{Details, UploadManager, UploadManagerSession},
validate::{image_webp, video_mp4},
};
@ -90,7 +95,7 @@ struct CancelSafeProcessor<F> {
impl<F> CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), UploadError>> + Unpin,
F: Future<Output = Result<(Details, web::Bytes), Error>> + Unpin,
{
pub(crate) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
@ -117,15 +122,15 @@ where
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), UploadError>> + Unpin,
F: Future<Output = Result<(Details, web::Bytes), Error>> + Unpin,
{
type Output = Result<(Details, web::Bytes), UploadError>;
type Output = Result<(Details, web::Bytes), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut rx) = self.receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled))
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
Pin::new(&mut self.fut).poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(&self.path);
@ -151,8 +156,8 @@ impl<F> Drop for CancelSafeProcessor<F> {
}
// try moving a file
#[instrument]
async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
#[instrument(name = "Moving file")]
async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), Error> {
if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
@ -164,7 +169,7 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
return Err(e.into());
}
} else {
return Err(UploadError::FileExists);
return Err(UploadError::FileExists.into());
}
debug!("Moving {:?} to {:?}", from, to);
@ -173,7 +178,7 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
Ok(())
}
async fn safe_create_parent<P>(path: P) -> Result<(), UploadError>
async fn safe_create_parent<P>(path: P) -> Result<(), Error>
where
P: AsRef<std::path::Path>,
{
@ -186,8 +191,8 @@ where
}
// Try writing to a file
#[instrument(skip(bytes))]
async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), UploadError> {
#[instrument(name = "Saving file", skip(bytes))]
async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), Error> {
if let Some(path) = path.parent() {
// create the directory for the file
debug!("Creating directory {:?}", path);
@ -240,7 +245,7 @@ pub(crate) fn tmp_file() -> PathBuf {
path
}
fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> {
fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> {
if mime == mime::IMAGE_PNG {
Ok(".png")
} else if mime == mime::IMAGE_JPEG {
@ -250,16 +255,16 @@ fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> {
} else if mime == image_webp() {
Ok(".webp")
} else {
Err(UploadError::UnsupportedFormat)
Err(UploadError::UnsupportedFormat.into())
}
}
/// Handle responding to succesful uploads
#[instrument(skip(value, manager))]
#[instrument(name = "Uploaded files", skip(value, manager))]
async fn upload(
value: Value<UploadManagerSession>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let images = value
.map()
.and_then(|mut m| m.remove("images"))
@ -319,16 +324,16 @@ struct UrlQuery {
}
/// download an image from a URL
#[instrument(skip(client, manager))]
#[instrument(name = "Downloading file", skip(client, manager))]
async fn download(
client: web::Data<Client>,
manager: web::Data<UploadManager>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let mut res = client.get(&query.url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()));
return Err(UploadError::Download(res.status()).into());
}
let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES);
@ -369,11 +374,11 @@ async fn download(
}
/// Delete aliases and files
#[instrument(skip(manager))]
#[instrument(name = "Deleting file", skip(manager))]
async fn delete(
manager: web::Data<UploadManager>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let (alias, token) = path_entries.into_inner();
manager.delete(token, alias).await?;
@ -388,7 +393,7 @@ async fn prepare_process(
ext: &str,
manager: &UploadManager,
whitelist: &Option<HashSet<String>>,
) -> Result<(Format, String, PathBuf, Vec<String>), UploadError> {
) -> Result<(Format, String, PathBuf, Vec<String>), Error> {
let (alias, operations) =
query
.into_inner()
@ -403,7 +408,7 @@ async fn prepare_process(
});
if alias.is_empty() {
return Err(UploadError::MissingFilename);
return Err(UploadError::MissingFilename.into());
}
let name = manager.from_alias(alias).await?;
@ -430,12 +435,13 @@ async fn prepare_process(
Ok((format, name, thumbnail_path, thumbnail_args))
}
#[instrument(name = "Fetching derived details", skip(manager, whitelist))]
async fn process_details(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
whitelist: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let (_, name, thumbnail_path, _) =
prepare_process(query, ext.as_str(), &manager, &whitelist).await?;
@ -447,14 +453,14 @@ async fn process_details(
}
/// Process files
#[instrument(skip(manager, whitelist))]
#[instrument(name = "Processing image", skip(manager, whitelist))]
async fn process(
range: Option<range::RangeHeader>,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
whitelist: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let (format, name, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &manager, &whitelist).await?;
@ -544,7 +550,7 @@ async fn process(
drop(entered);
});
Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError>
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
};
let (details, bytes) =
@ -553,11 +559,11 @@ async fn process(
return match range {
Some(range_header) => {
if !range_header.is_bytes() {
return Err(UploadError::Range);
return Err(UploadError::Range.into());
}
if range_header.is_empty() {
Err(UploadError::Range)
Err(UploadError::Range.into())
} else if range_header.len() == 1 {
let range = range_header.ranges().next().unwrap();
let content_range = range.to_content_range(bytes.len() as u64);
@ -573,12 +579,12 @@ async fn process(
details.system_time(),
))
} else {
Err(UploadError::Range)
Err(UploadError::Range.into())
}
}
None => Ok(srv_response(
HttpResponse::Ok(),
once(ready(Ok(bytes) as Result<_, UploadError>)),
once(ready(Ok(bytes) as Result<_, Error>)),
details.content_type(),
7 * DAYS,
details.system_time(),
@ -600,10 +606,11 @@ async fn process(
}
/// Fetch file details
#[instrument(name = "Fetching details", skip(manager))]
async fn details(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let name = manager.from_alias(alias.into_inner()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
@ -624,12 +631,12 @@ async fn details(
}
/// Serve files
#[instrument(skip(manager))]
#[instrument(name = "Serving file", skip(manager))]
async fn serve(
range: Option<range::RangeHeader>,
alias: web::Path<String>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let name = manager.from_alias(alias.into_inner()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
@ -653,16 +660,16 @@ async fn ranged_file_resp(
path: PathBuf,
range: Option<range::RangeHeader>,
details: Details,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let (builder, stream) = match range {
//Range header exists - return as ranged
Some(range_header) => {
if !range_header.is_bytes() {
return Err(UploadError::Range);
return Err(UploadError::Range.into());
}
if range_header.is_empty() {
return Err(UploadError::Range);
return Err(UploadError::Range.into());
} else if range_header.len() == 1 {
let file = tokio::fs::File::open(path).await?;
@ -675,7 +682,7 @@ async fn ranged_file_resp(
(builder, range.chop_file(file).await?)
} else {
return Err(UploadError::Range);
return Err(UploadError::Range.into());
}
}
//No Range header in the request - return the entire document
@ -727,10 +734,11 @@ enum FileOrAlias {
Alias { alias: String },
}
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
@ -748,10 +756,11 @@ async fn purge(
})))
}
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
@ -768,10 +777,11 @@ struct ByAlias {
alias: String,
}
#[instrument(name = "Fetching filename", skip(upload_manager))]
async fn filename_by_alias(
query: web::Query<ByAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
) -> Result<HttpResponse, Error> {
let filename = upload_manager.from_alias(query.into_inner().alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
@ -782,13 +792,23 @@ async fn filename_by_alias(
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
LogTracer::init()?;
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let subscriber = Registry::default()
.with(env_filter)
.with(ErrorLayer::default());
if CONFIG.json_logging() {
let formatting_layer = BunyanFormattingLayer::new("pict-rs".into(), std::io::stdout);
let subscriber = subscriber.with(JsonStorageLayer).with(formatting_layer);
set_global_default(subscriber)?;
} else {
let subscriber = subscriber.with(tracing_subscriber::fmt::layer());
set_global_default(subscriber)?;
};
let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?;
@ -799,7 +819,7 @@ async fn main() -> Result<(), anyhow::Error> {
let form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| UploadError::from(e).into())
.transform_error(|e| Error::from(e).into())
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
@ -828,7 +848,7 @@ async fn main() -> Result<(), anyhow::Error> {
let import_form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| UploadError::from(e).into())
.transform_error(|e| Error::from(e).into())
.field(
"images",
Field::array(Field::file(move |filename, content_type, stream| {
@ -858,8 +878,7 @@ async fn main() -> Result<(), anyhow::Error> {
.finish();
App::new()
.wrap(Logger::default())
.wrap(Tracing)
.wrap(TracingLogger::<RootSpanBuilder>::new())
.wrap(Deadline)
.app_data(web::Data::new(manager.clone()))
.app_data(web::Data::new(client))

View file

@ -10,8 +10,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tracing_futures::{Instrument, Instrumented};
use uuid::Uuid;
pub(crate) struct Deadline;
pub(crate) struct DeadlineMiddleware<S> {
@ -29,12 +27,6 @@ pub(crate) struct DeadlineFuture<F> {
inner: DeadlineFutureInner<F>,
}
pub(crate) struct Tracing;
pub(crate) struct TracingMiddleware<S> {
inner: S,
}
pub(crate) struct Internal(pub(crate) Option<String>);
pub(crate) struct InternalMiddleware<S>(Option<String>, S);
#[derive(Clone, Debug, thiserror::Error)]
@ -171,44 +163,6 @@ where
}
}
impl<S, Request> Transform<S, Request> for Tracing
where
S: Service<Request>,
S::Future: 'static,
{
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = TracingMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(TracingMiddleware { inner: service }))
}
}
impl<S, Request> Service<Request> for TracingMiddleware<S>
where
S: Service<Request>,
S::Future: 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Instrumented<S::Future>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&self, req: Request) -> Self::Future {
let uuid = Uuid::new_v4();
self.inner
.call(req)
.instrument(tracing::info_span!("request", ?uuid))
}
}
impl<S> Transform<S, ServiceRequest> for Internal
where
S: Service<ServiceRequest, Error = actix_web::Error>,

View file

@ -1,8 +1,11 @@
use crate::{error::UploadError, ffmpeg::ThumbnailFormat};
use crate::{
error::{Error, UploadError},
ffmpeg::ThumbnailFormat,
};
use std::path::{Path, PathBuf};
use tracing::{debug, error, instrument};
fn ptos(path: &Path) -> Result<String, UploadError> {
fn ptos(path: &Path) -> Result<String, Error> {
Ok(path.to_str().ok_or(UploadError::Path)?.to_owned())
}
@ -297,7 +300,7 @@ impl Exists {
pub(crate) async fn prepare_image(
original_file: PathBuf,
) -> Result<Option<(PathBuf, Exists)>, UploadError> {
) -> Result<Option<(PathBuf, Exists)>, Error> {
let original_path_str = ptos(&original_file)?;
let jpg_path = format!("{}.jpg", original_path_str);
let jpg_path = PathBuf::from(jpg_path);
@ -317,11 +320,13 @@ pub(crate) async fn prepare_image(
if let Err(e) = res {
error!("transcode error: {:?}", e);
tokio::fs::remove_file(&tmpfile).await?;
return Err(e.into());
return Err(e);
}
return match crate::safe_move_file(tmpfile, jpg_path.clone()).await {
Err(UploadError::FileExists) => Ok(Some((jpg_path, Exists::Exists))),
Err(e) if matches!(e.kind(), UploadError::FileExists) => {
Ok(Some((jpg_path, Exists::Exists)))
}
Err(e) => Err(e),
_ => Ok(Some((jpg_path, Exists::New))),
};

View file

@ -1,4 +1,7 @@
use crate::{stream::bytes_stream, UploadError};
use crate::{
error::{Error, UploadError},
stream::bytes_stream,
};
use actix_web::{
dev::Payload,
http::{
@ -46,7 +49,7 @@ impl Range {
pub(crate) fn chop_bytes(
&self,
bytes: Bytes,
) -> impl Stream<Item = Result<Bytes, UploadError>> + Unpin {
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
match self {
Range::Start(start) => once(ready(Ok(bytes.slice(*start as usize..)))),
Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))),
@ -59,7 +62,7 @@ impl Range {
pub(crate) async fn chop_file(
&self,
mut file: tokio::fs::File,
) -> Result<LocalBoxStream<'static, Result<Bytes, UploadError>>, UploadError> {
) -> Result<LocalBoxStream<'static, Result<Bytes, Error>>, Error> {
match self {
Range::Start(start) => {
file.seek(io::SeekFrom::Start(*start)).await?;
@ -102,14 +105,14 @@ impl RangeHeader {
impl FromRequest for RangeHeader {
type Config = ();
type Error = actix_web::Error;
type Error = Error;
type Future = std::future::Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
if let Some(range_head) = req.headers().get("Range") {
ready(parse_range_header(range_head).map_err(|e| {
tracing::warn!("Failed to parse range header: {}", e);
e.into()
e
}))
} else {
ready(Err(UploadError::ParseReq(
@ -120,7 +123,7 @@ impl FromRequest for RangeHeader {
}
}
fn parse_range_header(range_head: &HeaderValue) -> Result<RangeHeader, UploadError> {
fn parse_range_header(range_head: &HeaderValue) -> Result<RangeHeader, Error> {
let range_head_str = range_head.to_str().map_err(|_| {
UploadError::ParseReq("Range header contains non-utf8 characters".to_string())
})?;
@ -135,7 +138,7 @@ fn parse_range_header(range_head: &HeaderValue) -> Result<RangeHeader, UploadErr
let ranges = ranges
.split(',')
.map(parse_range)
.collect::<Result<Vec<Range>, UploadError>>()?;
.collect::<Result<Vec<Range>, Error>>()?;
Ok(RangeHeader {
unit: unit.to_owned(),
@ -143,7 +146,7 @@ fn parse_range_header(range_head: &HeaderValue) -> Result<RangeHeader, UploadErr
})
}
fn parse_range(s: &str) -> Result<Range, UploadError> {
fn parse_range(s: &str) -> Result<Range, Error> {
let dash_pos = s
.find('-')
.ok_or_else(|| UploadError::ParseReq("Mailformed Range Bound".to_string()))?;
@ -153,7 +156,7 @@ fn parse_range(s: &str) -> Result<Range, UploadError> {
let end = end.trim_start_matches('-').trim();
if start.is_empty() && end.is_empty() {
Err(UploadError::ParseReq("Malformed content range".to_string()))
Err(UploadError::ParseReq("Malformed content range".to_string()).into())
} else if start.is_empty() {
let suffix_length = end.parse().map_err(|_| {
UploadError::ParseReq("Cannot parse suffix length for range header".to_string())
@ -175,7 +178,7 @@ fn parse_range(s: &str) -> Result<Range, UploadError> {
})?;
if range_start > range_end {
return Err(UploadError::Range);
return Err(UploadError::Range.into());
}
Ok(Range::Segment(range_start, range_end))

46
src/root_span_builder.rs Normal file
View file

@ -0,0 +1,46 @@
use actix_web::{
dev::{ServiceRequest, ServiceResponse},
Error,
};
use tracing::Span;
use tracing_actix_web::root_span;
pub struct RootSpanBuilder;
impl tracing_actix_web::RootSpanBuilder for RootSpanBuilder {
fn on_request_start(request: &ServiceRequest) -> Span {
root_span!(request)
}
fn on_request_end<B>(span: Span, outcome: &Result<ServiceResponse<B>, Error>) {
match &outcome {
Ok(response) => {
if let Some(error) = response.response().error() {
handle_error(span, error)
} else {
span.record("http.status_code", &response.response().status().as_u16());
span.record("otel.status_code", &"OK");
}
}
Err(error) => handle_error(span, error),
}
}
}
fn handle_error(span: Span, error: &Error) {
let response_error = error.as_response_error();
let display = format!("{}", response_error);
let debug = format!("{:?}", response_error);
span.record("exception.message", &tracing::field::display(display));
span.record("exception.details", &tracing::field::display(debug));
let status_code = response_error.status_code();
span.record("http.status_code", &status_code.as_u16());
if status_code.is_client_error() {
span.record("otel.status_code", &"OK");
} else {
span.record("otel.status_code", &"ERROR");
}
}

View file

@ -1,37 +1,51 @@
use crate::error::UploadError;
use crate::error::Error;
use actix_rt::task::JoinHandle;
use actix_web::web::{Bytes, BytesMut};
use futures_util::Stream;
use std::{
future::Future,
pin::Pin,
process::Stdio,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, Command},
sync::oneshot::{channel, Receiver},
};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::instrument;
#[derive(Debug)]
struct StatusError;
pub(crate) struct Process {
child: tokio::process::Child,
child: Child,
}
pub(crate) struct ProcessRead<I> {
inner: I,
err_recv: tokio::sync::oneshot::Receiver<std::io::Error>,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
handle: actix_rt::task::JoinHandle<()>,
handle: JoinHandle<()>,
}
struct BytesFreezer<S>(S);
impl Process {
fn new(child: tokio::process::Child) -> Self {
fn new(child: Child) -> Self {
Process { child }
}
pub(crate) fn spawn(cmd: &mut tokio::process::Command) -> std::io::Result<Self> {
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
#[instrument(name = "Spawning command")]
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
tracing::info!("Spawning");
Self::spawn(Command::new(command).args(args))
}
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map(Process::new)
}
@ -40,7 +54,7 @@ impl Process {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = channel();
let mut child = self.child;
@ -79,7 +93,7 @@ impl Process {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = channel();
let mut child = self.child;
@ -114,11 +128,8 @@ impl Process {
pub(crate) fn bytes_stream(
input: impl AsyncRead + Unpin,
) -> impl Stream<Item = Result<Bytes, UploadError>> + Unpin {
BytesFreezer(tokio_util::codec::FramedRead::new(
input,
tokio_util::codec::BytesCodec::new(),
))
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
BytesFreezer(FramedRead::new(input, BytesCodec::new()))
}
impl<I> AsyncRead for ProcessRead<I>
@ -157,13 +168,13 @@ impl<S> Stream for BytesFreezer<S>
where
S: Stream<Item = std::io::Result<BytesMut>> + Unpin,
{
type Item = Result<Bytes, UploadError>;
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze())))
.map_err(UploadError::from)
.map_err(Error::from)
}
}

View file

@ -1,6 +1,6 @@
use crate::{
config::Format,
error::UploadError,
error::{Error, UploadError},
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
to_ext,
};
@ -98,7 +98,7 @@ where
}
}
async fn finalize_reset(self) -> Result<Hash, UploadError> {
async fn finalize_reset(self) -> Result<Hash, Error> {
let mut hasher = self.hasher;
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
Ok(hash)
@ -194,7 +194,7 @@ pub(crate) struct Details {
}
impl Details {
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, UploadError> {
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
let details = crate::magick::details_bytes(input).await?;
Ok(Details::now(
@ -204,7 +204,7 @@ impl Details {
))
}
pub(crate) async fn from_path<P>(path: P) -> Result<Self, UploadError>
pub(crate) async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<std::path::Path>,
{
@ -285,10 +285,7 @@ impl UploadManager {
}
/// Create a new UploadManager
pub(crate) async fn new(
mut root_dir: PathBuf,
format: Option<Format>,
) -> Result<Self, UploadError> {
pub(crate) async fn new(mut root_dir: PathBuf, format: Option<Format>) -> Result<Self, Error> {
let root_clone = root_dir.clone();
// sled automatically creates it's own directories
let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??;
@ -313,11 +310,7 @@ impl UploadManager {
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant(
&self,
path: PathBuf,
filename: String,
) -> Result<(), UploadError> {
pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let fname_tree = self.inner.filename_tree.clone();
@ -340,7 +333,7 @@ impl UploadManager {
&self,
path: PathBuf,
filename: String,
) -> Result<Option<Details>, UploadError> {
) -> Result<Option<Details>, Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let fname_tree = self.inner.filename_tree.clone();
@ -369,7 +362,7 @@ impl UploadManager {
path: PathBuf,
filename: String,
details: &Details,
) -> Result<(), UploadError> {
) -> Result<(), Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let fname_tree = self.inner.filename_tree.clone();
@ -389,10 +382,7 @@ impl UploadManager {
}
/// Get a list of aliases for a given file
pub(crate) async fn aliases_by_filename(
&self,
filename: String,
) -> Result<Vec<String>, UploadError> {
pub(crate) async fn aliases_by_filename(&self, filename: String) -> Result<Vec<String>, Error> {
let fname_tree = self.inner.filename_tree.clone();
let hash = web::block(move || fname_tree.get(filename.as_bytes()))
.await??
@ -402,7 +392,7 @@ impl UploadManager {
}
/// Get a list of aliases for a given alias
pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result<Vec<String>, UploadError> {
pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result<Vec<String>, Error> {
let alias_tree = self.inner.alias_tree.clone();
let hash = web::block(move || alias_tree.get(alias.as_bytes()))
.await??
@ -411,7 +401,7 @@ impl UploadManager {
self.aliases_by_hash(&hash).await
}
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, UploadError> {
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, Error> {
let (start, end) = alias_key_bounds(hash);
let main_tree = self.inner.main_tree.clone();
let aliases = web::block(move || {
@ -436,7 +426,7 @@ impl UploadManager {
}
/// Delete an alias without a delete token
pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), UploadError> {
pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), Error> {
let token_key = delete_key(&alias);
let alias_tree = self.inner.alias_tree.clone();
let token = web::block(move || alias_tree.get(token_key.as_bytes()))
@ -448,7 +438,7 @@ impl UploadManager {
/// Delete the alias, and the file & variants if no more aliases exist
#[instrument(skip(self, alias, token))]
pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> {
pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), Error> {
use sled::Transactional;
let main_tree = self.inner.main_tree.clone();
let alias_tree = self.inner.alias_tree.clone();
@ -478,7 +468,7 @@ impl UploadManager {
let id = alias_tree
.remove(alias_id_key(&alias2).as_bytes())?
.ok_or_else(|| trans_err(UploadError::MissingAlias))?;
let id = String::from_utf8(id.to_vec()).map_err(|e| trans_err(e.into()))?;
let id = String::from_utf8(id.to_vec()).map_err(trans_err)?;
// -- GET HASH FOR HASH TREE CLEANUP --
debug!("Deleting alias -> hash mapping");
@ -498,13 +488,13 @@ impl UploadManager {
self.check_delete_files(hash).await
}
async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), UploadError> {
async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), Error> {
// -- CHECK IF ANY OTHER ALIASES EXIST --
let main_tree = self.inner.main_tree.clone();
let (start, end) = alias_key_bounds(&hash);
debug!("Checking for additional aliases referencing hash");
let any_aliases = web::block(move || {
Ok(main_tree.range(start..end).next().is_some()) as Result<bool, UploadError>
Ok(main_tree.range(start..end).next().is_some()) as Result<bool, Error>
})
.await??;
@ -546,7 +536,7 @@ impl UploadManager {
/// Fetch the real on-disk filename given an alias
#[instrument(skip(self))]
pub(crate) async fn from_alias(&self, alias: String) -> Result<String, UploadError> {
pub(crate) async fn from_alias(&self, alias: String) -> Result<String, Error> {
let tree = self.inner.alias_tree.clone();
debug!("Getting hash from alias");
let hash = web::block(move || tree.get(alias.as_bytes()))
@ -574,7 +564,7 @@ impl UploadManager {
// Find image variants and remove them from the DB and the disk
#[instrument(skip(self))]
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> {
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> {
let filename = filename.inner;
let mut path = self.image_dir();
let fname = String::from_utf8(filename.to_vec())?;
@ -601,7 +591,7 @@ impl UploadManager {
keys.push(key?.to_owned());
}
Ok(keys) as Result<Vec<sled::IVec>, UploadError>
Ok(keys) as Result<Vec<sled::IVec>, Error>
})
.await??;
@ -631,7 +621,7 @@ impl UploadManager {
impl UploadManagerSession {
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<String, UploadError> {
pub(crate) async fn delete_token(&self) -> Result<String, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
@ -679,9 +669,9 @@ impl UploadManagerSession {
content_type: mime::Mime,
validate: bool,
mut stream: UploadStream<E>,
) -> Result<Self, UploadError>
) -> Result<Self, Error>
where
UploadError: From<E>,
Error: From<E>,
E: Unpin + 'static,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
@ -718,12 +708,9 @@ impl UploadManagerSession {
/// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(
mut self,
mut stream: UploadStream<E>,
) -> Result<Self, UploadError>
pub(crate) async fn upload<E>(mut self, mut stream: UploadStream<E>) -> Result<Self, Error>
where
UploadError: From<E>,
Error: From<E>,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
@ -762,7 +749,7 @@ impl UploadManagerSession {
tmpfile: PathBuf,
hash: Hash,
content_type: mime::Mime,
) -> Result<(), UploadError> {
) -> Result<(), Error> {
let (dup, name) = self.check_duplicate(hash, content_type).await?;
// bail early with alias to existing file if this is a duplicate
@ -786,7 +773,7 @@ impl UploadManagerSession {
&self,
hash: Hash,
content_type: mime::Mime,
) -> Result<(Dup, String), UploadError> {
) -> Result<(Dup, String), Error> {
let main_tree = self.manager.inner.main_tree.clone();
let filename = self.next_file(content_type).await?;
@ -822,7 +809,7 @@ impl UploadManagerSession {
// generate a short filename that isn't already in-use
#[instrument(skip(self, content_type))]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> {
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
let image_dir = self.manager.image_dir();
use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10;
@ -855,7 +842,7 @@ impl UploadManagerSession {
}
#[instrument(skip(self, hash, alias))]
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> {
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
self.save_alias_hash_mapping(hash, alias).await??;
self.store_hash_id_alias_mapping(hash, alias).await?;
@ -867,11 +854,7 @@ impl UploadManagerSession {
//
// This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument(skip(self, hash, content_type))]
async fn add_alias(
&mut self,
hash: &Hash,
content_type: mime::Mime,
) -> Result<(), UploadError> {
async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> {
let alias = self.next_alias(hash, content_type).await?;
self.store_hash_id_alias_mapping(hash, &alias).await?;
@ -883,11 +866,7 @@ impl UploadManagerSession {
//
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
#[instrument(skip(self, hash))]
async fn store_hash_id_alias_mapping(
&self,
hash: &Hash,
alias: &str,
) -> Result<(), UploadError> {
async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
let alias = alias.to_string();
loop {
debug!("hash -> alias save loop");
@ -921,11 +900,7 @@ impl UploadManagerSession {
// Generate an alias to the file
#[instrument(skip(self, hash, content_type))]
async fn next_alias(
&mut self,
hash: &Hash,
content_type: mime::Mime,
) -> Result<String, UploadError> {
async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<String, Error> {
use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10;
let mut rng = rand::thread_rng();
@ -956,7 +931,7 @@ impl UploadManagerSession {
&self,
hash: &Hash,
alias: &str,
) -> Result<Result<(), UploadError>, UploadError> {
) -> Result<Result<(), Error>, Error> {
let tree = self.manager.inner.alias_tree.clone();
let vec = hash.inner.clone();
let alias = alias.to_string();
@ -969,7 +944,7 @@ impl UploadManagerSession {
if res.is_err() {
warn!("Duplicate alias");
return Ok(Err(UploadError::DuplicateAlias));
return Ok(Err(UploadError::DuplicateAlias.into()));
}
Ok(Ok(()))
@ -980,7 +955,7 @@ impl UploadManagerSession {
pub(crate) async fn safe_save_reader(
to: PathBuf,
input: &mut (impl AsyncRead + Unpin),
) -> Result<(), UploadError> {
) -> Result<(), Error> {
if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path.to_owned()).await?;
@ -992,7 +967,7 @@ pub(crate) async fn safe_save_reader(
return Err(e.into());
}
} else {
return Err(UploadError::FileExists);
return Err(UploadError::FileExists.into());
}
debug!("Writing stream to {:?}", to);
@ -1008,9 +983,9 @@ pub(crate) async fn safe_save_reader(
pub(crate) async fn safe_save_stream<E>(
to: PathBuf,
mut stream: UploadStream<E>,
) -> Result<(), UploadError>
) -> Result<(), Error>
where
UploadError: From<E>,
Error: From<E>,
E: Unpin,
{
if let Some(path) = to.parent() {
@ -1024,7 +999,7 @@ where
return Err(e.into());
}
} else {
return Err(UploadError::FileExists);
return Err(UploadError::FileExists.into());
}
debug!("Writing stream to {:?}", to);
@ -1050,17 +1025,20 @@ where
Ok(())
}
async fn remove_path(path: sled::IVec) -> Result<(), UploadError> {
async fn remove_path(path: sled::IVec) -> Result<(), Error> {
let path_string = String::from_utf8(path.to_vec())?;
tokio::fs::remove_file(path_string).await?;
Ok(())
}
fn trans_err(e: UploadError) -> sled::transaction::ConflictableTransactionError<UploadError> {
sled::transaction::ConflictableTransactionError::Abort(e)
fn trans_err<E>(e: E) -> sled::transaction::ConflictableTransactionError<Error>
where
Error: From<E>,
{
sled::transaction::ConflictableTransactionError::Abort(e.into())
}
fn file_name(name: String, content_type: mime::Mime) -> Result<String, UploadError> {
fn file_name(name: String, content_type: mime::Mime) -> Result<String, Error> {
Ok(format!("{}{}", name, to_ext(content_type)?))
}

View file

@ -1,6 +1,7 @@
use crate::{config::Format, error::UploadError, ffmpeg::InputFormat, magick::ValidInputType};
use crate::{config::Format, error::Error, ffmpeg::InputFormat, magick::ValidInputType};
use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use tracing::instrument;
pub(crate) fn image_webp() -> mime::Mime {
"image/webp".parse().unwrap()
@ -10,36 +11,43 @@ pub(crate) fn video_mp4() -> mime::Mime {
"video/mp4".parse().unwrap()
}
#[instrument(name = "Validate image", skip(bytes))]
pub(crate) async fn validate_image_bytes(
bytes: Bytes,
prescribed_format: Option<Format>,
) -> Result<(mime::Mime, Box<dyn AsyncRead + Unpin>), UploadError> {
) -> Result<(mime::Mime, Box<dyn AsyncRead + Unpin>), Error> {
let input_type = crate::magick::input_type_bytes(bytes.clone()).await?;
match (prescribed_format, input_type) {
(_, ValidInputType::Gif) => Ok((
video_mp4(),
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?),
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?)
as Box<dyn AsyncRead + Unpin>,
)),
(_, ValidInputType::Mp4) => Ok((
video_mp4(),
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?),
Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?)
as Box<dyn AsyncRead + Unpin>,
)),
(Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok((
mime::IMAGE_JPEG,
Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?),
Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?)
as Box<dyn AsyncRead + Unpin>,
)),
(Some(Format::Png) | None, ValidInputType::Png) => Ok((
mime::IMAGE_PNG,
Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?),
Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?)
as Box<dyn AsyncRead + Unpin>,
)),
(Some(Format::Webp) | None, ValidInputType::Webp) => Ok((
image_webp(),
Box::new(crate::magick::clear_metadata_bytes_read(bytes)?),
Box::new(crate::magick::clear_metadata_bytes_read(bytes)?)
as Box<dyn AsyncRead + Unpin>,
)),
(Some(format), _) => Ok((
format.to_mime(),
Box::new(crate::magick::convert_bytes_read(bytes, format)?),
Box::new(crate::magick::convert_bytes_read(bytes, format)?)
as Box<dyn AsyncRead + Unpin>,
)),
}
}