From 3b4c5b646afd9bb82d9a0aa4244ec85b603a943b Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 14 Jun 2020 10:07:31 -0500 Subject: [PATCH] Do better logging --- Cargo.lock | 2 + Cargo.toml | 2 + src/main.rs | 65 ++++++++-------- src/processor.rs | 45 +++++------ src/upload_manager.rs | 169 +++++++++++++++++++++++++++++------------- src/validate.rs | 18 ++++- 6 files changed, 188 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a8657d..ffb44b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,11 +1378,13 @@ dependencies = [ "actix-rt", "actix-web", "anyhow", + "base64 0.12.1", "bytes", "futures", "gif", "image", "mime", + "once_cell", "rand", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 283e10b..2d654bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,13 @@ actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs" } actix-rt = "1.1.1" actix-web = { version = "3.0.0-alpha.2", features = ["rustls"] } anyhow = "1.0" +base64 = "0.12.1" bytes = "0.5" futures = "0.3.4" gif = "0.10.3" image = "0.23.4" mime = "0.3.1" +once_cell = "1.4.0" rand = "0.7.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/main.rs b/src/main.rs index 48a35f5..1aaaca2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,10 @@ use actix_web::{ web, App, HttpResponse, HttpServer, }; use futures::stream::{Stream, TryStreamExt}; +use once_cell::sync::Lazy; use std::{collections::HashSet, path::PathBuf}; use structopt::StructOpt; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; mod config; @@ -20,25 +21,25 @@ mod upload_manager; mod validate; use self::{ - config::Config, - error::UploadError, - middleware::Tracing, - upload_manager::{UploadManager, UploadStream}, + config::Config, error::UploadError, middleware::Tracing, upload_manager::UploadManager, }; const MEGABYTES: usize = 1024 * 1024; const HOURS: u32 = 60 * 60; +static CONFIG: Lazy = Lazy::new(|| Config::from_args()); + // Try writing to a file +#[instrument(skip(bytes))] async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> { if let Some(path) = path.parent() { // create the directory for the file - debug!("Creating directory"); + debug!("Creating directory {:?}", path); actix_fs::create_dir_all(path.to_owned()).await?; } // Only write the file if it doesn't already exist - debug!("Checking if file already exists"); + debug!("Checking if {:?} already exists", path); if let Err(e) = actix_fs::metadata(path.clone()).await { if e.kind() != Some(std::io::ErrorKind::NotFound) { return Err(e.into()); @@ -48,18 +49,18 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload } // Open the file for writing - debug!("Creating file"); + debug!("Creating {:?}", path); let file = actix_fs::file::create(path.clone()).await?; // try writing - debug!("Writing to file"); + debug!("Writing to {:?}", path); if let Err(e) = actix_fs::file::write(file, bytes).await { - error!("Error writing file, {}", e); + error!("Error writing {:?}, {}", path, e); // remove file if writing failed before completion actix_fs::remove_file(path).await?; return Err(e.into()); } - debug!("File written"); + debug!("{:?} written", path); Ok(()) } @@ -86,7 +87,7 @@ fn from_ext(ext: std::ffi::OsString) -> mime::Mime { } /// Handle responding to succesful uploads -#[instrument] +#[instrument(skip(manager))] async fn upload( value: Value, manager: web::Data, @@ -121,6 +122,7 @@ async fn upload( } /// download an image from a URL +#[instrument(skip(client, manager))] async fn download( client: web::Data, manager: web::Data, @@ -132,11 +134,11 @@ async fn download( return Err(UploadError::Download(res.status())); } - let fut = res.body().limit(40 * MEGABYTES); + let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES); let stream = Box::pin(futures::stream::once(fut)); - let alias = manager.upload(UploadStream::new(stream)).await?; + let alias = manager.upload(stream).await?; let delete_token = manager.delete_token(alias.clone()).await?; Ok(HttpResponse::Created().json(serde_json::json!({ @@ -148,6 +150,7 @@ async fn download( }))) } +#[instrument(skip(manager))] async fn delete( manager: web::Data, path_entries: web::Path<(String, String)>, @@ -160,7 +163,7 @@ async fn delete( } /// Serve files -#[instrument] +#[instrument(skip(manager))] async fn serve( segments: web::Path, manager: web::Data, @@ -209,13 +212,9 @@ async fn serve( (img, format) }; - debug!("Image read"); debug!("Processing image"); - let img = self::processor::process_image(chain, img.into()) - .await? - .inner; - debug!("Image processed"); + let img = self::processor::process_image(chain, img).await?; // perform thumbnail operation in a blocking thread debug!("Exporting image"); @@ -225,13 +224,15 @@ async fn serve( Ok(bytes::Bytes::from(bytes.into_inner())) as Result<_, image::error::ImageError> }) .await?; - debug!("Image exported"); let path2 = path.clone(); let img_bytes2 = img_bytes.clone(); // Save the file in another task, we want to return the thumbnail now + debug!("Spawning storage task"); + let span = Span::current(); actix_rt::spawn(async move { + let entered = span.enter(); if let Err(e) = manager.store_variant(path2.clone()).await { error!("Error storing variant, {}", e); return; @@ -240,6 +241,7 @@ async fn serve( if let Err(e) = safe_save_file(path2, img_bytes2).await { error!("Error saving file, {}", e); } + drop(entered); }); return Ok(srv_response( @@ -278,8 +280,6 @@ struct UrlQuery { #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { - let config = Config::from_args(); - if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "info"); } @@ -288,7 +288,7 @@ async fn main() -> Result<(), anyhow::Error> { .with_env_filter(EnvFilter::from_default_env()) .init(); - let manager = UploadManager::new(config.data_dir(), config.format()).await?; + let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?; // Create a new Multipart Form validator // @@ -296,7 +296,7 @@ async fn main() -> Result<(), anyhow::Error> { let manager2 = manager.clone(); let form = Form::new() .max_files(10) - .max_file_size(config.max_file_size() * MEGABYTES) + .max_file_size(CONFIG.max_file_size() * MEGABYTES) .transform_error(|e| UploadError::from(e).into()) .field( "images", @@ -304,7 +304,7 @@ async fn main() -> Result<(), anyhow::Error> { let manager = manager2.clone(); async move { - manager.upload(stream.into()).await.map(|alias| { + manager.upload(stream).await.map(|alias| { let mut path = PathBuf::new(); path.push(alias); Some(path) @@ -316,11 +316,11 @@ async fn main() -> Result<(), anyhow::Error> { // Create a new Multipart Form validator for internal imports // // This form is expecting a single array field, 'images' with at most 10 files in it - let validate_imports = config.validate_imports(); + let validate_imports = CONFIG.validate_imports(); let manager2 = manager.clone(); let import_form = Form::new() .max_files(10) - .max_file_size(config.max_file_size() * MEGABYTES) + .max_file_size(CONFIG.max_file_size() * MEGABYTES) .transform_error(|e| UploadError::from(e).into()) .field( "images", @@ -329,7 +329,7 @@ async fn main() -> Result<(), anyhow::Error> { async move { manager - .import(filename, content_type, validate_imports, stream.into()) + .import(filename, content_type, validate_imports, stream) .await .map(|alias| { let mut path = PathBuf::new(); @@ -340,21 +340,18 @@ async fn main() -> Result<(), anyhow::Error> { })), ); - let config2 = config.clone(); HttpServer::new(move || { let client = Client::build() .header("User-Agent", "pict-rs v0.1.0-master") .finish(); - let config = config2.clone(); - App::new() .wrap(Compress::default()) .wrap(Logger::default()) .wrap(Tracing) .data(manager.clone()) .data(client) - .data(config.filter_whitelist()) + .data(CONFIG.filter_whitelist()) .service( web::scope("/image") .service( @@ -377,7 +374,7 @@ async fn main() -> Result<(), anyhow::Error> { .route(web::post().to(upload)), ) }) - .bind(config.bind_address())? + .bind(CONFIG.bind_address())? .run() .await?; diff --git a/src/processor.rs b/src/processor.rs index d93b2a1..503ae5e 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -2,7 +2,7 @@ use crate::error::UploadError; use actix_web::web; use image::{DynamicImage, GenericImageView}; use std::{collections::HashSet, path::PathBuf}; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, Span}; pub(crate) trait Processor { fn name() -> &'static str @@ -51,6 +51,7 @@ impl Processor for Identity { where Self: Sized, { + debug!("Identity"); Some(Box::new(Identity)) } @@ -95,6 +96,7 @@ impl Processor for Thumbnail { } fn process(&self, img: DynamicImage) -> Result { + debug!("Thumbnail"); if img.in_bounds(self.0, self.0) { Ok(img.thumbnail(self.0, self.0)) } else { @@ -129,6 +131,7 @@ impl Processor for Blur { } fn process(&self, img: DynamicImage) -> Result { + debug!("Blur"); Ok(img.blur(self.0)) } } @@ -148,25 +151,7 @@ pub(crate) struct ProcessChain { impl std::fmt::Debug for ProcessChain { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("ProcessChain") - .field("inner", &format!("{} operations", self.inner.len())) - .finish() - } -} - -pub(crate) struct ImageWrapper { - pub(crate) inner: DynamicImage, -} - -impl From for ImageWrapper { - fn from(inner: DynamicImage) -> Self { - ImageWrapper { inner } - } -} - -impl std::fmt::Debug for ImageWrapper { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("ImageWrapper") - .field("inner", &"DynamicImage".to_string()) + .field("steps", &self.inner.len()) .finish() } } @@ -199,15 +184,23 @@ pub(crate) fn build_path(base: PathBuf, chain: &ProcessChain, filename: String) path } -#[instrument] +#[instrument(skip(img))] pub(crate) async fn process_image( chain: ProcessChain, - img: ImageWrapper, -) -> Result { - let mut inner = img.inner; + mut img: DynamicImage, +) -> Result { for processor in chain.inner.into_iter() { - inner = web::block(move || processor.process(inner)).await?; + debug!("Step"); + let span = Span::current(); + img = web::block(move || { + let entered = span.enter(); + let res = processor.process(img); + drop(entered); + res + }) + .await?; + debug!("Step complete"); } - Ok(inner.into()) + Ok(img) } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 0bc7c30..29df023 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -3,7 +3,7 @@ use actix_web::web; use futures::stream::{Stream, StreamExt}; use sha2::Digest; use std::{path::PathBuf, pin::Pin, sync::Arc}; -use tracing::{debug, error, instrument, warn}; +use tracing::{debug, error, info, instrument, warn, Span}; #[derive(Clone)] pub struct UploadManager { @@ -21,35 +21,41 @@ struct UploadManagerInner { impl std::fmt::Debug for UploadManager { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("UploadManager") - .field("inner", &format!("Arc")) - .finish() + f.debug_struct("UploadManager").finish() } } -type StreamAlias = Pin>>>; +type UploadStream = Pin>>>; -pub(crate) struct UploadStream { - inner: StreamAlias, +struct FilenameIVec { + inner: sled::IVec, } -impl UploadStream { - pub(crate) fn new(s: StreamAlias) -> Self { - UploadStream { inner: s } +impl FilenameIVec { + fn new(inner: sled::IVec) -> Self { + FilenameIVec { inner } } } -impl From> for UploadStream { - fn from(s: StreamAlias) -> Self { - UploadStream { inner: s } - } -} - -impl std::fmt::Debug for UploadStream { +impl std::fmt::Debug for FilenameIVec { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("UploadStream") - .field("inner", &"stream".to_string()) - .finish() + write!(f, "{:?}", String::from_utf8(self.inner.to_vec())) + } +} + +struct Hash { + inner: Vec, +} + +impl Hash { + fn new(inner: Vec) -> Self { + Hash { inner } + } +} + +impl std::fmt::Debug for Hash { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", base64::encode(&self.inner)) } } @@ -101,6 +107,7 @@ impl UploadManager { } /// Store the path to a generated image variant so we can easily clean it up later + #[instrument(skip(self))] pub(crate) async fn store_variant(&self, path: PathBuf) -> Result<(), UploadError> { let filename = path .file_name() @@ -110,30 +117,37 @@ impl UploadManager { let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); let fname_tree = self.inner.filename_tree.clone(); + debug!("Getting hash"); let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes())) .await? .ok_or(UploadError::MissingFilename)?; let key = variant_key(&hash, &path_string); let db = self.inner.db.clone(); + debug!("Storing variant"); web::block(move || db.insert(key, path_string.as_bytes())).await?; + debug!("Stored variant"); Ok(()) } /// Delete the alias, and the file & variants if no more aliases exist + #[instrument(skip(self))] pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> { use sled::Transactional; let db = self.inner.db.clone(); let alias_tree = self.inner.alias_tree.clone(); + let span = Span::current(); let alias2 = alias.clone(); let hash = web::block(move || { [&*db, &alias_tree].transaction(|v| { + let entered = span.enter(); let db = &v[0]; let alias_tree = &v[1]; // -- GET TOKEN -- + debug!("Deleting alias -> delete-token mapping"); let existing_token = alias_tree .remove(delete_key(&alias2).as_bytes())? .ok_or(trans_err(UploadError::MissingAlias))?; @@ -145,18 +159,22 @@ impl UploadManager { } // -- GET ID FOR HASH TREE CLEANUP -- + debug!("Deleting alias -> id mapping"); let id = alias_tree .remove(alias_id_key(&alias2).as_bytes())? .ok_or(trans_err(UploadError::MissingAlias))?; let id = String::from_utf8(id.to_vec()).map_err(|e| trans_err(e.into()))?; // -- GET HASH FOR HASH TREE CLEANUP -- + debug!("Deleting alias -> hash mapping"); let hash = alias_tree .remove(alias2.as_bytes())? .ok_or(trans_err(UploadError::MissingAlias))?; // -- REMOVE HASH TREE ELEMENT -- + debug!("Deleting hash -> alias mapping"); db.remove(alias_key(&hash, &id))?; + drop(entered); Ok(hash) }) }) @@ -165,6 +183,7 @@ impl UploadManager { // -- CHECK IF ANY OTHER ALIASES EXIST -- let db = self.inner.db.clone(); let (start, end) = alias_key_bounds(&hash); + debug!("Checking for additional aliases referencing hash"); let any_aliases = web::block(move || { Ok(db.range(start..end).next().is_some()) as Result }) @@ -172,36 +191,48 @@ impl UploadManager { // Bail if there are existing aliases if any_aliases { + debug!("Other aliases reference file, not removing from disk"); return Ok(()); } // -- DELETE HASH ENTRY -- let db = self.inner.db.clone(); let hash2 = hash.clone(); + debug!("Deleting hash -> filename mapping"); let filename = web::block(move || db.remove(&hash2)) .await? .ok_or(UploadError::MissingFile)?; // -- DELETE FILES -- let this = self.clone(); + debug!("Spawning cleanup task"); + let span = Span::current(); actix_rt::spawn(async move { - if let Err(e) = this.cleanup_files(filename).await { + let entered = span.enter(); + if let Err(e) = this + .cleanup_files(FilenameIVec::new(filename.clone())) + .await + { error!("Error removing files from fs, {}", e); } + info!( + "Files deleted for {:?}", + String::from_utf8(filename.to_vec()) + ); + drop(entered); }); Ok(()) } /// Generate a delete token for an alias - #[instrument] + #[instrument(skip(self))] pub(crate) async fn delete_token(&self, alias: String) -> Result { debug!("Generating delete token"); use rand::distributions::{Alphanumeric, Distribution}; let rng = rand::thread_rng(); let s: String = Alphanumeric.sample_iter(rng).take(10).collect(); let delete_token = s.clone(); - debug!("Generated delete token"); debug!("Saving delete token"); let alias_tree = self.inner.alias_tree.clone(); @@ -214,7 +245,6 @@ impl UploadManager { ) }) .await?; - debug!("Delete token saved"); if let Err(sled::CompareAndSwapError { current: Some(ivec), @@ -223,13 +253,16 @@ impl UploadManager { { let s = String::from_utf8(ivec.to_vec())?; + debug!("Returning existing delete token, {}", s); return Ok(s); } + debug!("Returning new delete token, {}", delete_token); Ok(delete_token) } /// Upload the file while preserving the filename, optionally validating the uploaded image + #[instrument(skip(self, stream))] pub(crate) async fn import( &self, alias: String, @@ -240,9 +273,11 @@ impl UploadManager { where UploadError: From, { + debug!("Reading stream"); let bytes = read_stream(stream).await?; let (bytes, content_type) = if validate { + debug!("Validating image"); let format = self.inner.format.clone(); validate_image(bytes, format).await? } else { @@ -252,10 +287,13 @@ impl UploadManager { // -- DUPLICATE CHECKS -- // Cloning bytes is fine because it's actually a pointer + debug!("Hashing bytes"); let hash = self.hash(bytes.clone()).await?; + debug!("Storing alias"); self.add_existing_alias(&hash, &alias).await?; + debug!("Saving file"); self.save_upload(bytes, hash, content_type).await?; // Return alias to file @@ -263,7 +301,7 @@ impl UploadManager { } /// Upload the file, discarding bytes if it's already present, or saving if it's new - #[instrument] + #[instrument(skip(self, stream))] pub(crate) async fn upload(&self, stream: UploadStream) -> Result where UploadError: From, @@ -271,41 +309,39 @@ impl UploadManager { // -- READ IN BYTES FROM CLIENT -- debug!("Reading stream"); let bytes = read_stream(stream).await?; - debug!("Read stream"); // -- VALIDATE IMAGE -- debug!("Validating image"); let format = self.inner.format.clone(); let (bytes, content_type) = validate_image(bytes, format).await?; - debug!("Image validated"); // -- DUPLICATE CHECKS -- // Cloning bytes is fine because it's actually a pointer debug!("Hashing bytes"); let hash = self.hash(bytes.clone()).await?; - debug!("Bytes hashed"); debug!("Adding alias"); let alias = self.add_alias(&hash, content_type.clone()).await?; - debug!("Alias added"); debug!("Saving file"); self.save_upload(bytes, hash, content_type).await?; - debug!("File saved"); // Return alias to file Ok(alias) } /// Fetch the real on-disk filename given an alias + #[instrument(skip(self))] pub(crate) async fn from_alias(&self, alias: String) -> Result { let tree = self.inner.alias_tree.clone(); + debug!("Getting hash from alias"); let hash = web::block(move || tree.get(alias.as_bytes())) .await? .ok_or(UploadError::MissingAlias)?; let db = self.inner.db.clone(); + debug!("Getting filename from hash"); let filename = web::block(move || db.get(hash)) .await? .ok_or(UploadError::MissingFile)?; @@ -316,23 +352,28 @@ impl UploadManager { } // Find image variants and remove them from the DB and the disk - async fn cleanup_files(&self, filename: sled::IVec) -> Result<(), UploadError> { + #[instrument(skip(self))] + async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> { + let filename = filename.inner; let mut path = self.image_dir(); let fname = String::from_utf8(filename.to_vec())?; path.push(fname); let mut errors = Vec::new(); + debug!("Deleting {:?}", path); if let Err(e) = actix_fs::remove_file(path).await { errors.push(e.into()); } let fname_tree = self.inner.filename_tree.clone(); + debug!("Deleting filename -> hash mapping"); let hash = web::block(move || fname_tree.remove(filename)) .await? .ok_or(UploadError::MissingFile)?; let (start, end) = variant_key_bounds(&hash); let db = self.inner.db.clone(); + debug!("Fetching file variants"); let keys = web::block(move || { let mut keys = Vec::new(); for key in db.range(start..end).keys() { @@ -343,9 +384,12 @@ impl UploadManager { }) .await?; + debug!("{} files prepared for deletion", keys.len()); + for key in keys { let db = self.inner.db.clone(); if let Some(path) = web::block(move || db.remove(key)).await? { + debug!("Deleting {:?}", String::from_utf8(path.to_vec())); if let Err(e) = remove_path(path).await { errors.push(e); } @@ -362,13 +406,14 @@ impl UploadManager { async fn save_upload( &self, bytes: bytes::Bytes, - hash: Vec, + hash: Hash, content_type: mime::Mime, ) -> Result<(), UploadError> { let (dup, name) = self.check_duplicate(hash, content_type).await?; // bail early with alias to existing file if this is a duplicate if dup.exists() { + debug!("Duplicate exists, not saving file"); return Ok(()); } @@ -382,7 +427,7 @@ impl UploadManager { } // produce a sh256sum of the uploaded file - async fn hash(&self, bytes: bytes::Bytes) -> Result, UploadError> { + async fn hash(&self, bytes: bytes::Bytes) -> Result { let mut hasher = self.inner.hasher.clone(); let hash = web::block(move || { hasher.update(&bytes); @@ -390,21 +435,22 @@ impl UploadManager { }) .await?; - Ok(hash) + Ok(Hash::new(hash)) } // check for an already-uploaded image with this hash, returning the path to the target file - #[instrument] + #[instrument(skip(self))] async fn check_duplicate( &self, - hash: Vec, + hash: Hash, content_type: mime::Mime, ) -> Result<(Dup, String), UploadError> { let db = self.inner.db.clone(); let filename = self.next_file(content_type).await?; let filename2 = filename.clone(); - let hash2 = hash.clone(); + let hash2 = hash.inner.clone(); + debug!("Inserting filename for hash"); let res = web::block(move || { db.compare_and_swap( hash2, @@ -420,24 +466,27 @@ impl UploadManager { }) = res { let name = String::from_utf8(ivec.to_vec())?; + debug!("Filename exists for hash, {}", name); return Ok((Dup::Exists, name)); } let fname_tree = self.inner.filename_tree.clone(); let filename2 = filename.clone(); - web::block(move || fname_tree.insert(filename2, hash)).await?; + debug!("Saving filename -> hash relation"); + web::block(move || fname_tree.insert(filename2, hash.inner)).await?; Ok((Dup::New, filename)) } // generate a short filename that isn't already in-use - #[instrument] + #[instrument(skip(self))] async fn next_file(&self, content_type: mime::Mime) -> Result { let image_dir = self.image_dir(); use rand::distributions::{Alphanumeric, Distribution}; let mut limit: usize = 10; let rng = rand::thread_rng(); loop { + debug!("Filename generation loop"); let mut path = image_dir.clone(); let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); @@ -447,16 +496,20 @@ impl UploadManager { if let Err(e) = actix_fs::metadata(path).await { if e.kind() == Some(std::io::ErrorKind::NotFound) { + debug!("Generated unused filename {}", filename); return Ok(filename); } return Err(e.into()); } + debug!("Filename exists, trying again"); + limit += 1; } } - async fn add_existing_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> { + #[instrument(skip(self))] + async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { self.save_alias(hash, alias).await??; self.store_alias(hash, alias).await?; @@ -467,10 +520,10 @@ impl UploadManager { // Add an alias to an existing file // // This will help if multiple 'users' upload the same file, and one of them wants to delete it - #[instrument] + #[instrument(skip(self))] async fn add_alias( &self, - hash: &[u8], + hash: &Hash, content_type: mime::Mime, ) -> Result { let alias = self.next_alias(hash, content_type).await?; @@ -483,16 +536,18 @@ impl UploadManager { // Add a pre-defined alias to an existin file // // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files - #[instrument] - async fn store_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> { + #[instrument(skip(self))] + async fn store_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { let alias = alias.to_string(); loop { + debug!("hash -> alias save loop"); let db = self.inner.db.clone(); let id = web::block(move || db.generate_id()).await?.to_string(); - let key = alias_key(hash, &id); + let key = alias_key(&hash.inner, &id); let db = self.inner.db.clone(); let alias2 = alias.clone(); + debug!("Saving hash/id -> alias mapping"); let res = web::block(move || { db.compare_and_swap(key, None as Option, Some(alias2.as_bytes())) }) @@ -501,25 +556,30 @@ impl UploadManager { if res.is_ok() { let alias_tree = self.inner.alias_tree.clone(); let key = alias_id_key(&alias); + debug!("Saving alias -> id mapping"); web::block(move || alias_tree.insert(key.as_bytes(), id.as_bytes())).await?; break; } + + debug!("Id exists, trying again"); } Ok(()) } // Generate an alias to the file + #[instrument(skip(self))] async fn next_alias( &self, - hash: &[u8], + hash: &Hash, content_type: mime::Mime, ) -> Result { use rand::distributions::{Alphanumeric, Distribution}; let mut limit: usize = 10; let rng = rand::thread_rng(); loop { + debug!("Alias gen loop"); let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); let alias = file_name(s, content_type.clone()); @@ -528,27 +588,31 @@ impl UploadManager { if res.is_ok() { return Ok(alias); } + debug!("Alias exists, regenning"); limit += 1; } } // Save an alias to the database + #[instrument(skip(self))] async fn save_alias( &self, - hash: &[u8], + hash: &Hash, alias: &str, ) -> Result, UploadError> { let tree = self.inner.alias_tree.clone(); - let vec = hash.to_vec(); + let vec = hash.inner.clone(); let alias = alias.to_string(); + debug!("Saving alias"); let res = web::block(move || { tree.compare_and_swap(alias.as_bytes(), None as Option, Some(vec)) }) .await?; if res.is_err() { + warn!("Duplicate alias"); return Ok(Err(UploadError::DuplicateAlias)); } @@ -556,16 +620,17 @@ impl UploadManager { } } -#[instrument] -async fn read_stream(stream: UploadStream) -> Result +#[instrument(skip(stream))] +async fn read_stream(mut stream: UploadStream) -> Result where UploadError: From, { - let mut stream = stream.inner; let mut bytes = bytes::BytesMut::new(); while let Some(res) = stream.next().await { - bytes.extend(res?); + let new = res?; + debug!("Extending with {} bytes", new.len()); + bytes.extend(new); } Ok(bytes.freeze()) diff --git a/src/validate.rs b/src/validate.rs index efa6810..de5a63a 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -3,7 +3,7 @@ use actix_web::web; use bytes::Bytes; use image::{ImageDecoder, ImageEncoder, ImageFormat}; use std::io::Cursor; -use tracing::debug; +use tracing::{debug, instrument, Span}; #[derive(Debug, thiserror::Error)] pub(crate) enum GifError { @@ -15,18 +15,26 @@ pub(crate) enum GifError { } // import & export image using the image crate +#[instrument(skip(bytes))] pub(crate) async fn validate_image( bytes: Bytes, prescribed_format: Option, ) -> Result<(Bytes, mime::Mime), UploadError> { + let span = Span::current(); + let tup = web::block(move || { + let entered = span.enter(); if let Some(prescribed) = prescribed_format { + debug!("Load from memory"); let img = image::load_from_memory(&bytes).map_err(UploadError::InvalidImage)?; + debug!("Loaded"); let mime = prescribed.to_mime(); + debug!("Writing"); let mut bytes = Cursor::new(vec![]); img.write_to(&mut bytes, prescribed.to_image_format())?; + debug!("Written"); return Ok((Bytes::from(bytes.into_inner()), mime)); } @@ -41,6 +49,7 @@ pub(crate) async fn validate_image( _ => Err(UploadError::UnsupportedFormat), }; debug!("Validated"); + drop(entered); res }) .await?; @@ -48,6 +57,7 @@ pub(crate) async fn validate_image( Ok(tup) } +#[instrument(skip(bytes))] fn validate_png(bytes: Bytes) -> Result { let decoder = image::png::PngDecoder::new(Cursor::new(&bytes))?; @@ -58,6 +68,7 @@ fn validate_png(bytes: Bytes) -> Result { Ok(Bytes::from(bytes.into_inner())) } +#[instrument(skip(bytes))] fn validate_jpg(bytes: Bytes) -> Result { let decoder = image::jpeg::JpegDecoder::new(Cursor::new(&bytes))?; @@ -68,6 +79,7 @@ fn validate_jpg(bytes: Bytes) -> Result { Ok(Bytes::from(bytes.into_inner())) } +#[instrument(skip(bytes))] fn validate_bmp(bytes: Bytes) -> Result { let decoder = image::bmp::BmpDecoder::new(Cursor::new(&bytes))?; @@ -78,6 +90,7 @@ fn validate_bmp(bytes: Bytes) -> Result { Ok(Bytes::from(bytes.into_inner())) } +#[instrument(skip(bytes))] fn validate_gif(bytes: Bytes) -> Result { use gif::{Parameter, SetParameter}; @@ -98,6 +111,7 @@ fn validate_gif(bytes: Bytes) -> Result { gif::Repeat::Infinite.set_param(&mut encoder)?; while let Some(frame) = reader.read_next_frame()? { + debug!("Writing frame"); encoder.write_frame(frame)?; } } @@ -113,9 +127,11 @@ where let (width, height) = decoder.dimensions(); let color_type = decoder.color_type(); let total_bytes = decoder.total_bytes(); + debug!("Reading image"); let mut decoded_bytes = vec![0u8; total_bytes as usize]; decoder.read_image(&mut decoded_bytes)?; + debug!("Writing image"); encoder.write_image(&decoded_bytes, width, height, color_type)?; Ok(())