2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

Replace log with tracing, only hardcode 'info' if RUST_LOG is empty

This commit is contained in:
asonix 2020-06-13 21:28:06 -05:00
parent 0292e3d74b
commit 68721fb269
7 changed files with 344 additions and 80 deletions

180
Cargo.lock generated
View file

@ -108,7 +108,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sha-1", "sha-1",
"slab", "slab",
"time", "time 0.2.16",
] ]
[[package]] [[package]]
@ -299,7 +299,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"socket2", "socket2",
"time", "time 0.2.16",
"tinyvec", "tinyvec",
"url", "url",
] ]
@ -553,6 +553,17 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "chrono"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
dependencies = [
"num-integer",
"num-traits",
"time 0.1.43",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.33.1" version = "2.33.1"
@ -724,19 +735,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "fake-simd" name = "fake-simd"
version = "0.1.2" version = "0.1.2"
@ -1002,15 +1000,6 @@ version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.2.0" version = "0.2.0"
@ -1177,6 +1166,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "matches" name = "matches"
version = "0.1.8" version = "0.1.8"
@ -1381,11 +1379,9 @@ dependencies = [
"actix-web", "actix-web",
"anyhow", "anyhow",
"bytes", "bytes",
"env_logger",
"futures", "futures",
"gif", "gif",
"image", "image",
"log",
"mime", "mime",
"rand", "rand",
"serde", "serde",
@ -1394,6 +1390,9 @@ dependencies = [
"sled", "sled",
"structopt", "structopt",
"thiserror", "thiserror",
"tracing",
"tracing-futures",
"tracing-subscriber",
] ]
[[package]] [[package]]
@ -1591,6 +1590,16 @@ dependencies = [
"thread_local", "thread_local",
] ]
[[package]]
name = "regex-automata"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
"regex-syntax",
]
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.6.18" version = "0.6.18"
@ -1766,6 +1775,15 @@ dependencies = [
"opaque-debug", "opaque-debug",
] ]
[[package]]
name = "sharded-slab"
version = "0.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.2.0" version = "1.2.0"
@ -1933,15 +1951,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "termcolor"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
@ -2000,6 +2009,16 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi 0.3.8",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.2.16" version = "0.2.16"
@ -2105,6 +2124,88 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tracing"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41f40ed0e162c911ac6fcb53ecdc8134c46905fdbbae8c50add462a538b495f"
dependencies = [
"cfg-if",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715"
dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-futures"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]] [[package]]
name = "trust-dns-proto" name = "trust-dns-proto"
version = "0.19.5" version = "0.19.5"
@ -2355,15 +2456,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi 0.3.8",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View file

@ -17,11 +17,9 @@ actix-rt = "1.1.1"
actix-web = { version = "3.0.0-alpha.2", features = ["rustls"] } actix-web = { version = "3.0.0-alpha.2", features = ["rustls"] }
anyhow = "1.0" anyhow = "1.0"
bytes = "0.5" bytes = "0.5"
env_logger = "0.7"
futures = "0.3.4" futures = "0.3.4"
gif = "0.10.3" gif = "0.10.3"
image = "0.23.4" image = "0.23.4"
log = "0.4"
mime = "0.3.1" mime = "0.3.1"
rand = "0.7.3" rand = "0.7.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
@ -30,3 +28,6 @@ sha2 = "0.9.0"
sled = "0.32.0-rc1" sled = "0.32.0-rc1"
structopt = "0.3.14" structopt = "0.3.14"
thiserror = "1.0" thiserror = "1.0"
tracing = "0.1.15"
tracing-futures = "0.2.4"
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }

View file

@ -7,17 +7,24 @@ use actix_web::{
web, App, HttpResponse, HttpServer, web, App, HttpResponse, HttpServer,
}; };
use futures::stream::{Stream, TryStreamExt}; use futures::stream::{Stream, TryStreamExt};
use log::{error, info};
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use structopt::StructOpt; use structopt::StructOpt;
use tracing::{debug, error, info, instrument};
use tracing_subscriber::EnvFilter;
mod config; mod config;
mod error; mod error;
mod middleware;
mod processor; mod processor;
mod upload_manager; mod upload_manager;
mod validate; mod validate;
use self::{config::Config, error::UploadError, upload_manager::UploadManager}; use self::{
config::Config,
error::UploadError,
middleware::Tracing,
upload_manager::{UploadManager, UploadStream},
};
const MEGABYTES: usize = 1024 * 1024; const MEGABYTES: usize = 1024 * 1024;
const HOURS: u32 = 60 * 60; const HOURS: u32 = 60 * 60;
@ -26,10 +33,12 @@ const HOURS: u32 = 60 * 60;
async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> { async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> {
if let Some(path) = path.parent() { if let Some(path) = path.parent() {
// create the directory for the file // create the directory for the file
debug!("Creating directory");
actix_fs::create_dir_all(path.to_owned()).await?; actix_fs::create_dir_all(path.to_owned()).await?;
} }
// Only write the file if it doesn't already exist // Only write the file if it doesn't already exist
debug!("Checking if file already exists");
if let Err(e) = actix_fs::metadata(path.clone()).await { if let Err(e) = actix_fs::metadata(path.clone()).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != Some(std::io::ErrorKind::NotFound) {
return Err(e.into()); return Err(e.into());
@ -39,15 +48,18 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload
} }
// Open the file for writing // Open the file for writing
debug!("Creating file");
let file = actix_fs::file::create(path.clone()).await?; let file = actix_fs::file::create(path.clone()).await?;
// try writing // try writing
debug!("Writing to file");
if let Err(e) = actix_fs::file::write(file, bytes).await { if let Err(e) = actix_fs::file::write(file, bytes).await {
error!("Error writing file, {}", e); error!("Error writing file, {}", e);
// remove file if writing failed before completion // remove file if writing failed before completion
actix_fs::remove_file(path).await?; actix_fs::remove_file(path).await?;
return Err(e.into()); return Err(e.into());
} }
debug!("File written");
Ok(()) Ok(())
} }
@ -74,6 +86,7 @@ fn from_ext(ext: std::ffi::OsString) -> mime::Mime {
} }
/// Handle responding to succesful uploads /// Handle responding to succesful uploads
#[instrument]
async fn upload( async fn upload(
value: Value, value: Value,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
@ -123,7 +136,7 @@ async fn download(
let stream = Box::pin(futures::stream::once(fut)); let stream = Box::pin(futures::stream::once(fut));
let alias = manager.upload(stream).await?; let alias = manager.upload(UploadStream::new(stream)).await?;
let delete_token = manager.delete_token(alias.clone()).await?; let delete_token = manager.delete_token(alias.clone()).await?;
Ok(HttpResponse::Created().json(serde_json::json!({ Ok(HttpResponse::Created().json(serde_json::json!({
@ -147,6 +160,7 @@ async fn delete(
} }
/// Serve files /// Serve files
#[instrument]
async fn serve( async fn serve(
segments: web::Path<String>, segments: web::Path<String>,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
@ -159,7 +173,9 @@ async fn serve(
.collect(); .collect();
let alias = segments.pop().ok_or(UploadError::MissingFilename)?; let alias = segments.pop().ok_or(UploadError::MissingFilename)?;
debug!("Building chain");
let chain = self::processor::build_chain(&segments, whitelist.as_ref().as_ref()); let chain = self::processor::build_chain(&segments, whitelist.as_ref().as_ref());
debug!("Chain built");
let name = manager.from_alias(alias).await?; let name = manager.from_alias(alias).await?;
let base = manager.image_dir(); let base = manager.image_dir();
@ -184,23 +200,32 @@ async fn serve(
// Read the image file & produce a DynamicImage // Read the image file & produce a DynamicImage
// //
// Drop bytes so we don't keep it around in memory longer than we need to // Drop bytes so we don't keep it around in memory longer than we need to
debug!("Reading image");
let (img, format) = { let (img, format) = {
let bytes = actix_fs::read(original_path.clone()).await?; let bytes = actix_fs::read(original_path.clone()).await?;
let format = image::guess_format(&bytes)?; let bytes2 = bytes.clone();
let format = web::block(move || image::guess_format(&bytes2)).await?;
let img = web::block(move || image::load_from_memory(&bytes)).await?; let img = web::block(move || image::load_from_memory(&bytes)).await?;
(img, format) (img, format)
}; };
debug!("Image read");
let img = self::processor::process_image(chain, img).await?; debug!("Processing image");
let img = self::processor::process_image(chain, img.into())
.await?
.inner;
debug!("Image processed");
// perform thumbnail operation in a blocking thread // perform thumbnail operation in a blocking thread
debug!("Exporting image");
let img_bytes: bytes::Bytes = web::block(move || { let img_bytes: bytes::Bytes = web::block(move || {
let mut bytes = std::io::Cursor::new(vec![]); let mut bytes = std::io::Cursor::new(vec![]);
img.write_to(&mut bytes, format)?; img.write_to(&mut bytes, format)?;
Ok(bytes::Bytes::from(bytes.into_inner())) as Result<_, image::error::ImageError> Ok(bytes::Bytes::from(bytes.into_inner())) as Result<_, image::error::ImageError>
}) })
.await?; .await?;
debug!("Image exported");
let path2 = path.clone(); let path2 = path.clone();
let img_bytes2 = img_bytes.clone(); let img_bytes2 = img_bytes.clone();
@ -254,8 +279,15 @@ struct UrlQuery {
#[actix_rt::main] #[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> Result<(), anyhow::Error> {
let config = Config::from_args(); let config = Config::from_args();
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info"); std::env::set_var("RUST_LOG", "info");
env_logger::init(); }
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let manager = UploadManager::new(config.data_dir(), config.format()).await?; let manager = UploadManager::new(config.data_dir(), config.format()).await?;
// Create a new Multipart Form validator // Create a new Multipart Form validator
@ -272,7 +304,7 @@ async fn main() -> Result<(), anyhow::Error> {
let manager = manager2.clone(); let manager = manager2.clone();
async move { async move {
manager.upload(stream).await.map(|alias| { manager.upload(stream.into()).await.map(|alias| {
let mut path = PathBuf::new(); let mut path = PathBuf::new();
path.push(alias); path.push(alias);
Some(path) Some(path)
@ -297,7 +329,7 @@ async fn main() -> Result<(), anyhow::Error> {
async move { async move {
manager manager
.import(filename, content_type, validate_imports, stream) .import(filename, content_type, validate_imports, stream.into())
.await .await
.map(|alias| { .map(|alias| {
let mut path = PathBuf::new(); let mut path = PathBuf::new();
@ -317,8 +349,9 @@ async fn main() -> Result<(), anyhow::Error> {
let config = config2.clone(); let config = config2.clone();
App::new() App::new()
.wrap(Logger::default())
.wrap(Compress::default()) .wrap(Compress::default())
.wrap(Logger::default())
.wrap(Tracing)
.data(manager.clone()) .data(manager.clone())
.data(client) .data(client)
.data(config.filter_whitelist()) .data(config.filter_whitelist())

48
src/middleware.rs Normal file
View file

@ -0,0 +1,48 @@
use actix_web::dev::{Service, Transform};
use futures::future::{ok, Ready};
use std::task::{Context, Poll};
use tracing_futures::{Instrument, Instrumented};
pub(crate) struct Tracing;
pub(crate) struct TracingMiddleware<S> {
inner: S,
}
impl<S> Transform<S> for Tracing
where
S: Service,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = TracingMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(TracingMiddleware { inner: service })
}
}
impl<S> Service for TracingMiddleware<S>
where
S: Service,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = Instrumented<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: S::Request) -> Self::Future {
self.inner
.call(req)
.instrument(tracing::info_span!("request"))
}
}

View file

@ -1,8 +1,8 @@
use crate::error::UploadError; use crate::error::UploadError;
use actix_web::web; use actix_web::web;
use image::{DynamicImage, GenericImageView}; use image::{DynamicImage, GenericImageView};
use log::debug;
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use tracing::{debug, instrument};
pub(crate) trait Processor { pub(crate) trait Processor {
fn name() -> &'static str fn name() -> &'static str
@ -141,11 +141,40 @@ macro_rules! parse {
}}; }};
} }
pub(crate) fn build_chain( pub(crate) struct ProcessChain {
args: &[String], inner: Vec<Box<dyn Processor + Send>>,
whitelist: Option<&HashSet<String>>, }
) -> Vec<Box<dyn Processor + Send>> {
args.into_iter() impl std::fmt::Debug for ProcessChain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ProcessChain")
.field("inner", &format!("{} operations", self.inner.len()))
.finish()
}
}
pub(crate) struct ImageWrapper {
pub(crate) inner: DynamicImage,
}
impl From<DynamicImage> for ImageWrapper {
fn from(inner: DynamicImage) -> Self {
ImageWrapper { inner }
}
}
impl std::fmt::Debug for ImageWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ImageWrapper")
.field("inner", &"DynamicImage".to_string())
.finish()
}
}
#[instrument]
pub(crate) fn build_chain(args: &[String], whitelist: Option<&HashSet<String>>) -> ProcessChain {
let inner = args
.into_iter()
.filter_map(|arg| { .filter_map(|arg| {
parse!(Identity, arg.as_str(), whitelist); parse!(Identity, arg.as_str(), whitelist);
parse!(Thumbnail, arg.as_str(), whitelist); parse!(Thumbnail, arg.as_str(), whitelist);
@ -155,27 +184,30 @@ pub(crate) fn build_chain(
None None
}) })
.collect() .collect();
ProcessChain { inner }
} }
pub(crate) fn build_path( pub(crate) fn build_path(base: PathBuf, chain: &ProcessChain, filename: String) -> PathBuf {
base: PathBuf, let mut path = chain
args: &[Box<dyn Processor + Send>], .inner
filename: String, .iter()
) -> PathBuf { .fold(base, |acc, processor| processor.path(acc));
let mut path = args.iter().fold(base, |acc, processor| processor.path(acc));
path.push(filename); path.push(filename);
path path
} }
#[instrument]
pub(crate) async fn process_image( pub(crate) async fn process_image(
args: Vec<Box<dyn Processor + Send>>, chain: ProcessChain,
mut img: DynamicImage, img: ImageWrapper,
) -> Result<DynamicImage, UploadError> { ) -> Result<ImageWrapper, UploadError> {
for processor in args.into_iter() { let mut inner = img.inner;
img = web::block(move || processor.process(img)).await?; for processor in chain.inner.into_iter() {
inner = web::block(move || processor.process(inner)).await?;
} }
Ok(img) Ok(inner.into())
} }

View file

@ -1,9 +1,9 @@
use crate::{config::Format, error::UploadError, safe_save_file, to_ext, validate::validate_image}; use crate::{config::Format, error::UploadError, safe_save_file, to_ext, validate::validate_image};
use actix_web::web; use actix_web::web;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use log::{error, warn};
use sha2::Digest; use sha2::Digest;
use std::{path::PathBuf, pin::Pin, sync::Arc}; use std::{path::PathBuf, pin::Pin, sync::Arc};
use tracing::{debug, error, instrument, warn};
#[derive(Clone)] #[derive(Clone)]
pub struct UploadManager { pub struct UploadManager {
@ -19,7 +19,39 @@ struct UploadManagerInner {
db: sled::Db, db: sled::Db,
} }
type UploadStream<E> = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, E>>>>; impl std::fmt::Debug for UploadManager {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager")
.field("inner", &format!("Arc<UploadManagerInner>"))
.finish()
}
}
type StreamAlias<E> = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, E>>>>;
pub(crate) struct UploadStream<E> {
inner: StreamAlias<E>,
}
impl<E> UploadStream<E> {
pub(crate) fn new(s: StreamAlias<E>) -> Self {
UploadStream { inner: s }
}
}
impl<E> From<StreamAlias<E>> for UploadStream<E> {
fn from(s: StreamAlias<E>) -> Self {
UploadStream { inner: s }
}
}
impl<E> std::fmt::Debug for UploadStream<E> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadStream")
.field("inner", &"stream".to_string())
.finish()
}
}
enum Dup { enum Dup {
Exists, Exists,
@ -162,12 +194,16 @@ impl UploadManager {
} }
/// Generate a delete token for an alias /// Generate a delete token for an alias
#[instrument]
pub(crate) async fn delete_token(&self, alias: String) -> Result<String, UploadError> { pub(crate) async fn delete_token(&self, alias: String) -> Result<String, UploadError> {
debug!("Generating delete token");
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
let rng = rand::thread_rng(); let rng = rand::thread_rng();
let s: String = Alphanumeric.sample_iter(rng).take(10).collect(); let s: String = Alphanumeric.sample_iter(rng).take(10).collect();
let delete_token = s.clone(); let delete_token = s.clone();
debug!("Generated delete token");
debug!("Saving delete token");
let alias_tree = self.inner.alias_tree.clone(); let alias_tree = self.inner.alias_tree.clone();
let key = delete_key(&alias); let key = delete_key(&alias);
let res = web::block(move || { let res = web::block(move || {
@ -178,6 +214,7 @@ impl UploadManager {
) )
}) })
.await?; .await?;
debug!("Delete token saved");
if let Err(sled::CompareAndSwapError { if let Err(sled::CompareAndSwapError {
current: Some(ivec), current: Some(ivec),
@ -226,25 +263,36 @@ impl UploadManager {
} }
/// Upload the file, discarding bytes if it's already present, or saving if it's new /// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument]
pub(crate) async fn upload<E>(&self, stream: UploadStream<E>) -> Result<String, UploadError> pub(crate) async fn upload<E>(&self, stream: UploadStream<E>) -> Result<String, UploadError>
where where
UploadError: From<E>, UploadError: From<E>,
{ {
// -- READ IN BYTES FROM CLIENT -- // -- READ IN BYTES FROM CLIENT --
debug!("Reading stream");
let bytes = read_stream(stream).await?; let bytes = read_stream(stream).await?;
debug!("Read stream");
// -- VALIDATE IMAGE -- // -- VALIDATE IMAGE --
debug!("Validating image");
let format = self.inner.format.clone(); let format = self.inner.format.clone();
let (bytes, content_type) = validate_image(bytes, format).await?; let (bytes, content_type) = validate_image(bytes, format).await?;
debug!("Image validated");
// -- DUPLICATE CHECKS -- // -- DUPLICATE CHECKS --
// Cloning bytes is fine because it's actually a pointer // Cloning bytes is fine because it's actually a pointer
debug!("Hashing bytes");
let hash = self.hash(bytes.clone()).await?; let hash = self.hash(bytes.clone()).await?;
debug!("Bytes hashed");
debug!("Adding alias");
let alias = self.add_alias(&hash, content_type.clone()).await?; let alias = self.add_alias(&hash, content_type.clone()).await?;
debug!("Alias added");
debug!("Saving file");
self.save_upload(bytes, hash, content_type).await?; self.save_upload(bytes, hash, content_type).await?;
debug!("File saved");
// Return alias to file // Return alias to file
Ok(alias) Ok(alias)
@ -346,6 +394,7 @@ impl UploadManager {
} }
// check for an already-uploaded image with this hash, returning the path to the target file // check for an already-uploaded image with this hash, returning the path to the target file
#[instrument]
async fn check_duplicate( async fn check_duplicate(
&self, &self,
hash: Vec<u8>, hash: Vec<u8>,
@ -382,6 +431,7 @@ impl UploadManager {
} }
// generate a short filename that isn't already in-use // generate a short filename that isn't already in-use
#[instrument]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> { async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> {
let image_dir = self.image_dir(); let image_dir = self.image_dir();
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
@ -417,6 +467,7 @@ impl UploadManager {
// Add an alias to an existing file // Add an alias to an existing file
// //
// This will help if multiple 'users' upload the same file, and one of them wants to delete it // This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument]
async fn add_alias( async fn add_alias(
&self, &self,
hash: &[u8], hash: &[u8],
@ -432,6 +483,7 @@ impl UploadManager {
// Add a pre-defined alias to an existin file // Add a pre-defined alias to an existin file
// //
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
#[instrument]
async fn store_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> { async fn store_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> {
let alias = alias.to_string(); let alias = alias.to_string();
loop { loop {
@ -504,10 +556,12 @@ impl UploadManager {
} }
} }
async fn read_stream<E>(mut stream: UploadStream<E>) -> Result<bytes::Bytes, UploadError> #[instrument]
async fn read_stream<E>(stream: UploadStream<E>) -> Result<bytes::Bytes, UploadError>
where where
UploadError: From<E>, UploadError: From<E>,
{ {
let mut stream = stream.inner;
let mut bytes = bytes::BytesMut::new(); let mut bytes = bytes::BytesMut::new();
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {

View file

@ -3,6 +3,7 @@ use actix_web::web;
use bytes::Bytes; use bytes::Bytes;
use image::{ImageDecoder, ImageEncoder, ImageFormat}; use image::{ImageDecoder, ImageEncoder, ImageFormat};
use std::io::Cursor; use std::io::Cursor;
use tracing::debug;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum GifError { pub(crate) enum GifError {
@ -31,13 +32,16 @@ pub(crate) async fn validate_image(
let format = image::guess_format(&bytes).map_err(UploadError::InvalidImage)?; let format = image::guess_format(&bytes).map_err(UploadError::InvalidImage)?;
match format { debug!("Validating {:?}", format);
let res = match format {
ImageFormat::Png => Ok((validate_png(bytes)?, mime::IMAGE_PNG)), ImageFormat::Png => Ok((validate_png(bytes)?, mime::IMAGE_PNG)),
ImageFormat::Jpeg => Ok((validate_jpg(bytes)?, mime::IMAGE_JPEG)), ImageFormat::Jpeg => Ok((validate_jpg(bytes)?, mime::IMAGE_JPEG)),
ImageFormat::Bmp => Ok((validate_bmp(bytes)?, mime::IMAGE_BMP)), ImageFormat::Bmp => Ok((validate_bmp(bytes)?, mime::IMAGE_BMP)),
ImageFormat::Gif => Ok((validate_gif(bytes)?, mime::IMAGE_GIF)), ImageFormat::Gif => Ok((validate_gif(bytes)?, mime::IMAGE_GIF)),
_ => Err(UploadError::UnsupportedFormat), _ => Err(UploadError::UnsupportedFormat),
} };
debug!("Validated");
res
}) })
.await?; .await?;