2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-31 15:01:25 +00:00

Do better logging

This commit is contained in:
asonix 2020-06-14 10:07:31 -05:00
parent a2fea150b4
commit 3b4c5b646a
6 changed files with 188 additions and 113 deletions

2
Cargo.lock generated
View file

@ -1378,11 +1378,13 @@ dependencies = [
"actix-rt", "actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"base64 0.12.1",
"bytes", "bytes",
"futures", "futures",
"gif", "gif",
"image", "image",
"mime", "mime",
"once_cell",
"rand", "rand",
"serde", "serde",
"serde_json", "serde_json",

View file

@ -16,11 +16,13 @@ actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs" }
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-web = { version = "3.0.0-alpha.2", features = ["rustls"] } actix-web = { version = "3.0.0-alpha.2", features = ["rustls"] }
anyhow = "1.0" anyhow = "1.0"
base64 = "0.12.1"
bytes = "0.5" bytes = "0.5"
futures = "0.3.4" futures = "0.3.4"
gif = "0.10.3" gif = "0.10.3"
image = "0.23.4" image = "0.23.4"
mime = "0.3.1" mime = "0.3.1"
once_cell = "1.4.0"
rand = "0.7.3" rand = "0.7.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View file

@ -7,9 +7,10 @@ use actix_web::{
web, App, HttpResponse, HttpServer, web, App, HttpResponse, HttpServer,
}; };
use futures::stream::{Stream, TryStreamExt}; use futures::stream::{Stream, TryStreamExt};
use once_cell::sync::Lazy;
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use structopt::StructOpt; use structopt::StructOpt;
use tracing::{debug, error, info, instrument}; use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
mod config; mod config;
@ -20,25 +21,25 @@ mod upload_manager;
mod validate; mod validate;
use self::{ use self::{
config::Config, config::Config, error::UploadError, middleware::Tracing, upload_manager::UploadManager,
error::UploadError,
middleware::Tracing,
upload_manager::{UploadManager, UploadStream},
}; };
const MEGABYTES: usize = 1024 * 1024; const MEGABYTES: usize = 1024 * 1024;
const HOURS: u32 = 60 * 60; const HOURS: u32 = 60 * 60;
static CONFIG: Lazy<Config> = Lazy::new(|| Config::from_args());
// Try writing to a file // Try writing to a file
#[instrument(skip(bytes))]
async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> { async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> {
if let Some(path) = path.parent() { if let Some(path) = path.parent() {
// create the directory for the file // create the directory for the file
debug!("Creating directory"); debug!("Creating directory {:?}", path);
actix_fs::create_dir_all(path.to_owned()).await?; actix_fs::create_dir_all(path.to_owned()).await?;
} }
// Only write the file if it doesn't already exist // Only write the file if it doesn't already exist
debug!("Checking if file already exists"); debug!("Checking if {:?} already exists", path);
if let Err(e) = actix_fs::metadata(path.clone()).await { if let Err(e) = actix_fs::metadata(path.clone()).await {
if e.kind() != Some(std::io::ErrorKind::NotFound) { if e.kind() != Some(std::io::ErrorKind::NotFound) {
return Err(e.into()); return Err(e.into());
@ -48,18 +49,18 @@ async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), Upload
} }
// Open the file for writing // Open the file for writing
debug!("Creating file"); debug!("Creating {:?}", path);
let file = actix_fs::file::create(path.clone()).await?; let file = actix_fs::file::create(path.clone()).await?;
// try writing // try writing
debug!("Writing to file"); debug!("Writing to {:?}", path);
if let Err(e) = actix_fs::file::write(file, bytes).await { if let Err(e) = actix_fs::file::write(file, bytes).await {
error!("Error writing file, {}", e); error!("Error writing {:?}, {}", path, e);
// remove file if writing failed before completion // remove file if writing failed before completion
actix_fs::remove_file(path).await?; actix_fs::remove_file(path).await?;
return Err(e.into()); return Err(e.into());
} }
debug!("File written"); debug!("{:?} written", path);
Ok(()) Ok(())
} }
@ -86,7 +87,7 @@ fn from_ext(ext: std::ffi::OsString) -> mime::Mime {
} }
/// Handle responding to succesful uploads /// Handle responding to succesful uploads
#[instrument] #[instrument(skip(manager))]
async fn upload( async fn upload(
value: Value, value: Value,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
@ -121,6 +122,7 @@ async fn upload(
} }
/// download an image from a URL /// download an image from a URL
#[instrument(skip(client, manager))]
async fn download( async fn download(
client: web::Data<Client>, client: web::Data<Client>,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
@ -132,11 +134,11 @@ async fn download(
return Err(UploadError::Download(res.status())); return Err(UploadError::Download(res.status()));
} }
let fut = res.body().limit(40 * MEGABYTES); let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES);
let stream = Box::pin(futures::stream::once(fut)); let stream = Box::pin(futures::stream::once(fut));
let alias = manager.upload(UploadStream::new(stream)).await?; let alias = manager.upload(stream).await?;
let delete_token = manager.delete_token(alias.clone()).await?; let delete_token = manager.delete_token(alias.clone()).await?;
Ok(HttpResponse::Created().json(serde_json::json!({ Ok(HttpResponse::Created().json(serde_json::json!({
@ -148,6 +150,7 @@ async fn download(
}))) })))
} }
#[instrument(skip(manager))]
async fn delete( async fn delete(
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
path_entries: web::Path<(String, String)>, path_entries: web::Path<(String, String)>,
@ -160,7 +163,7 @@ async fn delete(
} }
/// Serve files /// Serve files
#[instrument] #[instrument(skip(manager))]
async fn serve( async fn serve(
segments: web::Path<String>, segments: web::Path<String>,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
@ -209,13 +212,9 @@ async fn serve(
(img, format) (img, format)
}; };
debug!("Image read");
debug!("Processing image"); debug!("Processing image");
let img = self::processor::process_image(chain, img.into()) let img = self::processor::process_image(chain, img).await?;
.await?
.inner;
debug!("Image processed");
// perform thumbnail operation in a blocking thread // perform thumbnail operation in a blocking thread
debug!("Exporting image"); debug!("Exporting image");
@ -225,13 +224,15 @@ async fn serve(
Ok(bytes::Bytes::from(bytes.into_inner())) as Result<_, image::error::ImageError> Ok(bytes::Bytes::from(bytes.into_inner())) as Result<_, image::error::ImageError>
}) })
.await?; .await?;
debug!("Image exported");
let path2 = path.clone(); let path2 = path.clone();
let img_bytes2 = img_bytes.clone(); let img_bytes2 = img_bytes.clone();
// Save the file in another task, we want to return the thumbnail now // Save the file in another task, we want to return the thumbnail now
debug!("Spawning storage task");
let span = Span::current();
actix_rt::spawn(async move { actix_rt::spawn(async move {
let entered = span.enter();
if let Err(e) = manager.store_variant(path2.clone()).await { if let Err(e) = manager.store_variant(path2.clone()).await {
error!("Error storing variant, {}", e); error!("Error storing variant, {}", e);
return; return;
@ -240,6 +241,7 @@ async fn serve(
if let Err(e) = safe_save_file(path2, img_bytes2).await { if let Err(e) = safe_save_file(path2, img_bytes2).await {
error!("Error saving file, {}", e); error!("Error saving file, {}", e);
} }
drop(entered);
}); });
return Ok(srv_response( return Ok(srv_response(
@ -278,8 +280,6 @@ struct UrlQuery {
#[actix_rt::main] #[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> Result<(), anyhow::Error> {
let config = Config::from_args();
if std::env::var("RUST_LOG").is_err() { if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info"); std::env::set_var("RUST_LOG", "info");
} }
@ -288,7 +288,7 @@ async fn main() -> Result<(), anyhow::Error> {
.with_env_filter(EnvFilter::from_default_env()) .with_env_filter(EnvFilter::from_default_env())
.init(); .init();
let manager = UploadManager::new(config.data_dir(), config.format()).await?; let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?;
// Create a new Multipart Form validator // Create a new Multipart Form validator
// //
@ -296,7 +296,7 @@ async fn main() -> Result<(), anyhow::Error> {
let manager2 = manager.clone(); let manager2 = manager.clone();
let form = Form::new() let form = Form::new()
.max_files(10) .max_files(10)
.max_file_size(config.max_file_size() * MEGABYTES) .max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| UploadError::from(e).into()) .transform_error(|e| UploadError::from(e).into())
.field( .field(
"images", "images",
@ -304,7 +304,7 @@ async fn main() -> Result<(), anyhow::Error> {
let manager = manager2.clone(); let manager = manager2.clone();
async move { async move {
manager.upload(stream.into()).await.map(|alias| { manager.upload(stream).await.map(|alias| {
let mut path = PathBuf::new(); let mut path = PathBuf::new();
path.push(alias); path.push(alias);
Some(path) Some(path)
@ -316,11 +316,11 @@ async fn main() -> Result<(), anyhow::Error> {
// Create a new Multipart Form validator for internal imports // Create a new Multipart Form validator for internal imports
// //
// This form is expecting a single array field, 'images' with at most 10 files in it // This form is expecting a single array field, 'images' with at most 10 files in it
let validate_imports = config.validate_imports(); let validate_imports = CONFIG.validate_imports();
let manager2 = manager.clone(); let manager2 = manager.clone();
let import_form = Form::new() let import_form = Form::new()
.max_files(10) .max_files(10)
.max_file_size(config.max_file_size() * MEGABYTES) .max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| UploadError::from(e).into()) .transform_error(|e| UploadError::from(e).into())
.field( .field(
"images", "images",
@ -329,7 +329,7 @@ async fn main() -> Result<(), anyhow::Error> {
async move { async move {
manager manager
.import(filename, content_type, validate_imports, stream.into()) .import(filename, content_type, validate_imports, stream)
.await .await
.map(|alias| { .map(|alias| {
let mut path = PathBuf::new(); let mut path = PathBuf::new();
@ -340,21 +340,18 @@ async fn main() -> Result<(), anyhow::Error> {
})), })),
); );
let config2 = config.clone();
HttpServer::new(move || { HttpServer::new(move || {
let client = Client::build() let client = Client::build()
.header("User-Agent", "pict-rs v0.1.0-master") .header("User-Agent", "pict-rs v0.1.0-master")
.finish(); .finish();
let config = config2.clone();
App::new() App::new()
.wrap(Compress::default()) .wrap(Compress::default())
.wrap(Logger::default()) .wrap(Logger::default())
.wrap(Tracing) .wrap(Tracing)
.data(manager.clone()) .data(manager.clone())
.data(client) .data(client)
.data(config.filter_whitelist()) .data(CONFIG.filter_whitelist())
.service( .service(
web::scope("/image") web::scope("/image")
.service( .service(
@ -377,7 +374,7 @@ async fn main() -> Result<(), anyhow::Error> {
.route(web::post().to(upload)), .route(web::post().to(upload)),
) )
}) })
.bind(config.bind_address())? .bind(CONFIG.bind_address())?
.run() .run()
.await?; .await?;

View file

@ -2,7 +2,7 @@ use crate::error::UploadError;
use actix_web::web; use actix_web::web;
use image::{DynamicImage, GenericImageView}; use image::{DynamicImage, GenericImageView};
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use tracing::{debug, instrument}; use tracing::{debug, instrument, Span};
pub(crate) trait Processor { pub(crate) trait Processor {
fn name() -> &'static str fn name() -> &'static str
@ -51,6 +51,7 @@ impl Processor for Identity {
where where
Self: Sized, Self: Sized,
{ {
debug!("Identity");
Some(Box::new(Identity)) Some(Box::new(Identity))
} }
@ -95,6 +96,7 @@ impl Processor for Thumbnail {
} }
fn process(&self, img: DynamicImage) -> Result<DynamicImage, UploadError> { fn process(&self, img: DynamicImage) -> Result<DynamicImage, UploadError> {
debug!("Thumbnail");
if img.in_bounds(self.0, self.0) { if img.in_bounds(self.0, self.0) {
Ok(img.thumbnail(self.0, self.0)) Ok(img.thumbnail(self.0, self.0))
} else { } else {
@ -129,6 +131,7 @@ impl Processor for Blur {
} }
fn process(&self, img: DynamicImage) -> Result<DynamicImage, UploadError> { fn process(&self, img: DynamicImage) -> Result<DynamicImage, UploadError> {
debug!("Blur");
Ok(img.blur(self.0)) Ok(img.blur(self.0))
} }
} }
@ -148,25 +151,7 @@ pub(crate) struct ProcessChain {
impl std::fmt::Debug for ProcessChain { impl std::fmt::Debug for ProcessChain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ProcessChain") f.debug_struct("ProcessChain")
.field("inner", &format!("{} operations", self.inner.len())) .field("steps", &self.inner.len())
.finish()
}
}
pub(crate) struct ImageWrapper {
pub(crate) inner: DynamicImage,
}
impl From<DynamicImage> for ImageWrapper {
fn from(inner: DynamicImage) -> Self {
ImageWrapper { inner }
}
}
impl std::fmt::Debug for ImageWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ImageWrapper")
.field("inner", &"DynamicImage".to_string())
.finish() .finish()
} }
} }
@ -199,15 +184,23 @@ pub(crate) fn build_path(base: PathBuf, chain: &ProcessChain, filename: String)
path path
} }
#[instrument] #[instrument(skip(img))]
pub(crate) async fn process_image( pub(crate) async fn process_image(
chain: ProcessChain, chain: ProcessChain,
img: ImageWrapper, mut img: DynamicImage,
) -> Result<ImageWrapper, UploadError> { ) -> Result<DynamicImage, UploadError> {
let mut inner = img.inner;
for processor in chain.inner.into_iter() { for processor in chain.inner.into_iter() {
inner = web::block(move || processor.process(inner)).await?; debug!("Step");
let span = Span::current();
img = web::block(move || {
let entered = span.enter();
let res = processor.process(img);
drop(entered);
res
})
.await?;
debug!("Step complete");
} }
Ok(inner.into()) Ok(img)
} }

View file

@ -3,7 +3,7 @@ use actix_web::web;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use sha2::Digest; use sha2::Digest;
use std::{path::PathBuf, pin::Pin, sync::Arc}; use std::{path::PathBuf, pin::Pin, sync::Arc};
use tracing::{debug, error, instrument, warn}; use tracing::{debug, error, info, instrument, warn, Span};
#[derive(Clone)] #[derive(Clone)]
pub struct UploadManager { pub struct UploadManager {
@ -21,35 +21,41 @@ struct UploadManagerInner {
impl std::fmt::Debug for UploadManager { impl std::fmt::Debug for UploadManager {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager") f.debug_struct("UploadManager").finish()
.field("inner", &format!("Arc<UploadManagerInner>"))
.finish()
} }
} }
type StreamAlias<E> = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, E>>>>; type UploadStream<E> = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, E>>>>;
pub(crate) struct UploadStream<E> { struct FilenameIVec {
inner: StreamAlias<E>, inner: sled::IVec,
} }
impl<E> UploadStream<E> { impl FilenameIVec {
pub(crate) fn new(s: StreamAlias<E>) -> Self { fn new(inner: sled::IVec) -> Self {
UploadStream { inner: s } FilenameIVec { inner }
} }
} }
impl<E> From<StreamAlias<E>> for UploadStream<E> { impl std::fmt::Debug for FilenameIVec {
fn from(s: StreamAlias<E>) -> Self {
UploadStream { inner: s }
}
}
impl<E> std::fmt::Debug for UploadStream<E> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadStream") write!(f, "{:?}", String::from_utf8(self.inner.to_vec()))
.field("inner", &"stream".to_string()) }
.finish() }
struct Hash {
inner: Vec<u8>,
}
impl Hash {
fn new(inner: Vec<u8>) -> Self {
Hash { inner }
}
}
impl std::fmt::Debug for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", base64::encode(&self.inner))
} }
} }
@ -101,6 +107,7 @@ impl UploadManager {
} }
/// Store the path to a generated image variant so we can easily clean it up later /// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant(&self, path: PathBuf) -> Result<(), UploadError> { pub(crate) async fn store_variant(&self, path: PathBuf) -> Result<(), UploadError> {
let filename = path let filename = path
.file_name() .file_name()
@ -110,30 +117,37 @@ impl UploadManager {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let fname_tree = self.inner.filename_tree.clone(); let fname_tree = self.inner.filename_tree.clone();
debug!("Getting hash");
let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes())) let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes()))
.await? .await?
.ok_or(UploadError::MissingFilename)?; .ok_or(UploadError::MissingFilename)?;
let key = variant_key(&hash, &path_string); let key = variant_key(&hash, &path_string);
let db = self.inner.db.clone(); let db = self.inner.db.clone();
debug!("Storing variant");
web::block(move || db.insert(key, path_string.as_bytes())).await?; web::block(move || db.insert(key, path_string.as_bytes())).await?;
debug!("Stored variant");
Ok(()) Ok(())
} }
/// Delete the alias, and the file & variants if no more aliases exist /// Delete the alias, and the file & variants if no more aliases exist
#[instrument(skip(self))]
pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> { pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> {
use sled::Transactional; use sled::Transactional;
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let alias_tree = self.inner.alias_tree.clone(); let alias_tree = self.inner.alias_tree.clone();
let span = Span::current();
let alias2 = alias.clone(); let alias2 = alias.clone();
let hash = web::block(move || { let hash = web::block(move || {
[&*db, &alias_tree].transaction(|v| { [&*db, &alias_tree].transaction(|v| {
let entered = span.enter();
let db = &v[0]; let db = &v[0];
let alias_tree = &v[1]; let alias_tree = &v[1];
// -- GET TOKEN -- // -- GET TOKEN --
debug!("Deleting alias -> delete-token mapping");
let existing_token = alias_tree let existing_token = alias_tree
.remove(delete_key(&alias2).as_bytes())? .remove(delete_key(&alias2).as_bytes())?
.ok_or(trans_err(UploadError::MissingAlias))?; .ok_or(trans_err(UploadError::MissingAlias))?;
@ -145,18 +159,22 @@ impl UploadManager {
} }
// -- GET ID FOR HASH TREE CLEANUP -- // -- GET ID FOR HASH TREE CLEANUP --
debug!("Deleting alias -> id mapping");
let id = alias_tree let id = alias_tree
.remove(alias_id_key(&alias2).as_bytes())? .remove(alias_id_key(&alias2).as_bytes())?
.ok_or(trans_err(UploadError::MissingAlias))?; .ok_or(trans_err(UploadError::MissingAlias))?;
let id = String::from_utf8(id.to_vec()).map_err(|e| trans_err(e.into()))?; let id = String::from_utf8(id.to_vec()).map_err(|e| trans_err(e.into()))?;
// -- GET HASH FOR HASH TREE CLEANUP -- // -- GET HASH FOR HASH TREE CLEANUP --
debug!("Deleting alias -> hash mapping");
let hash = alias_tree let hash = alias_tree
.remove(alias2.as_bytes())? .remove(alias2.as_bytes())?
.ok_or(trans_err(UploadError::MissingAlias))?; .ok_or(trans_err(UploadError::MissingAlias))?;
// -- REMOVE HASH TREE ELEMENT -- // -- REMOVE HASH TREE ELEMENT --
debug!("Deleting hash -> alias mapping");
db.remove(alias_key(&hash, &id))?; db.remove(alias_key(&hash, &id))?;
drop(entered);
Ok(hash) Ok(hash)
}) })
}) })
@ -165,6 +183,7 @@ impl UploadManager {
// -- CHECK IF ANY OTHER ALIASES EXIST -- // -- CHECK IF ANY OTHER ALIASES EXIST --
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let (start, end) = alias_key_bounds(&hash); let (start, end) = alias_key_bounds(&hash);
debug!("Checking for additional aliases referencing hash");
let any_aliases = web::block(move || { let any_aliases = web::block(move || {
Ok(db.range(start..end).next().is_some()) as Result<bool, UploadError> Ok(db.range(start..end).next().is_some()) as Result<bool, UploadError>
}) })
@ -172,36 +191,48 @@ impl UploadManager {
// Bail if there are existing aliases // Bail if there are existing aliases
if any_aliases { if any_aliases {
debug!("Other aliases reference file, not removing from disk");
return Ok(()); return Ok(());
} }
// -- DELETE HASH ENTRY -- // -- DELETE HASH ENTRY --
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let hash2 = hash.clone(); let hash2 = hash.clone();
debug!("Deleting hash -> filename mapping");
let filename = web::block(move || db.remove(&hash2)) let filename = web::block(move || db.remove(&hash2))
.await? .await?
.ok_or(UploadError::MissingFile)?; .ok_or(UploadError::MissingFile)?;
// -- DELETE FILES -- // -- DELETE FILES --
let this = self.clone(); let this = self.clone();
debug!("Spawning cleanup task");
let span = Span::current();
actix_rt::spawn(async move { actix_rt::spawn(async move {
if let Err(e) = this.cleanup_files(filename).await { let entered = span.enter();
if let Err(e) = this
.cleanup_files(FilenameIVec::new(filename.clone()))
.await
{
error!("Error removing files from fs, {}", e); error!("Error removing files from fs, {}", e);
} }
info!(
"Files deleted for {:?}",
String::from_utf8(filename.to_vec())
);
drop(entered);
}); });
Ok(()) Ok(())
} }
/// Generate a delete token for an alias /// Generate a delete token for an alias
#[instrument] #[instrument(skip(self))]
pub(crate) async fn delete_token(&self, alias: String) -> Result<String, UploadError> { pub(crate) async fn delete_token(&self, alias: String) -> Result<String, UploadError> {
debug!("Generating delete token"); debug!("Generating delete token");
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
let rng = rand::thread_rng(); let rng = rand::thread_rng();
let s: String = Alphanumeric.sample_iter(rng).take(10).collect(); let s: String = Alphanumeric.sample_iter(rng).take(10).collect();
let delete_token = s.clone(); let delete_token = s.clone();
debug!("Generated delete token");
debug!("Saving delete token"); debug!("Saving delete token");
let alias_tree = self.inner.alias_tree.clone(); let alias_tree = self.inner.alias_tree.clone();
@ -214,7 +245,6 @@ impl UploadManager {
) )
}) })
.await?; .await?;
debug!("Delete token saved");
if let Err(sled::CompareAndSwapError { if let Err(sled::CompareAndSwapError {
current: Some(ivec), current: Some(ivec),
@ -223,13 +253,16 @@ impl UploadManager {
{ {
let s = String::from_utf8(ivec.to_vec())?; let s = String::from_utf8(ivec.to_vec())?;
debug!("Returning existing delete token, {}", s);
return Ok(s); return Ok(s);
} }
debug!("Returning new delete token, {}", delete_token);
Ok(delete_token) Ok(delete_token)
} }
/// Upload the file while preserving the filename, optionally validating the uploaded image /// Upload the file while preserving the filename, optionally validating the uploaded image
#[instrument(skip(self, stream))]
pub(crate) async fn import<E>( pub(crate) async fn import<E>(
&self, &self,
alias: String, alias: String,
@ -240,9 +273,11 @@ impl UploadManager {
where where
UploadError: From<E>, UploadError: From<E>,
{ {
debug!("Reading stream");
let bytes = read_stream(stream).await?; let bytes = read_stream(stream).await?;
let (bytes, content_type) = if validate { let (bytes, content_type) = if validate {
debug!("Validating image");
let format = self.inner.format.clone(); let format = self.inner.format.clone();
validate_image(bytes, format).await? validate_image(bytes, format).await?
} else { } else {
@ -252,10 +287,13 @@ impl UploadManager {
// -- DUPLICATE CHECKS -- // -- DUPLICATE CHECKS --
// Cloning bytes is fine because it's actually a pointer // Cloning bytes is fine because it's actually a pointer
debug!("Hashing bytes");
let hash = self.hash(bytes.clone()).await?; let hash = self.hash(bytes.clone()).await?;
debug!("Storing alias");
self.add_existing_alias(&hash, &alias).await?; self.add_existing_alias(&hash, &alias).await?;
debug!("Saving file");
self.save_upload(bytes, hash, content_type).await?; self.save_upload(bytes, hash, content_type).await?;
// Return alias to file // Return alias to file
@ -263,7 +301,7 @@ impl UploadManager {
} }
/// Upload the file, discarding bytes if it's already present, or saving if it's new /// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument] #[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(&self, stream: UploadStream<E>) -> Result<String, UploadError> pub(crate) async fn upload<E>(&self, stream: UploadStream<E>) -> Result<String, UploadError>
where where
UploadError: From<E>, UploadError: From<E>,
@ -271,41 +309,39 @@ impl UploadManager {
// -- READ IN BYTES FROM CLIENT -- // -- READ IN BYTES FROM CLIENT --
debug!("Reading stream"); debug!("Reading stream");
let bytes = read_stream(stream).await?; let bytes = read_stream(stream).await?;
debug!("Read stream");
// -- VALIDATE IMAGE -- // -- VALIDATE IMAGE --
debug!("Validating image"); debug!("Validating image");
let format = self.inner.format.clone(); let format = self.inner.format.clone();
let (bytes, content_type) = validate_image(bytes, format).await?; let (bytes, content_type) = validate_image(bytes, format).await?;
debug!("Image validated");
// -- DUPLICATE CHECKS -- // -- DUPLICATE CHECKS --
// Cloning bytes is fine because it's actually a pointer // Cloning bytes is fine because it's actually a pointer
debug!("Hashing bytes"); debug!("Hashing bytes");
let hash = self.hash(bytes.clone()).await?; let hash = self.hash(bytes.clone()).await?;
debug!("Bytes hashed");
debug!("Adding alias"); debug!("Adding alias");
let alias = self.add_alias(&hash, content_type.clone()).await?; let alias = self.add_alias(&hash, content_type.clone()).await?;
debug!("Alias added");
debug!("Saving file"); debug!("Saving file");
self.save_upload(bytes, hash, content_type).await?; self.save_upload(bytes, hash, content_type).await?;
debug!("File saved");
// Return alias to file // Return alias to file
Ok(alias) Ok(alias)
} }
/// Fetch the real on-disk filename given an alias /// Fetch the real on-disk filename given an alias
#[instrument(skip(self))]
pub(crate) async fn from_alias(&self, alias: String) -> Result<String, UploadError> { pub(crate) async fn from_alias(&self, alias: String) -> Result<String, UploadError> {
let tree = self.inner.alias_tree.clone(); let tree = self.inner.alias_tree.clone();
debug!("Getting hash from alias");
let hash = web::block(move || tree.get(alias.as_bytes())) let hash = web::block(move || tree.get(alias.as_bytes()))
.await? .await?
.ok_or(UploadError::MissingAlias)?; .ok_or(UploadError::MissingAlias)?;
let db = self.inner.db.clone(); let db = self.inner.db.clone();
debug!("Getting filename from hash");
let filename = web::block(move || db.get(hash)) let filename = web::block(move || db.get(hash))
.await? .await?
.ok_or(UploadError::MissingFile)?; .ok_or(UploadError::MissingFile)?;
@ -316,23 +352,28 @@ impl UploadManager {
} }
// Find image variants and remove them from the DB and the disk // Find image variants and remove them from the DB and the disk
async fn cleanup_files(&self, filename: sled::IVec) -> Result<(), UploadError> { #[instrument(skip(self))]
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> {
let filename = filename.inner;
let mut path = self.image_dir(); let mut path = self.image_dir();
let fname = String::from_utf8(filename.to_vec())?; let fname = String::from_utf8(filename.to_vec())?;
path.push(fname); path.push(fname);
let mut errors = Vec::new(); let mut errors = Vec::new();
debug!("Deleting {:?}", path);
if let Err(e) = actix_fs::remove_file(path).await { if let Err(e) = actix_fs::remove_file(path).await {
errors.push(e.into()); errors.push(e.into());
} }
let fname_tree = self.inner.filename_tree.clone(); let fname_tree = self.inner.filename_tree.clone();
debug!("Deleting filename -> hash mapping");
let hash = web::block(move || fname_tree.remove(filename)) let hash = web::block(move || fname_tree.remove(filename))
.await? .await?
.ok_or(UploadError::MissingFile)?; .ok_or(UploadError::MissingFile)?;
let (start, end) = variant_key_bounds(&hash); let (start, end) = variant_key_bounds(&hash);
let db = self.inner.db.clone(); let db = self.inner.db.clone();
debug!("Fetching file variants");
let keys = web::block(move || { let keys = web::block(move || {
let mut keys = Vec::new(); let mut keys = Vec::new();
for key in db.range(start..end).keys() { for key in db.range(start..end).keys() {
@ -343,9 +384,12 @@ impl UploadManager {
}) })
.await?; .await?;
debug!("{} files prepared for deletion", keys.len());
for key in keys { for key in keys {
let db = self.inner.db.clone(); let db = self.inner.db.clone();
if let Some(path) = web::block(move || db.remove(key)).await? { if let Some(path) = web::block(move || db.remove(key)).await? {
debug!("Deleting {:?}", String::from_utf8(path.to_vec()));
if let Err(e) = remove_path(path).await { if let Err(e) = remove_path(path).await {
errors.push(e); errors.push(e);
} }
@ -362,13 +406,14 @@ impl UploadManager {
async fn save_upload( async fn save_upload(
&self, &self,
bytes: bytes::Bytes, bytes: bytes::Bytes,
hash: Vec<u8>, hash: Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<(), UploadError> { ) -> Result<(), UploadError> {
let (dup, name) = self.check_duplicate(hash, content_type).await?; let (dup, name) = self.check_duplicate(hash, content_type).await?;
// bail early with alias to existing file if this is a duplicate // bail early with alias to existing file if this is a duplicate
if dup.exists() { if dup.exists() {
debug!("Duplicate exists, not saving file");
return Ok(()); return Ok(());
} }
@ -382,7 +427,7 @@ impl UploadManager {
} }
// produce a sh256sum of the uploaded file // produce a sh256sum of the uploaded file
async fn hash(&self, bytes: bytes::Bytes) -> Result<Vec<u8>, UploadError> { async fn hash(&self, bytes: bytes::Bytes) -> Result<Hash, UploadError> {
let mut hasher = self.inner.hasher.clone(); let mut hasher = self.inner.hasher.clone();
let hash = web::block(move || { let hash = web::block(move || {
hasher.update(&bytes); hasher.update(&bytes);
@ -390,21 +435,22 @@ impl UploadManager {
}) })
.await?; .await?;
Ok(hash) Ok(Hash::new(hash))
} }
// check for an already-uploaded image with this hash, returning the path to the target file // check for an already-uploaded image with this hash, returning the path to the target file
#[instrument] #[instrument(skip(self))]
async fn check_duplicate( async fn check_duplicate(
&self, &self,
hash: Vec<u8>, hash: Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<(Dup, String), UploadError> { ) -> Result<(Dup, String), UploadError> {
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let filename = self.next_file(content_type).await?; let filename = self.next_file(content_type).await?;
let filename2 = filename.clone(); let filename2 = filename.clone();
let hash2 = hash.clone(); let hash2 = hash.inner.clone();
debug!("Inserting filename for hash");
let res = web::block(move || { let res = web::block(move || {
db.compare_and_swap( db.compare_and_swap(
hash2, hash2,
@ -420,24 +466,27 @@ impl UploadManager {
}) = res }) = res
{ {
let name = String::from_utf8(ivec.to_vec())?; let name = String::from_utf8(ivec.to_vec())?;
debug!("Filename exists for hash, {}", name);
return Ok((Dup::Exists, name)); return Ok((Dup::Exists, name));
} }
let fname_tree = self.inner.filename_tree.clone(); let fname_tree = self.inner.filename_tree.clone();
let filename2 = filename.clone(); let filename2 = filename.clone();
web::block(move || fname_tree.insert(filename2, hash)).await?; debug!("Saving filename -> hash relation");
web::block(move || fname_tree.insert(filename2, hash.inner)).await?;
Ok((Dup::New, filename)) Ok((Dup::New, filename))
} }
// generate a short filename that isn't already in-use // generate a short filename that isn't already in-use
#[instrument] #[instrument(skip(self))]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> { async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> {
let image_dir = self.image_dir(); let image_dir = self.image_dir();
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10; let mut limit: usize = 10;
let rng = rand::thread_rng(); let rng = rand::thread_rng();
loop { loop {
debug!("Filename generation loop");
let mut path = image_dir.clone(); let mut path = image_dir.clone();
let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); let s: String = Alphanumeric.sample_iter(rng).take(limit).collect();
@ -447,16 +496,20 @@ impl UploadManager {
if let Err(e) = actix_fs::metadata(path).await { if let Err(e) = actix_fs::metadata(path).await {
if e.kind() == Some(std::io::ErrorKind::NotFound) { if e.kind() == Some(std::io::ErrorKind::NotFound) {
debug!("Generated unused filename {}", filename);
return Ok(filename); return Ok(filename);
} }
return Err(e.into()); return Err(e.into());
} }
debug!("Filename exists, trying again");
limit += 1; limit += 1;
} }
} }
async fn add_existing_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> { #[instrument(skip(self))]
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> {
self.save_alias(hash, alias).await??; self.save_alias(hash, alias).await??;
self.store_alias(hash, alias).await?; self.store_alias(hash, alias).await?;
@ -467,10 +520,10 @@ impl UploadManager {
// Add an alias to an existing file // Add an alias to an existing file
// //
// This will help if multiple 'users' upload the same file, and one of them wants to delete it // This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument] #[instrument(skip(self))]
async fn add_alias( async fn add_alias(
&self, &self,
hash: &[u8], hash: &Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<String, UploadError> { ) -> Result<String, UploadError> {
let alias = self.next_alias(hash, content_type).await?; let alias = self.next_alias(hash, content_type).await?;
@ -483,16 +536,18 @@ impl UploadManager {
// Add a pre-defined alias to an existin file // Add a pre-defined alias to an existin file
// //
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
#[instrument] #[instrument(skip(self))]
async fn store_alias(&self, hash: &[u8], alias: &str) -> Result<(), UploadError> { async fn store_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> {
let alias = alias.to_string(); let alias = alias.to_string();
loop { loop {
debug!("hash -> alias save loop");
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let id = web::block(move || db.generate_id()).await?.to_string(); let id = web::block(move || db.generate_id()).await?.to_string();
let key = alias_key(hash, &id); let key = alias_key(&hash.inner, &id);
let db = self.inner.db.clone(); let db = self.inner.db.clone();
let alias2 = alias.clone(); let alias2 = alias.clone();
debug!("Saving hash/id -> alias mapping");
let res = web::block(move || { let res = web::block(move || {
db.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes())) db.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
}) })
@ -501,25 +556,30 @@ impl UploadManager {
if res.is_ok() { if res.is_ok() {
let alias_tree = self.inner.alias_tree.clone(); let alias_tree = self.inner.alias_tree.clone();
let key = alias_id_key(&alias); let key = alias_id_key(&alias);
debug!("Saving alias -> id mapping");
web::block(move || alias_tree.insert(key.as_bytes(), id.as_bytes())).await?; web::block(move || alias_tree.insert(key.as_bytes(), id.as_bytes())).await?;
break; break;
} }
debug!("Id exists, trying again");
} }
Ok(()) Ok(())
} }
// Generate an alias to the file // Generate an alias to the file
#[instrument(skip(self))]
async fn next_alias( async fn next_alias(
&self, &self,
hash: &[u8], hash: &Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<String, UploadError> { ) -> Result<String, UploadError> {
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10; let mut limit: usize = 10;
let rng = rand::thread_rng(); let rng = rand::thread_rng();
loop { loop {
debug!("Alias gen loop");
let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); let s: String = Alphanumeric.sample_iter(rng).take(limit).collect();
let alias = file_name(s, content_type.clone()); let alias = file_name(s, content_type.clone());
@ -528,27 +588,31 @@ impl UploadManager {
if res.is_ok() { if res.is_ok() {
return Ok(alias); return Ok(alias);
} }
debug!("Alias exists, regenning");
limit += 1; limit += 1;
} }
} }
// Save an alias to the database // Save an alias to the database
#[instrument(skip(self))]
async fn save_alias( async fn save_alias(
&self, &self,
hash: &[u8], hash: &Hash,
alias: &str, alias: &str,
) -> Result<Result<(), UploadError>, UploadError> { ) -> Result<Result<(), UploadError>, UploadError> {
let tree = self.inner.alias_tree.clone(); let tree = self.inner.alias_tree.clone();
let vec = hash.to_vec(); let vec = hash.inner.clone();
let alias = alias.to_string(); let alias = alias.to_string();
debug!("Saving alias");
let res = web::block(move || { let res = web::block(move || {
tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec)) tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec))
}) })
.await?; .await?;
if res.is_err() { if res.is_err() {
warn!("Duplicate alias");
return Ok(Err(UploadError::DuplicateAlias)); return Ok(Err(UploadError::DuplicateAlias));
} }
@ -556,16 +620,17 @@ impl UploadManager {
} }
} }
#[instrument] #[instrument(skip(stream))]
async fn read_stream<E>(stream: UploadStream<E>) -> Result<bytes::Bytes, UploadError> async fn read_stream<E>(mut stream: UploadStream<E>) -> Result<bytes::Bytes, UploadError>
where where
UploadError: From<E>, UploadError: From<E>,
{ {
let mut stream = stream.inner;
let mut bytes = bytes::BytesMut::new(); let mut bytes = bytes::BytesMut::new();
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {
bytes.extend(res?); let new = res?;
debug!("Extending with {} bytes", new.len());
bytes.extend(new);
} }
Ok(bytes.freeze()) Ok(bytes.freeze())

View file

@ -3,7 +3,7 @@ use actix_web::web;
use bytes::Bytes; use bytes::Bytes;
use image::{ImageDecoder, ImageEncoder, ImageFormat}; use image::{ImageDecoder, ImageEncoder, ImageFormat};
use std::io::Cursor; use std::io::Cursor;
use tracing::debug; use tracing::{debug, instrument, Span};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum GifError { pub(crate) enum GifError {
@ -15,18 +15,26 @@ pub(crate) enum GifError {
} }
// import & export image using the image crate // import & export image using the image crate
#[instrument(skip(bytes))]
pub(crate) async fn validate_image( pub(crate) async fn validate_image(
bytes: Bytes, bytes: Bytes,
prescribed_format: Option<Format>, prescribed_format: Option<Format>,
) -> Result<(Bytes, mime::Mime), UploadError> { ) -> Result<(Bytes, mime::Mime), UploadError> {
let span = Span::current();
let tup = web::block(move || { let tup = web::block(move || {
let entered = span.enter();
if let Some(prescribed) = prescribed_format { if let Some(prescribed) = prescribed_format {
debug!("Load from memory");
let img = image::load_from_memory(&bytes).map_err(UploadError::InvalidImage)?; let img = image::load_from_memory(&bytes).map_err(UploadError::InvalidImage)?;
debug!("Loaded");
let mime = prescribed.to_mime(); let mime = prescribed.to_mime();
debug!("Writing");
let mut bytes = Cursor::new(vec![]); let mut bytes = Cursor::new(vec![]);
img.write_to(&mut bytes, prescribed.to_image_format())?; img.write_to(&mut bytes, prescribed.to_image_format())?;
debug!("Written");
return Ok((Bytes::from(bytes.into_inner()), mime)); return Ok((Bytes::from(bytes.into_inner()), mime));
} }
@ -41,6 +49,7 @@ pub(crate) async fn validate_image(
_ => Err(UploadError::UnsupportedFormat), _ => Err(UploadError::UnsupportedFormat),
}; };
debug!("Validated"); debug!("Validated");
drop(entered);
res res
}) })
.await?; .await?;
@ -48,6 +57,7 @@ pub(crate) async fn validate_image(
Ok(tup) Ok(tup)
} }
#[instrument(skip(bytes))]
fn validate_png(bytes: Bytes) -> Result<Bytes, UploadError> { fn validate_png(bytes: Bytes) -> Result<Bytes, UploadError> {
let decoder = image::png::PngDecoder::new(Cursor::new(&bytes))?; let decoder = image::png::PngDecoder::new(Cursor::new(&bytes))?;
@ -58,6 +68,7 @@ fn validate_png(bytes: Bytes) -> Result<Bytes, UploadError> {
Ok(Bytes::from(bytes.into_inner())) Ok(Bytes::from(bytes.into_inner()))
} }
#[instrument(skip(bytes))]
fn validate_jpg(bytes: Bytes) -> Result<Bytes, UploadError> { fn validate_jpg(bytes: Bytes) -> Result<Bytes, UploadError> {
let decoder = image::jpeg::JpegDecoder::new(Cursor::new(&bytes))?; let decoder = image::jpeg::JpegDecoder::new(Cursor::new(&bytes))?;
@ -68,6 +79,7 @@ fn validate_jpg(bytes: Bytes) -> Result<Bytes, UploadError> {
Ok(Bytes::from(bytes.into_inner())) Ok(Bytes::from(bytes.into_inner()))
} }
#[instrument(skip(bytes))]
fn validate_bmp(bytes: Bytes) -> Result<Bytes, UploadError> { fn validate_bmp(bytes: Bytes) -> Result<Bytes, UploadError> {
let decoder = image::bmp::BmpDecoder::new(Cursor::new(&bytes))?; let decoder = image::bmp::BmpDecoder::new(Cursor::new(&bytes))?;
@ -78,6 +90,7 @@ fn validate_bmp(bytes: Bytes) -> Result<Bytes, UploadError> {
Ok(Bytes::from(bytes.into_inner())) Ok(Bytes::from(bytes.into_inner()))
} }
#[instrument(skip(bytes))]
fn validate_gif(bytes: Bytes) -> Result<Bytes, GifError> { fn validate_gif(bytes: Bytes) -> Result<Bytes, GifError> {
use gif::{Parameter, SetParameter}; use gif::{Parameter, SetParameter};
@ -98,6 +111,7 @@ fn validate_gif(bytes: Bytes) -> Result<Bytes, GifError> {
gif::Repeat::Infinite.set_param(&mut encoder)?; gif::Repeat::Infinite.set_param(&mut encoder)?;
while let Some(frame) = reader.read_next_frame()? { while let Some(frame) = reader.read_next_frame()? {
debug!("Writing frame");
encoder.write_frame(frame)?; encoder.write_frame(frame)?;
} }
} }
@ -113,9 +127,11 @@ where
let (width, height) = decoder.dimensions(); let (width, height) = decoder.dimensions();
let color_type = decoder.color_type(); let color_type = decoder.color_type();
let total_bytes = decoder.total_bytes(); let total_bytes = decoder.total_bytes();
debug!("Reading image");
let mut decoded_bytes = vec![0u8; total_bytes as usize]; let mut decoded_bytes = vec![0u8; total_bytes as usize];
decoder.read_image(&mut decoded_bytes)?; decoder.read_image(&mut decoded_bytes)?;
debug!("Writing image");
encoder.write_image(&decoded_bytes, width, height, color_type)?; encoder.write_image(&decoded_bytes, width, height, color_type)?;
Ok(()) Ok(())