From 34038a0f1675bbef6fb1543659af0cd362fa0339 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Mon, 18 Oct 2021 23:37:11 -0500 Subject: [PATCH] Fix restructure, upload --- src/main.rs | 319 ++++++++++++++++-------------- src/processor.rs | 4 +- src/upload_manager/mod.rs | 147 +++++++++++--- src/upload_manager/restructure.rs | 48 ++--- src/upload_manager/session.rs | 20 +- 5 files changed, 307 insertions(+), 231 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6e6b4ad..96f3286 100644 --- a/src/main.rs +++ b/src/main.rs @@ -300,8 +300,7 @@ async fn upload( let delete_token = image.result.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let mut path = manager.image_dir(); - path.push(name.clone()); + let path = manager.path_from_filename(name.clone()).await?; let details = manager.variant_details(path.clone(), name.clone()).await?; @@ -365,8 +364,7 @@ async fn download( let delete_token = session.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let mut path = manager.image_dir(); - path.push(name.clone()); + let path = manager.path_from_filename(name.clone()).await?; let details = manager.variant_details(path.clone(), name.clone()).await?; @@ -446,8 +444,7 @@ async fn prepare_process( .parse::() .map_err(|_| UploadError::UnsupportedFormat)?; let processed_name = format!("{}.{}", name, ext); - let base = manager.image_dir(); - let thumbnail_path = self::processor::build_path(base, &chain, processed_name); + let thumbnail_path = self::processor::build_path(&chain, processed_name); let thumbnail_args = self::processor::build_args(&chain); Ok((format, name, thumbnail_path, thumbnail_args)) @@ -463,7 +460,12 @@ async fn process_details( let (_, name, thumbnail_path, _) = prepare_process(query, ext.as_str(), &manager, &whitelist).await?; - let details = manager.variant_details(thumbnail_path, name).await?; + let real_path = manager + .variant_path(&thumbnail_path, &name) + .await? + .ok_or(UploadError::MissingAlias)?; + + let details = manager.variant_details(real_path, name).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -482,162 +484,175 @@ async fn process( let (format, name, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &manager, &whitelist).await?; + let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?; + // If the thumbnail doesn't exist, we need to create it - let thumbnail_exists = if let Err(e) = tokio::fs::metadata(&thumbnail_path) - .instrument(tracing::info_span!("Get thumbnail metadata")) - .await - { - if e.kind() != std::io::ErrorKind::NotFound { - error!("Error looking up processed image, {}", e); - return Err(e.into()); + let thumbnail_exists = if let Some(real_path) = &real_path_opt { + if let Err(e) = tokio::fs::metadata(real_path) + .instrument(tracing::info_span!("Get thumbnail metadata")) + .await + { + if e.kind() != std::io::ErrorKind::NotFound { + error!("Error looking up processed image, {}", e); + return Err(e.into()); + } + false + } else { + true } - false } else { - true + false }; - let details = manager - .variant_details(thumbnail_path.clone(), name.clone()) - .await?; + if thumbnail_exists { + if let Some(real_path) = real_path_opt { + let details_opt = manager + .variant_details(real_path.clone(), name.clone()) + .await?; - if !thumbnail_exists || details.is_none() { - let mut original_path = manager.image_dir(); - original_path.push(name.clone()); - - let thumbnail_path2 = thumbnail_path.clone(); - let process_fut = async { - let thumbnail_path = thumbnail_path2; - // Create and save a JPG for motion images (gif, mp4) - if let Some((updated_path, exists)) = - self::processor::prepare_image(original_path.clone()).await? - { - original_path = updated_path.clone(); - - if exists.is_new() { - // Save the transcoded file in another task - debug!("Spawning storage task"); - let manager2 = manager.clone(); - let name = name.clone(); - let span = tracing::info_span!( - parent: None, - "Storing variant info", - path = &tracing::field::debug(&updated_path), - name = &tracing::field::display(&name), - ); - span.follows_from(Span::current()); - actix_rt::spawn( - async move { - if let Err(e) = manager2.store_variant(updated_path, name).await { - error!("Error storing variant, {}", e); - return; - } - } - .instrument(span), - ); - } - } - - let permit = PROCESS_SEMAPHORE.acquire().await?; - - let file = crate::file::File::open(original_path.clone()).await?; - - let mut processed_reader = - crate::magick::process_image_file_read(file, thumbnail_args, format)?; - - let mut vec = Vec::new(); - processed_reader.read_to_end(&mut vec).await?; - let bytes = web::Bytes::from(vec); - - drop(permit); - - let details = if let Some(details) = details { + let details = if let Some(details) = details_opt { details } else { - Details::from_bytes(bytes.clone()).await? + let details = Details::from_path(real_path.clone()).await?; + manager + .store_variant_details(real_path.clone(), name, &details) + .await?; + details }; - let save_span = tracing::info_span!( - parent: None, - "Saving variant information", - path = tracing::field::debug(&thumbnail_path), - name = tracing::field::display(&name), - ); - save_span.follows_from(Span::current()); - let details2 = details.clone(); - let bytes2 = bytes.clone(); - actix_rt::spawn( - async move { - if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await { - tracing::warn!("Error saving thumbnail: {}", e); - return; - } - if let Err(e) = manager - .store_variant_details(thumbnail_path.clone(), name.clone(), &details2) - .await - { - tracing::warn!("Error saving variant details: {}", e); - return; - } - if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await { - tracing::warn!("Error saving variant info: {}", e); - } - } - .instrument(save_span), - ); - - Ok((details, bytes)) as Result<(Details, web::Bytes), Error> - }; - - let (details, bytes) = - CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?; - - return match range { - Some(range_header) => { - if !range_header.is_bytes() { - return Err(UploadError::Range.into()); - } - - if range_header.is_empty() { - Err(UploadError::Range.into()) - } else if range_header.len() == 1 { - let range = range_header.ranges().next().unwrap(); - let content_range = range.to_content_range(bytes.len() as u64); - let stream = range.chop_bytes(bytes); - let mut builder = HttpResponse::PartialContent(); - builder.insert_header(content_range); - - Ok(srv_response( - builder, - stream, - details.content_type(), - 7 * DAYS, - details.system_time(), - )) - } else { - Err(UploadError::Range.into()) - } - } - None => Ok(srv_response( - HttpResponse::Ok(), - once(ready(Ok(bytes) as Result<_, Error>)), - details.content_type(), - 7 * DAYS, - details.system_time(), - )), - }; + return ranged_file_resp(real_path, range, details).await; + } } - let details = if let Some(details) = details { - details - } else { - let details = Details::from_path(thumbnail_path.clone()).await?; - manager - .store_variant_details(thumbnail_path.clone(), name, &details) - .await?; - details + let mut original_path = manager.path_from_filename(name.clone()).await?; + + let thumbnail_path2 = thumbnail_path.clone(); + let process_fut = async { + let thumbnail_path = thumbnail_path2; + // Create and save a JPG for motion images (gif, mp4) + if let Some((updated_path, exists)) = + self::processor::prepare_image(original_path.clone()).await? + { + original_path = updated_path.clone(); + + if exists.is_new() { + // Save the transcoded file in another task + debug!("Spawning storage task"); + let manager2 = manager.clone(); + let name = name.clone(); + let span = tracing::info_span!( + parent: None, + "Storing variant info", + path = &tracing::field::debug(&updated_path), + name = &tracing::field::display(&name), + ); + span.follows_from(Span::current()); + actix_rt::spawn( + async move { + if let Err(e) = manager2.store_variant(None, &updated_path, &name).await { + error!("Error storing variant, {}", e); + return; + } + } + .instrument(span), + ); + } + } + + let permit = PROCESS_SEMAPHORE.acquire().await?; + + let file = crate::file::File::open(original_path.clone()).await?; + + let mut processed_reader = + crate::magick::process_image_file_read(file, thumbnail_args, format)?; + + let mut vec = Vec::new(); + processed_reader.read_to_end(&mut vec).await?; + let bytes = web::Bytes::from(vec); + + drop(permit); + + let details = Details::from_bytes(bytes.clone()).await?; + + let save_span = tracing::info_span!( + parent: None, + "Saving variant information", + path = tracing::field::debug(&thumbnail_path), + name = tracing::field::display(&name), + ); + save_span.follows_from(Span::current()); + let details2 = details.clone(); + let bytes2 = bytes.clone(); + actix_rt::spawn( + async move { + let real_path = match manager.next_directory() { + Ok(real_path) => real_path.join(&name), + Err(e) => { + tracing::warn!("Failed to generate directory path: {}", e); + return; + } + }; + + if let Err(e) = safe_save_file(real_path.clone(), bytes2).await { + tracing::warn!("Error saving thumbnail: {}", e); + return; + } + if let Err(e) = manager + .store_variant_details(real_path.clone(), name.clone(), &details2) + .await + { + tracing::warn!("Error saving variant details: {}", e); + return; + } + if let Err(e) = manager + .store_variant(Some(&thumbnail_path), &real_path, &name) + .await + { + tracing::warn!("Error saving variant info: {}", e); + } + } + .instrument(save_span), + ); + + Ok((details, bytes)) as Result<(Details, web::Bytes), Error> }; - ranged_file_resp(thumbnail_path, range, details).await + let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?; + + match range { + Some(range_header) => { + if !range_header.is_bytes() { + return Err(UploadError::Range.into()); + } + + if range_header.is_empty() { + Err(UploadError::Range.into()) + } else if range_header.len() == 1 { + let range = range_header.ranges().next().unwrap(); + let content_range = range.to_content_range(bytes.len() as u64); + let stream = range.chop_bytes(bytes); + let mut builder = HttpResponse::PartialContent(); + builder.insert_header(content_range); + + Ok(srv_response( + builder, + stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + )) + } else { + Err(UploadError::Range.into()) + } + } + None => Ok(srv_response( + HttpResponse::Ok(), + once(ready(Ok(bytes) as Result<_, Error>)), + details.content_type(), + 7 * DAYS, + details.system_time(), + )), + } } /// Fetch file details @@ -647,8 +662,7 @@ async fn details( manager: web::Data, ) -> Result { let name = manager.from_alias(alias.into_inner()).await?; - let mut path = manager.image_dir(); - path.push(name.clone()); + let path = manager.path_from_filename(name.clone()).await?; let details = manager.variant_details(path.clone(), name.clone()).await?; @@ -673,8 +687,7 @@ async fn serve( manager: web::Data, ) -> Result { let name = manager.from_alias(alias.into_inner()).await?; - let mut path = manager.image_dir(); - path.push(name.clone()); + let path = manager.path_from_filename(name.clone()).await?; let details = manager.variant_details(path.clone(), name.clone()).await?; diff --git a/src/processor.rs b/src/processor.rs index bf78911..ccb9e8a 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -266,11 +266,11 @@ pub(crate) fn build_chain(args: &[(String, String)]) -> ProcessChain { ProcessChain { inner } } -pub(crate) fn build_path(base: PathBuf, chain: &ProcessChain, filename: String) -> PathBuf { +pub(crate) fn build_path(chain: &ProcessChain, filename: String) -> PathBuf { let mut path = chain .inner .iter() - .fold(base, |acc, processor| processor.path(acc)); + .fold(PathBuf::default(), |acc, processor| processor.path(acc)); path.push(filename); path diff --git a/src/upload_manager/mod.rs b/src/upload_manager/mod.rs index 83756ba..e1d34a8 100644 --- a/src/upload_manager/mod.rs +++ b/src/upload_manager/mod.rs @@ -48,7 +48,7 @@ pub struct UploadManager { struct UploadManagerInner { format: Option, hasher: sha2::Sha256, - image_dir: PathBuf, + root_dir: PathBuf, alias_tree: sled::Tree, filename_tree: sled::Tree, main_tree: sled::Tree, @@ -77,19 +77,12 @@ struct FilenameIVec { } impl UploadManager { - /// Get the image directory - pub(crate) fn image_dir(&self) -> PathBuf { - self.inner.image_dir.clone() - } - /// Create a new UploadManager - pub(crate) async fn new(mut root_dir: PathBuf, format: Option) -> Result { + pub(crate) async fn new(root_dir: PathBuf, format: Option) -> Result { let root_clone = root_dir.clone(); // sled automatically creates it's own directories let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??; - root_dir.push("files"); - // Ensure file dir exists tokio::fs::create_dir_all(&root_dir).await?; @@ -101,7 +94,7 @@ impl UploadManager { inner: Arc::new(UploadManagerInner { format, hasher: sha2::Sha256::new(), - image_dir: root_dir, + root_dir, alias_tree: db.open_tree("alias")?, filename_tree: db.open_tree("filename")?, details_tree: db.open_tree("details")?, @@ -118,17 +111,61 @@ impl UploadManager { Ok(manager) } + #[instrument(skip(self))] + pub(crate) async fn path_from_filename(&self, filename: String) -> Result { + let path_tree = self.inner.path_tree.clone(); + let path_ivec = web::block(move || path_tree.get(filename.as_bytes())) + .await?? + .ok_or(UploadError::MissingFile)?; + + let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?); + + Ok(self.inner.root_dir.join(relative)) + } + + #[instrument(skip(self))] + async fn store_path(&self, filename: String, path: &std::path::Path) -> Result<(), Error> { + let path_bytes = path.to_str().ok_or(UploadError::Path)?.as_bytes().to_vec(); + let path_tree = self.inner.path_tree.clone(); + web::block(move || path_tree.insert(filename.as_bytes(), path_bytes)).await??; + Ok(()) + } + + #[instrument(skip(self))] + pub(crate) async fn variant_path( + &self, + process_path: &std::path::Path, + filename: &str, + ) -> Result, Error> { + let key = self.variant_key(process_path, filename)?; + let path_tree = self.inner.path_tree.clone(); + let path_opt = web::block(move || path_tree.get(key)).await??; + + if let Some(path_ivec) = path_opt { + let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?); + Ok(Some(self.inner.root_dir.join(relative))) + } else { + Ok(None) + } + } + /// Store the path to a generated image variant so we can easily clean it up later #[instrument(skip(self))] - pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> { + pub(crate) async fn store_variant( + &self, + variant_process_path: Option<&std::path::Path>, + real_path: &std::path::Path, + filename: &str, + ) -> Result<(), Error> { let path_bytes = self - .generalize_path(&path)? + .generalize_path(real_path)? .to_str() .ok_or(UploadError::Path)? .as_bytes() .to_vec(); - let key = self.variant_key(&path, &filename)?; + let variant_path = variant_process_path.unwrap_or(real_path); + let key = self.variant_key(variant_path, filename)?; let path_tree = self.inner.path_tree.clone(); debug!("Storing variant"); @@ -199,14 +236,14 @@ impl UploadManager { self.aliases_by_hash(&hash).await } - fn next_directory(&self) -> Result { + pub(crate) fn next_directory(&self) -> Result { let path = self.inner.path_gen.next(); self.inner .settings_tree .insert(GENERATOR_KEY, path.to_be_bytes())?; - let mut target_path = self.image_dir(); + let mut target_path = self.inner.root_dir.join("files"); for dir in path.to_strings() { target_path.push(dir) } @@ -381,14 +418,18 @@ impl UploadManager { #[instrument(skip(self))] async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> { let filename = filename.inner; - let mut path = self.image_dir(); - let fname = String::from_utf8(filename.to_vec())?; - path.push(fname); + + let filename2 = filename.clone(); + let path_tree = self.inner.path_tree.clone(); + let path = web::block(move || path_tree.remove(filename2)).await??; let mut errors = Vec::new(); - debug!("Deleting {:?}", path); - if let Err(e) = tokio::fs::remove_file(path).await { - errors.push(e.into()); + if let Some(path) = path { + let path = self.inner.root_dir.join(String::from_utf8(path.to_vec())?); + debug!("Deleting {:?}", path); + if let Err(e) = self.remove_path(&path).await { + errors.push(e.into()); + } } let filename2 = filename.clone(); @@ -410,9 +451,12 @@ impl UploadManager { debug!("{} files prepared for deletion", paths.len()); for path in paths { - let s = String::from_utf8_lossy(&path); - debug!("Deleting {}", s); - if let Err(e) = remove_path(path).await { + let path = self + .inner + .root_dir + .join(String::from_utf8_lossy(&path).as_ref()); + debug!("Deleting {:?}", path); + if let Err(e) = self.remove_path(&path).await { errors.push(e); } } @@ -434,6 +478,55 @@ impl UploadManager { } Ok(()) } + + async fn try_remove_parents(&self, mut path: &std::path::Path) -> Result<(), Error> { + let root = self.inner.root_dir.join("files"); + + while let Some(parent) = path.parent() { + if parent.ends_with(&root) { + break; + } + + if tokio::fs::remove_dir(parent).await.is_err() { + break; + } + + path = parent; + } + + Ok(()) + } + + async fn remove_path(&self, path: &std::path::Path) -> Result<(), Error> { + tokio::fs::remove_file(path).await?; + self.try_remove_parents(path).await + } + + fn variant_key( + &self, + variant_process_path: &std::path::Path, + filename: &str, + ) -> Result, Error> { + let path_string = variant_process_path + .to_str() + .ok_or(UploadError::Path)? + .to_string(); + + let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); + Ok(vec) + } + + fn details_key( + &self, + variant_path: &std::path::Path, + filename: &str, + ) -> Result, Error> { + let path = self.generalize_path(variant_path)?; + let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + + let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); + Ok(vec) + } } impl Serde { @@ -502,12 +595,6 @@ fn init_generator(settings: &sled::Tree) -> Result { } } -async fn remove_path(path: sled::IVec) -> Result<(), Error> { - let path_string = String::from_utf8(path.to_vec())?; - tokio::fs::remove_file(path_string).await?; - Ok(()) -} - fn trans_err(e: E) -> sled::transaction::ConflictableTransactionError where Error: From, diff --git a/src/upload_manager/restructure.rs b/src/upload_manager/restructure.rs index e01c881..a49db18 100644 --- a/src/upload_manager/restructure.rs +++ b/src/upload_manager/restructure.rs @@ -20,7 +20,7 @@ impl UploadManager { let filename = String::from_utf8(filename.to_vec())?; tracing::info!("Migrating {}", filename); - let mut file_path = self.image_dir(); + let mut file_path = self.inner.root_dir.join("files"); file_path.push(filename.clone()); if tokio::fs::metadata(&file_path).await.is_ok() { @@ -46,18 +46,7 @@ impl UploadManager { for res in self.inner.main_tree.range(start..end) { let (hash_variant_key, variant_path_or_details) = res?; - if hash_variant_key.ends_with(DETAILS) { - let details = variant_path_or_details; - - let start_index = hash.len() + 1; - let end_index = hash_variant_key.len() - DETAILS.len(); - let path_bytes = &hash_variant_key[start_index..end_index]; - - let variant_path = PathBuf::from(String::from_utf8(path_bytes.to_vec())?); - let key = self.details_key(&variant_path, &filename)?; - - self.inner.details_tree.insert(key, details)?; - } else { + if !hash_variant_key.ends_with(DETAILS) { let variant_path = PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?); if tokio::fs::metadata(&variant_path).await.is_ok() { @@ -71,13 +60,14 @@ impl UploadManager { .as_bytes() .to_vec(); - let variant_key = self.variant_key(&target_path, &filename)?; + let variant_key = self.migrate_variant_key(&variant_path, &filename)?; self.inner .path_tree .insert(variant_key, relative_target_path_bytes)?; - safe_move_file(variant_path, target_path).await?; + safe_move_file(variant_path.clone(), target_path).await?; + self.try_remove_parents(&variant_path).await?; } } @@ -90,7 +80,7 @@ impl UploadManager { } fn restructure_complete(&self) -> Result { - Ok(!self + Ok(self .inner .settings_tree .get(RESTRUCTURE_COMPLETE)? @@ -106,31 +96,19 @@ impl UploadManager { } pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> { - Ok(path.strip_prefix(&self.inner.image_dir)?) + Ok(path.strip_prefix(&self.inner.root_dir)?) } - pub(super) fn details_key( + fn migrate_variant_key( &self, - variant_path: &Path, + variant_process_path: &Path, filename: &str, ) -> Result, Error> { - let path = self.generalize_path(variant_path)?; - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + let path = self + .generalize_path(variant_process_path)? + .strip_prefix("files")?; - let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); - Ok(vec) - } - - pub(super) fn variant_key( - &self, - variant_path: &Path, - filename: &str, - ) -> Result, Error> { - let path = self.generalize_path(variant_path)?; - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); - - let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); - Ok(vec) + self.variant_key(path, filename) } } diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 21e5cf4..3bc0c47 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -229,9 +229,9 @@ impl UploadManagerSession { } // -- WRITE NEW FILE -- - let mut real_path = self.manager.image_dir(); - real_path.push(name); + let real_path = self.manager.next_directory()?.join(&name); + self.manager.store_path(name, &real_path).await?; crate::safe_move_file(tmpfile, real_path).await?; Ok(()) @@ -280,22 +280,20 @@ impl UploadManagerSession { // generate a short filename that isn't already in-use #[instrument(skip(self, content_type))] async fn next_file(&self, content_type: mime::Mime) -> Result { - let image_dir = self.manager.image_dir(); loop { debug!("Filename generation loop"); - let mut path = image_dir.clone(); let s: String = Uuid::new_v4().to_string(); let filename = file_name(s, content_type.clone())?; - path.push(filename.clone()); + let path_tree = self.manager.inner.path_tree.clone(); + let filename2 = filename.clone(); + let filename_exists = web::block(move || path_tree.get(filename2.as_bytes())) + .await?? + .is_some(); - if let Err(e) = tokio::fs::metadata(path).await { - if e.kind() == std::io::ErrorKind::NotFound { - debug!("Generated unused filename {}", filename); - return Ok(filename); - } - return Err(e.into()); + if !filename_exists { + return Ok(filename); } debug!("Filename exists, trying again");