diff --git a/Cargo.lock b/Cargo.lock index b6bbcc1..3532546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,9 +20,9 @@ dependencies = [ [[package]] name = "actix-form-data" -version = "0.6.0-beta.6" +version = "0.6.0-beta.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6552c90f3283caa08a8114d49f82cb3eacd6038168bc0ffb199b9304f615be2" +checksum = "304d237617d707993b9210dfaa2c5243ac8bcda5ed1b7a5b6f3b404a4f31a2f3" dependencies = [ "actix-multipart", "actix-rt", @@ -278,9 +278,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" +checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" [[package]] name = "atty" @@ -401,6 +401,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "time 0.1.43", "winapi", ] @@ -557,12 +558,54 @@ dependencies = [ "winapi", ] +[[package]] +name = "futures" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" +[[package]] +name = "futures-executor" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + [[package]] name = "futures-macro" version = "0.3.17" @@ -595,10 +638,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" dependencies = [ "autocfg", + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "proc-macro-hack", @@ -625,6 +671,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "getrandom" version = "0.2.3" @@ -951,7 +1007,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.3.0-alpha.30" +version = "0.3.0-alpha.31" dependencies = [ "actix-form-data", "actix-rt", @@ -975,9 +1031,12 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "tracing-actix-web", + "tracing-bunyan-formatter", + "tracing-error", "tracing-futures", + "tracing-log", "tracing-subscriber", - "uuid", ] [[package]] @@ -1522,6 +1581,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "time" version = "0.2.27" @@ -1630,21 +1699,35 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +checksum = "c2ba9ab62b7d6497a8638dfda5e5c4fb3b2d5a7fca4118f2b96151c8ef1a437e" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", ] [[package]] -name = "tracing-attributes" -version = "0.1.15" +name = "tracing-actix-web" +version = "0.4.0-beta.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" +checksum = "aef43d92080b0429626deba48d01dad848ad515777b373d7a18eac3f129be359" +dependencies = [ + "actix-web", + "futures", + "tracing", + "tracing-futures", + "uuid", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77" dependencies = [ "proc-macro2", "quote", @@ -1652,14 +1735,41 @@ dependencies = [ ] [[package]] -name = "tracing-core" -version = "0.1.19" +name = "tracing-bunyan-formatter" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ca517f43f0fb96e0c3072ed5c275fe5eece87e8cb52f4a77b69226d3b1c9df8" +checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1" +dependencies = [ + "chrono", + "gethostname", + "log", + "serde", + "serde_json", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + +[[package]] +name = "tracing-core" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf" dependencies = [ "lazy_static", ] +[[package]] +name = "tracing-error" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4d7c0b83d4a500748fa5879461652b361edf5c9d51ede2a2ac03875ca185e24" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-futures" version = "0.2.5" @@ -1693,9 +1803,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.20" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe" +checksum = "62af966210b88ad5776ee3ba12d5f35b8d6a2b2a12168f3080cf02b814d7376b" dependencies = [ "ansi_term 0.12.1", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 286b2af..111ece7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.3.0-alpha.30" +version = "0.3.0-alpha.31" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -33,6 +33,10 @@ time = { version = "0.3.0", features = ["serde"] } tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" +tracing-actix-web = { version = "0.4.0-beta.8" } +tracing-bunyan-formatter = "0.2.6" +tracing-error = "0.1.2" tracing-futures = "0.2.4" +tracing-log = "0.1.2" tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] } -uuid = { version = "0.8", features = ["v4"] } +# uuid = { version = "0.8", features = ["v4"] } diff --git a/src/config.rs b/src/config.rs index 71dc896..5fe2f2b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -73,6 +73,9 @@ pub(crate) struct Config { help = "An optional string to be checked on requests to privileged endpoints" )] api_key: Option, + + #[structopt(short, long, help = "Enable json logging for the pict-rs server")] + json_logging: bool, } impl Config { @@ -113,6 +116,10 @@ impl Config { pub(crate) fn api_key(&self) -> Option<&str> { self.api_key.as_deref() } + + pub(crate) fn json_logging(&self) -> bool { + self.json_logging + } } #[derive(Debug, thiserror::Error)] diff --git a/src/error.rs b/src/error.rs index e40c9f4..6fd4451 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,56 @@ -use crate::{ffmpeg::VideoError, magick::MagickError}; use actix_web::{http::StatusCode, HttpResponse, ResponseError}; +use tracing_error::SpanTrace; + +pub(crate) struct Error { + context: SpanTrace, + kind: UploadError, +} + +impl Error { + pub(crate) fn kind(&self) -> &UploadError { + &self.kind + } +} + +impl std::fmt::Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}\n", self.kind) + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}\n", self.kind)?; + std::fmt::Display::fmt(&self.context, f) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.kind.source() + } +} + +impl From for Error +where + UploadError: From, +{ + fn from(error: T) -> Self { + Error { + kind: UploadError::from(error), + context: SpanTrace::capture(), + } + } +} + +impl From> for Error { + fn from(e: sled::transaction::TransactionError) -> Self { + match e { + sled::transaction::TransactionError::Abort(t) => t, + sled::transaction::TransactionError::Storage(e) => e.into(), + } + } +} #[derive(Debug, thiserror::Error)] pub(crate) enum UploadError { @@ -39,6 +90,9 @@ pub(crate) enum UploadError { #[error("Unsupported image format")] UnsupportedFormat, + #[error("Invalid media dimensions")] + Dimensions, + #[error("Unable to download image, bad response {0}")] Download(actix_web::http::StatusCode), @@ -66,11 +120,8 @@ pub(crate) enum UploadError { #[error("Range header not satisfiable")] Range, - #[error("{0}")] - VideoError(#[from] VideoError), - - #[error("{0}")] - MagickError(#[from] MagickError), + #[error("Command failed")] + Status, } impl From for UploadError { @@ -79,18 +130,13 @@ impl From for UploadError { } } -impl From> for UploadError { - fn from(e: sled::transaction::TransactionError) -> Self { - match e { - sled::transaction::TransactionError::Abort(t) => t, - sled::transaction::TransactionError::Storage(e) => e.into(), +impl From> for Error { + fn from(e: actix_form_data::Error) -> Self { + if let actix_form_data::Error::FileFn(e) = e { + return e; } - } -} -impl From for UploadError { - fn from(e: actix_form_data::Error) -> Self { - UploadError::Upload(e.to_string()) + UploadError::Upload(e.to_string()).into() } } @@ -106,12 +152,10 @@ impl From for UploadError { } } -impl ResponseError for UploadError { +impl ResponseError for Error { fn status_code(&self) -> StatusCode { - match self { - UploadError::VideoError(_) - | UploadError::MagickError(_) - | UploadError::DuplicateAlias + match self.kind { + UploadError::DuplicateAlias | UploadError::NoFiles | UploadError::Upload(_) | UploadError::ParseReq(_) => StatusCode::BAD_REQUEST, @@ -126,8 +170,8 @@ impl ResponseError for UploadError { HttpResponse::build(self.status_code()) .content_type("application/json") .body( - serde_json::to_string(&serde_json::json!({ "msg": self.to_string() })) - .unwrap_or_else(|_| r#"{"msg":"Internal Server Error"}"#.to_string()), + serde_json::to_string(&serde_json::json!({ "msg": self.kind.to_string() })) + .unwrap_or_else(|_| r#"{"msg":"Request failed"}"#.to_string()), ) } } diff --git a/src/exiftool.rs b/src/exiftool.rs index 8d784e3..98ff95c 100644 --- a/src/exiftool.rs +++ b/src/exiftool.rs @@ -1,9 +1,9 @@ use crate::stream::Process; use actix_web::web::Bytes; -use tokio::{io::AsyncRead, process::Command}; +use tokio::io::AsyncRead; pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { - let process = Process::spawn(Command::new("exiftool").args(["-all=", "-", "-out", "-"]))?; + let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?; Ok(process.bytes_read(input).unwrap()) } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index f1f873b..8c8a967 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,21 +1,17 @@ -use crate::stream::Process; +use crate::{ + error::{Error, UploadError}, + stream::Process, +}; use actix_web::web::Bytes; use tokio::{io::AsyncRead, process::Command}; - -#[derive(Debug, thiserror::Error)] -pub(crate) enum VideoError { - #[error("Failed to interface with transcode process")] - IO(#[from] std::io::Error), - - #[error("Failed to convert file")] - Status, -} +use tracing::instrument; pub(crate) enum InputFormat { Gif, Mp4, } +#[derive(Debug)] pub(crate) enum ThumbnailFormat { Jpeg, // Webp, @@ -50,55 +46,72 @@ pub(crate) fn to_mp4_bytes( input: Bytes, input_format: InputFormat, ) -> std::io::Result { - let process = Process::spawn(Command::new("ffmpeg").args([ - "-f", - input_format.as_format(), - "-i", - "pipe:", - "-movflags", - "faststart+frag_keyframe+empty_moov", - "-pix_fmt", - "yuv420p", - "-vf", - "scale=trunc(iw/2)*2:trunc(ih/2)*2", - "-an", - "-codec", - "h264", - "-f", - "mp4", - "pipe:", - ]))?; + let process = Process::run( + "ffmpeg", + &[ + "-f", + input_format.as_format(), + "-i", + "pipe:", + "-movflags", + "faststart+frag_keyframe+empty_moov", + "-pix_fmt", + "yuv420p", + "-vf", + "scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-an", + "-codec", + "h264", + "-f", + "mp4", + "pipe:", + ], + )?; Ok(process.bytes_read(input).unwrap()) } +#[instrument(name = "Create video thumbnail", skip(from, to))] pub(crate) async fn thumbnail( from: P1, to: P2, format: ThumbnailFormat, -) -> Result<(), VideoError> +) -> Result<(), Error> where P1: AsRef, P2: AsRef, { - let mut child = Command::new("ffmpeg") - .arg(&"-i") - .arg(&from.as_ref()) - .args([ - "-vframes", - "1", - "-codec", - format.as_codec(), - "-f", - format.as_format(), - ]) - .arg(&to.as_ref()) + let command = "ffmpeg"; + let first_arg = "-i"; + let args = [ + "-vframes", + "1", + "-codec", + format.as_codec(), + "-f", + format.as_format(), + ]; + + tracing::info!( + "Spawning command: {} {} {:?} {:?} {:?}", + command, + first_arg, + from.as_ref(), + args, + to.as_ref() + ); + + let mut child = Command::new(command) + .arg(first_arg) + .arg(from.as_ref()) + .args(args) + .arg(to.as_ref()) .spawn()?; let status = child.wait().await?; if !status.success() { - return Err(VideoError::Status); + return Err(UploadError::Status.into()); } Ok(()) diff --git a/src/magick.rs b/src/magick.rs index 56d9077..7cf5db6 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -1,21 +1,14 @@ -use crate::{config::Format, stream::Process}; +use crate::{ + config::Format, + error::{Error, UploadError}, + stream::Process, +}; use actix_web::web::Bytes; use tokio::{ io::{AsyncRead, AsyncReadExt}, process::Command, }; - -#[derive(Debug, thiserror::Error)] -pub(crate) enum MagickError { - #[error("{0}")] - IO(#[from] std::io::Error), - - #[error("Invalid format")] - Format, - - #[error("Image too large")] - Dimensions, -} +use tracing::instrument; pub(crate) enum ValidInputType { Mp4, @@ -32,19 +25,16 @@ pub(crate) struct Details { } pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { - let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; + let process = Process::run("magick", &["convert", "-", "-strip", "-"])?; Ok(process.bytes_read(input).unwrap()) } -pub(crate) async fn details_bytes(input: Bytes) -> Result { - let process = Process::spawn(Command::new("magick").args([ - "identify", - "-ping", - "-format", - "%w %h | %m\n", - "-", - ]))?; +pub(crate) async fn details_bytes(input: Bytes) -> Result { + let process = Process::run( + "magick", + &["identify", "-ping", "-format", "%w %h | %m\n", "-"], + )?; let mut reader = process.bytes_read(input).unwrap(); @@ -59,22 +49,30 @@ pub(crate) fn convert_bytes_read( input: Bytes, format: Format, ) -> std::io::Result { - let process = Process::spawn(Command::new("magick").args([ - "convert", - "-", - format!("{}:-", format.to_magick_format()).as_str(), - ]))?; + let process = Process::run( + "magick", + &[ + "convert", + "-", + format!("{}:-", format.to_magick_format()).as_str(), + ], + )?; Ok(process.bytes_read(input).unwrap()) } -pub(crate) async fn details

(file: P) -> Result +pub(crate) async fn details

(file: P) -> Result where P: AsRef, { - let output = Command::new("magick") - .args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"]) - .arg(&file.as_ref()) + let command = "magick"; + let args = ["identify", "-ping", "-format", "%w %h | %m\n"]; + let last_arg = file.as_ref(); + + tracing::info!("Spawning command: {} {:?} {:?}", command, args, last_arg); + let output = Command::new(command) + .args(args) + .arg(last_arg) .output() .await?; @@ -83,23 +81,39 @@ where parse_details(s) } -fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { +fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { let mut lines = s.lines(); - let first = lines.next().ok_or(MagickError::Format)?; + let first = lines.next().ok_or(UploadError::UnsupportedFormat)?; let mut segments = first.split('|'); - let dimensions = segments.next().ok_or(MagickError::Format)?.trim(); + let dimensions = segments + .next() + .ok_or(UploadError::UnsupportedFormat)? + .trim(); tracing::debug!("dimensions: {}", dimensions); let mut dims = dimensions.split(' '); - let width = dims.next().ok_or(MagickError::Format)?.trim().parse()?; - let height = dims.next().ok_or(MagickError::Format)?.trim().parse()?; + let width = dims + .next() + .ok_or(UploadError::UnsupportedFormat)? + .trim() + .parse() + .map_err(|_| UploadError::UnsupportedFormat)?; + let height = dims + .next() + .ok_or(UploadError::UnsupportedFormat)? + .trim() + .parse() + .map_err(|_| UploadError::UnsupportedFormat)?; - let format = segments.next().ok_or(MagickError::Format)?.trim(); + let format = segments + .next() + .ok_or(UploadError::UnsupportedFormat)? + .trim(); tracing::debug!("format: {}", format); if !lines.all(|item| item.ends_with(format)) { - return Err(MagickError::Format); + return Err(UploadError::UnsupportedFormat.into()); } let mime_type = match format { @@ -108,7 +122,7 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { "PNG" => mime::IMAGE_PNG, "JPEG" => mime::IMAGE_JPEG, "WEBP" => crate::validate::image_webp(), - _ => return Err(MagickError::Format), + _ => return Err(UploadError::UnsupportedFormat.into()), }; Ok(Details { @@ -118,29 +132,41 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { }) } -pub(crate) async fn input_type_bytes(input: Bytes) -> Result { +pub(crate) async fn input_type_bytes(input: Bytes) -> Result { details_bytes(input).await?.validate_input() } +#[instrument(name = "Spawning process command", skip(input))] pub(crate) fn process_image_write_read( input: impl AsyncRead + Unpin + 'static, args: Vec, format: Format, ) -> std::io::Result { + let command = "magick"; + let convert_args = ["convert", "-"]; + let last_arg = format!("{}:-", format.to_magick_format()); + + tracing::info!( + "Spawning command: {} {:?} {:?} {}", + command, + convert_args, + args, + last_arg + ); let process = Process::spawn( - Command::new("magick") - .args([&"convert", &"-"]) + Command::new(command) + .args(convert_args) .args(args) - .arg(format!("{}:-", format.to_magick_format())), + .arg(last_arg), )?; Ok(process.write_read(input).unwrap()) } impl Details { - fn validate_input(&self) -> Result { + fn validate_input(&self) -> Result { if self.width > crate::CONFIG.max_width() || self.height > crate::CONFIG.max_height() { - return Err(MagickError::Dimensions); + return Err(UploadError::Dimensions.into()); } let input_type = match (self.mime_type.type_(), self.mime_type.subtype()) { @@ -149,15 +175,9 @@ impl Details { (mime::IMAGE, mime::PNG) => ValidInputType::Png, (mime::IMAGE, mime::JPEG) => ValidInputType::Jpeg, (mime::IMAGE, subtype) if subtype.as_str() == "webp" => ValidInputType::Webp, - _ => return Err(MagickError::Format), + _ => return Err(UploadError::UnsupportedFormat.into()), }; Ok(input_type) } } - -impl From for MagickError { - fn from(_: std::num::ParseIntError) -> MagickError { - MagickError::Format - } -} diff --git a/src/main.rs b/src/main.rs index d316606..3e3ae7d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use actix_form_data::{Field, Form, Value}; use actix_web::{ guard, http::header::{CacheControl, CacheDirective, LastModified, ACCEPT_RANGES}, - middleware::Logger, web, App, HttpResponse, HttpResponseBuilder, HttpServer, }; use awc::Client; @@ -28,8 +27,12 @@ use tokio::{ Semaphore, }, }; -use tracing::{debug, error, info, instrument, Span}; -use tracing_subscriber::EnvFilter; +use tracing::{debug, error, info, instrument, subscriber::set_global_default, Span}; +use tracing_actix_web::TracingLogger; +use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; +use tracing_error::ErrorLayer; +use tracing_log::LogTracer; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; mod config; mod error; @@ -40,14 +43,16 @@ mod middleware; mod migrate; mod processor; mod range; +mod root_span_builder; mod stream; mod upload_manager; mod validate; use self::{ config::{Config, Format}, - error::UploadError, - middleware::{Deadline, Internal, Tracing}, + error::{Error, UploadError}, + middleware::{Deadline, Internal}, + root_span_builder::RootSpanBuilder, upload_manager::{Details, UploadManager, UploadManagerSession}, validate::{image_webp, video_mp4}, }; @@ -90,7 +95,7 @@ struct CancelSafeProcessor { impl CancelSafeProcessor where - F: Future> + Unpin, + F: Future> + Unpin, { pub(crate) fn new(path: PathBuf, fut: F) -> Self { let entry = PROCESS_MAP.entry(path.clone()); @@ -117,15 +122,15 @@ where impl Future for CancelSafeProcessor where - F: Future> + Unpin, + F: Future> + Unpin, { - type Output = Result<(Details, web::Bytes), UploadError>; + type Output = Result<(Details, web::Bytes), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if let Some(ref mut rx) = self.receiver { Pin::new(rx) .poll(cx) - .map(|res| res.map_err(|_| UploadError::Canceled)) + .map(|res| res.map_err(|_| UploadError::Canceled.into())) } else { Pin::new(&mut self.fut).poll(cx).map(|res| { let opt = PROCESS_MAP.remove(&self.path); @@ -151,8 +156,8 @@ impl Drop for CancelSafeProcessor { } // try moving a file -#[instrument] -async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { +#[instrument(name = "Moving file")] +async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), Error> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); tokio::fs::create_dir_all(path).await?; @@ -164,7 +169,7 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { return Err(e.into()); } } else { - return Err(UploadError::FileExists); + return Err(UploadError::FileExists.into()); } debug!("Moving {:?} to {:?}", from, to); @@ -173,7 +178,7 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { Ok(()) } -async fn safe_create_parent

(path: P) -> Result<(), UploadError> +async fn safe_create_parent

(path: P) -> Result<(), Error> where P: AsRef, { @@ -186,8 +191,8 @@ where } // Try writing to a file -#[instrument(skip(bytes))] -async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), UploadError> { +#[instrument(name = "Saving file", skip(bytes))] +async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), Error> { if let Some(path) = path.parent() { // create the directory for the file debug!("Creating directory {:?}", path); @@ -240,7 +245,7 @@ pub(crate) fn tmp_file() -> PathBuf { path } -fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> { +fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> { if mime == mime::IMAGE_PNG { Ok(".png") } else if mime == mime::IMAGE_JPEG { @@ -250,16 +255,16 @@ fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> { } else if mime == image_webp() { Ok(".webp") } else { - Err(UploadError::UnsupportedFormat) + Err(UploadError::UnsupportedFormat.into()) } } /// Handle responding to succesful uploads -#[instrument(skip(value, manager))] +#[instrument(name = "Uploaded files", skip(value, manager))] async fn upload( value: Value, manager: web::Data, -) -> Result { +) -> Result { let images = value .map() .and_then(|mut m| m.remove("images")) @@ -319,16 +324,16 @@ struct UrlQuery { } /// download an image from a URL -#[instrument(skip(client, manager))] +#[instrument(name = "Downloading file", skip(client, manager))] async fn download( client: web::Data, manager: web::Data, query: web::Query, -) -> Result { +) -> Result { let mut res = client.get(&query.url).send().await?; if !res.status().is_success() { - return Err(UploadError::Download(res.status())); + return Err(UploadError::Download(res.status()).into()); } let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES); @@ -369,11 +374,11 @@ async fn download( } /// Delete aliases and files -#[instrument(skip(manager))] +#[instrument(name = "Deleting file", skip(manager))] async fn delete( manager: web::Data, path_entries: web::Path<(String, String)>, -) -> Result { +) -> Result { let (alias, token) = path_entries.into_inner(); manager.delete(token, alias).await?; @@ -388,7 +393,7 @@ async fn prepare_process( ext: &str, manager: &UploadManager, whitelist: &Option>, -) -> Result<(Format, String, PathBuf, Vec), UploadError> { +) -> Result<(Format, String, PathBuf, Vec), Error> { let (alias, operations) = query .into_inner() @@ -403,7 +408,7 @@ async fn prepare_process( }); if alias.is_empty() { - return Err(UploadError::MissingFilename); + return Err(UploadError::MissingFilename.into()); } let name = manager.from_alias(alias).await?; @@ -430,12 +435,13 @@ async fn prepare_process( Ok((format, name, thumbnail_path, thumbnail_args)) } +#[instrument(name = "Fetching derived details", skip(manager, whitelist))] async fn process_details( query: web::Query, ext: web::Path, manager: web::Data, whitelist: web::Data>>, -) -> Result { +) -> Result { let (_, name, thumbnail_path, _) = prepare_process(query, ext.as_str(), &manager, &whitelist).await?; @@ -447,14 +453,14 @@ async fn process_details( } /// Process files -#[instrument(skip(manager, whitelist))] +#[instrument(name = "Processing image", skip(manager, whitelist))] async fn process( range: Option, query: web::Query, ext: web::Path, manager: web::Data, whitelist: web::Data>>, -) -> Result { +) -> Result { let (format, name, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &manager, &whitelist).await?; @@ -544,7 +550,7 @@ async fn process( drop(entered); }); - Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError> + Ok((details, bytes)) as Result<(Details, web::Bytes), Error> }; let (details, bytes) = @@ -553,11 +559,11 @@ async fn process( return match range { Some(range_header) => { if !range_header.is_bytes() { - return Err(UploadError::Range); + return Err(UploadError::Range.into()); } if range_header.is_empty() { - Err(UploadError::Range) + Err(UploadError::Range.into()) } else if range_header.len() == 1 { let range = range_header.ranges().next().unwrap(); let content_range = range.to_content_range(bytes.len() as u64); @@ -573,12 +579,12 @@ async fn process( details.system_time(), )) } else { - Err(UploadError::Range) + Err(UploadError::Range.into()) } } None => Ok(srv_response( HttpResponse::Ok(), - once(ready(Ok(bytes) as Result<_, UploadError>)), + once(ready(Ok(bytes) as Result<_, Error>)), details.content_type(), 7 * DAYS, details.system_time(), @@ -600,10 +606,11 @@ async fn process( } /// Fetch file details +#[instrument(name = "Fetching details", skip(manager))] async fn details( alias: web::Path, manager: web::Data, -) -> Result { +) -> Result { let name = manager.from_alias(alias.into_inner()).await?; let mut path = manager.image_dir(); path.push(name.clone()); @@ -624,12 +631,12 @@ async fn details( } /// Serve files -#[instrument(skip(manager))] +#[instrument(name = "Serving file", skip(manager))] async fn serve( range: Option, alias: web::Path, manager: web::Data, -) -> Result { +) -> Result { let name = manager.from_alias(alias.into_inner()).await?; let mut path = manager.image_dir(); path.push(name.clone()); @@ -653,16 +660,16 @@ async fn ranged_file_resp( path: PathBuf, range: Option, details: Details, -) -> Result { +) -> Result { let (builder, stream) = match range { //Range header exists - return as ranged Some(range_header) => { if !range_header.is_bytes() { - return Err(UploadError::Range); + return Err(UploadError::Range.into()); } if range_header.is_empty() { - return Err(UploadError::Range); + return Err(UploadError::Range.into()); } else if range_header.len() == 1 { let file = tokio::fs::File::open(path).await?; @@ -675,7 +682,7 @@ async fn ranged_file_resp( (builder, range.chop_file(file).await?) } else { - return Err(UploadError::Range); + return Err(UploadError::Range.into()); } } //No Range header in the request - return the entire document @@ -727,10 +734,11 @@ enum FileOrAlias { Alias { alias: String }, } +#[instrument(name = "Purging file", skip(upload_manager))] async fn purge( query: web::Query, upload_manager: web::Data, -) -> Result { +) -> Result { let aliases = match query.into_inner() { FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, @@ -748,10 +756,11 @@ async fn purge( }))) } +#[instrument(name = "Fetching aliases", skip(upload_manager))] async fn aliases( query: web::Query, upload_manager: web::Data, -) -> Result { +) -> Result { let aliases = match query.into_inner() { FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, @@ -768,10 +777,11 @@ struct ByAlias { alias: String, } +#[instrument(name = "Fetching filename", skip(upload_manager))] async fn filename_by_alias( query: web::Query, upload_manager: web::Data, -) -> Result { +) -> Result { let filename = upload_manager.from_alias(query.into_inner().alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -782,13 +792,23 @@ async fn filename_by_alias( #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info"); - } + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + LogTracer::init()?; - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); + let subscriber = Registry::default() + .with(env_filter) + .with(ErrorLayer::default()); + + if CONFIG.json_logging() { + let formatting_layer = BunyanFormattingLayer::new("pict-rs".into(), std::io::stdout); + + let subscriber = subscriber.with(JsonStorageLayer).with(formatting_layer); + + set_global_default(subscriber)?; + } else { + let subscriber = subscriber.with(tracing_subscriber::fmt::layer()); + set_global_default(subscriber)?; + }; let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?; @@ -799,7 +819,7 @@ async fn main() -> Result<(), anyhow::Error> { let form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) - .transform_error(|e| UploadError::from(e).into()) + .transform_error(|e| Error::from(e).into()) .field( "images", Field::array(Field::file(move |filename, _, stream| { @@ -828,7 +848,7 @@ async fn main() -> Result<(), anyhow::Error> { let import_form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) - .transform_error(|e| UploadError::from(e).into()) + .transform_error(|e| Error::from(e).into()) .field( "images", Field::array(Field::file(move |filename, content_type, stream| { @@ -858,8 +878,7 @@ async fn main() -> Result<(), anyhow::Error> { .finish(); App::new() - .wrap(Logger::default()) - .wrap(Tracing) + .wrap(TracingLogger::::new()) .wrap(Deadline) .app_data(web::Data::new(manager.clone())) .app_data(web::Data::new(client)) diff --git a/src/middleware.rs b/src/middleware.rs index a346ac4..232e9bd 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -10,8 +10,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tracing_futures::{Instrument, Instrumented}; -use uuid::Uuid; pub(crate) struct Deadline; pub(crate) struct DeadlineMiddleware { @@ -29,12 +27,6 @@ pub(crate) struct DeadlineFuture { inner: DeadlineFutureInner, } -pub(crate) struct Tracing; - -pub(crate) struct TracingMiddleware { - inner: S, -} - pub(crate) struct Internal(pub(crate) Option); pub(crate) struct InternalMiddleware(Option, S); #[derive(Clone, Debug, thiserror::Error)] @@ -171,44 +163,6 @@ where } } -impl Transform for Tracing -where - S: Service, - S::Future: 'static, -{ - type Response = S::Response; - type Error = S::Error; - type InitError = (); - type Transform = TracingMiddleware; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(TracingMiddleware { inner: service })) - } -} - -impl Service for TracingMiddleware -where - S: Service, - S::Future: 'static, -{ - type Response = S::Response; - type Error = S::Error; - type Future = Instrumented; - - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&self, req: Request) -> Self::Future { - let uuid = Uuid::new_v4(); - - self.inner - .call(req) - .instrument(tracing::info_span!("request", ?uuid)) - } -} - impl Transform for Internal where S: Service, diff --git a/src/processor.rs b/src/processor.rs index 4a7c8f1..bf78911 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,8 +1,11 @@ -use crate::{error::UploadError, ffmpeg::ThumbnailFormat}; +use crate::{ + error::{Error, UploadError}, + ffmpeg::ThumbnailFormat, +}; use std::path::{Path, PathBuf}; use tracing::{debug, error, instrument}; -fn ptos(path: &Path) -> Result { +fn ptos(path: &Path) -> Result { Ok(path.to_str().ok_or(UploadError::Path)?.to_owned()) } @@ -297,7 +300,7 @@ impl Exists { pub(crate) async fn prepare_image( original_file: PathBuf, -) -> Result, UploadError> { +) -> Result, Error> { let original_path_str = ptos(&original_file)?; let jpg_path = format!("{}.jpg", original_path_str); let jpg_path = PathBuf::from(jpg_path); @@ -317,11 +320,13 @@ pub(crate) async fn prepare_image( if let Err(e) = res { error!("transcode error: {:?}", e); tokio::fs::remove_file(&tmpfile).await?; - return Err(e.into()); + return Err(e); } return match crate::safe_move_file(tmpfile, jpg_path.clone()).await { - Err(UploadError::FileExists) => Ok(Some((jpg_path, Exists::Exists))), + Err(e) if matches!(e.kind(), UploadError::FileExists) => { + Ok(Some((jpg_path, Exists::Exists))) + } Err(e) => Err(e), _ => Ok(Some((jpg_path, Exists::New))), }; diff --git a/src/range.rs b/src/range.rs index e808e28..8828067 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,4 +1,7 @@ -use crate::{stream::bytes_stream, UploadError}; +use crate::{ + error::{Error, UploadError}, + stream::bytes_stream, +}; use actix_web::{ dev::Payload, http::{ @@ -46,7 +49,7 @@ impl Range { pub(crate) fn chop_bytes( &self, bytes: Bytes, - ) -> impl Stream> + Unpin { + ) -> impl Stream> + Unpin { match self { Range::Start(start) => once(ready(Ok(bytes.slice(*start as usize..)))), Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))), @@ -59,7 +62,7 @@ impl Range { pub(crate) async fn chop_file( &self, mut file: tokio::fs::File, - ) -> Result>, UploadError> { + ) -> Result>, Error> { match self { Range::Start(start) => { file.seek(io::SeekFrom::Start(*start)).await?; @@ -102,14 +105,14 @@ impl RangeHeader { impl FromRequest for RangeHeader { type Config = (); - type Error = actix_web::Error; + type Error = Error; type Future = std::future::Ready>; fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { if let Some(range_head) = req.headers().get("Range") { ready(parse_range_header(range_head).map_err(|e| { tracing::warn!("Failed to parse range header: {}", e); - e.into() + e })) } else { ready(Err(UploadError::ParseReq( @@ -120,7 +123,7 @@ impl FromRequest for RangeHeader { } } -fn parse_range_header(range_head: &HeaderValue) -> Result { +fn parse_range_header(range_head: &HeaderValue) -> Result { let range_head_str = range_head.to_str().map_err(|_| { UploadError::ParseReq("Range header contains non-utf8 characters".to_string()) })?; @@ -135,7 +138,7 @@ fn parse_range_header(range_head: &HeaderValue) -> Result, UploadError>>()?; + .collect::, Error>>()?; Ok(RangeHeader { unit: unit.to_owned(), @@ -143,7 +146,7 @@ fn parse_range_header(range_head: &HeaderValue) -> Result Result { +fn parse_range(s: &str) -> Result { let dash_pos = s .find('-') .ok_or_else(|| UploadError::ParseReq("Mailformed Range Bound".to_string()))?; @@ -153,7 +156,7 @@ fn parse_range(s: &str) -> Result { let end = end.trim_start_matches('-').trim(); if start.is_empty() && end.is_empty() { - Err(UploadError::ParseReq("Malformed content range".to_string())) + Err(UploadError::ParseReq("Malformed content range".to_string()).into()) } else if start.is_empty() { let suffix_length = end.parse().map_err(|_| { UploadError::ParseReq("Cannot parse suffix length for range header".to_string()) @@ -175,7 +178,7 @@ fn parse_range(s: &str) -> Result { })?; if range_start > range_end { - return Err(UploadError::Range); + return Err(UploadError::Range.into()); } Ok(Range::Segment(range_start, range_end)) diff --git a/src/root_span_builder.rs b/src/root_span_builder.rs new file mode 100644 index 0000000..f403d9e --- /dev/null +++ b/src/root_span_builder.rs @@ -0,0 +1,46 @@ +use actix_web::{ + dev::{ServiceRequest, ServiceResponse}, + Error, +}; +use tracing::Span; +use tracing_actix_web::root_span; + +pub struct RootSpanBuilder; + +impl tracing_actix_web::RootSpanBuilder for RootSpanBuilder { + fn on_request_start(request: &ServiceRequest) -> Span { + root_span!(request) + } + + fn on_request_end(span: Span, outcome: &Result, Error>) { + match &outcome { + Ok(response) => { + if let Some(error) = response.response().error() { + handle_error(span, error) + } else { + span.record("http.status_code", &response.response().status().as_u16()); + span.record("otel.status_code", &"OK"); + } + } + Err(error) => handle_error(span, error), + } + } +} + +fn handle_error(span: Span, error: &Error) { + let response_error = error.as_response_error(); + + let display = format!("{}", response_error); + let debug = format!("{:?}", response_error); + span.record("exception.message", &tracing::field::display(display)); + span.record("exception.details", &tracing::field::display(debug)); + + let status_code = response_error.status_code(); + span.record("http.status_code", &status_code.as_u16()); + + if status_code.is_client_error() { + span.record("otel.status_code", &"OK"); + } else { + span.record("otel.status_code", &"ERROR"); + } +} diff --git a/src/stream.rs b/src/stream.rs index f3316f8..b6e95ba 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,37 +1,51 @@ -use crate::error::UploadError; +use crate::error::Error; +use actix_rt::task::JoinHandle; use actix_web::web::{Bytes, BytesMut}; use futures_util::Stream; use std::{ future::Future, pin::Pin, + process::Stdio, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::{ + io::{AsyncRead, AsyncWriteExt, ReadBuf}, + process::{Child, Command}, + sync::oneshot::{channel, Receiver}, +}; +use tokio_util::codec::{BytesCodec, FramedRead}; +use tracing::instrument; #[derive(Debug)] struct StatusError; pub(crate) struct Process { - child: tokio::process::Child, + child: Child, } pub(crate) struct ProcessRead { inner: I, - err_recv: tokio::sync::oneshot::Receiver, + err_recv: Receiver, err_closed: bool, - handle: actix_rt::task::JoinHandle<()>, + handle: JoinHandle<()>, } struct BytesFreezer(S); impl Process { - fn new(child: tokio::process::Child) -> Self { + fn new(child: Child) -> Self { Process { child } } - pub(crate) fn spawn(cmd: &mut tokio::process::Command) -> std::io::Result { - cmd.stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) + #[instrument(name = "Spawning command")] + pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result { + tracing::info!("Spawning"); + Self::spawn(Command::new(command).args(args)) + } + + pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result { + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) .spawn() .map(Process::new) } @@ -40,7 +54,7 @@ impl Process { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = channel(); let mut child = self.child; @@ -79,7 +93,7 @@ impl Process { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = channel(); let mut child = self.child; @@ -114,11 +128,8 @@ impl Process { pub(crate) fn bytes_stream( input: impl AsyncRead + Unpin, -) -> impl Stream> + Unpin { - BytesFreezer(tokio_util::codec::FramedRead::new( - input, - tokio_util::codec::BytesCodec::new(), - )) +) -> impl Stream> + Unpin { + BytesFreezer(FramedRead::new(input, BytesCodec::new())) } impl AsyncRead for ProcessRead @@ -157,13 +168,13 @@ impl Stream for BytesFreezer where S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0) .poll_next(cx) .map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze()))) - .map_err(UploadError::from) + .map_err(Error::from) } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 806ad50..3ff6672 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -1,6 +1,6 @@ use crate::{ config::Format, - error::UploadError, + error::{Error, UploadError}, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, to_ext, }; @@ -98,7 +98,7 @@ where } } - async fn finalize_reset(self) -> Result { + async fn finalize_reset(self) -> Result { let mut hasher = self.hasher; let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; Ok(hash) @@ -194,7 +194,7 @@ pub(crate) struct Details { } impl Details { - pub(crate) async fn from_bytes(input: web::Bytes) -> Result { + pub(crate) async fn from_bytes(input: web::Bytes) -> Result { let details = crate::magick::details_bytes(input).await?; Ok(Details::now( @@ -204,7 +204,7 @@ impl Details { )) } - pub(crate) async fn from_path

(path: P) -> Result + pub(crate) async fn from_path

(path: P) -> Result where P: AsRef, { @@ -285,10 +285,7 @@ impl UploadManager { } /// Create a new UploadManager - pub(crate) async fn new( - mut root_dir: PathBuf, - format: Option, - ) -> Result { + pub(crate) async fn new(mut root_dir: PathBuf, format: Option) -> Result { let root_clone = root_dir.clone(); // sled automatically creates it's own directories let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??; @@ -313,11 +310,7 @@ impl UploadManager { /// Store the path to a generated image variant so we can easily clean it up later #[instrument(skip(self))] - pub(crate) async fn store_variant( - &self, - path: PathBuf, - filename: String, - ) -> Result<(), UploadError> { + pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> { let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); let fname_tree = self.inner.filename_tree.clone(); @@ -340,7 +333,7 @@ impl UploadManager { &self, path: PathBuf, filename: String, - ) -> Result, UploadError> { + ) -> Result, Error> { let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); let fname_tree = self.inner.filename_tree.clone(); @@ -369,7 +362,7 @@ impl UploadManager { path: PathBuf, filename: String, details: &Details, - ) -> Result<(), UploadError> { + ) -> Result<(), Error> { let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); let fname_tree = self.inner.filename_tree.clone(); @@ -389,10 +382,7 @@ impl UploadManager { } /// Get a list of aliases for a given file - pub(crate) async fn aliases_by_filename( - &self, - filename: String, - ) -> Result, UploadError> { + pub(crate) async fn aliases_by_filename(&self, filename: String) -> Result, Error> { let fname_tree = self.inner.filename_tree.clone(); let hash = web::block(move || fname_tree.get(filename.as_bytes())) .await?? @@ -402,7 +392,7 @@ impl UploadManager { } /// Get a list of aliases for a given alias - pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result, UploadError> { + pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result, Error> { let alias_tree = self.inner.alias_tree.clone(); let hash = web::block(move || alias_tree.get(alias.as_bytes())) .await?? @@ -411,7 +401,7 @@ impl UploadManager { self.aliases_by_hash(&hash).await } - async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result, UploadError> { + async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result, Error> { let (start, end) = alias_key_bounds(hash); let main_tree = self.inner.main_tree.clone(); let aliases = web::block(move || { @@ -436,7 +426,7 @@ impl UploadManager { } /// Delete an alias without a delete token - pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), UploadError> { + pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), Error> { let token_key = delete_key(&alias); let alias_tree = self.inner.alias_tree.clone(); let token = web::block(move || alias_tree.get(token_key.as_bytes())) @@ -448,7 +438,7 @@ impl UploadManager { /// Delete the alias, and the file & variants if no more aliases exist #[instrument(skip(self, alias, token))] - pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> { + pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), Error> { use sled::Transactional; let main_tree = self.inner.main_tree.clone(); let alias_tree = self.inner.alias_tree.clone(); @@ -478,7 +468,7 @@ impl UploadManager { let id = alias_tree .remove(alias_id_key(&alias2).as_bytes())? .ok_or_else(|| trans_err(UploadError::MissingAlias))?; - let id = String::from_utf8(id.to_vec()).map_err(|e| trans_err(e.into()))?; + let id = String::from_utf8(id.to_vec()).map_err(trans_err)?; // -- GET HASH FOR HASH TREE CLEANUP -- debug!("Deleting alias -> hash mapping"); @@ -498,13 +488,13 @@ impl UploadManager { self.check_delete_files(hash).await } - async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), UploadError> { + async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), Error> { // -- CHECK IF ANY OTHER ALIASES EXIST -- let main_tree = self.inner.main_tree.clone(); let (start, end) = alias_key_bounds(&hash); debug!("Checking for additional aliases referencing hash"); let any_aliases = web::block(move || { - Ok(main_tree.range(start..end).next().is_some()) as Result + Ok(main_tree.range(start..end).next().is_some()) as Result }) .await??; @@ -546,7 +536,7 @@ impl UploadManager { /// Fetch the real on-disk filename given an alias #[instrument(skip(self))] - pub(crate) async fn from_alias(&self, alias: String) -> Result { + pub(crate) async fn from_alias(&self, alias: String) -> Result { let tree = self.inner.alias_tree.clone(); debug!("Getting hash from alias"); let hash = web::block(move || tree.get(alias.as_bytes())) @@ -574,7 +564,7 @@ impl UploadManager { // Find image variants and remove them from the DB and the disk #[instrument(skip(self))] - async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> { + async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> { let filename = filename.inner; let mut path = self.image_dir(); let fname = String::from_utf8(filename.to_vec())?; @@ -601,7 +591,7 @@ impl UploadManager { keys.push(key?.to_owned()); } - Ok(keys) as Result, UploadError> + Ok(keys) as Result, Error> }) .await??; @@ -631,7 +621,7 @@ impl UploadManager { impl UploadManagerSession { /// Generate a delete token for an alias #[instrument(skip(self))] - pub(crate) async fn delete_token(&self) -> Result { + pub(crate) async fn delete_token(&self) -> Result { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; debug!("Generating delete token"); @@ -679,9 +669,9 @@ impl UploadManagerSession { content_type: mime::Mime, validate: bool, mut stream: UploadStream, - ) -> Result + ) -> Result where - UploadError: From, + Error: From, E: Unpin + 'static, { let mut bytes_mut = actix_web::web::BytesMut::new(); @@ -718,12 +708,9 @@ impl UploadManagerSession { /// Upload the file, discarding bytes if it's already present, or saving if it's new #[instrument(skip(self, stream))] - pub(crate) async fn upload( - mut self, - mut stream: UploadStream, - ) -> Result + pub(crate) async fn upload(mut self, mut stream: UploadStream) -> Result where - UploadError: From, + Error: From, { let mut bytes_mut = actix_web::web::BytesMut::new(); @@ -762,7 +749,7 @@ impl UploadManagerSession { tmpfile: PathBuf, hash: Hash, content_type: mime::Mime, - ) -> Result<(), UploadError> { + ) -> Result<(), Error> { let (dup, name) = self.check_duplicate(hash, content_type).await?; // bail early with alias to existing file if this is a duplicate @@ -786,7 +773,7 @@ impl UploadManagerSession { &self, hash: Hash, content_type: mime::Mime, - ) -> Result<(Dup, String), UploadError> { + ) -> Result<(Dup, String), Error> { let main_tree = self.manager.inner.main_tree.clone(); let filename = self.next_file(content_type).await?; @@ -822,7 +809,7 @@ impl UploadManagerSession { // generate a short filename that isn't already in-use #[instrument(skip(self, content_type))] - async fn next_file(&self, content_type: mime::Mime) -> Result { + async fn next_file(&self, content_type: mime::Mime) -> Result { let image_dir = self.manager.image_dir(); use rand::distributions::{Alphanumeric, Distribution}; let mut limit: usize = 10; @@ -855,7 +842,7 @@ impl UploadManagerSession { } #[instrument(skip(self, hash, alias))] - async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { + async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> { self.save_alias_hash_mapping(hash, alias).await??; self.store_hash_id_alias_mapping(hash, alias).await?; @@ -867,11 +854,7 @@ impl UploadManagerSession { // // This will help if multiple 'users' upload the same file, and one of them wants to delete it #[instrument(skip(self, hash, content_type))] - async fn add_alias( - &mut self, - hash: &Hash, - content_type: mime::Mime, - ) -> Result<(), UploadError> { + async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> { let alias = self.next_alias(hash, content_type).await?; self.store_hash_id_alias_mapping(hash, &alias).await?; @@ -883,11 +866,7 @@ impl UploadManagerSession { // // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files #[instrument(skip(self, hash))] - async fn store_hash_id_alias_mapping( - &self, - hash: &Hash, - alias: &str, - ) -> Result<(), UploadError> { + async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> { let alias = alias.to_string(); loop { debug!("hash -> alias save loop"); @@ -921,11 +900,7 @@ impl UploadManagerSession { // Generate an alias to the file #[instrument(skip(self, hash, content_type))] - async fn next_alias( - &mut self, - hash: &Hash, - content_type: mime::Mime, - ) -> Result { + async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result { use rand::distributions::{Alphanumeric, Distribution}; let mut limit: usize = 10; let mut rng = rand::thread_rng(); @@ -956,7 +931,7 @@ impl UploadManagerSession { &self, hash: &Hash, alias: &str, - ) -> Result, UploadError> { + ) -> Result, Error> { let tree = self.manager.inner.alias_tree.clone(); let vec = hash.inner.clone(); let alias = alias.to_string(); @@ -969,7 +944,7 @@ impl UploadManagerSession { if res.is_err() { warn!("Duplicate alias"); - return Ok(Err(UploadError::DuplicateAlias)); + return Ok(Err(UploadError::DuplicateAlias.into())); } Ok(Ok(())) @@ -980,7 +955,7 @@ impl UploadManagerSession { pub(crate) async fn safe_save_reader( to: PathBuf, input: &mut (impl AsyncRead + Unpin), -) -> Result<(), UploadError> { +) -> Result<(), Error> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); tokio::fs::create_dir_all(path.to_owned()).await?; @@ -992,7 +967,7 @@ pub(crate) async fn safe_save_reader( return Err(e.into()); } } else { - return Err(UploadError::FileExists); + return Err(UploadError::FileExists.into()); } debug!("Writing stream to {:?}", to); @@ -1008,9 +983,9 @@ pub(crate) async fn safe_save_reader( pub(crate) async fn safe_save_stream( to: PathBuf, mut stream: UploadStream, -) -> Result<(), UploadError> +) -> Result<(), Error> where - UploadError: From, + Error: From, E: Unpin, { if let Some(path) = to.parent() { @@ -1024,7 +999,7 @@ where return Err(e.into()); } } else { - return Err(UploadError::FileExists); + return Err(UploadError::FileExists.into()); } debug!("Writing stream to {:?}", to); @@ -1050,17 +1025,20 @@ where Ok(()) } -async fn remove_path(path: sled::IVec) -> Result<(), UploadError> { +async fn remove_path(path: sled::IVec) -> Result<(), Error> { let path_string = String::from_utf8(path.to_vec())?; tokio::fs::remove_file(path_string).await?; Ok(()) } -fn trans_err(e: UploadError) -> sled::transaction::ConflictableTransactionError { - sled::transaction::ConflictableTransactionError::Abort(e) +fn trans_err(e: E) -> sled::transaction::ConflictableTransactionError +where + Error: From, +{ + sled::transaction::ConflictableTransactionError::Abort(e.into()) } -fn file_name(name: String, content_type: mime::Mime) -> Result { +fn file_name(name: String, content_type: mime::Mime) -> Result { Ok(format!("{}{}", name, to_ext(content_type)?)) } diff --git a/src/validate.rs b/src/validate.rs index 9c0896b..d8c6d24 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -1,6 +1,7 @@ -use crate::{config::Format, error::UploadError, ffmpeg::InputFormat, magick::ValidInputType}; +use crate::{config::Format, error::Error, ffmpeg::InputFormat, magick::ValidInputType}; use actix_web::web::Bytes; use tokio::io::AsyncRead; +use tracing::instrument; pub(crate) fn image_webp() -> mime::Mime { "image/webp".parse().unwrap() @@ -10,36 +11,43 @@ pub(crate) fn video_mp4() -> mime::Mime { "video/mp4".parse().unwrap() } +#[instrument(name = "Validate image", skip(bytes))] pub(crate) async fn validate_image_bytes( bytes: Bytes, prescribed_format: Option, -) -> Result<(mime::Mime, Box), UploadError> { +) -> Result<(mime::Mime, Box), Error> { let input_type = crate::magick::input_type_bytes(bytes.clone()).await?; match (prescribed_format, input_type) { (_, ValidInputType::Gif) => Ok(( video_mp4(), - Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?), + Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif)?) + as Box, )), (_, ValidInputType::Mp4) => Ok(( video_mp4(), - Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?), + Box::new(crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4)?) + as Box, )), (Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok(( mime::IMAGE_JPEG, - Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?), + Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?) + as Box, )), (Some(Format::Png) | None, ValidInputType::Png) => Ok(( mime::IMAGE_PNG, - Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?), + Box::new(crate::exiftool::clear_metadata_bytes_read(bytes)?) + as Box, )), (Some(Format::Webp) | None, ValidInputType::Webp) => Ok(( image_webp(), - Box::new(crate::magick::clear_metadata_bytes_read(bytes)?), + Box::new(crate::magick::clear_metadata_bytes_read(bytes)?) + as Box, )), (Some(format), _) => Ok(( format.to_mime(), - Box::new(crate::magick::convert_bytes_read(bytes, format)?), + Box::new(crate::magick::convert_bytes_read(bytes, format)?) + as Box, )), } }