diff --git a/Cargo.lock b/Cargo.lock index afc7ef6..0610c2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1205,6 +1205,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -2303,6 +2304,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 69841ed..3f5ab98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ tracing-log = "0.1.2" tracing-opentelemetry = "0.15" tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] } url = "2.2" +uuid = { version = "0.8.2", features = ["v4", "serde"]} [dependencies.tracing-actix-web] version = "0.4.0-beta.14" diff --git a/src/upload_manager/hasher.rs b/src/upload_manager/hasher.rs new file mode 100644 index 0000000..b92fcd6 --- /dev/null +++ b/src/upload_manager/hasher.rs @@ -0,0 +1,132 @@ +use crate::error::Error; +use actix_web::web; +use sha2::Digest; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, ReadBuf}; + +pin_project_lite::pin_project! { + pub(crate) struct Hasher { + #[pin] + inner: I, + + hasher: D, + } +} + +pub(super) struct Hash { + inner: Vec, +} + +impl Hasher +where + D: Digest + Send + 'static, +{ + pub(super) fn new(reader: I, digest: D) -> Self { + Hasher { + inner: reader, + hasher: digest, + } + } + + pub(super) async fn finalize_reset(self) -> Result { + let mut hasher = self.hasher; + let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; + Ok(hash) + } +} + +impl Hash { + fn new(inner: Vec) -> Self { + Hash { inner } + } + + pub(super) fn as_slice(&self) -> &[u8] { + &self.inner + } + + pub(super) fn into_inner(self) -> Vec { + self.inner + } +} + +impl AsyncRead for Hasher +where + I: AsyncRead, + D: Digest, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.as_mut().project(); + + let reader = this.inner; + let hasher = this.hasher; + + let before_len = buf.filled().len(); + let poll_res = reader.poll_read(cx, buf); + let after_len = buf.filled().len(); + if after_len > before_len { + hasher.update(&buf.filled()[before_len..after_len]); + } + poll_res + } +} + +impl std::fmt::Debug for Hash { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", base64::encode(&self.inner)) + } +} + +#[cfg(test)] +mod test { + use super::Hasher; + use sha2::{Digest, Sha256}; + use std::io::Read; + + macro_rules! test_on_arbiter { + ($fut:expr) => { + actix_rt::System::new().block_on(async move { + let arbiter = actix_rt::Arbiter::new(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + + arbiter.spawn(async move { + let handle = actix_rt::spawn($fut); + + let _ = tx.send(handle.await.unwrap()); + }); + + rx.await.unwrap() + }) + }; + } + + #[test] + fn hasher_works() { + let hash = test_on_arbiter!(async move { + let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; + + let mut hasher = Hasher::new(file1, Sha256::new()); + + tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; + + hasher.finalize_reset().await + }) + .unwrap(); + + let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); + let mut vec = Vec::new(); + file.read_to_end(&mut vec).unwrap(); + let mut hasher = Sha256::new(); + hasher.update(vec); + let correct_hash = hasher.finalize_reset().to_vec(); + + assert_eq!(hash.inner, correct_hash); + } +} diff --git a/src/upload_manager.rs b/src/upload_manager/mod.rs similarity index 50% rename from src/upload_manager.rs rename to src/upload_manager/mod.rs index 1a98333..26a6097 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager/mod.rs @@ -2,21 +2,18 @@ use crate::{ config::Format, error::{Error, UploadError}, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, - to_ext, }; use actix_web::web; -use futures_util::stream::{LocalBoxStream, StreamExt}; use sha2::Digest; -use std::{ - path::PathBuf, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::io::{AsyncRead, ReadBuf}; +use std::{path::PathBuf, sync::Arc}; use tracing::{debug, error, info, instrument, warn, Span}; use tracing_futures::Instrument; +mod hasher; +mod session; + +pub(super) use session::UploadManagerSession; + // TREE STRUCTURE // - Alias Tree // - alias -> hash @@ -28,110 +25,19 @@ use tracing_futures::Instrument; // - hash 2 variant path -> variant path // - Filename Tree // - filename -> hash +// - Path Tree +// - filename -> relative path +// - filename / variant operation path -> relative path +// - Settings Tree +// - last-path -> last generated path +// - fs-restructure-01-started -> bool +// - fs-restructure-01-complete -> bool #[derive(Clone)] pub struct UploadManager { inner: Arc, } -pub struct UploadManagerSession { - manager: UploadManager, - alias: Option, - finished: bool, -} - -impl UploadManagerSession { - pub(crate) fn succeed(mut self) { - self.finished = true; - } - - pub(crate) fn alias(&self) -> Option<&str> { - self.alias.as_deref() - } -} - -impl Drop for UploadManagerSession { - fn drop(&mut self) { - if self.finished { - return; - } - - if let Some(alias) = self.alias.take() { - let manager = self.manager.clone(); - let cleanup_span = tracing::info_span!( - parent: None, - "Upload cleanup", - alias = &tracing::field::display(&alias), - ); - cleanup_span.follows_from(Span::current()); - actix_rt::spawn( - async move { - // undo alias -> hash mapping - debug!("Remove alias -> hash mapping"); - if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) { - // undo alias -> id mapping - debug!("Remove alias -> id mapping"); - let key = alias_id_key(&alias); - if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) { - // undo hash/id -> alias mapping - debug!("Remove hash/id -> alias mapping"); - let id = String::from_utf8_lossy(&id); - let key = alias_key(&hash, &id); - let _ = manager.inner.main_tree.remove(&key); - } - - let _ = manager.check_delete_files(hash).await; - } - } - .instrument(cleanup_span), - ); - } - } -} - -pub struct Hasher { - inner: I, - hasher: D, -} - -impl Hasher -where - D: Digest + Send + 'static, -{ - fn new(reader: I, digest: D) -> Self { - Hasher { - inner: reader, - hasher: digest, - } - } - - async fn finalize_reset(self) -> Result { - let mut hasher = self.hasher; - let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; - Ok(hash) - } -} - -impl AsyncRead for Hasher -where - I: AsyncRead + Unpin, - D: Digest + Unpin, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let before_len = buf.filled().len(); - let poll_res = Pin::new(&mut self.inner).poll_read(cx, buf); - let after_len = buf.filled().len(); - if after_len > before_len { - self.hasher.update(&buf.filled()[before_len..after_len]); - } - poll_res - } -} - struct UploadManagerInner { format: Option, hasher: sha2::Sha256, @@ -139,59 +45,16 @@ struct UploadManagerInner { alias_tree: sled::Tree, filename_tree: sled::Tree, main_tree: sled::Tree, + path_tree: sled::Tree, + settings_tree: sled::Tree, db: sled::Db, } -impl std::fmt::Debug for UploadManager { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("UploadManager").finish() - } -} - -type UploadStream = LocalBoxStream<'static, Result>; - #[derive(Clone, Debug)] pub(crate) struct Serde { inner: T, } -impl Serde { - pub(crate) fn new(inner: T) -> Self { - Serde { inner } - } -} - -impl serde::Serialize for Serde -where - T: std::fmt::Display, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let s = self.inner.to_string(); - serde::Serialize::serialize(s.as_str(), serializer) - } -} - -impl<'de, T> serde::Deserialize<'de> for Serde -where - T: std::str::FromStr, - ::Err: std::fmt::Display, -{ - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let s: String = serde::Deserialize::deserialize(deserializer)?; - let inner = s - .parse::() - .map_err(|e| serde::de::Error::custom(e.to_string()))?; - - Ok(Serde { inner }) - } -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Details { width: usize, @@ -200,93 +63,10 @@ pub(crate) struct Details { created_at: time::OffsetDateTime, } -impl Details { - #[tracing::instrument("Details from bytes", skip(input))] - pub(crate) async fn from_bytes(input: web::Bytes) -> Result { - let details = crate::magick::details_bytes(input).await?; - - Ok(Details::now( - details.width, - details.height, - details.mime_type, - )) - } - - #[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))] - pub(crate) async fn from_path

(path: P) -> Result - where - P: AsRef, - { - let details = crate::magick::details(&path).await?; - - Ok(Details::now( - details.width, - details.height, - details.mime_type, - )) - } - - fn now(width: usize, height: usize, content_type: mime::Mime) -> Self { - Details { - width, - height, - content_type: Serde::new(content_type), - created_at: time::OffsetDateTime::now_utc(), - } - } - - pub(crate) fn content_type(&self) -> mime::Mime { - self.content_type.inner.clone() - } - - pub(crate) fn system_time(&self) -> std::time::SystemTime { - self.created_at.into() - } -} - struct FilenameIVec { inner: sled::IVec, } -impl FilenameIVec { - fn new(inner: sled::IVec) -> Self { - FilenameIVec { inner } - } -} - -impl std::fmt::Debug for FilenameIVec { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", String::from_utf8(self.inner.to_vec())) - } -} - -struct Hash { - inner: Vec, -} - -impl Hash { - fn new(inner: Vec) -> Self { - Hash { inner } - } -} - -impl std::fmt::Debug for Hash { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", base64::encode(&self.inner)) - } -} - -enum Dup { - Exists, - New, -} - -impl Dup { - fn exists(&self) -> bool { - matches!(self, Dup::Exists) - } -} - impl UploadManager { /// Get the image directory pub(crate) fn image_dir(&self) -> PathBuf { @@ -304,7 +84,9 @@ impl UploadManager { // Ensure file dir exists tokio::fs::create_dir_all(&root_dir).await?; - Ok(UploadManager { + let settings_tree = db.open_tree("settings")?; + + let manager = UploadManager { inner: Arc::new(UploadManagerInner { format, hasher: sha2::Sha256::new(), @@ -312,9 +94,13 @@ impl UploadManager { alias_tree: db.open_tree("alias")?, filename_tree: db.open_tree("filename")?, main_tree: db.open_tree("main")?, + path_tree: db.open_tree("path")?, + settings_tree, db, }), - }) + }; + + Ok(manager) } /// Store the path to a generated image variant so we can easily clean it up later @@ -572,11 +358,7 @@ impl UploadManager { } pub(crate) fn session(&self) -> UploadManagerSession { - UploadManagerSession { - manager: self.clone(), - alias: None, - finished: false, - } + UploadManagerSession::new(self.clone()) } // Find image variants and remove them from the DB and the disk @@ -635,367 +417,60 @@ impl UploadManager { } } -impl UploadManagerSession { - /// Generate a delete token for an alias - #[instrument(skip(self))] - pub(crate) async fn delete_token(&self) -> Result { - let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; - - debug!("Generating delete token"); - use rand::distributions::{Alphanumeric, Distribution}; - let rng = rand::thread_rng(); - let s: String = Alphanumeric - .sample_iter(rng) - .take(10) - .map(char::from) - .collect(); - let delete_token = s.clone(); - - debug!("Saving delete token"); - let alias_tree = self.manager.inner.alias_tree.clone(); - let key = delete_key(&alias); - let res = web::block(move || { - alias_tree.compare_and_swap( - key.as_bytes(), - None as Option, - Some(s.as_bytes()), - ) - }) - .await??; - - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let s = String::from_utf8(ivec.to_vec())?; - - debug!("Returning existing delete token, {}", s); - return Ok(s); - } - - debug!("Returning new delete token, {}", delete_token); - Ok(delete_token) - } - - /// Upload the file while preserving the filename, optionally validating the uploaded image - #[instrument(skip(self, stream))] - pub(crate) async fn import( - mut self, - alias: String, - content_type: mime::Mime, - validate: bool, - mut stream: UploadStream, - ) -> Result - where - Error: From, - E: Unpin + 'static, - { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (content_type, validated_reader) = crate::validate::validate_image_bytes( - bytes_mut.freeze(), - self.manager.inner.format.clone(), - validate, - ) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Storing alias"); - self.alias = Some(alias.clone()); - self.add_existing_alias(&hash, &alias).await?; - - debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; - - // Return alias to file - Ok(self) - } - - /// Upload the file, discarding bytes if it's already present, or saving if it's new - #[instrument(skip(self, stream))] - pub(crate) async fn upload(mut self, mut stream: UploadStream) -> Result - where - Error: From, - { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (content_type, validated_reader) = crate::validate::validate_image_bytes( - bytes_mut.freeze(), - self.manager.inner.format.clone(), - true, - ) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Adding alias"); - self.add_alias(&hash, content_type.clone()).await?; - - debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; - - // Return alias to file - Ok(self) - } - - // check duplicates & store image if new - async fn save_upload( - &self, - tmpfile: PathBuf, - hash: Hash, - content_type: mime::Mime, - ) -> Result<(), Error> { - let (dup, name) = self.check_duplicate(hash, content_type).await?; - - // bail early with alias to existing file if this is a duplicate - if dup.exists() { - debug!("Duplicate exists, not saving file"); - return Ok(()); - } - - // -- WRITE NEW FILE -- - let mut real_path = self.manager.image_dir(); - real_path.push(name); - - crate::safe_move_file(tmpfile, real_path).await?; - - Ok(()) - } - - // check for an already-uploaded image with this hash, returning the path to the target file - #[instrument(skip(self, hash, content_type))] - async fn check_duplicate( - &self, - hash: Hash, - content_type: mime::Mime, - ) -> Result<(Dup, String), Error> { - let main_tree = self.manager.inner.main_tree.clone(); - - let filename = self.next_file(content_type).await?; - let filename2 = filename.clone(); - let hash2 = hash.inner.clone(); - debug!("Inserting filename for hash"); - let res = web::block(move || { - main_tree.compare_and_swap( - hash2, - None as Option, - Some(filename2.as_bytes()), - ) - }) - .await??; - - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let name = String::from_utf8(ivec.to_vec())?; - debug!("Filename exists for hash, {}", name); - return Ok((Dup::Exists, name)); - } - - let fname_tree = self.manager.inner.filename_tree.clone(); - let filename2 = filename.clone(); - debug!("Saving filename -> hash relation"); - web::block(move || fname_tree.insert(filename2, hash.inner)).await??; - - Ok((Dup::New, filename)) - } - - // generate a short filename that isn't already in-use - #[instrument(skip(self, content_type))] - async fn next_file(&self, content_type: mime::Mime) -> Result { - let image_dir = self.manager.image_dir(); - use rand::distributions::{Alphanumeric, Distribution}; - let mut limit: usize = 10; - let mut rng = rand::thread_rng(); - loop { - debug!("Filename generation loop"); - let mut path = image_dir.clone(); - let s: String = Alphanumeric - .sample_iter(&mut rng) - .take(limit) - .map(char::from) - .collect(); - - let filename = file_name(s, content_type.clone())?; - - path.push(filename.clone()); - - if let Err(e) = tokio::fs::metadata(path).await { - if e.kind() == std::io::ErrorKind::NotFound { - debug!("Generated unused filename {}", filename); - return Ok(filename); - } - return Err(e.into()); - } - - debug!("Filename exists, trying again"); - - limit += 1; - } - } - - #[instrument(skip(self, hash, alias))] - async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> { - self.save_alias_hash_mapping(hash, alias).await??; - - self.store_hash_id_alias_mapping(hash, alias).await?; - - Ok(()) - } - - // Add an alias to an existing file - // - // This will help if multiple 'users' upload the same file, and one of them wants to delete it - #[instrument(skip(self, hash, content_type))] - async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> { - let alias = self.next_alias(hash, content_type).await?; - - self.store_hash_id_alias_mapping(hash, &alias).await?; - - Ok(()) - } - - // Add a pre-defined alias to an existin file - // - // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files - #[instrument(skip(self, hash))] - async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> { - let alias = alias.to_string(); - loop { - debug!("hash -> alias save loop"); - let db = self.manager.inner.db.clone(); - let id = web::block(move || db.generate_id()).await??.to_string(); - - let alias_tree = self.manager.inner.alias_tree.clone(); - let key = alias_id_key(&alias); - let id2 = id.clone(); - debug!("Saving alias -> id mapping"); - web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??; - - let key = alias_key(&hash.inner, &id); - let main_tree = self.manager.inner.main_tree.clone(); - let alias2 = alias.clone(); - debug!("Saving hash/id -> alias mapping"); - let res = web::block(move || { - main_tree.compare_and_swap(key, None as Option, Some(alias2.as_bytes())) - }) - .await??; - - if res.is_ok() { - break; - } - - debug!("Id exists, trying again"); - } - - Ok(()) - } - - // Generate an alias to the file - #[instrument(skip(self, hash, content_type))] - async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result { - use rand::distributions::{Alphanumeric, Distribution}; - let mut limit: usize = 10; - let mut rng = rand::thread_rng(); - loop { - debug!("Alias gen loop"); - let s: String = Alphanumeric - .sample_iter(&mut rng) - .take(limit) - .map(char::from) - .collect(); - let alias = file_name(s, content_type.clone())?; - self.alias = Some(alias.clone()); - - let res = self.save_alias_hash_mapping(hash, &alias).await?; - - if res.is_ok() { - return Ok(alias); - } - debug!("Alias exists, regenning"); - - limit += 1; - } - } - - // Save an alias to the database - #[instrument(skip(self, hash))] - async fn save_alias_hash_mapping( - &self, - hash: &Hash, - alias: &str, - ) -> Result, Error> { - let tree = self.manager.inner.alias_tree.clone(); - let vec = hash.inner.clone(); - let alias = alias.to_string(); - - debug!("Saving alias -> hash mapping"); - let res = web::block(move || { - tree.compare_and_swap(alias.as_bytes(), None as Option, Some(vec)) - }) - .await??; - - if res.is_err() { - warn!("Duplicate alias"); - return Ok(Err(UploadError::DuplicateAlias.into())); - } - - Ok(Ok(())) +impl Serde { + pub(crate) fn new(inner: T) -> Self { + Serde { inner } } } -#[instrument(skip(input))] -pub(crate) async fn safe_save_reader( - to: PathBuf, - input: &mut (impl AsyncRead + Unpin), -) -> Result<(), Error> { - if let Some(path) = to.parent() { - debug!("Creating directory {:?}", path); - tokio::fs::create_dir_all(path.to_owned()).await?; +impl Details { + #[tracing::instrument("Details from bytes", skip(input))] + pub(crate) async fn from_bytes(input: web::Bytes) -> Result { + let details = crate::magick::details_bytes(input).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) } - debug!("Checking if {:?} already exists", to); - if let Err(e) = tokio::fs::metadata(to.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); + #[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))] + pub(crate) async fn from_path

(path: P) -> Result + where + P: AsRef, + { + let details = crate::magick::details(&path).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) + } + + fn now(width: usize, height: usize, content_type: mime::Mime) -> Self { + Details { + width, + height, + content_type: Serde::new(content_type), + created_at: time::OffsetDateTime::now_utc(), } - } else { - return Err(UploadError::FileExists.into()); } - debug!("Writing stream to {:?}", to); + pub(crate) fn content_type(&self) -> mime::Mime { + self.content_type.inner.clone() + } - let mut file = crate::file::File::create(to).await?; + pub(crate) fn system_time(&self) -> std::time::SystemTime { + self.created_at.into() + } +} - file.write_from_async_read(input).await?; - - Ok(()) +impl FilenameIVec { + fn new(inner: sled::IVec) -> Self { + FilenameIVec { inner } + } } async fn remove_path(path: sled::IVec) -> Result<(), Error> { @@ -1011,10 +486,6 @@ where sled::transaction::ConflictableTransactionError::Abort(e.into()) } -fn file_name(name: String, content_type: mime::Mime) -> Result { - Ok(format!("{}{}", name, to_ext(content_type)?)) -} - fn delete_key(alias: &str) -> String { format!("{}/delete", alias) } @@ -1034,50 +505,45 @@ fn variant_details_key(hash: &[u8], path: &str) -> Vec { key } -#[cfg(test)] -mod test { - use super::Hasher; - use sha2::{Digest, Sha256}; - use std::io::Read; - - macro_rules! test_on_arbiter { - ($fut:expr) => { - actix_rt::System::new().block_on(async move { - let arbiter = actix_rt::Arbiter::new(); - - let (tx, rx) = tokio::sync::oneshot::channel(); - - arbiter.spawn(async move { - let handle = actix_rt::spawn($fut); - - let _ = tx.send(handle.await.unwrap()); - }); - - rx.await.unwrap() - }) - }; - } - - #[test] - fn hasher_works() { - let hash = test_on_arbiter!(async move { - let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; - - let mut hasher = Hasher::new(file1, Sha256::new()); - - tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; - - hasher.finalize_reset().await - }) - .unwrap(); - - let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); - let mut vec = Vec::new(); - file.read_to_end(&mut vec).unwrap(); - let mut hasher = Sha256::new(); - hasher.update(vec); - let correct_hash = hasher.finalize_reset().to_vec(); - - assert_eq!(hash.inner, correct_hash); +impl std::fmt::Debug for UploadManager { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("UploadManager").finish() + } +} + +impl serde::Serialize for Serde +where + T: std::fmt::Display, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let s = self.inner.to_string(); + serde::Serialize::serialize(s.as_str(), serializer) + } +} + +impl<'de, T> serde::Deserialize<'de> for Serde +where + T: std::str::FromStr, + ::Err: std::fmt::Display, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s: String = serde::Deserialize::deserialize(deserializer)?; + let inner = s + .parse::() + .map_err(|e| serde::de::Error::custom(e.to_string()))?; + + Ok(Serde { inner }) + } +} + +impl std::fmt::Debug for FilenameIVec { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", String::from_utf8(self.inner.to_vec())) } } diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs new file mode 100644 index 0000000..9b5b9b9 --- /dev/null +++ b/src/upload_manager/session.rs @@ -0,0 +1,456 @@ +use crate::{ + error::{Error, UploadError}, + migrate::{alias_id_key, alias_key}, + to_ext, + upload_manager::{ + delete_key, + hasher::{Hash, Hasher}, + UploadManager, + }, +}; +use actix_web::web; +use futures_util::stream::{LocalBoxStream, StreamExt}; +use std::path::PathBuf; +use tokio::io::AsyncRead; +use tracing::{debug, instrument, warn, Span}; +use tracing_futures::Instrument; + +type UploadStream = LocalBoxStream<'static, Result>; + +pub(crate) struct UploadManagerSession { + manager: UploadManager, + alias: Option, + finished: bool, +} + +impl UploadManagerSession { + pub(super) fn new(manager: UploadManager) -> Self { + UploadManagerSession { + manager, + alias: None, + finished: false, + } + } + + pub(crate) fn succeed(mut self) { + self.finished = true; + } + + pub(crate) fn alias(&self) -> Option<&str> { + self.alias.as_deref() + } +} + +enum Dup { + Exists, + New, +} + +impl Dup { + fn exists(&self) -> bool { + matches!(self, Dup::Exists) + } +} + +impl Drop for UploadManagerSession { + fn drop(&mut self) { + if self.finished { + return; + } + + if let Some(alias) = self.alias.take() { + let manager = self.manager.clone(); + let cleanup_span = tracing::info_span!( + parent: None, + "Upload cleanup", + alias = &tracing::field::display(&alias), + ); + cleanup_span.follows_from(Span::current()); + actix_rt::spawn( + async move { + // undo alias -> hash mapping + debug!("Remove alias -> hash mapping"); + if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) { + // undo alias -> id mapping + debug!("Remove alias -> id mapping"); + let key = alias_id_key(&alias); + if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) { + // undo hash/id -> alias mapping + debug!("Remove hash/id -> alias mapping"); + let id = String::from_utf8_lossy(&id); + let key = alias_key(&hash, &id); + let _ = manager.inner.main_tree.remove(&key); + } + + let _ = manager.check_delete_files(hash).await; + } + } + .instrument(cleanup_span), + ); + } + } +} + +impl UploadManagerSession { + /// Generate a delete token for an alias + #[instrument(skip(self))] + pub(crate) async fn delete_token(&self) -> Result { + let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; + + debug!("Generating delete token"); + use rand::distributions::{Alphanumeric, Distribution}; + let rng = rand::thread_rng(); + let s: String = Alphanumeric + .sample_iter(rng) + .take(10) + .map(char::from) + .collect(); + let delete_token = s.clone(); + + debug!("Saving delete token"); + let alias_tree = self.manager.inner.alias_tree.clone(); + let key = delete_key(&alias); + let res = web::block(move || { + alias_tree.compare_and_swap( + key.as_bytes(), + None as Option, + Some(s.as_bytes()), + ) + }) + .await??; + + if let Err(sled::CompareAndSwapError { + current: Some(ivec), + .. + }) = res + { + let s = String::from_utf8(ivec.to_vec())?; + + debug!("Returning existing delete token, {}", s); + return Ok(s); + } + + debug!("Returning new delete token, {}", delete_token); + Ok(delete_token) + } + + /// Upload the file while preserving the filename, optionally validating the uploaded image + #[instrument(skip(self, stream))] + pub(crate) async fn import( + mut self, + alias: String, + content_type: mime::Mime, + validate: bool, + mut stream: UploadStream, + ) -> Result + where + Error: From, + E: Unpin + 'static, + { + let mut bytes_mut = actix_web::web::BytesMut::new(); + + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } + + debug!("Validating bytes"); + let (content_type, validated_reader) = crate::validate::validate_image_bytes( + bytes_mut.freeze(), + self.manager.inner.format.clone(), + validate, + ) + .await?; + + let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); + + let tmpfile = crate::tmp_file(); + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; + + debug!("Storing alias"); + self.alias = Some(alias.clone()); + self.add_existing_alias(&hash, &alias).await?; + + debug!("Saving file"); + self.save_upload(tmpfile, hash, content_type).await?; + + // Return alias to file + Ok(self) + } + + /// Upload the file, discarding bytes if it's already present, or saving if it's new + #[instrument(skip(self, stream))] + pub(crate) async fn upload(mut self, mut stream: UploadStream) -> Result + where + Error: From, + { + let mut bytes_mut = actix_web::web::BytesMut::new(); + + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } + + debug!("Validating bytes"); + let (content_type, validated_reader) = crate::validate::validate_image_bytes( + bytes_mut.freeze(), + self.manager.inner.format.clone(), + true, + ) + .await?; + + let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); + + let tmpfile = crate::tmp_file(); + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; + + debug!("Adding alias"); + self.add_alias(&hash, content_type.clone()).await?; + + debug!("Saving file"); + self.save_upload(tmpfile, hash, content_type).await?; + + // Return alias to file + Ok(self) + } + + // check duplicates & store image if new + async fn save_upload( + &self, + tmpfile: PathBuf, + hash: Hash, + content_type: mime::Mime, + ) -> Result<(), Error> { + let (dup, name) = self.check_duplicate(hash, content_type).await?; + + // bail early with alias to existing file if this is a duplicate + if dup.exists() { + debug!("Duplicate exists, not saving file"); + return Ok(()); + } + + // -- WRITE NEW FILE -- + let mut real_path = self.manager.image_dir(); + real_path.push(name); + + crate::safe_move_file(tmpfile, real_path).await?; + + Ok(()) + } + + // check for an already-uploaded image with this hash, returning the path to the target file + #[instrument(skip(self, hash, content_type))] + async fn check_duplicate( + &self, + hash: Hash, + content_type: mime::Mime, + ) -> Result<(Dup, String), Error> { + let main_tree = self.manager.inner.main_tree.clone(); + + let filename = self.next_file(content_type).await?; + let filename2 = filename.clone(); + let hash2 = hash.as_slice().to_vec(); + debug!("Inserting filename for hash"); + let res = web::block(move || { + main_tree.compare_and_swap( + hash2, + None as Option, + Some(filename2.as_bytes()), + ) + }) + .await??; + + if let Err(sled::CompareAndSwapError { + current: Some(ivec), + .. + }) = res + { + let name = String::from_utf8(ivec.to_vec())?; + debug!("Filename exists for hash, {}", name); + return Ok((Dup::Exists, name)); + } + + let fname_tree = self.manager.inner.filename_tree.clone(); + let filename2 = filename.clone(); + debug!("Saving filename -> hash relation"); + web::block(move || fname_tree.insert(filename2, hash.into_inner())).await??; + + Ok((Dup::New, filename)) + } + + // generate a short filename that isn't already in-use + #[instrument(skip(self, content_type))] + async fn next_file(&self, content_type: mime::Mime) -> Result { + let image_dir = self.manager.image_dir(); + use rand::distributions::{Alphanumeric, Distribution}; + let mut limit: usize = 10; + let mut rng = rand::thread_rng(); + loop { + debug!("Filename generation loop"); + let mut path = image_dir.clone(); + let s: String = Alphanumeric + .sample_iter(&mut rng) + .take(limit) + .map(char::from) + .collect(); + + let filename = file_name(s, content_type.clone())?; + + path.push(filename.clone()); + + if let Err(e) = tokio::fs::metadata(path).await { + if e.kind() == std::io::ErrorKind::NotFound { + debug!("Generated unused filename {}", filename); + return Ok(filename); + } + return Err(e.into()); + } + + debug!("Filename exists, trying again"); + + limit += 1; + } + } + + #[instrument(skip(self, hash, alias))] + async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> { + self.save_alias_hash_mapping(hash, alias).await??; + + self.store_hash_id_alias_mapping(hash, alias).await?; + + Ok(()) + } + + // Add an alias to an existing file + // + // This will help if multiple 'users' upload the same file, and one of them wants to delete it + #[instrument(skip(self, hash, content_type))] + async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> { + let alias = self.next_alias(hash, content_type).await?; + + self.store_hash_id_alias_mapping(hash, &alias).await?; + + Ok(()) + } + + // Add a pre-defined alias to an existin file + // + // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files + #[instrument(skip(self, hash))] + async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> { + let alias = alias.to_string(); + loop { + debug!("hash -> alias save loop"); + let db = self.manager.inner.db.clone(); + let id = web::block(move || db.generate_id()).await??.to_string(); + + let alias_tree = self.manager.inner.alias_tree.clone(); + let key = alias_id_key(&alias); + let id2 = id.clone(); + debug!("Saving alias -> id mapping"); + web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??; + + let key = alias_key(hash.as_slice(), &id); + let main_tree = self.manager.inner.main_tree.clone(); + let alias2 = alias.clone(); + debug!("Saving hash/id -> alias mapping"); + let res = web::block(move || { + main_tree.compare_and_swap(key, None as Option, Some(alias2.as_bytes())) + }) + .await??; + + if res.is_ok() { + break; + } + + debug!("Id exists, trying again"); + } + + Ok(()) + } + + // Generate an alias to the file + #[instrument(skip(self, hash, content_type))] + async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result { + use rand::distributions::{Alphanumeric, Distribution}; + let mut limit: usize = 10; + let mut rng = rand::thread_rng(); + loop { + debug!("Alias gen loop"); + let s: String = Alphanumeric + .sample_iter(&mut rng) + .take(limit) + .map(char::from) + .collect(); + let alias = file_name(s, content_type.clone())?; + self.alias = Some(alias.clone()); + + let res = self.save_alias_hash_mapping(hash, &alias).await?; + + if res.is_ok() { + return Ok(alias); + } + debug!("Alias exists, regenning"); + + limit += 1; + } + } + + // Save an alias to the database + #[instrument(skip(self, hash))] + async fn save_alias_hash_mapping( + &self, + hash: &Hash, + alias: &str, + ) -> Result, Error> { + let tree = self.manager.inner.alias_tree.clone(); + let vec = hash.as_slice().to_vec(); + let alias = alias.to_string(); + + debug!("Saving alias -> hash mapping"); + let res = web::block(move || { + tree.compare_and_swap(alias.as_bytes(), None as Option, Some(vec)) + }) + .await??; + + if res.is_err() { + warn!("Duplicate alias"); + return Ok(Err(UploadError::DuplicateAlias.into())); + } + + Ok(Ok(())) + } +} + +fn file_name(name: String, content_type: mime::Mime) -> Result { + Ok(format!("{}{}", name, to_ext(content_type)?)) +} + +#[instrument(skip(input))] +async fn safe_save_reader(to: PathBuf, input: &mut (impl AsyncRead + Unpin)) -> Result<(), Error> { + if let Some(path) = to.parent() { + debug!("Creating directory {:?}", path); + tokio::fs::create_dir_all(path.to_owned()).await?; + } + + debug!("Checking if {:?} already exists", to); + if let Err(e) = tokio::fs::metadata(to.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e.into()); + } + } else { + return Err(UploadError::FileExists.into()); + } + + debug!("Writing stream to {:?}", to); + + let mut file = crate::file::File::create(to).await?; + + file.write_from_async_read(input).await?; + + Ok(()) +}