From 48557bc2ea382760388afcb04faeba1598e2c789 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 22 Oct 2021 23:48:56 -0500 Subject: [PATCH] Make pict-rs generic over file storage --- Cargo.lock | 1 + Cargo.toml | 1 + src/config.rs | 12 +- src/error.rs | 9 +- src/ffmpeg.rs | 73 ++- src/magick.rs | 80 +++- src/main.rs | 439 ++++++++---------- src/map_error.rs | 43 ++ src/process.rs | 10 +- src/processor.rs | 92 ++-- src/range.rs | 28 +- src/store.rs | 52 +++ src/store/file_store.rs | 303 ++++++++++++ src/{ => store/file_store}/file.rs | 105 +---- src/store/file_store/file_id.rs | 48 ++ .../file_store}/restructure.rs | 45 +- src/upload_manager/mod.rs | 386 +++++++-------- src/upload_manager/session.rs | 122 +++-- 18 files changed, 1088 insertions(+), 761 deletions(-) create mode 100644 src/map_error.rs create mode 100644 src/store.rs create mode 100644 src/store/file_store.rs rename src/{ => store/file_store}/file.rs (83%) create mode 100644 src/store/file_store/file_id.rs rename src/{upload_manager => store/file_store}/restructure.rs (69%) diff --git a/Cargo.lock b/Cargo.lock index 28a7fa1..fd24f7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,7 @@ dependencies = [ "actix-server", "actix-web", "anyhow", + "async-trait", "awc", "base64", "dashmap", diff --git a/Cargo.toml b/Cargo.toml index 4c31786..c036327 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ actix-rt = "2.2.0" actix-server = "2.0.0-beta.6" actix-web = { version = "4.0.0-beta.10", default-features = false } anyhow = "1.0" +async-trait = "0.1.51" awc = { version = "3.0.0-beta.9", default-features = false, features = ["rustls"] } base64 = "0.13.0" dashmap = "4.0.2" diff --git a/src/config.rs b/src/config.rs index 814620a..8f77ffa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,8 @@ use std::{collections::HashSet, net::SocketAddr, path::PathBuf}; use url::Url; +use crate::magick::ValidInputType; + #[derive(Clone, Debug, structopt::StructOpt)] pub(crate) struct Config { #[structopt( @@ -130,7 +132,7 @@ impl Config { #[error("Invalid format supplied, {0}")] pub(crate) struct FormatError(String); -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub(crate) enum Format { Jpeg, Png, @@ -153,6 +155,14 @@ impl Format { Format::Webp => "WEBP", } } + + pub(crate) fn to_hint(&self) -> Option { + match self { + Format::Jpeg => Some(ValidInputType::Jpeg), + Format::Png => Some(ValidInputType::Png), + Format::Webp => Some(ValidInputType::Webp), + } + } } impl std::str::FromStr for Format { diff --git a/src/error.rs b/src/error.rs index 2d6c4df..279b87d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -69,6 +69,9 @@ pub(crate) enum UploadError { #[error(transparent)] StripPrefix(#[from] std::path::StripPrefixError), + #[error(transparent)] + FileStore(#[from] crate::store::file_store::FileError), + #[error("Provided process path is invalid")] ParsePath, @@ -114,18 +117,12 @@ pub(crate) enum UploadError { #[error("Tried to save an image with an already-taken name")] DuplicateAlias, - #[error("Tried to create file, but file already exists")] - FileExists, - #[error("{0}")] Json(#[from] serde_json::Error), #[error("Range header not satisfiable")] Range, - #[error("Command failed")] - Status, - #[error(transparent)] Limit(#[from] super::LimitError), } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 0a704cd..72da431 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,11 +1,9 @@ -use crate::{ - error::{Error, UploadError}, - process::Process, -}; +use crate::{error::Error, process::Process, store::Store}; use actix_web::web::Bytes; -use tokio::{io::AsyncRead, process::Command}; +use tokio::io::AsyncRead; use tracing::instrument; +#[derive(Debug)] pub(crate) enum InputFormat { Gif, Mp4, @@ -71,48 +69,29 @@ pub(crate) fn to_mp4_bytes( Ok(process.bytes_read(input).unwrap()) } -#[instrument(name = "Create video thumbnail", skip(from, to))] -pub(crate) async fn thumbnail( - from: P1, - to: P2, +#[instrument(name = "Create video thumbnail")] +pub(crate) async fn thumbnail( + store: S, + from: S::Identifier, + input_format: InputFormat, format: ThumbnailFormat, -) -> Result<(), Error> -where - P1: AsRef, - P2: AsRef, -{ - let command = "ffmpeg"; - let first_arg = "-i"; - let args = [ - "-vframes", - "1", - "-codec", - format.as_codec(), - "-f", - format.as_format(), - ]; +) -> Result { + let process = Process::run( + "ffmpeg", + &[ + "-f", + input_format.as_format(), + "-i", + "pipe:", + "-vframes", + "1", + "-codec", + format.as_codec(), + "-f", + format.as_format(), + "pipe:", + ], + )?; - tracing::info!( - "Spawning command: {} {} {:?} {:?} {:?}", - command, - first_arg, - from.as_ref(), - args, - to.as_ref() - ); - - let mut child = Command::new(command) - .arg(first_arg) - .arg(from.as_ref()) - .args(args) - .arg(to.as_ref()) - .spawn()?; - - let status = child.wait().await?; - - if !status.success() { - return Err(UploadError::Status.into()); - } - - Ok(()) + Ok(process.store_read(store, from).unwrap()) } diff --git a/src/magick.rs b/src/magick.rs index 0d974aa..d804d8e 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -2,6 +2,7 @@ use crate::{ config::Format, error::{Error, UploadError}, process::Process, + store::Store, }; use actix_web::web::Bytes; use tokio::{ @@ -10,6 +11,15 @@ use tokio::{ }; use tracing::instrument; +pub(crate) fn details_hint(filename: &str) -> Option { + if filename.ends_with(".mp4") { + Some(ValidInputType::Mp4) + } else { + None + } +} + +#[derive(Debug)] pub(crate) enum ValidInputType { Mp4, Gif, @@ -18,6 +28,18 @@ pub(crate) enum ValidInputType { Webp, } +impl ValidInputType { + fn to_str(&self) -> &'static str { + match self { + Self::Mp4 => "MP4", + Self::Gif => "GIF", + Self::Png => "PNG", + Self::Jpeg => "JPEG", + Self::Webp => "WEBP", + } + } +} + #[derive(Debug)] pub(crate) struct Details { pub(crate) mime_type: mime::Mime, @@ -32,10 +54,19 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result Result { +pub(crate) async fn details_bytes( + input: Bytes, + hint: Option, +) -> Result { + let last_arg = if let Some(expected_format) = hint { + format!("{}:-", expected_format.to_str()) + } else { + "-".to_owned() + }; + let process = Process::run( "magick", - &["identify", "-ping", "-format", "%w %h | %m\n", "-"], + &["identify", "-ping", "-format", "%w %h | %m\n", &last_arg], )?; let mut reader = process.bytes_read(input).unwrap(); @@ -64,22 +95,28 @@ pub(crate) fn convert_bytes_read( Ok(process.bytes_read(input).unwrap()) } -pub(crate) async fn details

(file: P) -> Result -where - P: AsRef, -{ - let command = "magick"; - let args = ["identify", "-ping", "-format", "%w %h | %m\n"]; - let last_arg = file.as_ref(); +pub(crate) async fn details_store( + store: S, + identifier: S::Identifier, + expected_format: Option, +) -> Result { + let last_arg = if let Some(expected_format) = expected_format { + format!("{}:-", expected_format.to_str()) + } else { + "-".to_owned() + }; - tracing::info!("Spawning command: {} {:?} {:?}", command, args, last_arg); - let output = Command::new(command) - .args(args) - .arg(last_arg) - .output() - .await?; + let process = Process::run( + "magick", + &["identify", "-ping", "-format", "%w %h | %m\n", &last_arg], + )?; - let s = String::from_utf8_lossy(&output.stdout); + let mut reader = process.store_read(store, identifier).unwrap(); + + let mut output = Vec::new(); + reader.read_to_end(&mut output).await?; + + let s = String::from_utf8_lossy(&output); parse_details(s) } @@ -137,12 +174,13 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { #[instrument(name = "Getting input type from bytes", skip(input))] pub(crate) async fn input_type_bytes(input: Bytes) -> Result { - details_bytes(input).await?.validate_input() + details_bytes(input, None).await?.validate_input() } -#[instrument(name = "Spawning process command", skip(input))] -pub(crate) fn process_image_file_read( - input: crate::file::File, +#[instrument(name = "Spawning process command")] +pub(crate) fn process_image_store_read( + store: S, + identifier: S::Identifier, args: Vec, format: Format, ) -> std::io::Result { @@ -157,7 +195,7 @@ pub(crate) fn process_image_file_read( .arg(last_arg), )?; - Ok(process.file_read(input).unwrap()) + Ok(process.store_read(store, identifier).unwrap()) } impl Details { diff --git a/src/main.rs b/src/main.rs index 3ad8655..a335286 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,6 @@ use tracing::{debug, error, info, instrument, Span}; use tracing_actix_web::TracingLogger; use tracing_awc::Propagate; use tracing_futures::Instrument; -use uuid::Uuid; mod concurrent_processor; mod config; @@ -29,25 +28,29 @@ mod either; mod error; mod exiftool; mod ffmpeg; -mod file; mod init_tracing; mod magick; +mod map_error; mod middleware; mod migrate; mod process; mod processor; mod range; +mod store; mod upload_manager; mod validate; +use crate::{magick::details_hint, store::file_store::FileStore}; + use self::{ concurrent_processor::CancelSafeProcessor, config::{Config, Format}, either::Either, error::{Error, UploadError}, - file::CrateError, init_tracing::init_tracing, middleware::{Deadline, Internal}, + migrate::LatestDb, + store::Store, upload_manager::{Details, UploadManager, UploadManagerSession}, validate::{image_webp, video_mp4}, }; @@ -57,119 +60,19 @@ const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; const DAYS: u32 = 24 * HOURS; -static TMP_DIR: Lazy = Lazy::new(|| { - let tmp_nonce = Uuid::new_v4(); - - let mut path = std::env::temp_dir(); - path.push(format!("pict-rs-{}", tmp_nonce)); - path -}); static CONFIG: Lazy = Lazy::new(Config::from_args); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); -// try moving a file -#[instrument(name = "Moving file")] -async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), Error> { - if let Some(path) = to.parent() { - debug!("Creating directory {:?}", path); - tokio::fs::create_dir_all(path).await?; - } - - debug!("Checking if {:?} already exists", to); - if let Err(e) = tokio::fs::metadata(&to).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Err(UploadError::FileExists.into()); - } - - debug!("Moving {:?} to {:?}", from, to); - tokio::fs::copy(&from, to).await?; - tokio::fs::remove_file(from).await?; - Ok(()) -} - -async fn safe_create_parent

(path: P) -> Result<(), Error> -where - P: AsRef, -{ - if let Some(path) = path.as_ref().parent() { - debug!("Creating directory {:?}", path); - tokio::fs::create_dir_all(path).await?; - } - - Ok(()) -} - -// Try writing to a file -#[instrument(name = "Saving file", skip(bytes))] -async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), Error> { - if let Some(path) = path.parent() { - // create the directory for the file - debug!("Creating directory {:?}", path); - tokio::fs::create_dir_all(path).await?; - } - - // Only write the file if it doesn't already exist - debug!("Checking if {:?} already exists", path); - if let Err(e) = tokio::fs::metadata(&path).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Ok(()); - } - - // Open the file for writing - debug!("Creating {:?}", path); - let mut file = crate::file::File::create(&path).await?; - - // try writing - debug!("Writing to {:?}", path); - if let Err(e) = file.write_from_bytes(bytes).await { - error!("Error writing {:?}, {}", path, e); - // remove file if writing failed before completion - tokio::fs::remove_file(path).await?; - return Err(e.into()); - } - debug!("{:?} written", path); - - Ok(()) -} - -pub(crate) fn tmp_file() -> PathBuf { - let s: String = Uuid::new_v4().to_string(); - - let name = format!("{}.tmp", s); - - let mut path = TMP_DIR.clone(); - path.push(&name); - - path -} - -fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> { - if mime == mime::IMAGE_PNG { - Ok(".png") - } else if mime == mime::IMAGE_JPEG { - Ok(".jpg") - } else if mime == video_mp4() { - Ok(".mp4") - } else if mime == image_webp() { - Ok(".webp") - } else { - Err(UploadError::UnsupportedFormat.into()) - } -} - /// Handle responding to succesful uploads #[instrument(name = "Uploaded files", skip(value, manager))] -async fn upload( - value: Value, - manager: web::Data, -) -> Result { +async fn upload( + value: Value>, + manager: web::Data>, +) -> Result +where + Error: From, +{ let images = value .map() .and_then(|mut m| m.remove("images")) @@ -187,19 +90,23 @@ async fn upload( let delete_token = image.result.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let path = manager.path_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename(name.clone()).await?; - let details = manager.variant_details(path.clone(), name.clone()).await?; + let details = manager + .variant_details(identifier.clone(), name.clone()) + .await?; let details = if let Some(details) = details { debug!("details exist"); details } else { - debug!("generating new details from {:?}", path); - let new_details = Details::from_path(path.clone()).await?; - debug!("storing details for {:?} {}", path, name); + debug!("generating new details from {:?}", identifier); + let hint = details_hint(&name); + let new_details = + Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + debug!("storing details for {:?} {}", identifier, name); manager - .store_variant_details(path, name, &new_details) + .store_variant_details(identifier, name, &new_details) .await?; debug!("stored"); new_details @@ -282,11 +189,14 @@ where /// download an image from a URL #[instrument(name = "Downloading file", skip(client, manager))] -async fn download( +async fn download( client: web::Data, - manager: web::Data, + manager: web::Data>, query: web::Query, -) -> Result { +) -> Result +where + Error: From, +{ let res = client.get(&query.url).propagate().send().await?; if !res.status().is_success() { @@ -294,7 +204,7 @@ async fn download( } let mut stream = Limit::new( - CrateError::new(res), + map_error::map_crate_error(res), (CONFIG.max_file_size() * MEGABYTES) as u64, ); @@ -308,16 +218,20 @@ async fn download( let delete_token = session.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let path = manager.path_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename(name.clone()).await?; - let details = manager.variant_details(path.clone(), name.clone()).await?; + let details = manager + .variant_details(identifier.clone(), name.clone()) + .await?; let details = if let Some(details) = details { details } else { - let new_details = Details::from_path(path.clone()).await?; + let hint = details_hint(&name); + let new_details = + Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; manager - .store_variant_details(path, name, &new_details) + .store_variant_details(identifier, name, &new_details) .await?; new_details }; @@ -335,10 +249,13 @@ async fn download( /// Delete aliases and files #[instrument(name = "Deleting file", skip(manager))] -async fn delete( - manager: web::Data, +async fn delete( + manager: web::Data>, path_entries: web::Path<(String, String)>, -) -> Result { +) -> Result +where + Error: From, +{ let (alias, token) = path_entries.into_inner(); manager.delete(token, alias).await?; @@ -348,12 +265,15 @@ async fn delete( type ProcessQuery = Vec<(String, String)>; -async fn prepare_process( +async fn prepare_process( query: web::Query, ext: &str, - manager: &UploadManager, + manager: &UploadManager, filters: &Option>, -) -> Result<(Format, String, PathBuf, Vec), Error> { +) -> Result<(Format, String, PathBuf, Vec), Error> +where + Error: From, +{ let (alias, operations) = query .into_inner() @@ -394,21 +314,24 @@ async fn prepare_process( } #[instrument(name = "Fetching derived details", skip(manager, filters))] -async fn process_details( +async fn process_details( query: web::Query, ext: web::Path, - manager: web::Data, + manager: web::Data>, filters: web::Data>>, -) -> Result { +) -> Result +where + Error: From, +{ let (_, name, thumbnail_path, _) = prepare_process(query, ext.as_str(), &manager, &filters).await?; - let real_path = manager - .variant_path(&thumbnail_path, &name) + let identifier = manager + .variant_identifier(&thumbnail_path, &name) .await? .ok_or(UploadError::MissingAlias)?; - let details = manager.variant_details(real_path, name).await?; + let details = manager.variant_details(identifier, name).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -417,55 +340,42 @@ async fn process_details( /// Process files #[instrument(name = "Serving processed image", skip(manager, filters))] -async fn process( +async fn process( range: Option, query: web::Query, ext: web::Path, - manager: web::Data, + manager: web::Data>, filters: web::Data>>, -) -> Result { +) -> Result +where + Error: From, +{ let (format, name, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &manager, &filters).await?; - let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?; + let identifier_opt = manager.variant_identifier(&thumbnail_path, &name).await?; - // If the thumbnail doesn't exist, we need to create it - let real_path_opt = if let Some(real_path) = real_path_opt { - if let Err(e) = tokio::fs::metadata(&real_path) - .instrument(tracing::info_span!("Get thumbnail metadata")) - .await - { - if e.kind() != std::io::ErrorKind::NotFound { - error!("Error looking up processed image, {}", e); - return Err(e.into()); - } - None - } else { - Some(real_path) - } - } else { - None - }; - - if let Some(real_path) = real_path_opt { + if let Some(identifier) = identifier_opt { let details_opt = manager - .variant_details(real_path.clone(), name.clone()) + .variant_details(identifier.clone(), name.clone()) .await?; let details = if let Some(details) = details_opt { details } else { - let details = Details::from_path(real_path.clone()).await?; + let hint = details_hint(&name); + let details = + Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; manager - .store_variant_details(real_path.clone(), name, &details) + .store_variant_details(identifier.clone(), name, &details) .await?; details }; - return ranged_file_resp(real_path, range, details).await; + return ranged_file_resp(manager.store().clone(), identifier, range, details).await; } - let original_path = manager.still_path_from_filename(name.clone()).await?; + let identifier = manager.still_identifier_from_filename(name.clone()).await?; let thumbnail_path2 = thumbnail_path.clone(); let process_fut = async { @@ -473,10 +383,12 @@ async fn process( let permit = PROCESS_SEMAPHORE.acquire().await?; - let file = crate::file::File::open(original_path.clone()).await?; - - let mut processed_reader = - crate::magick::process_image_file_read(file, thumbnail_args, format)?; + let mut processed_reader = crate::magick::process_image_store_read( + manager.store().clone(), + identifier, + thumbnail_args, + format, + )?; let mut vec = Vec::new(); processed_reader.read_to_end(&mut vec).await?; @@ -484,7 +396,7 @@ async fn process( drop(permit); - let details = Details::from_bytes(bytes.clone()).await?; + let details = Details::from_bytes(bytes.clone(), format.to_hint()).await?; let save_span = tracing::info_span!( parent: None, @@ -497,27 +409,22 @@ async fn process( let bytes2 = bytes.clone(); actix_rt::spawn( async move { - let real_path = match manager.next_directory() { - Ok(real_path) => real_path.join(&name), + let identifier = match manager.store().save_bytes(bytes2).await { + Ok(identifier) => identifier, Err(e) => { tracing::warn!("Failed to generate directory path: {}", e); return; } }; - - if let Err(e) = safe_save_file(real_path.clone(), bytes2).await { - tracing::warn!("Error saving thumbnail: {}", e); - return; - } if let Err(e) = manager - .store_variant_details(real_path.clone(), name.clone(), &details2) + .store_variant_details(identifier.clone(), name.clone(), &details2) .await { tracing::warn!("Error saving variant details: {}", e); return; } if let Err(e) = manager - .store_variant(Some(&thumbnail_path), &real_path, &name) + .store_variant(Some(&thumbnail_path), &identifier, &name) .await { tracing::warn!("Error saving variant info: {}", e); @@ -569,21 +476,28 @@ async fn process( /// Fetch file details #[instrument(name = "Fetching details", skip(manager))] -async fn details( +async fn details( alias: web::Path, - manager: web::Data, -) -> Result { + manager: web::Data>, +) -> Result +where + Error: From, +{ let name = manager.from_alias(alias.into_inner()).await?; - let path = manager.path_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename(name.clone()).await?; - let details = manager.variant_details(path.clone(), name.clone()).await?; + let details = manager + .variant_details(identifier.clone(), name.clone()) + .await?; let details = if let Some(details) = details { details } else { - let new_details = Details::from_path(path.clone()).await?; + let hint = details_hint(&name); + let new_details = + Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; manager - .store_variant_details(path.clone(), name, &new_details) + .store_variant_details(identifier, name, &new_details) .await?; new_details }; @@ -593,34 +507,45 @@ async fn details( /// Serve files #[instrument(name = "Serving file", skip(manager))] -async fn serve( +async fn serve( range: Option, alias: web::Path, - manager: web::Data, -) -> Result { + manager: web::Data>, +) -> Result +where + Error: From, +{ let name = manager.from_alias(alias.into_inner()).await?; - let path = manager.path_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename(name.clone()).await?; - let details = manager.variant_details(path.clone(), name.clone()).await?; + let details = manager + .variant_details(identifier.clone(), name.clone()) + .await?; let details = if let Some(details) = details { details } else { - let details = Details::from_path(path.clone()).await?; + let hint = details_hint(&name); + let details = + Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; manager - .store_variant_details(path.clone(), name, &details) + .store_variant_details(identifier.clone(), name, &details) .await?; details }; - ranged_file_resp(path, range, details).await + ranged_file_resp(manager.store().clone(), identifier, range, details).await } -async fn ranged_file_resp( - path: PathBuf, +async fn ranged_file_resp( + store: S, + identifier: S::Identifier, range: Option, details: Details, -) -> Result { +) -> Result +where + Error: From, +{ let (builder, stream) = match range { //Range header exists - return as ranged Some(range_header) => { @@ -631,24 +556,27 @@ async fn ranged_file_resp( if range_header.is_empty() { return Err(UploadError::Range.into()); } else if range_header.len() == 1 { - let file = crate::file::File::open(path).await?; - - let meta = file.metadata().await?; + let len = store.len(&identifier).await?; let range = range_header.ranges().next().unwrap(); let mut builder = HttpResponse::PartialContent(); - builder.insert_header(range.to_content_range(meta.len())); + builder.insert_header(range.to_content_range(len)); - (builder, Either::left(range.chop_file(file).await?)) + ( + builder, + Either::left(map_error::map_crate_error( + range.chop_store(store, identifier).await?, + )), + ) } else { return Err(UploadError::Range.into()); } } //No Range header in the request - return the entire document None => { - let file = crate::file::File::open(path).await?; - let stream = file.read_to_stream(None, None).await?; + let stream = + map_error::map_crate_error(store.to_stream(&identifier, None, None).await?); (HttpResponse::Ok(), Either::right(stream)) } }; @@ -696,10 +624,13 @@ enum FileOrAlias { } #[instrument(name = "Purging file", skip(upload_manager))] -async fn purge( +async fn purge( query: web::Query, - upload_manager: web::Data, -) -> Result { + upload_manager: web::Data>, +) -> Result +where + Error: From, +{ let aliases = match query.into_inner() { FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, @@ -718,10 +649,13 @@ async fn purge( } #[instrument(name = "Fetching aliases", skip(upload_manager))] -async fn aliases( +async fn aliases( query: web::Query, - upload_manager: web::Data, -) -> Result { + upload_manager: web::Data>, +) -> Result +where + Error: From, +{ let aliases = match query.into_inner() { FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, @@ -739,10 +673,13 @@ struct ByAlias { } #[instrument(name = "Fetching filename", skip(upload_manager))] -async fn filename_by_alias( +async fn filename_by_alias( query: web::Query, - upload_manager: web::Data, -) -> Result { + upload_manager: web::Data>, +) -> Result +where + Error: From, +{ let filename = upload_manager.from_alias(query.into_inner().alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -751,12 +688,17 @@ async fn filename_by_alias( }))) } -#[actix_rt::main] -async fn main() -> anyhow::Result<()> { - let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?; - - init_tracing("pict-rs", CONFIG.opentelemetry_url())?; +fn transform_error(error: actix_form_data::Error) -> actix_web::Error { + let error: Error = error.into(); + let error: actix_web::Error = error.into(); + error +} +async fn launch(manager: UploadManager) -> anyhow::Result<()> +where + S::Error: Unpin, + Error: From, +{ // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it @@ -764,7 +706,7 @@ async fn main() -> anyhow::Result<()> { let form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) - .transform_error(|e| Error::from(e).into()) + .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { @@ -775,7 +717,10 @@ async fn main() -> anyhow::Result<()> { async move { let permit = PROCESS_SEMAPHORE.acquire().await?; - let res = manager.session().upload(stream).await; + let res = manager + .session() + .upload(map_error::map_crate_error(stream)) + .await; drop(permit); res @@ -792,7 +737,7 @@ async fn main() -> anyhow::Result<()> { let import_form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) - .transform_error(|e| Error::from(e).into()) + .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, content_type, stream| { @@ -805,7 +750,12 @@ async fn main() -> anyhow::Result<()> { let res = manager .session() - .import(filename, content_type, validate_imports, stream) + .import( + filename, + content_type, + validate_imports, + map_error::map_crate_error(stream), + ) .await; drop(permit); @@ -832,24 +782,25 @@ async fn main() -> anyhow::Result<()> { web::resource("") .guard(guard::Post()) .wrap(form.clone()) - .route(web::post().to(upload)), + .route(web::post().to(upload::)), ) - .service(web::resource("/download").route(web::get().to(download))) + .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete)) - .route(web::get().to(delete)), + .route(web::delete().to(delete::)) + .route(web::get().to(delete::)), ) - .service(web::resource("/original/{filename}").route(web::get().to(serve))) - .service(web::resource("/process.{ext}").route(web::get().to(process))) + .service(web::resource("/original/{filename}").route(web::get().to(serve::))) + .service(web::resource("/process.{ext}").route(web::get().to(process::))) .service( web::scope("/details") .service( - web::resource("/original/{filename}").route(web::get().to(details)), + web::resource("/original/{filename}") + .route(web::get().to(details::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details)), + .route(web::get().to(process_details::)), ), ), ) @@ -859,20 +810,34 @@ async fn main() -> anyhow::Result<()> { .service( web::resource("/import") .wrap(import_form.clone()) - .route(web::post().to(upload)), + .route(web::post().to(upload::)), ) - .service(web::resource("/purge").route(web::post().to(purge))) - .service(web::resource("/aliases").route(web::get().to(aliases))) - .service(web::resource("/filename").route(web::get().to(filename_by_alias))), + .service(web::resource("/purge").route(web::post().to(purge::))) + .service(web::resource("/aliases").route(web::get().to(aliases::))) + .service( + web::resource("/filename").route(web::get().to(filename_by_alias::)), + ), ) }) .bind(CONFIG.bind_address())? .run() .await?; - if tokio::fs::metadata(&*TMP_DIR).await.is_ok() { - tokio::fs::remove_dir_all(&*TMP_DIR).await?; - } - Ok(()) } + +#[actix_rt::main] +async fn main() -> anyhow::Result<()> { + init_tracing("pict-rs", CONFIG.opentelemetry_url())?; + + let root_dir = CONFIG.data_dir(); + let db = LatestDb::exists(root_dir.clone()).migrate()?; + + let store = FileStore::build(root_dir, &db)?; + + let manager = UploadManager::new(store, db, CONFIG.format()).await?; + + // TODO: move restructure to FileStore + manager.restructure().await?; + launch(manager).await +} diff --git a/src/map_error.rs b/src/map_error.rs new file mode 100644 index 0000000..b63b9a8 --- /dev/null +++ b/src/map_error.rs @@ -0,0 +1,43 @@ +use crate::error::Error; +use futures_util::stream::Stream; +use std::{ + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project_lite::pin_project! { + pub(super) struct MapError { + #[pin] + inner: S, + + _error: PhantomData, + } +} + +pub(super) fn map_crate_error(inner: S) -> MapError { + map_error(inner) +} + +pub(super) fn map_error(inner: S) -> MapError { + MapError { + inner, + _error: PhantomData, + } +} + +impl Stream for MapError +where + S: Stream>, + E: From, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.as_mut().project(); + + this.inner + .poll_next(cx) + .map(|opt| opt.map(|res| res.map_err(Into::into))) + } +} diff --git a/src/process.rs b/src/process.rs index cfd2994..e616f87 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,3 +1,4 @@ +use crate::store::Store; use actix_rt::task::JoinHandle; use actix_web::web::Bytes; use std::{ @@ -66,7 +67,7 @@ impl Process { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; - let (tx, rx) = channel(); + let (tx, rx) = channel::(); let span = self.spawn_span(); let mut child = self.child; @@ -102,9 +103,10 @@ impl Process { }) } - pub(crate) fn file_read( + pub(crate) fn store_read( mut self, - mut input_file: crate::file::File, + store: S, + identifier: S::Identifier, ) -> Option { let mut stdin = self.child.stdin.take()?; let stdout = self.child.stdout.take()?; @@ -115,7 +117,7 @@ impl Process { let mut child = self.child; let handle = actix_rt::spawn( async move { - if let Err(e) = input_file.read_to_async_write(&mut stdin).await { + if let Err(e) = store.read_into(&identifier, &mut stdin).await { let _ = tx.send(e); return; } diff --git a/src/processor.rs b/src/processor.rs index bac220e..6a67aa1 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -14,6 +14,50 @@ pub(crate) trait Processor { } pub(crate) struct Identity; +pub(crate) struct Thumbnail(usize); +pub(crate) struct Resize(usize); +pub(crate) struct Crop(usize, usize); +pub(crate) struct Blur(f64); + +#[instrument] +pub(crate) fn build_chain( + args: &[(String, String)], + filename: String, +) -> Result<(PathBuf, Vec), Error> { + fn parse(key: &str, value: &str) -> Result, UploadError> { + if key == P::NAME { + return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?)); + } + + Ok(None) + } + + macro_rules! parse { + ($inner:expr, $x:ident, $k:expr, $v:expr) => {{ + if let Some(processor) = parse::<$x>($k, $v)? { + return Ok((processor.path($inner.0), processor.command($inner.1))); + }; + }}; + } + + let (path, args) = + args.into_iter() + .fold(Ok((PathBuf::default(), vec![])), |inner, (name, value)| { + if let Ok(inner) = inner { + parse!(inner, Identity, name, value); + parse!(inner, Thumbnail, name, value); + parse!(inner, Resize, name, value); + parse!(inner, Crop, name, value); + parse!(inner, Blur, name, value); + + Err(Error::from(UploadError::ParsePath)) + } else { + inner + } + })?; + + Ok((path.join(filename), args)) +} impl Processor for Identity { const NAME: &'static str = "identity"; @@ -34,8 +78,6 @@ impl Processor for Identity { } } -pub(crate) struct Thumbnail(usize); - impl Processor for Thumbnail { const NAME: &'static str = "thumbnail"; @@ -60,8 +102,6 @@ impl Processor for Thumbnail { } } -pub(crate) struct Resize(usize); - impl Processor for Resize { const NAME: &'static str = "resize"; @@ -91,8 +131,6 @@ impl Processor for Resize { } } -pub(crate) struct Crop(usize, usize); - impl Processor for Crop { const NAME: &'static str = "crop"; @@ -133,8 +171,6 @@ impl Processor for Crop { } } -pub(crate) struct Blur(f64); - impl Processor for Blur { const NAME: &'static str = "blur"; @@ -155,43 +191,3 @@ impl Processor for Blur { args } } - -#[instrument] -pub(crate) fn build_chain( - args: &[(String, String)], - filename: String, -) -> Result<(PathBuf, Vec), Error> { - fn parse(key: &str, value: &str) -> Result, UploadError> { - if key == P::NAME { - return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?)); - } - - Ok(None) - } - - macro_rules! parse { - ($inner:expr, $x:ident, $k:expr, $v:expr) => {{ - if let Some(processor) = parse::<$x>($k, $v)? { - return Ok((processor.path($inner.0), processor.command($inner.1))); - }; - }}; - } - - let (path, args) = - args.into_iter() - .fold(Ok((PathBuf::default(), vec![])), |inner, (name, value)| { - if let Ok(inner) = inner { - parse!(inner, Identity, name, value); - parse!(inner, Thumbnail, name, value); - parse!(inner, Resize, name, value); - parse!(inner, Crop, name, value); - parse!(inner, Blur, name, value); - - Err(Error::from(UploadError::ParsePath)) - } else { - inner - } - })?; - - Ok((path.join(filename), args)) -} diff --git a/src/range.rs b/src/range.rs index d11d47e..9f64d03 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,4 +1,7 @@ -use crate::error::{Error, UploadError}; +use crate::{ + error::{Error, UploadError}, + store::Store, +}; use actix_web::{ dev::Payload, http::{ @@ -52,17 +55,22 @@ impl Range { } } - pub(crate) async fn chop_file( + pub(crate) async fn chop_store( &self, - file: crate::file::File, - ) -> Result>, Error> { + store: S, + identifier: S::Identifier, + ) -> Result>, Error> + where + Error: From, + { match self { - Range::Start(start) => file.read_to_stream(Some(*start), None).await, - Range::SuffixLength(from_start) => file.read_to_stream(None, Some(*from_start)).await, - Range::Segment(start, end) => { - file.read_to_stream(Some(*start), Some(end.saturating_sub(*start))) - .await - } + Range::Start(start) => Ok(store.to_stream(&identifier, Some(*start), None).await?), + Range::SuffixLength(from_start) => Ok(store + .to_stream(&identifier, None, Some(*from_start)) + .await?), + Range::Segment(start, end) => Ok(store + .to_stream(&identifier, Some(*start), Some(end.saturating_sub(*start))) + .await?), } } } diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..2edb581 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,52 @@ +use std::fmt::Debug; + +use actix_web::web::Bytes; +use futures_util::stream::Stream; +use tokio::io::{AsyncRead, AsyncWrite}; + +pub(crate) mod file_store; + +pub(crate) trait Identifier: Send + Sync + Clone + Debug { + type Error: std::error::Error; + + fn to_bytes(&self) -> Result, Self::Error>; + + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized; +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { + type Error: std::error::Error; + type Identifier: Identifier; + type Stream: Stream>; + + async fn save_async_read( + &self, + reader: &mut Reader, + ) -> Result + where + Reader: AsyncRead + Unpin; + + async fn save_bytes(&self, bytes: Bytes) -> Result; + + async fn to_stream( + &self, + identifier: &Self::Identifier, + from_start: Option, + len: Option, + ) -> Result; + + async fn read_into( + &self, + identifier: &Self::Identifier, + writer: &mut Writer, + ) -> Result<(), std::io::Error> + where + Writer: AsyncWrite + Unpin; + + async fn len(&self, identifier: &Self::Identifier) -> Result; + + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error>; +} diff --git a/src/store/file_store.rs b/src/store/file_store.rs new file mode 100644 index 0000000..e13740b --- /dev/null +++ b/src/store/file_store.rs @@ -0,0 +1,303 @@ +use crate::store::Store; +use actix_web::web::Bytes; +use futures_util::stream::Stream; +use std::{ + path::{Path, PathBuf}, + pin::Pin, +}; +use storage_path_generator::Generator; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, error, instrument}; +use uuid::Uuid; + +mod file; +mod file_id; +mod restructure; +use file::File; +pub(crate) use file_id::FileId; + +// - Settings Tree +// - last-path -> last generated path +// - fs-restructure-01-complete -> bool + +const GENERATOR_KEY: &'static [u8] = b"last-path"; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum FileError { + #[error(transparent)] + Sled(#[from] sled::Error), + + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error(transparent)] + PathGenerator(#[from] storage_path_generator::PathError), + + #[error("Error formatting file store identifier")] + IdError, + + #[error("Mailformed file store identifier")] + PrefixError, + + #[error("Tried to save over existing file")] + FileExists, +} + +#[derive(Clone)] +pub(crate) struct FileStore { + path_gen: Generator, + root_dir: PathBuf, + settings_tree: sled::Tree, +} + +#[async_trait::async_trait(?Send)] +impl Store for FileStore { + type Error = FileError; + type Identifier = FileId; + type Stream = Pin>>>; + + async fn save_async_read( + &self, + reader: &mut Reader, + ) -> Result + where + Reader: AsyncRead + Unpin, + { + let path = self.next_file()?; + + if let Err(e) = self.safe_save_reader(&path, reader).await { + self.safe_remove_file(&path).await?; + return Err(e); + } + + self.file_id_from_path(path) + } + + async fn save_bytes(&self, bytes: Bytes) -> Result { + let path = self.next_file()?; + + if let Err(e) = self.safe_save_bytes(&path, bytes).await { + self.safe_remove_file(&path).await?; + return Err(e); + } + + self.file_id_from_path(path) + } + + async fn to_stream( + &self, + identifier: &Self::Identifier, + from_start: Option, + len: Option, + ) -> Result { + let path = self.path_from_file_id(identifier); + + let stream = File::open(path) + .await? + .read_to_stream(from_start, len) + .await?; + + Ok(Box::pin(stream)) + } + + async fn read_into( + &self, + identifier: &Self::Identifier, + writer: &mut Writer, + ) -> Result<(), std::io::Error> + where + Writer: AsyncWrite + Unpin, + { + let path = self.path_from_file_id(identifier); + + File::open(&path).await?.read_to_async_write(writer).await?; + + Ok(()) + } + + async fn len(&self, identifier: &Self::Identifier) -> Result { + let path = self.path_from_file_id(identifier); + + let len = tokio::fs::metadata(path).await?.len(); + + Ok(len) + } + + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { + let path = self.path_from_file_id(identifier); + + self.safe_remove_file(path).await?; + + Ok(()) + } +} + +impl FileStore { + pub fn build(root_dir: PathBuf, db: &sled::Db) -> Result { + let settings_tree = db.open_tree("settings")?; + + let path_gen = init_generator(&settings_tree)?; + + Ok(FileStore { + root_dir, + path_gen, + settings_tree, + }) + } + + fn next_directory(&self) -> Result { + let path = self.path_gen.next(); + + self.settings_tree + .insert(GENERATOR_KEY, path.to_be_bytes())?; + + let mut target_path = self.root_dir.join("files"); + for dir in path.to_strings() { + target_path.push(dir) + } + + Ok(target_path) + } + + fn next_file(&self) -> Result { + let target_path = self.next_directory()?; + + let filename = Uuid::new_v4().to_string(); + + Ok(target_path.join(filename)) + } + + async fn safe_remove_file>(&self, path: P) -> Result<(), FileError> { + tokio::fs::remove_file(&path).await?; + self.try_remove_parents(path.as_ref()).await; + Ok(()) + } + + async fn try_remove_parents(&self, mut path: &Path) { + while let Some(parent) = path.parent() { + if parent.ends_with(&self.root_dir) { + return; + } + + if tokio::fs::remove_dir(parent).await.is_err() { + return; + } + + path = parent; + } + } + + // Try writing to a file + #[instrument(name = "Saving file", skip(bytes), fields(path = tracing::field::debug(&path.as_ref())))] + async fn safe_save_bytes>( + &self, + path: P, + bytes: Bytes, + ) -> Result<(), FileError> { + safe_create_parent(&path).await?; + + // Only write the file if it doesn't already exist + debug!("Checking if {:?} already exists", path.as_ref()); + if let Err(e) = tokio::fs::metadata(&path).await { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e.into()); + } + } else { + return Ok(()); + } + + // Open the file for writing + debug!("Creating {:?}", path.as_ref()); + let mut file = File::create(&path).await?; + + // try writing + debug!("Writing to {:?}", path.as_ref()); + if let Err(e) = file.write_from_bytes(bytes).await { + error!("Error writing {:?}, {}", path.as_ref(), e); + // remove file if writing failed before completion + self.safe_remove_file(path).await?; + return Err(e.into()); + } + debug!("{:?} written", path.as_ref()); + + Ok(()) + } + + #[instrument(skip(input), fields(to = tracing::field::debug(&to.as_ref())))] + async fn safe_save_reader>( + &self, + to: P, + input: &mut (impl AsyncRead + Unpin + ?Sized), + ) -> Result<(), FileError> { + safe_create_parent(&to).await?; + + debug!("Checking if {:?} already exists", to.as_ref()); + if let Err(e) = tokio::fs::metadata(&to).await { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e.into()); + } + } else { + return Err(FileError::FileExists); + } + + debug!("Writing stream to {:?}", to.as_ref()); + + let mut file = File::create(to).await?; + + file.write_from_async_read(input).await?; + + Ok(()) + } + + // try moving a file + #[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))] + pub(crate) async fn safe_move_file, Q: AsRef>( + &self, + from: P, + to: Q, + ) -> Result<(), FileError> { + safe_create_parent(&to).await?; + + debug!("Checking if {:?} already exists", to.as_ref()); + if let Err(e) = tokio::fs::metadata(&to).await { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e.into()); + } + } else { + return Err(FileError::FileExists); + } + + debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref()); + tokio::fs::copy(&from, &to).await?; + self.safe_remove_file(from).await?; + Ok(()) + } +} + +pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { + if let Some(path) = path.as_ref().parent() { + debug!("Creating directory {:?}", path); + tokio::fs::create_dir_all(path).await?; + } + + Ok(()) +} + +fn init_generator(settings: &sled::Tree) -> Result { + if let Some(ivec) = settings.get(GENERATOR_KEY)? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + )) + } else { + Ok(Generator::new()) + } +} + +impl std::fmt::Debug for FileStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileStore") + .field("path_gen", &self.path_gen) + .field("root_dir", &self.root_dir) + .finish() + } +} diff --git a/src/file.rs b/src/store/file_store/file.rs similarity index 83% rename from src/file.rs rename to src/store/file_store/file.rs index 703251f..cf5accf 100644 --- a/src/file.rs +++ b/src/store/file_store/file.rs @@ -1,50 +1,15 @@ -use futures_util::stream::Stream; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - #[cfg(feature = "io-uring")] pub(crate) use io_uring::File; #[cfg(not(feature = "io-uring"))] pub(crate) use tokio_file::File; -pin_project_lite::pin_project! { - pub(super) struct CrateError { - #[pin] - inner: S - } -} - -impl CrateError { - pub(super) fn new(inner: S) -> Self { - CrateError { inner } - } -} - -impl Stream for CrateError -where - S: Stream>, - crate::error::Error: From, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().project(); - - this.inner - .poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(Into::into))) - } -} - #[cfg(not(feature = "io-uring"))] mod tokio_file { - use crate::Either; + use crate::{store::file_store::FileError, Either}; use actix_web::web::{Bytes, BytesMut}; use futures_util::stream::Stream; - use std::{fs::Metadata, io::SeekFrom, path::Path}; + use std::{io::SeekFrom, path::Path}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -65,20 +30,13 @@ mod tokio_file { }) } - pub(crate) async fn metadata(&self) -> std::io::Result { - self.inner.metadata().await - } - - pub(crate) async fn write_from_bytes<'a>( - &'a mut self, - mut bytes: Bytes, - ) -> std::io::Result<()> { + pub(crate) async fn write_from_bytes(&mut self, mut bytes: Bytes) -> std::io::Result<()> { self.inner.write_all_buf(&mut bytes).await?; Ok(()) } - pub(crate) async fn write_from_async_read<'a, R>( - &'a mut self, + pub(crate) async fn write_from_async_read( + &mut self, mut reader: R, ) -> std::io::Result<()> where @@ -88,12 +46,9 @@ mod tokio_file { Ok(()) } - pub(crate) async fn read_to_async_write<'a, W>( - &'a mut self, - writer: &'a mut W, - ) -> std::io::Result<()> + pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + ?Sized, { tokio::io::copy(&mut self.inner, writer).await?; Ok(()) @@ -103,8 +58,7 @@ mod tokio_file { mut self, from_start: Option, len: Option, - ) -> Result>, crate::error::Error> - { + ) -> Result>, FileError> { let obj = match (from_start, len) { (Some(lower), Some(upper)) => { self.inner.seek(SeekFrom::Start(lower)).await?; @@ -118,10 +72,7 @@ mod tokio_file { (None, None) => Either::right(self.inner), }; - Ok(super::CrateError::new(BytesFreezer::new(FramedRead::new( - obj, - BytesCodec::new(), - )))) + Ok(BytesFreezer::new(FramedRead::new(obj, BytesCodec::new()))) } } @@ -159,6 +110,7 @@ mod tokio_file { #[cfg(feature = "io-uring")] mod io_uring { + use crate::store::file_store::FileError; use actix_web::web::Bytes; use futures_util::stream::Stream; use std::{ @@ -182,7 +134,7 @@ mod io_uring { impl File { pub(crate) async fn open(path: impl AsRef) -> std::io::Result { - tracing::info!("Opening io-uring file"); + tracing::info!("Opening io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), inner: tokio_uring::fs::File::open(path).await?, @@ -190,21 +142,18 @@ mod io_uring { } pub(crate) async fn create(path: impl AsRef) -> std::io::Result { - tracing::info!("Creating io-uring file"); + tracing::info!("Creating io-uring file: {:?}", path.as_ref()); Ok(File { path: path.as_ref().to_owned(), inner: tokio_uring::fs::File::create(path).await?, }) } - pub(crate) async fn metadata(&self) -> std::io::Result { + async fn metadata(&self) -> std::io::Result { tokio::fs::metadata(&self.path).await } - pub(crate) async fn write_from_bytes<'a>( - &'a mut self, - bytes: Bytes, - ) -> std::io::Result<()> { + pub(crate) async fn write_from_bytes(&mut self, bytes: Bytes) -> std::io::Result<()> { let mut buf = bytes.to_vec(); let len: u64 = buf.len().try_into().unwrap(); @@ -233,8 +182,8 @@ mod io_uring { Ok(()) } - pub(crate) async fn write_from_async_read<'a, R>( - &'a mut self, + pub(crate) async fn write_from_async_read( + &mut self, mut reader: R, ) -> std::io::Result<()> where @@ -283,12 +232,9 @@ mod io_uring { Ok(()) } - pub(crate) async fn read_to_async_write<'a, W>( - &'a mut self, - writer: &mut W, - ) -> std::io::Result<()> + pub(crate) async fn read_to_async_write(&mut self, writer: &mut W) -> std::io::Result<()> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + ?Sized, { let metadata = self.metadata().await?; let size = metadata.len(); @@ -323,20 +269,17 @@ mod io_uring { self, from_start: Option, len: Option, - ) -> Result>, crate::error::Error> - { + ) -> Result>, FileError> { let size = self.metadata().await?.len(); let cursor = from_start.unwrap_or(0); let size = len.unwrap_or(size - cursor) + cursor; - Ok(super::CrateError { - inner: BytesStream { - state: ReadFileState::File { file: Some(self) }, - size, - cursor, - callback: read_file, - }, + Ok(BytesStream { + state: ReadFileState::File { file: Some(self) }, + size, + cursor, + callback: read_file, }) } diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs new file mode 100644 index 0000000..e811466 --- /dev/null +++ b/src/store/file_store/file_id.rs @@ -0,0 +1,48 @@ +use crate::store::{ + file_store::{FileError, FileStore}, + Identifier, +}; +use std::path::PathBuf; + +#[derive(Clone, Debug)] +pub(crate) struct FileId(PathBuf); + +impl Identifier for FileId { + type Error = FileError; + + fn to_bytes(&self) -> Result, Self::Error> { + let vec = self + .0 + .to_str() + .ok_or(FileError::IdError)? + .as_bytes() + .to_vec(); + + Ok(vec) + } + + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized, + { + let string = String::from_utf8(bytes).map_err(|_| FileError::IdError)?; + + let id = FileId(PathBuf::from(string)); + + Ok(id) + } +} + +impl FileStore { + pub(super) fn file_id_from_path(&self, path: PathBuf) -> Result { + let stripped = path + .strip_prefix(&self.root_dir) + .map_err(|_| FileError::PrefixError)?; + + Ok(FileId(stripped.to_path_buf())) + } + + pub(super) fn path_from_file_id(&self, file_id: &FileId) -> PathBuf { + self.root_dir.join(&file_id.0) + } +} diff --git a/src/upload_manager/restructure.rs b/src/store/file_store/restructure.rs similarity index 69% rename from src/upload_manager/restructure.rs rename to src/store/file_store/restructure.rs index a49db18..1ff70f9 100644 --- a/src/upload_manager/restructure.rs +++ b/src/store/file_store/restructure.rs @@ -1,6 +1,6 @@ use crate::{ error::{Error, UploadError}, - safe_move_file, + store::file_store::FileStore, upload_manager::UploadManager, }; use std::path::{Path, PathBuf}; @@ -8,24 +8,22 @@ use std::path::{Path, PathBuf}; const RESTRUCTURE_COMPLETE: &'static [u8] = b"fs-restructure-01-complete"; const DETAILS: &'static [u8] = b"details"; -impl UploadManager { +impl UploadManager { #[tracing::instrument(skip(self))] - pub(super) async fn restructure(&self) -> Result<(), Error> { + pub(crate) async fn restructure(&self) -> Result<(), Error> { if self.restructure_complete()? { return Ok(()); } - for res in self.inner.filename_tree.iter() { + for res in self.inner().filename_tree.iter() { let (filename, hash) = res?; let filename = String::from_utf8(filename.to_vec())?; tracing::info!("Migrating {}", filename); - let mut file_path = self.inner.root_dir.join("files"); - file_path.push(filename.clone()); + let file_path = self.store().root_dir.join("files").join(&filename); if tokio::fs::metadata(&file_path).await.is_ok() { - let mut target_path = self.next_directory()?; - target_path.push(filename.clone()); + let target_path = self.store().next_directory()?.join(&filename); let target_path_bytes = self .generalize_path(&target_path)? @@ -34,24 +32,23 @@ impl UploadManager { .as_bytes() .to_vec(); - self.inner - .path_tree + self.inner() + .identifier_tree .insert(filename.as_bytes(), target_path_bytes)?; - safe_move_file(file_path, target_path).await?; + self.store().safe_move_file(file_path, target_path).await?; } let (start, end) = variant_key_bounds(&hash); - for res in self.inner.main_tree.range(start..end) { + for res in self.inner().main_tree.range(start..end) { let (hash_variant_key, variant_path_or_details) = res?; if !hash_variant_key.ends_with(DETAILS) { let variant_path = PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?); if tokio::fs::metadata(&variant_path).await.is_ok() { - let mut target_path = self.next_directory()?; - target_path.push(filename.clone()); + let target_path = self.store().next_directory()?.join(&filename); let relative_target_path_bytes = self .generalize_path(&target_path)? @@ -62,16 +59,18 @@ impl UploadManager { let variant_key = self.migrate_variant_key(&variant_path, &filename)?; - self.inner - .path_tree + self.inner() + .identifier_tree .insert(variant_key, relative_target_path_bytes)?; - safe_move_file(variant_path.clone(), target_path).await?; - self.try_remove_parents(&variant_path).await?; + self.store() + .safe_move_file(variant_path.clone(), target_path) + .await?; + self.store().try_remove_parents(&variant_path).await; } } - self.inner.main_tree.remove(hash_variant_key)?; + self.inner().main_tree.remove(hash_variant_key)?; } } @@ -81,22 +80,22 @@ impl UploadManager { fn restructure_complete(&self) -> Result { Ok(self - .inner + .store() .settings_tree .get(RESTRUCTURE_COMPLETE)? .is_some()) } fn mark_restructure_complete(&self) -> Result<(), Error> { - self.inner + self.store() .settings_tree .insert(RESTRUCTURE_COMPLETE, b"true")?; Ok(()) } - pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> { - Ok(path.strip_prefix(&self.inner.root_dir)?) + fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> { + Ok(path.strip_prefix(&self.store().root_dir)?) } fn migrate_variant_key( diff --git a/src/upload_manager/mod.rs b/src/upload_manager/mod.rs index b8bac4a..b541e3a 100644 --- a/src/upload_manager/mod.rs +++ b/src/upload_manager/mod.rs @@ -1,22 +1,22 @@ use crate::{ config::Format, error::{Error, UploadError}, - ffmpeg::ThumbnailFormat, - migrate::{alias_id_key, alias_key, alias_key_bounds, LatestDb}, + ffmpeg::{InputFormat, ThumbnailFormat}, + magick::{details_hint, ValidInputType}, + migrate::{alias_id_key, alias_key, alias_key_bounds}, + store::{Identifier, Store}, }; use actix_web::web; use sha2::Digest; use std::{ ops::{Deref, DerefMut}, - path::PathBuf, + string::FromUtf8Error, sync::Arc, }; -use storage_path_generator::{Generator, Path}; use tracing::{debug, error, info, instrument, warn, Span}; use tracing_futures::Instrument; mod hasher; -mod restructure; mod session; pub(super) use session::UploadManagerSession; @@ -35,33 +35,26 @@ pub(super) use session::UploadManagerSession; // - Filename Tree // - filename -> hash // - Details Tree -// - filename / relative path -> details +// - filename / S::Identifier -> details // - Path Tree -// - filename -> relative path -// - filename / relative variant path -> relative variant path -// - filename / motion -> relative motion path -// - Settings Tree -// - last-path -> last generated path -// - fs-restructure-01-complete -> bool - -const GENERATOR_KEY: &'static [u8] = b"last-path"; +// - filename -> S::Identifier +// - filename / variant path -> S::Identifier +// - filename / motion -> S::Identifier #[derive(Clone)] -pub struct UploadManager { +pub(crate) struct UploadManager { inner: Arc, + store: S, } -struct UploadManagerInner { +pub(crate) struct UploadManagerInner { format: Option, hasher: sha2::Sha256, - root_dir: PathBuf, - alias_tree: sled::Tree, - filename_tree: sled::Tree, - main_tree: sled::Tree, + pub(crate) alias_tree: sled::Tree, + pub(crate) filename_tree: sled::Tree, + pub(crate) main_tree: sled::Tree, details_tree: sled::Tree, - path_tree: sled::Tree, - settings_tree: sled::Tree, - path_gen: Generator, + pub(crate) identifier_tree: sled::Tree, db: sled::Db, } @@ -82,88 +75,85 @@ struct FilenameIVec { inner: sled::IVec, } -impl UploadManager { +impl UploadManager +where + S: Store + 'static, + Error: From, +{ /// Create a new UploadManager - pub(crate) async fn new(root_dir: PathBuf, format: Option) -> Result { - let root_clone = root_dir.clone(); - // sled automatically creates it's own directories - let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??; - - // Ensure file dir exists - tokio::fs::create_dir_all(&root_dir).await?; - - let settings_tree = db.open_tree("settings")?; - - let path_gen = init_generator(&settings_tree)?; - + pub(crate) async fn new(store: S, db: sled::Db, format: Option) -> Result { let manager = UploadManager { inner: Arc::new(UploadManagerInner { format, hasher: sha2::Sha256::new(), - root_dir, alias_tree: db.open_tree("alias")?, filename_tree: db.open_tree("filename")?, details_tree: db.open_tree("details")?, main_tree: db.open_tree("main")?, - path_tree: db.open_tree("path")?, - settings_tree, - path_gen, + identifier_tree: db.open_tree("path")?, db, }), + store, }; - manager.restructure().await?; - Ok(manager) } - pub(crate) async fn still_path_from_filename( - &self, - filename: String, - ) -> Result { - let path = self.path_from_filename(filename.clone()).await?; - let details = - if let Some(details) = self.variant_details(path.clone(), filename.clone()).await? { - details - } else { - Details::from_path(&path).await? - }; - - if !details.is_motion() { - return Ok(path); - } - - if let Some(motion_path) = self.motion_path(&filename).await? { - return Ok(motion_path); - } - - let jpeg_path = self.next_directory()?.join(&filename); - crate::safe_create_parent(&jpeg_path).await?; - - let permit = crate::PROCESS_SEMAPHORE.acquire().await; - let res = crate::ffmpeg::thumbnail(&path, &jpeg_path, ThumbnailFormat::Jpeg).await; - drop(permit); - - if let Err(e) = res { - error!("transcode error: {:?}", e); - self.remove_path(&jpeg_path).await?; - return Err(e); - } - - self.store_motion_path(&filename, &jpeg_path).await?; - Ok(jpeg_path) + pub(crate) fn store(&self) -> &S { + &self.store } - async fn motion_path(&self, filename: &str) -> Result, Error> { - let path_tree = self.inner.path_tree.clone(); + pub(crate) fn inner(&self) -> &UploadManagerInner { + &self.inner + } + + pub(crate) async fn still_identifier_from_filename( + &self, + filename: String, + ) -> Result { + let identifier = self.identifier_from_filename(filename.clone()).await?; + let details = if let Some(details) = self + .variant_details(identifier.clone(), filename.clone()) + .await? + { + details + } else { + let hint = details_hint(&filename); + Details::from_store(self.store.clone(), identifier.clone(), hint).await? + }; + + if !details.is_motion() { + return Ok(identifier); + } + + if let Some(motion_identifier) = self.motion_identifier(&filename).await? { + return Ok(motion_identifier); + } + + let permit = crate::PROCESS_SEMAPHORE.acquire().await; + let mut reader = crate::ffmpeg::thumbnail( + self.store.clone(), + identifier, + InputFormat::Mp4, + ThumbnailFormat::Jpeg, + ) + .await?; + let motion_identifier = self.store.save_async_read(&mut reader).await?; + drop(permit); + + self.store_motion_path(&filename, &motion_identifier) + .await?; + Ok(motion_identifier) + } + + async fn motion_identifier(&self, filename: &str) -> Result, Error> { + let identifier_tree = self.inner.identifier_tree.clone(); let motion_key = format!("{}/motion", filename); - let opt = web::block(move || path_tree.get(motion_key.as_bytes())).await??; + let opt = web::block(move || identifier_tree.get(motion_key.as_bytes())).await??; if let Some(ivec) = opt { - return Ok(Some( - self.inner.root_dir.join(String::from_utf8(ivec.to_vec())?), - )); + return Ok(Some(S::Identifier::from_bytes(ivec.to_vec())?)); } Ok(None) @@ -172,59 +162,57 @@ impl UploadManager { async fn store_motion_path( &self, filename: &str, - path: impl AsRef, + identifier: &S::Identifier, ) -> Result<(), Error> { - let path_bytes = self - .generalize_path(path.as_ref())? - .to_str() - .ok_or(UploadError::Path)? - .as_bytes() - .to_vec(); + let identifier_bytes = identifier.to_bytes()?; let motion_key = format!("{}/motion", filename); - let path_tree = self.inner.path_tree.clone(); + let identifier_tree = self.inner.identifier_tree.clone(); - web::block(move || path_tree.insert(motion_key.as_bytes(), path_bytes)).await??; + web::block(move || identifier_tree.insert(motion_key.as_bytes(), identifier_bytes)) + .await??; Ok(()) } #[instrument(skip(self))] - pub(crate) async fn path_from_filename(&self, filename: String) -> Result { - let path_tree = self.inner.path_tree.clone(); - let path_ivec = web::block(move || path_tree.get(filename.as_bytes())) + pub(crate) async fn identifier_from_filename( + &self, + filename: String, + ) -> Result { + let identifier_tree = self.inner.identifier_tree.clone(); + let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes())) .await?? .ok_or(UploadError::MissingFile)?; - let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?); + let identifier = S::Identifier::from_bytes(path_ivec.to_vec())?; - Ok(self.inner.root_dir.join(relative)) + Ok(identifier) } #[instrument(skip(self))] - async fn store_path(&self, filename: String, path: &std::path::Path) -> Result<(), Error> { - let path_bytes = self - .generalize_path(path)? - .to_str() - .ok_or(UploadError::Path)? - .as_bytes() - .to_vec(); - let path_tree = self.inner.path_tree.clone(); - web::block(move || path_tree.insert(filename.as_bytes(), path_bytes)).await??; + async fn store_identifier( + &self, + filename: String, + identifier: &S::Identifier, + ) -> Result<(), Error> { + let identifier_bytes = identifier.to_bytes()?; + let identifier_tree = self.inner.identifier_tree.clone(); + web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??; Ok(()) } #[instrument(skip(self))] - pub(crate) async fn variant_path( + pub(crate) async fn variant_identifier( &self, process_path: &std::path::Path, filename: &str, - ) -> Result, Error> { + ) -> Result, Error> { let key = self.variant_key(process_path, filename)?; - let path_tree = self.inner.path_tree.clone(); - let path_opt = web::block(move || path_tree.get(key)).await??; + let identifier_tree = self.inner.identifier_tree.clone(); + let path_opt = web::block(move || identifier_tree.get(key)).await??; - if let Some(path_ivec) = path_opt { - let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?); - Ok(Some(self.inner.root_dir.join(relative))) + if let Some(ivec) = path_opt { + let identifier = S::Identifier::from_bytes(ivec.to_vec())?; + Ok(Some(identifier)) } else { Ok(None) } @@ -235,22 +223,22 @@ impl UploadManager { pub(crate) async fn store_variant( &self, variant_process_path: Option<&std::path::Path>, - real_path: &std::path::Path, + identifier: &S::Identifier, filename: &str, ) -> Result<(), Error> { - let path_bytes = self - .generalize_path(real_path)? - .to_str() - .ok_or(UploadError::Path)? - .as_bytes() - .to_vec(); - - let variant_path = variant_process_path.unwrap_or(real_path); - let key = self.variant_key(variant_path, filename)?; - let path_tree = self.inner.path_tree.clone(); + let key = if let Some(path) = variant_process_path { + self.variant_key(path, filename)? + } else { + let mut vec = filename.as_bytes().to_vec(); + vec.extend(b"/"); + vec.extend(&identifier.to_bytes()?); + vec + }; + let identifier_tree = self.inner.identifier_tree.clone(); + let identifier_bytes = identifier.to_bytes()?; debug!("Storing variant"); - web::block(move || path_tree.insert(key, path_bytes)).await??; + web::block(move || identifier_tree.insert(key, identifier_bytes)).await??; debug!("Stored variant"); Ok(()) @@ -260,10 +248,10 @@ impl UploadManager { #[instrument(skip(self))] pub(crate) async fn variant_details( &self, - path: PathBuf, + identifier: S::Identifier, filename: String, ) -> Result, Error> { - let key = self.details_key(&path, &filename)?; + let key = self.details_key(identifier, &filename)?; let details_tree = self.inner.details_tree.clone(); debug!("Getting details"); @@ -282,11 +270,11 @@ impl UploadManager { #[instrument(skip(self))] pub(crate) async fn store_variant_details( &self, - path: PathBuf, + identifier: S::Identifier, filename: String, details: &Details, ) -> Result<(), Error> { - let key = self.details_key(&path, &filename)?; + let key = self.details_key(identifier, &filename)?; let details_tree = self.inner.details_tree.clone(); let details_value = serde_json::to_vec(details)?; @@ -317,21 +305,6 @@ impl UploadManager { self.aliases_by_hash(&hash).await } - pub(crate) fn next_directory(&self) -> Result { - let path = self.inner.path_gen.next(); - - self.inner - .settings_tree - .insert(GENERATOR_KEY, path.to_be_bytes())?; - - let mut target_path = self.inner.root_dir.join("files"); - for dir in path.to_strings() { - target_path.push(dir) - } - - Ok(target_path) - } - async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result, Error> { let (start, end) = alias_key_bounds(hash); let main_tree = self.inner.main_tree.clone(); @@ -386,26 +359,26 @@ impl UploadManager { debug!("Deleting alias -> delete-token mapping"); let existing_token = alias_tree .remove(delete_key(&alias2).as_bytes())? - .ok_or_else(|| trans_err(UploadError::MissingAlias))?; + .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; // Bail if invalid token if existing_token != token { warn!("Invalid delete token"); - return Err(trans_err(UploadError::InvalidToken)); + return Err(trans_upload_error(UploadError::InvalidToken)); } // -- GET ID FOR HASH TREE CLEANUP -- debug!("Deleting alias -> id mapping"); let id = alias_tree .remove(alias_id_key(&alias2).as_bytes())? - .ok_or_else(|| trans_err(UploadError::MissingAlias))?; - let id = String::from_utf8(id.to_vec()).map_err(trans_err)?; + .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; + let id = String::from_utf8(id.to_vec()).map_err(trans_utf8_error)?; // -- GET HASH FOR HASH TREE CLEANUP -- debug!("Deleting alias -> hash mapping"); let hash = alias_tree .remove(alias2.as_bytes())? - .ok_or_else(|| trans_err(UploadError::MissingAlias))?; + .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; // -- REMOVE HASH TREE ELEMENT -- debug!("Deleting hash -> alias mapping"); @@ -491,7 +464,7 @@ impl UploadManager { Ok(filename) } - pub(crate) fn session(&self) -> UploadManagerSession { + pub(crate) fn session(&self) -> UploadManagerSession { UploadManagerSession::new(self.clone()) } @@ -501,14 +474,14 @@ impl UploadManager { let filename = filename.inner; let filename2 = filename.clone(); - let path_tree = self.inner.path_tree.clone(); - let path = web::block(move || path_tree.remove(filename2)).await??; + let identifier_tree = self.inner.identifier_tree.clone(); + let identifier = web::block(move || identifier_tree.remove(filename2)).await??; let mut errors = Vec::new(); - if let Some(path) = path { - let path = self.inner.root_dir.join(String::from_utf8(path.to_vec())?); - debug!("Deleting {:?}", path); - if let Err(e) = self.remove_path(&path).await { + if let Some(identifier) = identifier { + let identifier = S::Identifier::from_bytes(identifier.to_vec())?; + debug!("Deleting {:?}", identifier); + if let Err(e) = self.store.remove(&identifier).await { errors.push(e.into()); } } @@ -519,36 +492,34 @@ impl UploadManager { web::block(move || fname_tree.remove(filename2)).await??; let path_prefix = filename.clone(); - let path_tree = self.inner.path_tree.clone(); + let identifier_tree = self.inner.identifier_tree.clone(); debug!("Fetching file variants"); - let paths = web::block(move || { - path_tree + let identifiers = web::block(move || { + identifier_tree .scan_prefix(path_prefix) .values() .collect::, sled::Error>>() }) .await??; - debug!("{} files prepared for deletion", paths.len()); + debug!("{} files prepared for deletion", identifiers.len()); - for path in paths { - let path = self - .inner - .root_dir - .join(String::from_utf8_lossy(&path).as_ref()); - debug!("Deleting {:?}", path); - if let Err(e) = self.remove_path(&path).await { + for id in identifiers { + let identifier = S::Identifier::from_bytes(id.to_vec())?; + + debug!("Deleting {:?}", identifier); + if let Err(e) = self.store.remove(&identifier).await { errors.push(e); } } let path_prefix = filename.clone(); - let path_tree = self.inner.path_tree.clone(); + let identifier_tree = self.inner.identifier_tree.clone(); debug!("Deleting path info"); web::block(move || { - for res in path_tree.scan_prefix(path_prefix).keys() { + for res in identifier_tree.scan_prefix(path_prefix).keys() { let key = res?; - path_tree.remove(key)?; + identifier_tree.remove(key)?; } Ok(()) as Result<(), Error> }) @@ -560,30 +531,7 @@ impl UploadManager { Ok(()) } - async fn try_remove_parents(&self, mut path: &std::path::Path) -> Result<(), Error> { - let root = self.inner.root_dir.join("files"); - - while let Some(parent) = path.parent() { - if parent.ends_with(&root) { - break; - } - - if tokio::fs::remove_dir(parent).await.is_err() { - break; - } - - path = parent; - } - - Ok(()) - } - - async fn remove_path(&self, path: &std::path::Path) -> Result<(), Error> { - tokio::fs::remove_file(path).await?; - self.try_remove_parents(path).await - } - - fn variant_key( + pub(crate) fn variant_key( &self, variant_process_path: &std::path::Path, filename: &str, @@ -597,15 +545,11 @@ impl UploadManager { Ok(vec) } - fn details_key( - &self, - variant_path: &std::path::Path, - filename: &str, - ) -> Result, Error> { - let path = self.generalize_path(variant_path)?; - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + fn details_key(&self, identifier: S::Identifier, filename: &str) -> Result, Error> { + let mut vec = filename.as_bytes().to_vec(); + vec.extend(b"/"); + vec.extend(&identifier.to_bytes()?); - let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); Ok(vec) } } @@ -623,8 +567,11 @@ impl Details { } #[tracing::instrument("Details from bytes", skip(input))] - pub(crate) async fn from_bytes(input: web::Bytes) -> Result { - let details = crate::magick::details_bytes(input).await?; + pub(crate) async fn from_bytes( + input: web::Bytes, + hint: Option, + ) -> Result { + let details = crate::magick::details_bytes(input, hint).await?; Ok(Details::now( details.width, @@ -633,12 +580,13 @@ impl Details { )) } - #[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))] - pub(crate) async fn from_path

(path: P) -> Result - where - P: AsRef, - { - let details = crate::magick::details(&path).await?; + #[tracing::instrument("Details from store")] + pub(crate) async fn from_store( + store: S, + identifier: S::Identifier, + expected_format: Option, + ) -> Result { + let details = crate::magick::details_store(store, identifier, expected_format).await?; Ok(Details::now( details.width, @@ -671,14 +619,14 @@ impl FilenameIVec { } } -fn init_generator(settings: &sled::Tree) -> Result { - if let Some(ivec) = settings.get(GENERATOR_KEY)? { - Ok(Generator::from_existing(Path::from_be_bytes( - ivec.to_vec(), - )?)) - } else { - Ok(Generator::new()) - } +fn trans_upload_error( + upload_error: UploadError, +) -> sled::transaction::ConflictableTransactionError { + trans_err(upload_error) +} + +fn trans_utf8_error(e: FromUtf8Error) -> sled::transaction::ConflictableTransactionError { + trans_err(e) } fn trans_err(e: E) -> sled::transaction::ConflictableTransactionError @@ -706,7 +654,7 @@ impl DerefMut for Serde { } } -impl std::fmt::Debug for UploadManager { +impl std::fmt::Debug for UploadManager { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("UploadManager").finish() } diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 718f194..69dd910 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -1,7 +1,7 @@ use crate::{ error::{Error, UploadError}, migrate::{alias_id_key, alias_key}, - to_ext, + store::Store, upload_manager::{ delete_key, hasher::{Hash, Hasher}, @@ -10,20 +10,24 @@ use crate::{ }; use actix_web::web; use futures_util::stream::{Stream, StreamExt}; -use std::path::PathBuf; -use tokio::io::AsyncRead; use tracing::{debug, instrument, warn, Span}; use tracing_futures::Instrument; use uuid::Uuid; -pub(crate) struct UploadManagerSession { - manager: UploadManager, +pub(crate) struct UploadManagerSession +where + Error: From, +{ + manager: UploadManager, alias: Option, finished: bool, } -impl UploadManagerSession { - pub(super) fn new(manager: UploadManager) -> Self { +impl UploadManagerSession +where + Error: From, +{ + pub(super) fn new(manager: UploadManager) -> Self { UploadManagerSession { manager, alias: None, @@ -51,7 +55,10 @@ impl Dup { } } -impl Drop for UploadManagerSession { +impl Drop for UploadManagerSession +where + Error: From, +{ fn drop(&mut self) { if self.finished { return; @@ -90,7 +97,10 @@ impl Drop for UploadManagerSession { } } -impl UploadManagerSession { +impl UploadManagerSession +where + Error: From, +{ /// Generate a delete token for an alias #[instrument(skip(self))] pub(crate) async fn delete_token(&self) -> Result { @@ -129,17 +139,13 @@ impl UploadManagerSession { /// Upload the file while preserving the filename, optionally validating the uploaded image #[instrument(skip(self, stream))] - pub(crate) async fn import( + pub(crate) async fn import( mut self, alias: String, content_type: mime::Mime, validate: bool, - mut stream: impl Stream> + Unpin, - ) -> Result - where - Error: From, - E: Unpin + 'static, - { + mut stream: impl Stream> + Unpin, + ) -> Result { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); @@ -158,8 +164,11 @@ impl UploadManagerSession { let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let identifier = self + .manager + .store + .save_async_read(&mut hasher_reader) + .await?; let hash = hasher_reader.finalize_reset().await?; debug!("Storing alias"); @@ -167,7 +176,7 @@ impl UploadManagerSession { self.add_existing_alias(&hash, &alias).await?; debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; + self.save_upload(&identifier, hash, content_type).await?; // Return alias to file Ok(self) @@ -175,13 +184,10 @@ impl UploadManagerSession { /// Upload the file, discarding bytes if it's already present, or saving if it's new #[instrument(skip(self, stream))] - pub(crate) async fn upload( + pub(crate) async fn upload( mut self, - mut stream: impl Stream> + Unpin, - ) -> Result - where - Error: From, - { + mut stream: impl Stream> + Unpin, + ) -> Result { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); @@ -200,15 +206,18 @@ impl UploadManagerSession { let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let identifier = self + .manager + .store + .save_async_read(&mut hasher_reader) + .await?; let hash = hasher_reader.finalize_reset().await?; debug!("Adding alias"); self.add_alias(&hash, content_type.clone()).await?; debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; + self.save_upload(&identifier, hash, content_type).await?; // Return alias to file Ok(self) @@ -217,7 +226,7 @@ impl UploadManagerSession { // check duplicates & store image if new async fn save_upload( &self, - tmpfile: PathBuf, + identifier: &S::Identifier, hash: Hash, content_type: mime::Mime, ) -> Result<(), Error> { @@ -225,15 +234,13 @@ impl UploadManagerSession { // bail early with alias to existing file if this is a duplicate if dup.exists() { - debug!("Duplicate exists, not saving file"); + debug!("Duplicate exists, removing file"); + + self.manager.store.remove(&identifier).await?; return Ok(()); } - // -- WRITE NEW FILE -- - let real_path = self.manager.next_directory()?.join(&name); - - self.manager.store_path(name, &real_path).await?; - crate::safe_move_file(tmpfile, real_path).await?; + self.manager.store_identifier(name, &identifier).await?; Ok(()) } @@ -248,6 +255,7 @@ impl UploadManagerSession { let main_tree = self.manager.inner.main_tree.clone(); let filename = self.next_file(content_type).await?; + let filename2 = filename.clone(); let hash2 = hash.as_slice().to_vec(); debug!("Inserting filename for hash"); @@ -283,13 +291,11 @@ impl UploadManagerSession { async fn next_file(&self, content_type: mime::Mime) -> Result { loop { debug!("Filename generation loop"); - let s: String = Uuid::new_v4().to_string(); + let filename = file_name(Uuid::new_v4(), content_type.clone())?; - let filename = file_name(s, content_type.clone())?; - - let path_tree = self.manager.inner.path_tree.clone(); + let identifier_tree = self.manager.inner.identifier_tree.clone(); let filename2 = filename.clone(); - let filename_exists = web::block(move || path_tree.get(filename2.as_bytes())) + let filename_exists = web::block(move || identifier_tree.get(filename2.as_bytes())) .await?? .is_some(); @@ -363,8 +369,7 @@ impl UploadManagerSession { async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result { loop { debug!("Alias gen loop"); - let s: String = Uuid::new_v4().to_string(); - let alias = file_name(s, content_type.clone())?; + let alias = file_name(Uuid::new_v4(), content_type.clone())?; self.alias = Some(alias.clone()); let res = self.save_alias_hash_mapping(hash, &alias).await?; @@ -402,31 +407,20 @@ impl UploadManagerSession { } } -fn file_name(name: String, content_type: mime::Mime) -> Result { +fn file_name(name: Uuid, content_type: mime::Mime) -> Result { Ok(format!("{}{}", name, to_ext(content_type)?)) } -#[instrument(skip(input))] -async fn safe_save_reader(to: PathBuf, input: &mut (impl AsyncRead + Unpin)) -> Result<(), Error> { - if let Some(path) = to.parent() { - debug!("Creating directory {:?}", path); - tokio::fs::create_dir_all(path.to_owned()).await?; - } - - debug!("Checking if {:?} already exists", to); - if let Err(e) = tokio::fs::metadata(to.clone()).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } +fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> { + if mime == mime::IMAGE_PNG { + Ok(".png") + } else if mime == mime::IMAGE_JPEG { + Ok(".jpg") + } else if mime == crate::video_mp4() { + Ok(".mp4") + } else if mime == crate::image_webp() { + Ok(".webp") } else { - return Err(UploadError::FileExists.into()); + Err(UploadError::UnsupportedFormat.into()) } - - debug!("Writing stream to {:?}", to); - - let mut file = crate::file::File::create(to).await?; - - file.write_from_async_read(input).await?; - - Ok(()) }