diff --git a/Cargo.lock b/Cargo.lock index 0610c2e..872d7c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1185,11 +1185,11 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "pin-project-lite", - "rand", "serde", "serde_json", "sha2", "sled", + "storage-path-generator", "structopt", "thiserror", "time 0.3.3", @@ -1747,6 +1747,14 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "storage-path-generator" +version = "0.1.0" +source = "git+https://git.asonix.dog/asonix/storage-path-generator?branch=main#68f70707b0d04e5429f0c16d71ca329a9d4a9557" +dependencies = [ + "parking_lot", +] + [[package]] name = "strsim" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 3f5ab98..412a284 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,11 +29,11 @@ once_cell = "1.4.0" opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" pin-project-lite = "0.2.7" -rand = "0.8.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9.0" sled = { version = "0.34.6" } +storage-path-generator = { version = "0.1.0", git = "https://git.asonix.dog/asonix/storage-path-generator", branch = "main" } structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } diff --git a/src/error.rs b/src/error.rs index d5c73a8..bbe3c85 100644 --- a/src/error.rs +++ b/src/error.rs @@ -69,6 +69,12 @@ pub(crate) enum UploadError { #[error("Error interacting with filesystem, {0}")] Io(#[from] std::io::Error), + #[error(transparent)] + PathGenerator(#[from] storage_path_generator::PathError), + + #[error(transparent)] + StripPrefix(#[from] std::path::StripPrefixError), + #[error("Failed to acquire the semaphore")] Semaphore, diff --git a/src/main.rs b/src/main.rs index 20124cf..6e6b4ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ use tracing_error::ErrorLayer; use tracing_futures::Instrument; use tracing_log::LogTracer; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry}; +use uuid::Uuid; mod config; mod either; @@ -67,17 +68,7 @@ const HOURS: u32 = 60 * MINUTES; const DAYS: u32 = 24 * HOURS; static TMP_DIR: Lazy = Lazy::new(|| { - use rand::{ - distributions::{Alphanumeric, Distribution}, - thread_rng, - }; - - let mut rng = thread_rng(); - let tmp_nonce = Alphanumeric - .sample_iter(&mut rng) - .take(7) - .map(char::from) - .collect::(); + let tmp_nonce = Uuid::new_v4(); let mut path = std::env::temp_dir(); path.push(format!("pict-rs-{}", tmp_nonce)); @@ -262,15 +253,7 @@ async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), Error> { } pub(crate) fn tmp_file() -> PathBuf { - use rand::distributions::{Alphanumeric, Distribution}; - let limit: usize = 10; - let rng = rand::thread_rng(); - - let s: String = Alphanumeric - .sample_iter(rng) - .take(limit) - .map(char::from) - .collect(); + let s: String = Uuid::new_v4().to_string(); let name = format!("{}.tmp", s); diff --git a/src/migrate/mod.rs b/src/migrate/mod.rs index 6e466e0..8ac65d6 100644 --- a/src/migrate/mod.rs +++ b/src/migrate/mod.rs @@ -106,16 +106,6 @@ pub(crate) fn alias_key_bounds(hash: &[u8]) -> (Vec, Vec) { (start, end) } -pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec, Vec) { - let mut start = hash.to_vec(); - start.extend(&[2]); - - let mut end = hash.to_vec(); - end.extend(&[3]); - - (start, end) -} - pub(crate) fn alias_id_key(alias: &str) -> String { format!("{}/id", alias) } diff --git a/src/upload_manager/mod.rs b/src/upload_manager/mod.rs index 26a6097..83756ba 100644 --- a/src/upload_manager/mod.rs +++ b/src/upload_manager/mod.rs @@ -1,15 +1,17 @@ use crate::{ config::Format, error::{Error, UploadError}, - migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, + migrate::{alias_id_key, alias_key, alias_key_bounds, LatestDb}, }; use actix_web::web; use sha2::Digest; use std::{path::PathBuf, sync::Arc}; +use storage_path_generator::{Generator, Path}; use tracing::{debug, error, info, instrument, warn, Span}; use tracing_futures::Instrument; mod hasher; +mod restructure; mod session; pub(super) use session::UploadManagerSession; @@ -22,17 +24,22 @@ pub(super) use session::UploadManagerSession; // - Main Tree // - hash -> filename // - hash 0 u64(id) -> alias -// - hash 2 variant path -> variant path +// - DEPRECATED: +// - hash 2 variant path -> variant path +// - hash 2 vairant path details -> details // - Filename Tree // - filename -> hash +// - Details Tree +// - filename / relative path -> details // - Path Tree // - filename -> relative path -// - filename / variant operation path -> relative path +// - filename / relative variant path -> relative variant path // - Settings Tree // - last-path -> last generated path -// - fs-restructure-01-started -> bool // - fs-restructure-01-complete -> bool +const GENERATOR_KEY: &'static [u8] = b"last-path"; + #[derive(Clone)] pub struct UploadManager { inner: Arc, @@ -45,8 +52,10 @@ struct UploadManagerInner { alias_tree: sled::Tree, filename_tree: sled::Tree, main_tree: sled::Tree, + details_tree: sled::Tree, path_tree: sled::Tree, settings_tree: sled::Tree, + path_gen: Generator, db: sled::Db, } @@ -86,6 +95,8 @@ impl UploadManager { let settings_tree = db.open_tree("settings")?; + let path_gen = init_generator(&settings_tree)?; + let manager = UploadManager { inner: Arc::new(UploadManagerInner { format, @@ -93,31 +104,35 @@ impl UploadManager { image_dir: root_dir, alias_tree: db.open_tree("alias")?, filename_tree: db.open_tree("filename")?, + details_tree: db.open_tree("details")?, main_tree: db.open_tree("main")?, path_tree: db.open_tree("path")?, settings_tree, + path_gen, db, }), }; + manager.restructure().await?; + Ok(manager) } /// Store the path to a generated image variant so we can easily clean it up later #[instrument(skip(self))] pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> { - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + let path_bytes = self + .generalize_path(&path)? + .to_str() + .ok_or(UploadError::Path)? + .as_bytes() + .to_vec(); - let fname_tree = self.inner.filename_tree.clone(); - debug!("Getting hash"); - let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes())) - .await?? - .ok_or(UploadError::MissingFilename)?; + let key = self.variant_key(&path, &filename)?; + let path_tree = self.inner.path_tree.clone(); - let key = variant_key(&hash, &path_string); - let main_tree = self.inner.main_tree.clone(); debug!("Storing variant"); - web::block(move || main_tree.insert(key, path_string.as_bytes())).await??; + web::block(move || path_tree.insert(key, path_bytes)).await??; debug!("Stored variant"); Ok(()) @@ -130,18 +145,11 @@ impl UploadManager { path: PathBuf, filename: String, ) -> Result, Error> { - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + let key = self.details_key(&path, &filename)?; + let details_tree = self.inner.details_tree.clone(); - let fname_tree = self.inner.filename_tree.clone(); - debug!("Getting hash"); - let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes())) - .await?? - .ok_or(UploadError::MissingFilename)?; - - let key = variant_details_key(&hash, &path_string); - let main_tree = self.inner.main_tree.clone(); debug!("Getting details"); - let opt = match web::block(move || main_tree.get(key)).await?? { + let opt = match web::block(move || details_tree.get(key)).await?? { Some(ivec) => match serde_json::from_slice(&ivec) { Ok(details) => Some(details), Err(_) => None, @@ -160,19 +168,12 @@ impl UploadManager { filename: String, details: &Details, ) -> Result<(), Error> { - let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + let key = self.details_key(&path, &filename)?; + let details_tree = self.inner.details_tree.clone(); + let details_value = serde_json::to_vec(details)?; - let fname_tree = self.inner.filename_tree.clone(); - debug!("Getting hash"); - let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes())) - .await?? - .ok_or(UploadError::MissingFilename)?; - - let key = variant_details_key(&hash, &path_string); - let main_tree = self.inner.main_tree.clone(); - let details_value = serde_json::to_string(details)?; debug!("Storing details"); - web::block(move || main_tree.insert(key, details_value.as_bytes())).await??; + web::block(move || details_tree.insert(key, details_value)).await??; debug!("Stored details"); Ok(()) @@ -198,6 +199,21 @@ impl UploadManager { self.aliases_by_hash(&hash).await } + fn next_directory(&self) -> Result { + let path = self.inner.path_gen.next(); + + self.inner + .settings_tree + .insert(GENERATOR_KEY, path.to_be_bytes())?; + + let mut target_path = self.image_dir(); + for dir in path.to_strings() { + target_path.push(dir) + } + + Ok(target_path) + } + async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result, Error> { let (start, end) = alias_key_bounds(hash); let main_tree = self.inner.main_tree.clone(); @@ -375,41 +391,44 @@ impl UploadManager { errors.push(e.into()); } + let filename2 = filename.clone(); let fname_tree = self.inner.filename_tree.clone(); debug!("Deleting filename -> hash mapping"); - let hash = web::block(move || fname_tree.remove(filename)) - .await?? - .ok_or(UploadError::MissingFile)?; + web::block(move || fname_tree.remove(filename2)).await??; - let (start, end) = variant_key_bounds(&hash); - let main_tree = self.inner.main_tree.clone(); + let path_prefix = filename.clone(); + let path_tree = self.inner.path_tree.clone(); debug!("Fetching file variants"); - let keys = web::block(move || { - let mut keys = Vec::new(); - for key in main_tree.range(start..end).keys() { - keys.push(key?.to_owned()); - } - - Ok(keys) as Result, Error> + let paths = web::block(move || { + path_tree + .scan_prefix(path_prefix) + .values() + .collect::, sled::Error>>() }) .await??; - debug!("{} files prepared for deletion", keys.len()); + debug!("{} files prepared for deletion", paths.len()); - for key in keys { - let main_tree = self.inner.main_tree.clone(); - if let Some(path) = web::block(move || main_tree.remove(key)).await?? { - let s = String::from_utf8_lossy(&path); - debug!("Deleting {}", s); - // ignore json objects - if !s.starts_with('{') { - if let Err(e) = remove_path(path).await { - errors.push(e); - } - } + for path in paths { + let s = String::from_utf8_lossy(&path); + debug!("Deleting {}", s); + if let Err(e) = remove_path(path).await { + errors.push(e); } } + let path_prefix = filename.clone(); + let path_tree = self.inner.path_tree.clone(); + debug!("Deleting path info"); + web::block(move || { + for res in path_tree.scan_prefix(path_prefix).keys() { + let key = res?; + path_tree.remove(key)?; + } + Ok(()) as Result<(), Error> + }) + .await??; + for error in errors { error!("Error deleting files, {}", error); } @@ -473,6 +492,16 @@ impl FilenameIVec { } } +fn init_generator(settings: &sled::Tree) -> Result { + if let Some(ivec) = settings.get(GENERATOR_KEY)? { + Ok(Generator::from_existing(Path::from_be_bytes( + ivec.to_vec(), + )?)) + } else { + Ok(Generator::new()) + } +} + async fn remove_path(path: sled::IVec) -> Result<(), Error> { let path_string = String::from_utf8(path.to_vec())?; tokio::fs::remove_file(path_string).await?; @@ -490,21 +519,6 @@ fn delete_key(alias: &str) -> String { format!("{}/delete", alias) } -fn variant_key(hash: &[u8], path: &str) -> Vec { - let mut key = hash.to_vec(); - key.extend(&[2]); - key.extend(path.as_bytes()); - key -} - -fn variant_details_key(hash: &[u8], path: &str) -> Vec { - let mut key = hash.to_vec(); - key.extend(&[2]); - key.extend(path.as_bytes()); - key.extend(b"details"); - key -} - impl std::fmt::Debug for UploadManager { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("UploadManager").finish() diff --git a/src/upload_manager/restructure.rs b/src/upload_manager/restructure.rs new file mode 100644 index 0000000..e01c881 --- /dev/null +++ b/src/upload_manager/restructure.rs @@ -0,0 +1,145 @@ +use crate::{ + error::{Error, UploadError}, + safe_move_file, + upload_manager::UploadManager, +}; +use std::path::{Path, PathBuf}; + +const RESTRUCTURE_COMPLETE: &'static [u8] = b"fs-restructure-01-complete"; +const DETAILS: &'static [u8] = b"details"; + +impl UploadManager { + #[tracing::instrument(skip(self))] + pub(super) async fn restructure(&self) -> Result<(), Error> { + if self.restructure_complete()? { + return Ok(()); + } + + for res in self.inner.filename_tree.iter() { + let (filename, hash) = res?; + let filename = String::from_utf8(filename.to_vec())?; + tracing::info!("Migrating {}", filename); + + let mut file_path = self.image_dir(); + file_path.push(filename.clone()); + + if tokio::fs::metadata(&file_path).await.is_ok() { + let mut target_path = self.next_directory()?; + target_path.push(filename.clone()); + + let target_path_bytes = self + .generalize_path(&target_path)? + .to_str() + .ok_or(UploadError::Path)? + .as_bytes() + .to_vec(); + + self.inner + .path_tree + .insert(filename.as_bytes(), target_path_bytes)?; + + safe_move_file(file_path, target_path).await?; + } + + let (start, end) = variant_key_bounds(&hash); + + for res in self.inner.main_tree.range(start..end) { + let (hash_variant_key, variant_path_or_details) = res?; + + if hash_variant_key.ends_with(DETAILS) { + let details = variant_path_or_details; + + let start_index = hash.len() + 1; + let end_index = hash_variant_key.len() - DETAILS.len(); + let path_bytes = &hash_variant_key[start_index..end_index]; + + let variant_path = PathBuf::from(String::from_utf8(path_bytes.to_vec())?); + let key = self.details_key(&variant_path, &filename)?; + + self.inner.details_tree.insert(key, details)?; + } else { + let variant_path = + PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?); + if tokio::fs::metadata(&variant_path).await.is_ok() { + let mut target_path = self.next_directory()?; + target_path.push(filename.clone()); + + let relative_target_path_bytes = self + .generalize_path(&target_path)? + .to_str() + .ok_or(UploadError::Path)? + .as_bytes() + .to_vec(); + + let variant_key = self.variant_key(&target_path, &filename)?; + + self.inner + .path_tree + .insert(variant_key, relative_target_path_bytes)?; + + safe_move_file(variant_path, target_path).await?; + } + } + + self.inner.main_tree.remove(hash_variant_key)?; + } + } + + self.mark_restructure_complete()?; + Ok(()) + } + + fn restructure_complete(&self) -> Result { + Ok(!self + .inner + .settings_tree + .get(RESTRUCTURE_COMPLETE)? + .is_some()) + } + + fn mark_restructure_complete(&self) -> Result<(), Error> { + self.inner + .settings_tree + .insert(RESTRUCTURE_COMPLETE, b"true")?; + + Ok(()) + } + + pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> { + Ok(path.strip_prefix(&self.inner.image_dir)?) + } + + pub(super) fn details_key( + &self, + variant_path: &Path, + filename: &str, + ) -> Result, Error> { + let path = self.generalize_path(variant_path)?; + let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + + let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); + Ok(vec) + } + + pub(super) fn variant_key( + &self, + variant_path: &Path, + filename: &str, + ) -> Result, Error> { + let path = self.generalize_path(variant_path)?; + let path_string = path.to_str().ok_or(UploadError::Path)?.to_string(); + + let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); + Ok(vec) + } +} + +pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec, Vec) { + let mut start = hash.to_vec(); + start.extend(&[2]); + + let mut end = hash.to_vec(); + end.extend(&[3]); + + (start, end) +} diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 9b5b9b9..21e5cf4 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -14,6 +14,7 @@ use std::path::PathBuf; use tokio::io::AsyncRead; use tracing::{debug, instrument, warn, Span}; use tracing_futures::Instrument; +use uuid::Uuid; type UploadStream = LocalBoxStream<'static, Result>; @@ -98,13 +99,7 @@ impl UploadManagerSession { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; debug!("Generating delete token"); - use rand::distributions::{Alphanumeric, Distribution}; - let rng = rand::thread_rng(); - let s: String = Alphanumeric - .sample_iter(rng) - .take(10) - .map(char::from) - .collect(); + let s: String = Uuid::new_v4().to_string(); let delete_token = s.clone(); debug!("Saving delete token"); @@ -286,17 +281,10 @@ impl UploadManagerSession { #[instrument(skip(self, content_type))] async fn next_file(&self, content_type: mime::Mime) -> Result { let image_dir = self.manager.image_dir(); - use rand::distributions::{Alphanumeric, Distribution}; - let mut limit: usize = 10; - let mut rng = rand::thread_rng(); loop { debug!("Filename generation loop"); let mut path = image_dir.clone(); - let s: String = Alphanumeric - .sample_iter(&mut rng) - .take(limit) - .map(char::from) - .collect(); + let s: String = Uuid::new_v4().to_string(); let filename = file_name(s, content_type.clone())?; @@ -311,8 +299,6 @@ impl UploadManagerSession { } debug!("Filename exists, trying again"); - - limit += 1; } } @@ -376,16 +362,9 @@ impl UploadManagerSession { // Generate an alias to the file #[instrument(skip(self, hash, content_type))] async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result { - use rand::distributions::{Alphanumeric, Distribution}; - let mut limit: usize = 10; - let mut rng = rand::thread_rng(); loop { debug!("Alias gen loop"); - let s: String = Alphanumeric - .sample_iter(&mut rng) - .take(limit) - .map(char::from) - .collect(); + let s: String = Uuid::new_v4().to_string(); let alias = file_name(s, content_type.clone())?; self.alias = Some(alias.clone()); @@ -395,8 +374,6 @@ impl UploadManagerSession { return Ok(alias); } debug!("Alias exists, regenning"); - - limit += 1; } }