diff --git a/Cargo.lock b/Cargo.lock index 76fc925..33fed7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1610,7 +1610,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.3.0-rc.7" +version = "0.4.0-alpha.1" dependencies = [ "actix-form-data", "actix-rt", diff --git a/Cargo.toml b/Cargo.toml index 478f4d1..4857e63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.3.0-rc.7" +version = "0.4.0-alpha.1" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" diff --git a/src/error.rs b/src/error.rs index 7e02205..faca216 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,39 +37,33 @@ where } } -impl From> for Error { - fn from(e: sled::transaction::TransactionError) -> Self { - match e { - sled::transaction::TransactionError::Abort(t) => t, - sled::transaction::TransactionError::Storage(e) => e.into(), - } - } -} - #[derive(Debug, thiserror::Error)] pub(crate) enum UploadError { - #[error("Couln't upload file, {0}")] + #[error("Couln't upload file")] Upload(#[from] actix_form_data::Error), - #[error("Error in DB, {0}")] - Db(#[from] sled::Error), + #[error("Error in DB")] + Sled(#[from] crate::repo::sled::Error), - #[error("Error parsing string, {0}")] + #[error("Error in old sled DB")] + OldSled(#[from] ::sled::Error), + + #[error("Error parsing string")] ParseString(#[from] std::string::FromUtf8Error), - #[error("Error interacting with filesystem, {0}")] + #[error("Error interacting with filesystem")] Io(#[from] std::io::Error), - #[error(transparent)] + #[error("Error generating path")] PathGenerator(#[from] storage_path_generator::PathError), - #[error(transparent)] + #[error("Error stripping prefix")] StripPrefix(#[from] std::path::StripPrefixError), - #[error(transparent)] + #[error("Error storing file")] FileStore(#[from] crate::store::file_store::FileError), - #[error(transparent)] + #[error("Error storing object")] ObjectStore(#[from] crate::store::object_store::ObjectError), #[error("Provided process path is invalid")] @@ -87,9 +81,6 @@ pub(crate) enum UploadError { #[error("Requested a file that doesn't exist")] MissingAlias, - #[error("Alias directed to missing file")] - MissingFile, - #[error("Provided token did not match expected token")] InvalidToken, @@ -102,7 +93,7 @@ pub(crate) enum UploadError { #[error("Unable to download image, bad response {0}")] Download(actix_web::http::StatusCode), - #[error("Unable to download image, {0}")] + #[error("Unable to download image")] Payload(#[from] awc::error::PayloadError), #[error("Unable to send request, {0}")] @@ -117,13 +108,13 @@ pub(crate) enum UploadError { #[error("Tried to save an image with an already-taken name")] DuplicateAlias, - #[error("{0}")] + #[error("Error in json")] Json(#[from] serde_json::Error), #[error("Range header not satisfiable")] Range, - #[error(transparent)] + #[error("Hit limit")] Limit(#[from] super::LimitError), } diff --git a/src/magick.rs b/src/magick.rs index 327052e..772b333 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -2,6 +2,7 @@ use crate::{ config::Format, error::{Error, UploadError}, process::Process, + repo::Alias, store::Store, }; use actix_web::web::Bytes; @@ -11,8 +12,9 @@ use tokio::{ }; use tracing::instrument; -pub(crate) fn details_hint(filename: &str) -> Option { - if filename.ends_with(".mp4") { +pub(crate) fn details_hint(alias: &Alias) -> Option { + let ext = alias.extension()?; + if ext.ends_with(".mp4") { Some(ValidInputType::Mp4) } else { None diff --git a/src/main.rs b/src/main.rs index 305df02..d7acc11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,6 +57,7 @@ use self::{ magick::details_hint, middleware::{Deadline, Internal}, migrate::LatestDb, + repo::{Alias, DeleteToken, Repo}, store::{file_store::FileStore, object_store::ObjectStore, Store}, upload_manager::{UploadManager, UploadManagerSession}, }; @@ -96,30 +97,26 @@ where info!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token().await?; - let name = manager.from_alias(alias.to_owned()).await?; - let identifier = manager.identifier_from_filename::(name.clone()).await?; - - let details = manager.variant_details(&identifier, name.clone()).await?; + let identifier = manager.identifier_from_alias::(alias).await?; + let details = manager.details(&identifier).await?; let details = if let Some(details) = details { debug!("details exist"); details } else { debug!("generating new details from {:?}", identifier); - let hint = details_hint(&name); + let hint = details_hint(alias); let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - debug!("storing details for {:?} {}", identifier, name); - manager - .store_variant_details(&identifier, name, &new_details) - .await?; + debug!("storing details for {:?}", identifier); + manager.store_details(&identifier, &new_details).await?; debug!("stored"); new_details }; files.push(serde_json::json!({ - "file": alias, - "delete_token": delete_token, + "file": alias.to_string(), + "delete_token": delete_token.to_string(), "details": details, })); } @@ -222,19 +219,16 @@ where drop(permit); let delete_token = session.delete_token().await?; - let name = manager.from_alias(alias.to_owned()).await?; - let identifier = manager.identifier_from_filename::(name.clone()).await?; + let identifier = manager.identifier_from_alias::(&alias).await?; - let details = manager.variant_details(&identifier, name.clone()).await?; + let details = manager.details(&identifier).await?; let details = if let Some(details) = details { details } else { - let hint = details_hint(&name); + let hint = details_hint(&alias); let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager - .store_variant_details(&identifier, name, &new_details) - .await?; + manager.store_details(&identifier, &new_details).await?; new_details }; @@ -242,8 +236,8 @@ where Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": [{ - "file": alias, - "delete_token": delete_token, + "file": alias.to_string(), + "delete_token": delete_token.to_string(), "details": details, }] }))) @@ -261,19 +255,21 @@ where { let (alias, token) = path_entries.into_inner(); - manager.delete((**store).clone(), token, alias).await?; + let alias = Alias::from_existing(&alias); + let token = DeleteToken::from_existing(&token); + + manager.delete((**store).clone(), alias, token).await?; Ok(HttpResponse::NoContent().finish()) } type ProcessQuery = Vec<(String, String)>; -async fn prepare_process( +fn prepare_process( query: web::Query, ext: &str, - manager: &UploadManager, filters: &Option>, -) -> Result<(Format, String, PathBuf, Vec), Error> { +) -> Result<(Format, Alias, PathBuf, Vec), Error> { let (alias, operations) = query .into_inner() @@ -291,7 +287,7 @@ async fn prepare_process( return Err(UploadError::MissingFilename.into()); } - let name = manager.from_alias(alias).await?; + let alias = Alias::from_existing(&alias); let operations = if let Some(filters) = filters.as_ref() { operations @@ -305,12 +301,10 @@ async fn prepare_process( let format = ext .parse::() .map_err(|_| UploadError::UnsupportedFormat)?; - let processed_name = format!("{}.{}", name, ext); - let (thumbnail_path, thumbnail_args) = - self::processor::build_chain(&operations, processed_name)?; + let (thumbnail_path, thumbnail_args) = self::processor::build_chain(&operations)?; - Ok((format, name, thumbnail_path, thumbnail_args)) + Ok((format, alias, thumbnail_path, thumbnail_args)) } #[instrument(name = "Fetching derived details", skip(manager, filters))] @@ -324,15 +318,14 @@ async fn process_details( where Error: From, { - let (_, name, thumbnail_path, _) = - prepare_process(query, ext.as_str(), &manager, &filters).await?; + let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?; let identifier = manager - .variant_identifier::(&thumbnail_path, &name) + .variant_identifier::(&alias, &thumbnail_path) .await? .ok_or(UploadError::MissingAlias)?; - let details = manager.variant_details(&identifier, name).await?; + let details = manager.details(&identifier).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -352,24 +345,22 @@ async fn process( where Error: From, { - let (format, name, thumbnail_path, thumbnail_args) = - prepare_process(query, ext.as_str(), &manager, &filters).await?; + let (format, alias, thumbnail_path, thumbnail_args) = + prepare_process(query, ext.as_str(), &filters)?; let identifier_opt = manager - .variant_identifier::(&thumbnail_path, &name) + .variant_identifier::(&alias, &thumbnail_path) .await?; if let Some(identifier) = identifier_opt { - let details_opt = manager.variant_details(&identifier, name.clone()).await?; + let details_opt = manager.details(&identifier).await?; let details = if let Some(details) = details_opt { details } else { - let hint = details_hint(&name); + let hint = details_hint(&alias); let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager - .store_variant_details(&identifier, name, &details) - .await?; + manager.store_details(&identifier, &details).await?; details }; @@ -377,7 +368,7 @@ where } let identifier = manager - .still_identifier_from_filename((**store).clone(), name.clone()) + .still_identifier_from_alias((**store).clone(), &alias) .await?; let thumbnail_path2 = thumbnail_path.clone(); @@ -405,29 +396,27 @@ where parent: None, "Saving variant information", path = tracing::field::debug(&thumbnail_path), - name = tracing::field::display(&name), + name = tracing::field::display(&alias), ); save_span.follows_from(Span::current()); let details2 = details.clone(); let bytes2 = bytes.clone(); + let alias2 = alias.clone(); actix_rt::spawn( async move { - let identifier = match store.save_bytes(bytes2, &name).await { + let identifier = match store.save_bytes(bytes2).await { Ok(identifier) => identifier, Err(e) => { tracing::warn!("Failed to generate directory path: {}", e); return; } }; - if let Err(e) = manager - .store_variant_details(&identifier, name.clone(), &details2) - .await - { + if let Err(e) = manager.store_details(&identifier, &details2).await { tracing::warn!("Error saving variant details: {}", e); return; } if let Err(e) = manager - .store_variant(Some(&thumbnail_path), &identifier, &name) + .store_variant(&alias2, &thumbnail_path, &identifier) .await { tracing::warn!("Error saving variant info: {}", e); @@ -483,19 +472,19 @@ async fn details( where Error: From, { - let name = manager.from_alias(alias.into_inner()).await?; - let identifier = manager.identifier_from_filename::(name.clone()).await?; + let alias = alias.into_inner(); + let alias = Alias::from_existing(&alias); - let details = manager.variant_details(&identifier, name.clone()).await?; + let identifier = manager.identifier_from_alias::(&alias).await?; + + let details = manager.details(&identifier).await?; let details = if let Some(details) = details { details } else { - let hint = details_hint(&name); + let hint = details_hint(&alias); let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager - .store_variant_details(&identifier, name, &new_details) - .await?; + manager.store_details(&identifier, &new_details).await?; new_details }; @@ -513,19 +502,18 @@ async fn serve( where Error: From, { - let name = manager.from_alias(alias.into_inner()).await?; - let identifier = manager.identifier_from_filename::(name.clone()).await?; + let alias = alias.into_inner(); + let alias = Alias::from_existing(&alias); + let identifier = manager.identifier_from_alias::(&alias).await?; - let details = manager.variant_details(&identifier, name.clone()).await?; + let details = manager.details(&identifier).await?; let details = if let Some(details) = details { details } else { - let hint = details_hint(&name); + let hint = details_hint(&alias); let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager - .store_variant_details(&identifier, name, &details) - .await?; + manager.store_details(&identifier, &details).await?; details }; @@ -605,25 +593,21 @@ where } #[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum FileOrAlias { - File { file: String }, - Alias { alias: String }, +struct AliasQuery { + alias: String, } #[instrument(name = "Purging file", skip(upload_manager))] async fn purge( - query: web::Query, + query: web::Query, upload_manager: web::Data, store: web::Data, ) -> Result where Error: From, { - let aliases = match query.into_inner() { - FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, - FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, - }; + let alias = Alias::from_existing(&query.alias); + let aliases = upload_manager.aliases_by_alias(&alias).await?; for alias in aliases.iter() { upload_manager @@ -633,49 +617,25 @@ where Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", - "aliases": aliases + "aliases": aliases.iter().map(|a| a.to_string()).collect::>() }))) } #[instrument(name = "Fetching aliases", skip(upload_manager))] async fn aliases( - query: web::Query, + query: web::Query, upload_manager: web::Data, store: web::Data, ) -> Result where Error: From, { - let aliases = match query.into_inner() { - FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?, - FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?, - }; + let alias = Alias::from_existing(&query.alias); + let aliases = upload_manager.aliases_by_alias(&alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", - "aliases": aliases, - }))) -} - -#[derive(Debug, serde::Deserialize)] -struct ByAlias { - alias: String, -} - -#[instrument(name = "Fetching filename", skip(upload_manager))] -async fn filename_by_alias( - query: web::Query, - upload_manager: web::Data, - store: web::Data, -) -> Result -where - Error: From, -{ - let filename = upload_manager.from_alias(query.into_inner().alias).await?; - - Ok(HttpResponse::Ok().json(&serde_json::json!({ - "msg": "ok", - "filename": filename, + "aliases": aliases.iter().map(|a| a.to_string()).collect::>() }))) } @@ -817,10 +777,7 @@ where .route(web::post().to(upload::)), ) .service(web::resource("/purge").route(web::post().to(purge::))) - .service(web::resource("/aliases").route(web::get().to(aliases::))) - .service( - web::resource("/filename").route(web::get().to(filename_by_alias::)), - ), + .service(web::resource("/aliases").route(web::get().to(aliases::))), ) }) .bind(CONFIG.bind_address())? @@ -834,7 +791,7 @@ where async fn migrate_inner( manager: &UploadManager, - db: &sled::Db, + repo: &Repo, from: S1, to: &config::Storage, ) -> anyhow::Result<()> @@ -844,9 +801,7 @@ where { match to { config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { - let to = FileStore::build(path.clone(), db)?; - manager.restructure(&to).await?; - + let to = FileStore::build(path.clone(), repo.clone()).await?; manager.migrate_store::(from, to).await?; } config::Storage::ObjectStorage(RequiredObjectStorage { @@ -864,9 +819,10 @@ where secret_key.clone(), security_token.clone(), session_token.clone(), - db, + repo.clone(), build_reqwest_client()?, - )?; + ) + .await?; manager.migrate_store::(from, to).await?; } @@ -883,13 +839,12 @@ async fn main() -> anyhow::Result<()> { CONFIG.console_buffer_capacity(), )?; + let repo = Repo::open(CONFIG.repo())?; + let db = LatestDb::exists(CONFIG.data_dir()).migrate()?; - - let repo = self::repo::Repo::open(CONFIG.repo())?; - repo.from_db(db).await?; - let manager = UploadManager::new(db.clone(), CONFIG.format()).await?; + let manager = UploadManager::new(repo.clone(), CONFIG.format()).await?; match CONFIG.command()? { CommandConfig::Run => (), @@ -901,10 +856,8 @@ async fn main() -> anyhow::Result<()> { match from { config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { - let from = FileStore::build(path.clone(), &db)?; - manager.restructure(&from).await?; - - migrate_inner(&manager, &db, from, &to).await?; + let from = FileStore::build(path.clone(), repo.clone()).await?; + migrate_inner(&manager, &repo, from, &to).await?; } config::Storage::ObjectStorage(RequiredObjectStorage { bucket_name, @@ -921,11 +874,12 @@ async fn main() -> anyhow::Result<()> { secret_key, security_token, session_token, - &db, + repo.clone(), build_reqwest_client()?, - )?; + ) + .await?; - migrate_inner(&manager, &db, from, &to).await?; + migrate_inner(&manager, &repo, from, &to).await?; } } @@ -935,9 +889,7 @@ async fn main() -> anyhow::Result<()> { match CONFIG.store()? { config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { - let store = FileStore::build(path, &db)?; - manager.restructure(&store).await?; - + let store = FileStore::build(path, repo).await?; launch(manager, store).await } config::Storage::ObjectStorage(RequiredObjectStorage { @@ -955,9 +907,10 @@ async fn main() -> anyhow::Result<()> { secret_key, security_token, session_token, - &db, + repo, build_reqwest_client()?, - )?; + ) + .await?; launch(manager, store).await } diff --git a/src/migrate.rs b/src/migrate.rs index 8ac65d6..f33ffb5 100644 --- a/src/migrate.rs +++ b/src/migrate.rs @@ -95,26 +95,3 @@ impl DbVersion { } } } - -pub(crate) fn alias_key_bounds(hash: &[u8]) -> (Vec, Vec) { - let mut start = hash.to_vec(); - start.extend(&[0]); - - let mut end = hash.to_vec(); - end.extend(&[1]); - - (start, end) -} - -pub(crate) fn alias_id_key(alias: &str) -> String { - format!("{}/id", alias) -} - -pub(crate) fn alias_key(hash: &[u8], id: &str) -> Vec { - let mut key = hash.to_vec(); - // add a separator to the key between the hash and the ID - key.extend(&[0]); - key.extend(id.as_bytes()); - - key -} diff --git a/src/processor.rs b/src/processor.rs index feb3c09..0e01a5d 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -20,10 +20,7 @@ pub(crate) struct Crop(usize, usize); pub(crate) struct Blur(f64); #[instrument] -pub(crate) fn build_chain( - args: &[(String, String)], - filename: String, -) -> Result<(PathBuf, Vec), Error> { +pub(crate) fn build_chain(args: &[(String, String)]) -> Result<(PathBuf, Vec), Error> { fn parse(key: &str, value: &str) -> Result, UploadError> { if key == P::NAME { return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?)); @@ -56,7 +53,7 @@ pub(crate) fn build_chain( } })?; - Ok((path.join(filename), args)) + Ok((path, args)) } impl Processor for Identity { diff --git a/src/repo.rs b/src/repo.rs index 13d1b78..b7fcf37 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,105 +1,131 @@ use crate::config::RequiredSledRepo; use crate::{config::Repository, details::Details, store::Identifier}; use futures_util::Stream; +use tracing::debug; use uuid::Uuid; mod old; pub(crate) mod sled; -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum Repo { Sled(self::sled::SledRepo), } -#[derive(Clone, Debug)] -pub(crate) struct Alias { - id: Uuid, - extension: String, +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum MaybeUuid { + Uuid(Uuid), + Name(String), } +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct Alias { + id: MaybeUuid, + extension: Option, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct DeleteToken { - id: Uuid, + id: MaybeUuid, } pub(crate) struct AlreadyExists; -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync + 'static; async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>; async fn get(&self, key: &'static [u8]) -> Result, Self::Error>; async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>; } -#[async_trait::async_trait] -pub(crate) trait IdentifierRepo { - type Error: std::error::Error; +#[async_trait::async_trait(?Send)] +pub(crate) trait IdentifierRepo { + type Error: std::error::Error + Send + Sync + 'static; - async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>; - async fn details(&self, identifier: I) -> Result, Self::Error>; + async fn relate_details( + &self, + identifier: &I, + details: &Details, + ) -> Result<(), Self::Error>; + async fn details(&self, identifier: &I) -> Result, Self::Error>; - async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>; + async fn cleanup(&self, identifier: &I) -> Result<(), Self::Error>; } -#[async_trait::async_trait] -pub(crate) trait HashRepo { +#[async_trait::async_trait(?Send)] +pub(crate) trait HashRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync + 'static; type Stream: Stream>; async fn hashes(&self) -> Self::Stream; async fn create(&self, hash: Self::Bytes) -> Result, Self::Error>; - async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; - async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>; + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>; async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error>; - async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>; - async fn identifier(&self, hash: Self::Bytes) -> Result; + async fn relate_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), Self::Error>; + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result; - async fn relate_variant_identifier( + async fn relate_variant_identifier( &self, hash: Self::Bytes, variant: String, - identifier: I, + identifier: &I, ) -> Result<(), Self::Error>; - async fn variant_identifier( + async fn variant_identifier( &self, hash: Self::Bytes, variant: String, ) -> Result, Self::Error>; - - async fn relate_motion_identifier( + async fn variants( &self, hash: Self::Bytes, - identifier: I, + ) -> Result, Self::Error>; + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: &I, ) -> Result<(), Self::Error>; - async fn motion_identifier(&self, hash: Self::Bytes) -> Result, Self::Error>; + async fn motion_identifier( + &self, + hash: Self::Bytes, + ) -> Result, Self::Error>; async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>; } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync + 'static; - async fn create(&self, alias: Alias) -> Result, Self::Error>; + async fn create(&self, alias: &Alias) -> Result, Self::Error>; async fn relate_delete_token( &self, - alias: Alias, - delete_token: DeleteToken, + alias: &Alias, + delete_token: &DeleteToken, ) -> Result, Self::Error>; - async fn delete_token(&self, alias: Alias) -> Result; + async fn delete_token(&self, alias: &Alias) -> Result; - async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>; - async fn hash(&self, alias: Alias) -> Result; + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error>; + async fn hash(&self, alias: &Alias) -> Result; - async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>; + async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error>; } impl Repo { @@ -167,72 +193,134 @@ const GENERATOR_KEY: &[u8] = b"last-path"; async fn migrate_hash(repo: &T, old: &old::Old, hash: ::sled::IVec) -> anyhow::Result<()> where - T: IdentifierRepo<::sled::IVec>, - >::Error: Send + Sync + 'static, - T: HashRepo<::sled::IVec>, - >::Error: Send + Sync + 'static, - T: AliasRepo, - ::Error: Send + Sync + 'static, - T: SettingsRepo, - ::Error: Send + Sync + 'static, + T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo, { - HashRepo::create(repo, hash.to_vec().into()).await?; + if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() { + debug!("Duplicate hash detected"); + return Ok(()); + } - let main_ident = old.main_identifier(&hash)?; + let main_ident = old.main_identifier(&hash)?.to_vec(); - HashRepo::relate_identifier(repo, hash.to_vec().into(), main_ident.clone()).await?; + repo.relate_identifier(hash.to_vec().into(), &main_ident) + .await?; for alias in old.aliases(&hash) { - if let Ok(Ok(())) = AliasRepo::create(repo, alias.clone()).await { - let _ = HashRepo::relate_alias(repo, hash.to_vec().into(), alias.clone()).await; - let _ = AliasRepo::relate_hash(repo, alias.clone(), hash.to_vec().into()).await; + if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await { + let _ = repo.relate_alias(hash.to_vec().into(), &alias).await; + let _ = repo.relate_hash(&alias, hash.to_vec().into()).await; if let Ok(Some(delete_token)) = old.delete_token(&alias) { - let _ = AliasRepo::relate_delete_token(repo, alias, delete_token).await; + let _ = repo.relate_delete_token(&alias, &delete_token).await; } } } if let Ok(Some(identifier)) = old.motion_identifier(&hash) { - HashRepo::relate_motion_identifier(repo, hash.to_vec().into(), identifier).await; + let _ = repo + .relate_motion_identifier(hash.to_vec().into(), &identifier.to_vec()) + .await; } - for (variant, identifier) in old.variants(&hash)? { - let _ = - HashRepo::relate_variant_identifier(repo, hash.to_vec().into(), variant, identifier) - .await; + for (variant_path, identifier) in old.variants(&hash)? { + let variant = variant_path.to_string_lossy().to_string(); + + let _ = repo + .relate_variant_identifier(hash.to_vec().into(), variant, &identifier.to_vec()) + .await; } for (identifier, details) in old.details(&hash)? { - let _ = IdentifierRepo::relate_details(repo, identifier, details).await; + let _ = repo.relate_details(&identifier.to_vec(), &details).await; } if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) { - SettingsRepo::set(repo, STORE_MIGRATION_PROGRESS, value.to_vec().into()).await?; + repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into()) + .await?; } if let Ok(Some(value)) = old.setting(GENERATOR_KEY) { - SettingsRepo::set(repo, GENERATOR_KEY, value.to_vec().into()).await?; + repo.set(GENERATOR_KEY, value.to_vec().into()).await?; } Ok(()) } +impl MaybeUuid { + fn from_str(s: &str) -> Self { + if let Ok(uuid) = Uuid::parse_str(s) { + MaybeUuid::Uuid(uuid) + } else { + MaybeUuid::Name(s.into()) + } + } + + fn as_bytes(&self) -> &[u8] { + match self { + Self::Uuid(uuid) => &uuid.as_bytes()[..], + Self::Name(name) => name.as_bytes(), + } + } +} + +fn split_at_dot(s: &str) -> Option<(&str, &str)> { + let index = s.find('.')?; + + Some(s.split_at(index)) +} + impl Alias { + pub(crate) fn generate(extension: String) -> Self { + Alias { + id: MaybeUuid::Uuid(Uuid::new_v4()), + extension: Some(extension), + } + } + + pub(crate) fn from_existing(alias: &str) -> Self { + if let Some((start, end)) = split_at_dot(alias) { + Alias { + id: MaybeUuid::from_str(start), + extension: Some(end.into()), + } + } else { + Alias { + id: MaybeUuid::from_str(alias), + extension: None, + } + } + } + + pub(crate) fn extension(&self) -> Option<&str> { + self.extension.as_deref() + } + fn to_bytes(&self) -> Vec { let mut v = self.id.as_bytes().to_vec(); - v.extend_from_slice(self.extension.as_bytes()); + if let Some(ext) = self.extension() { + v.extend_from_slice(ext.as_bytes()); + } v } fn from_slice(bytes: &[u8]) -> Option { - if bytes.len() > 16 { + if let Ok(s) = std::str::from_utf8(bytes) { + Some(Self::from_existing(s)) + } else if bytes.len() >= 16 { let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); - let extension = String::from_utf8_lossy(&bytes[16..]).to_string(); - Some(Self { id, extension }) + let extension = if bytes.len() > 16 { + Some(String::from_utf8_lossy(&bytes[16..]).to_string()) + } else { + None + }; + + Some(Self { + id: MaybeUuid::Uuid(id), + extension, + }) } else { None } @@ -240,20 +328,63 @@ impl Alias { } impl DeleteToken { + pub(crate) fn from_existing(existing: &str) -> Self { + if let Ok(uuid) = Uuid::parse_str(existing) { + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + } else { + DeleteToken { + id: MaybeUuid::Name(existing.into()), + } + } + } + + pub(crate) fn generate() -> Self { + Self { + id: MaybeUuid::Uuid(Uuid::new_v4()), + } + } + fn to_bytes(&self) -> Vec { self.id.as_bytes().to_vec() } fn from_slice(bytes: &[u8]) -> Option { - Some(DeleteToken { - id: Uuid::from_slice(bytes).ok()?, - }) + if let Ok(s) = std::str::from_utf8(bytes) { + Some(DeleteToken::from_existing(s)) + } else if bytes.len() == 16 { + Some(DeleteToken { + id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?), + }) + } else { + None + } + } +} + +impl std::fmt::Display for MaybeUuid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Uuid(id) => write!(f, "{}", id), + Self::Name(name) => write!(f, "{}", name), + } + } +} + +impl std::fmt::Display for DeleteToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) } } impl std::fmt::Display for Alias { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}{}", self.id, self.extension) + if let Some(ext) = self.extension() { + write!(f, "{}{}", self.id, ext) + } else { + write!(f, "{}", self.id) + } } } @@ -272,17 +403,220 @@ impl Identifier for Vec { } } -impl Identifier for ::sled::IVec { - type Error = std::convert::Infallible; +#[cfg(test)] +mod tests { + use super::{Alias, DeleteToken, MaybeUuid, Uuid}; - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized, - { - Ok(bytes.into()) + #[test] + fn string_delete_token() { + let delete_token = DeleteToken::from_existing("blah"); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) } - fn to_bytes(&self) -> Result, Self::Error> { - Ok(self.to_vec()) + #[test] + fn uuid_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_existing(&uuid.to_string()); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn bytes_delete_token() { + let delete_token = DeleteToken::from_slice(b"blah").unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Name(String::from("blah")) + } + ) + } + + #[test] + fn uuid_bytes_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn uuid_bytes_string_delete_token() { + let uuid = Uuid::new_v4(); + + let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + delete_token, + DeleteToken { + id: MaybeUuid::Uuid(uuid), + } + ) + } + + #[test] + fn string_alias() { + let alias = Alias::from_existing("blah"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn string_alias_ext() { + let alias = Alias::from_existing("blah.mp4"); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_existing(&uuid.to_string()); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{}.mp4", uuid); + let alias = Alias::from_existing(&alias_str); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn bytes_alias() { + let alias = Alias::from_slice(b"blah").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: None + } + ); + } + + #[test] + fn bytes_alias_ext() { + let alias = Alias::from_slice(b"blah.mp4").unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Name(String::from("blah")), + extension: Some(String::from(".mp4")), + } + ); + } + + #[test] + fn uuid_bytes_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_string_alias() { + let uuid = Uuid::new_v4(); + + let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: None, + } + ) + } + + #[test] + fn uuid_bytes_alias_ext() { + let uuid = Uuid::new_v4(); + + let mut alias_bytes = uuid.as_bytes().to_vec(); + alias_bytes.extend_from_slice(b".mp4"); + + let alias = Alias::from_slice(&alias_bytes).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) + } + + #[test] + fn uuid_bytes_string_alias_ext() { + let uuid = Uuid::new_v4(); + + let alias_str = format!("{}.mp4", uuid); + let alias = Alias::from_slice(alias_str.as_bytes()).unwrap(); + + assert_eq!( + alias, + Alias { + id: MaybeUuid::Uuid(uuid), + extension: Some(String::from(".mp4")), + } + ) } } diff --git a/src/repo/old.rs b/src/repo/old.rs index a61b402..bf4ae65 100644 --- a/src/repo/old.rs +++ b/src/repo/old.rs @@ -17,8 +17,9 @@ // - Settings Tree // - store-migration-progress -> Path Tree Key +use std::path::PathBuf; + use super::{Alias, DeleteToken, Details}; -use uuid::Uuid; pub(super) struct Old { alias_tree: ::sled::Tree, @@ -91,7 +92,7 @@ impl Old { .ok_or_else(|| anyhow::anyhow!("Missing identifier")) } - pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result> { + pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result> { let filename = self .main_tree .get(hash)? @@ -106,13 +107,16 @@ impl Old { .scan_prefix(&variant_prefix) .filter_map(|res| res.ok()) .filter_map(|(key, value)| { - let key_str = String::from_utf8_lossy(&key); - let variant_path = key_str.trim_start_matches(&variant_prefix); - if variant_path == "motion" { + let variant_path_bytes = &key[variant_prefix.as_bytes().len()..]; + if variant_path_bytes == b"motion" { return None; } - Some((variant_path.to_string(), value)) + let path = String::from_utf8(variant_path_bytes.to_vec()).ok()?; + let mut path = PathBuf::from(path); + path.pop(); + + Some((path, value)) }) .collect()) } @@ -141,29 +145,15 @@ impl Old { .scan_prefix(key) .values() .filter_map(|res| res.ok()) - .filter_map(|alias| { - let alias_str = String::from_utf8_lossy(&alias); - - let (uuid, ext) = alias_str.split_once('.')?; - - let uuid = uuid.parse::().ok()?; - - Some(Alias { - id: uuid, - extension: ext.to_string(), - }) - }) + .filter_map(|alias| Alias::from_slice(&alias)) .collect() } pub(super) fn delete_token(&self, alias: &Alias) -> anyhow::Result> { - let key = format!("{}{}/delete", alias.id, alias.extension); + let key = format!("{}/delete", alias); if let Some(ivec) = self.alias_tree.get(key)? { - let token_str = String::from_utf8_lossy(&ivec); - if let Ok(uuid) = token_str.parse::() { - return Ok(Some(DeleteToken { id: uuid })); - } + return Ok(DeleteToken::from_slice(&ivec)); } Ok(None) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index b35500b..d7a3f2f 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -18,7 +18,7 @@ pub(crate) enum Error { Sled(#[from] sled::Error), #[error("Invalid identifier")] - Identifier(#[source] Box), + Identifier(#[source] Box), #[error("Invalid details json")] Details(#[from] serde_json::Error), @@ -30,6 +30,7 @@ pub(crate) enum Error { Panic, } +#[derive(Clone)] pub(crate) struct SledRepo { settings: Tree, identifier_details: Tree, @@ -62,7 +63,7 @@ impl SledRepo { } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { type Bytes = IVec; type Error = Error; @@ -89,31 +90,35 @@ impl SettingsRepo for SledRepo { fn identifier_bytes(identifier: &I) -> Result, Error> where I: Identifier, - I::Error: Send + Sync + 'static, { identifier .to_bytes() .map_err(|e| Error::Identifier(Box::new(e))) } -fn variant_key(hash: &[u8], variant: &str) -> Result, Error> { +fn variant_key(hash: &[u8], variant: &str) -> Vec { let mut bytes = hash.to_vec(); bytes.push(b'/'); bytes.extend_from_slice(variant.as_bytes()); - Ok(bytes) + bytes } -#[async_trait::async_trait] -impl IdentifierRepo for SledRepo -where - I: Identifier + 'static, - I::Error: Send + Sync + 'static, -{ +fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { + let prefix_len = hash.len() + 1; + let variant_bytes = key.get(prefix_len..)?.to_vec(); + String::from_utf8(variant_bytes).ok() +} + +#[async_trait::async_trait(?Send)] +impl IdentifierRepo for SledRepo { type Error = Error; - async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> { - let key = identifier_bytes(&identifier)?; - + async fn relate_details( + &self, + identifier: &I, + details: &Details, + ) -> Result<(), Self::Error> { + let key = identifier_bytes(identifier)?; let details = serde_json::to_vec(&details)?; b!( @@ -124,8 +129,8 @@ where Ok(()) } - async fn details(&self, identifier: I) -> Result, Self::Error> { - let key = identifier_bytes(&identifier)?; + async fn details(&self, identifier: &I) -> Result, Self::Error> { + let key = identifier_bytes(identifier)?; let opt = b!(self.identifier_details, identifier_details.get(key)); @@ -136,8 +141,8 @@ where } } - async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> { - let key = identifier_bytes(&identifier)?; + async fn cleanup(&self, identifier: &I) -> Result<(), Self::Error> { + let key = identifier_bytes(identifier)?; b!(self.identifier_details, identifier_details.remove(key)); @@ -182,11 +187,12 @@ impl futures_util::Stream for HashStream { } else if let Some(mut iter) = this.hashes.take() { let fut = Box::pin(async move { actix_rt::task::spawn_blocking(move || { - let opt = iter.next().map(|res| res.map_err(Error::from)); + let opt = iter.next(); (iter, opt) }) .await + .map(|(iter, opt)| (iter, opt.map(|res| res.map_err(Error::from)))) .map_err(Error::from) }); @@ -204,12 +210,8 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { v } -#[async_trait::async_trait] -impl HashRepo for SledRepo -where - I: Identifier + 'static, - I::Error: Send + Sync + 'static, -{ +#[async_trait::async_trait(?Send)] +impl HashRepo for SledRepo { type Bytes = IVec; type Error = Error; type Stream = HashStream; @@ -232,19 +234,17 @@ where Ok(res.map_err(|_| AlreadyExists)) } - async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> { - let key = hash_alias_key(&hash, &alias); + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> { + let key = hash_alias_key(&hash, alias); + let value = alias.to_bytes(); - b!( - self.hash_aliases, - hash_aliases.insert(key, alias.to_bytes()) - ); + b!(self.hash_aliases, hash_aliases.insert(key, value)); Ok(()) } - async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> { - let key = hash_alias_key(&hash, &alias); + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> { + let key = hash_alias_key(&hash, alias); b!(self.hash_aliases, hash_aliases.remove(key)); @@ -258,21 +258,28 @@ where .values() .filter_map(Result::ok) .filter_map(|ivec| Alias::from_slice(&ivec)) - .collect::>()) as Result<_, Error> + .collect::>()) as Result<_, sled::Error> }); Ok(v) } - async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error> { - let bytes = identifier_bytes(&identifier)?; + async fn relate_identifier( + &self, + hash: Self::Bytes, + identifier: &I, + ) -> Result<(), Self::Error> { + let bytes = identifier_bytes(identifier)?; b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes)); Ok(()) } - async fn identifier(&self, hash: Self::Bytes) -> Result { + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); opt.ok_or(Error::Missing).and_then(|ivec| { @@ -280,14 +287,14 @@ where }) } - async fn relate_variant_identifier( + async fn relate_variant_identifier( &self, hash: Self::Bytes, variant: String, - identifier: I, + identifier: &I, ) -> Result<(), Self::Error> { - let key = variant_key(&hash, &variant)?; - let value = identifier_bytes(&identifier)?; + let key = variant_key(&hash, &variant); + let value = identifier_bytes(identifier)?; b!( self.hash_variant_identifiers, @@ -297,12 +304,12 @@ where Ok(()) } - async fn variant_identifier( + async fn variant_identifier( &self, hash: Self::Bytes, variant: String, ) -> Result, Self::Error> { - let key = variant_key(&hash, &variant)?; + let key = variant_key(&hash, &variant); let opt = b!( self.hash_variant_identifiers, @@ -318,12 +325,33 @@ where } } - async fn relate_motion_identifier( + async fn variants( &self, hash: Self::Bytes, - identifier: I, + ) -> Result, Self::Error> { + let vec = b!( + self.hash_variant_identifiers, + Ok(hash_variant_identifiers + .scan_prefix(&hash) + .filter_map(|res| res.ok()) + .filter_map(|(key, ivec)| { + let identifier = I::from_bytes(ivec.to_vec()).ok()?; + let variant = variant_from_key(&hash, &key)?; + + Some((variant, identifier)) + }) + .collect::>()) as Result, sled::Error> + ); + + Ok(vec) + } + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: &I, ) -> Result<(), Self::Error> { - let bytes = identifier_bytes(&identifier)?; + let bytes = identifier_bytes(identifier)?; b!( self.hash_motion_identifiers, @@ -333,7 +361,10 @@ where Ok(()) } - async fn motion_identifier(&self, hash: Self::Bytes) -> Result, Self::Error> { + async fn motion_identifier( + &self, + hash: Self::Bytes, + ) -> Result, Self::Error> { let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) @@ -361,7 +392,7 @@ where hash_motion_identifiers.remove(hash2) ); - let aliases = HashRepo::::aliases(self, hash.clone()).await?; + let aliases = self.aliases(hash.clone()).await?; let hash2 = hash.clone(); b!(self.hash_aliases, { for alias in aliases { @@ -369,7 +400,7 @@ where let _ = hash_aliases.remove(key); } - Ok(()) as Result<(), Error> + Ok(()) as Result<(), sled::Error> }); let variant_keys = b!(self.hash_variant_identifiers, { @@ -379,25 +410,25 @@ where .filter_map(Result::ok) .collect::>(); - Ok(v) as Result, Error> + Ok(v) as Result, sled::Error> }); b!(self.hash_variant_identifiers, { for key in variant_keys { let _ = hash_variant_identifiers.remove(key); } - Ok(()) as Result<(), Error> + Ok(()) as Result<(), sled::Error> }); Ok(()) } } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { type Bytes = sled::IVec; type Error = Error; - async fn create(&self, alias: Alias) -> Result, Self::Error> { + async fn create(&self, alias: &Alias) -> Result, Self::Error> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -411,8 +442,8 @@ impl AliasRepo for SledRepo { async fn relate_delete_token( &self, - alias: Alias, - delete_token: DeleteToken, + alias: &Alias, + delete_token: &DeleteToken, ) -> Result, Self::Error> { let key = alias.to_bytes(); let token = delete_token.to_bytes(); @@ -425,7 +456,7 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - async fn delete_token(&self, alias: Alias) -> Result { + async fn delete_token(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); @@ -434,7 +465,7 @@ impl AliasRepo for SledRepo { .ok_or(Error::Missing) } - async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error> { + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error> { let key = alias.to_bytes(); b!(self.alias_hashes, alias_hashes.insert(key, hash)); @@ -442,7 +473,7 @@ impl AliasRepo for SledRepo { Ok(()) } - async fn hash(&self, alias: Alias) -> Result { + async fn hash(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_hashes, alias_hashes.get(key)); @@ -450,7 +481,7 @@ impl AliasRepo for SledRepo { opt.ok_or(Error::Missing) } - async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error> { + async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error> { let key = alias.to_bytes(); let key2 = key.clone(); diff --git a/src/store.rs b/src/store.rs index 46fad1e..668950b 100644 --- a/src/store.rs +++ b/src/store.rs @@ -8,7 +8,7 @@ pub(crate) mod file_store; pub(crate) mod object_store; pub(crate) trait Identifier: Send + Sync + Clone + Debug { - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync + 'static; fn to_bytes(&self) -> Result, Self::Error>; @@ -19,23 +19,18 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { #[async_trait::async_trait(?Send)] pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync + 'static; type Identifier: Identifier; type Stream: Stream>; async fn save_async_read( &self, reader: &mut Reader, - filename: &str, ) -> Result where Reader: AsyncRead + Unpin; - async fn save_bytes( - &self, - bytes: Bytes, - filename: &str, - ) -> Result; + async fn save_bytes(&self, bytes: Bytes) -> Result; async fn to_stream( &self, diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 84b2f0e..c923731 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,4 +1,8 @@ -use crate::{file::File, store::Store}; +use crate::{ + file::File, + repo::{Repo, SettingsRepo}, + store::Store, +}; use actix_web::web::Bytes; use futures_util::stream::Stream; use std::{ @@ -10,24 +14,22 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, error, instrument}; mod file_id; -mod restructure; pub(crate) use file_id::FileId; // - Settings Tree // - last-path -> last generated path -// - fs-restructure-01-complete -> bool const GENERATOR_KEY: &[u8] = b"last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum FileError { - #[error(transparent)] - Sled(#[from] sled::Error), + #[error("Failed to interact with sled db")] + Sled(#[from] crate::repo::sled::Error), - #[error(transparent)] + #[error("Failed to read or write file")] Io(#[from] std::io::Error), - #[error(transparent)] + #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), #[error("Error formatting file store identifier")] @@ -44,7 +46,7 @@ pub(crate) enum FileError { pub(crate) struct FileStore { path_gen: Generator, root_dir: PathBuf, - settings_tree: sled::Tree, + repo: Repo, } #[async_trait::async_trait(?Send)] @@ -57,12 +59,11 @@ impl Store for FileStore { async fn save_async_read( &self, reader: &mut Reader, - filename: &str, ) -> Result where Reader: AsyncRead + Unpin, { - let path = self.next_file(filename)?; + let path = self.next_file().await?; if let Err(e) = self.safe_save_reader(&path, reader).await { self.safe_remove_file(&path).await?; @@ -73,12 +74,8 @@ impl Store for FileStore { } #[tracing::instrument(skip(bytes))] - async fn save_bytes( - &self, - bytes: Bytes, - filename: &str, - ) -> Result { - let path = self.next_file(filename)?; + async fn save_bytes(&self, bytes: Bytes) -> Result { + let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { self.safe_remove_file(&path).await?; @@ -141,23 +138,26 @@ impl Store for FileStore { } impl FileStore { - pub fn build(root_dir: PathBuf, db: &sled::Db) -> Result { - let settings_tree = db.open_tree("settings")?; - - let path_gen = init_generator(&settings_tree)?; + pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result { + let path_gen = init_generator(&repo).await?; Ok(FileStore { root_dir, path_gen, - settings_tree, + repo, }) } - fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); - self.settings_tree - .insert(GENERATOR_KEY, path.to_be_bytes())?; + match self.repo { + Repo::Sled(ref sled_repo) => { + sled_repo + .set(GENERATOR_KEY, path.to_be_bytes().into()) + .await?; + } + } let mut target_path = self.root_dir.clone(); for dir in path.to_strings() { @@ -167,8 +167,9 @@ impl FileStore { Ok(target_path) } - fn next_file(&self, filename: &str) -> Result { - let target_path = self.next_directory()?; + async fn next_file(&self) -> Result { + let target_path = self.next_directory().await?; + let filename = uuid::Uuid::new_v4().to_string(); Ok(target_path.join(filename)) } @@ -289,13 +290,17 @@ pub(crate) async fn safe_create_parent>(path: P) -> Result<(), Fi Ok(()) } -fn init_generator(settings: &sled::Tree) -> Result { - if let Some(ivec) = settings.get(GENERATOR_KEY)? { - Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, - )) - } else { - Ok(Generator::new()) +async fn init_generator(repo: &Repo) -> Result { + match repo { + Repo::Sled(sled_repo) => { + if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + )) + } else { + Ok(Generator::new()) + } + } } } diff --git a/src/store/file_store/restructure.rs b/src/store/file_store/restructure.rs deleted file mode 100644 index 81e990c..0000000 --- a/src/store/file_store/restructure.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::{ - error::{Error, UploadError}, - store::file_store::FileStore, - upload_manager::UploadManager, -}; -use std::path::{Path, PathBuf}; - -const RESTRUCTURE_COMPLETE: &[u8] = b"fs-restructure-01-complete"; -const DETAILS: &[u8] = b"details"; - -impl UploadManager { - #[tracing::instrument(skip(self))] - pub(crate) async fn restructure(&self, store: &FileStore) -> Result<(), Error> { - if self.restructure_complete(store)? { - return Ok(()); - } - - for res in self.inner().filename_tree.iter() { - let (filename, hash) = res?; - let filename = String::from_utf8(filename.to_vec())?; - tracing::info!("Migrating {}", filename); - - let file_path = store.root_dir.join("files").join(&filename); - - if tokio::fs::metadata(&file_path).await.is_ok() { - let target_path = store.next_directory()?.join(&filename); - - let target_path_bytes = self - .generalize_path(store, &target_path)? - .to_str() - .ok_or(UploadError::Path)? - .as_bytes() - .to_vec(); - - self.inner() - .identifier_tree - .insert(filename.as_bytes(), target_path_bytes)?; - - store.safe_move_file(file_path, target_path).await?; - } - - let (start, end) = variant_key_bounds(&hash); - - for res in self.inner().main_tree.range(start..end) { - let (hash_variant_key, variant_path_or_details) = res?; - - if !hash_variant_key.ends_with(DETAILS) { - let variant_path = - PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?); - if tokio::fs::metadata(&variant_path).await.is_ok() { - let target_path = store.next_directory()?.join(&filename); - - let relative_target_path_bytes = self - .generalize_path(store, &target_path)? - .to_str() - .ok_or(UploadError::Path)? - .as_bytes() - .to_vec(); - - let variant_key = - self.migrate_variant_key(store, &variant_path, &filename)?; - - self.inner() - .identifier_tree - .insert(variant_key, relative_target_path_bytes)?; - - store - .safe_move_file(variant_path.clone(), target_path) - .await?; - store.try_remove_parents(&variant_path).await; - } - } - - self.inner().main_tree.remove(hash_variant_key)?; - } - } - - self.mark_restructure_complete(store)?; - Ok(()) - } - - fn restructure_complete(&self, store: &FileStore) -> Result { - Ok(store.settings_tree.get(RESTRUCTURE_COMPLETE)?.is_some()) - } - - fn mark_restructure_complete(&self, store: &FileStore) -> Result<(), Error> { - store.settings_tree.insert(RESTRUCTURE_COMPLETE, b"true")?; - - Ok(()) - } - - fn generalize_path<'a>(&self, store: &FileStore, path: &'a Path) -> Result<&'a Path, Error> { - Ok(path.strip_prefix(&store.root_dir)?) - } - - fn migrate_variant_key( - &self, - store: &FileStore, - variant_process_path: &Path, - filename: &str, - ) -> Result, Error> { - let path = self - .generalize_path(store, variant_process_path)? - .strip_prefix("files")?; - - self.variant_key(path, filename) - } -} - -pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec, Vec) { - let mut start = hash.to_vec(); - start.extend(&[2]); - - let mut end = hash.to_vec(); - end.extend(&[3]); - - (start, end) -} diff --git a/src/store/object_store.rs b/src/store/object_store.rs index d07f29a..8fea267 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,4 +1,7 @@ -use crate::store::Store; +use crate::{ + repo::{Repo, SettingsRepo}, + store::Store, +}; use actix_web::web::Bytes; use futures_util::stream::Stream; use s3::{ @@ -22,26 +25,26 @@ const GENERATOR_KEY: &[u8] = b"last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum ObjectError { - #[error(transparent)] + #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), - #[error(transparent)] - Sled(#[from] sled::Error), + #[error("Failed to interact with sled repo")] + Sled(#[from] crate::repo::sled::Error), - #[error(transparent)] + #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), #[error("Invalid length")] Length, - #[error("Storage error: {0}")] + #[error("Storage error")] Anyhow(#[from] anyhow::Error), } #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, - settings_tree: sled::Tree, + repo: Repo, bucket: Bucket, client: reqwest::Client, } @@ -63,12 +66,11 @@ impl Store for ObjectStore { async fn save_async_read( &self, reader: &mut Reader, - filename: &str, ) -> Result where Reader: AsyncRead + Unpin, { - let path = self.next_file(filename)?; + let path = self.next_file().await?; self.bucket .put_object_stream(&self.client, reader, &path) @@ -78,12 +80,8 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(bytes))] - async fn save_bytes( - &self, - bytes: Bytes, - filename: &str, - ) -> Result { - let path = self.next_file(filename)?; + async fn save_bytes(&self, bytes: Bytes) -> Result { + let path = self.next_file().await?; self.bucket.put_object(&self.client, &path, &bytes).await?; @@ -154,23 +152,21 @@ impl Store for ObjectStore { impl ObjectStore { #[allow(clippy::too_many_arguments)] - pub(crate) fn build( + pub(crate) async fn build( bucket_name: &str, region: Region, access_key: Option, secret_key: Option, security_token: Option, session_token: Option, - db: &sled::Db, + repo: Repo, client: reqwest::Client, ) -> Result { - let settings_tree = db.open_tree("settings")?; - - let path_gen = init_generator(&settings_tree)?; + let path_gen = init_generator(&repo).await?; Ok(ObjectStore { path_gen, - settings_tree, + repo, bucket: Bucket::new_with_path_style( bucket_name, match region { @@ -191,29 +187,39 @@ impl ObjectStore { }) } - fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); - self.settings_tree - .insert(GENERATOR_KEY, path.to_be_bytes())?; + match self.repo { + Repo::Sled(ref sled_repo) => { + sled_repo + .set(GENERATOR_KEY, path.to_be_bytes().into()) + .await?; + } + } Ok(path) } - fn next_file(&self, filename: &str) -> Result { - let path = self.next_directory()?.to_strings().join("/"); + async fn next_file(&self) -> Result { + let path = self.next_directory().await?.to_strings().join("/"); + let filename = uuid::Uuid::new_v4().to_string(); Ok(format!("{}/{}", path, filename)) } } -fn init_generator(settings: &sled::Tree) -> Result { - if let Some(ivec) = settings.get(GENERATOR_KEY)? { - Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, - )) - } else { - Ok(Generator::new()) +async fn init_generator(repo: &Repo) -> Result { + match repo { + Repo::Sled(sled_repo) => { + if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + )) + } else { + Ok(Generator::new()) + } + } } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 7e1b7cf..3052414 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -4,13 +4,15 @@ use crate::{ error::{Error, UploadError}, ffmpeg::{InputFormat, ThumbnailFormat}, magick::details_hint, - migrate::{alias_id_key, alias_key, alias_key_bounds}, + repo::{ + sled::SledRepo, Alias, AliasRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, SettingsRepo, + }, store::{Identifier, Store}, }; -use actix_web::web; +use futures_util::StreamExt; use sha2::Digest; -use std::{string::FromUtf8Error, sync::Arc}; -use tracing::{debug, error, info, instrument, warn, Span}; +use std::sync::Arc; +use tracing::{debug, error, instrument, Span}; use tracing_futures::Instrument; mod hasher; @@ -18,28 +20,6 @@ mod session; pub(super) use session::UploadManagerSession; -// TREE STRUCTURE -// - Alias Tree -// - alias -> hash -// - alias / id -> u64(id) -// - alias / delete -> delete token -// - Main Tree -// - hash -> filename -// - hash 0 u64(id) -> alias -// - DEPRECATED: -// - hash 2 variant path -> variant path -// - hash 2 vairant path details -> details -// - Filename Tree -// - filename -> hash -// - Details Tree -// - filename / S::Identifier -> details -// - Identifier Tree -// - filename -> S::Identifier -// - filename / variant path -> S::Identifier -// - filename / motion -> S::Identifier -// - Settings Tree -// - store-migration-progress -> Path Tree Key - const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; #[derive(Clone)] @@ -50,33 +30,17 @@ pub(crate) struct UploadManager { pub(crate) struct UploadManagerInner { format: Option, hasher: sha2::Sha256, - pub(crate) alias_tree: sled::Tree, - pub(crate) filename_tree: sled::Tree, - pub(crate) main_tree: sled::Tree, - details_tree: sled::Tree, - settings_tree: sled::Tree, - pub(crate) identifier_tree: sled::Tree, - db: sled::Db, -} - -struct FilenameIVec { - inner: sled::IVec, + repo: Repo, } impl UploadManager { /// Create a new UploadManager - pub(crate) async fn new(db: sled::Db, format: Option) -> Result { + pub(crate) async fn new(repo: Repo, format: Option) -> Result { let manager = UploadManager { inner: Arc::new(UploadManagerInner { format, hasher: sha2::Sha256::new(), - alias_tree: db.open_tree("alias")?, - filename_tree: db.open_tree("filename")?, - main_tree: db.open_tree("main")?, - details_tree: db.open_tree("details")?, - settings_tree: db.open_tree("settings")?, - identifier_tree: db.open_tree("path")?, - db, + repo, }), }; @@ -89,92 +53,32 @@ impl UploadManager { S2: Store, Error: From + From, { - let iter = - if let Some(starting_line) = self.inner.settings_tree.get(STORE_MIGRATION_PROGRESS)? { - self.inner.identifier_tree.range(starting_line..) - } else { - self.inner.identifier_tree.iter() - }; - - for res in iter { - let (key, identifier) = res?; - - let identifier = S1::Identifier::from_bytes(identifier.to_vec())?; - - let filename = - if let Some((filename, _)) = String::from_utf8_lossy(&key).split_once('/') { - filename.to_string() - } else { - String::from_utf8_lossy(&key).to_string() - }; - - let stream = from.to_stream(&identifier, None, None).await?; - futures_util::pin_mut!(stream); - let mut reader = tokio_util::io::StreamReader::new(stream); - - let new_identifier = to.save_async_read(&mut reader, &filename).await?; - - let details_key = self.details_key(&identifier, &filename)?; - - if let Some(details) = self.inner.details_tree.get(details_key.clone())? { - let new_details_key = self.details_key(&new_identifier, &filename)?; - - self.inner.details_tree.insert(new_details_key, details)?; - } - - self.inner - .identifier_tree - .insert(key.clone(), new_identifier.to_bytes()?)?; - self.inner.details_tree.remove(details_key)?; - self.inner - .settings_tree - .insert(STORE_MIGRATION_PROGRESS, key)?; - - let (ident, detail, settings) = futures_util::future::join3( - self.inner.identifier_tree.flush_async(), - self.inner.details_tree.flush_async(), - self.inner.settings_tree.flush_async(), - ) - .await; - - ident?; - detail?; - settings?; + match self.inner.repo { + Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await, } - - // clean up the migration key to avoid interfering with future migrations - self.inner.settings_tree.remove(STORE_MIGRATION_PROGRESS)?; - self.inner.settings_tree.flush_async().await?; - - Ok(()) } - pub(crate) fn inner(&self) -> &UploadManagerInner { - &self.inner - } - - pub(crate) async fn still_identifier_from_filename( + pub(crate) async fn still_identifier_from_alias( &self, store: S, - filename: String, + alias: &Alias, ) -> Result where Error: From, { - let identifier = self.identifier_from_filename::(filename.clone()).await?; - let details = - if let Some(details) = self.variant_details(&identifier, filename.clone()).await? { - details - } else { - let hint = details_hint(&filename); - Details::from_store(store.clone(), identifier.clone(), hint).await? - }; + let identifier = self.identifier_from_alias::(alias).await?; + let details = if let Some(details) = self.details(&identifier).await? { + details + } else { + let hint = details_hint(alias); + Details::from_store(store.clone(), identifier.clone(), hint).await? + }; if !details.is_motion() { return Ok(identifier); } - if let Some(motion_identifier) = self.motion_identifier::(&filename).await? { + if let Some(motion_identifier) = self.motion_identifier::(alias).await? { return Ok(motion_identifier); } @@ -186,101 +90,93 @@ impl UploadManager { ThumbnailFormat::Jpeg, ) .await?; - let motion_identifier = store.save_async_read(&mut reader, &filename).await?; + let motion_identifier = store.save_async_read(&mut reader).await?; drop(permit); - self.store_motion_path(&filename, &motion_identifier) + self.store_motion_identifier(alias, &motion_identifier) .await?; Ok(motion_identifier) } async fn motion_identifier( &self, - filename: &str, - ) -> Result, Error> - where - Error: From, - { - let identifier_tree = self.inner.identifier_tree.clone(); - let motion_key = format!("{}/motion", filename); - - let opt = web::block(move || identifier_tree.get(motion_key.as_bytes())).await??; - - if let Some(ivec) = opt { - return Ok(Some(S::Identifier::from_bytes(ivec.to_vec())?)); + alias: &Alias, + ) -> Result, Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.motion_identifier(hash).await?) + } } - - Ok(None) } - async fn store_motion_path( + async fn store_motion_identifier( &self, - filename: &str, + alias: &Alias, identifier: &I, - ) -> Result<(), Error> - where - Error: From, - { - let identifier_bytes = identifier.to_bytes()?; - let motion_key = format!("{}/motion", filename); - let identifier_tree = self.inner.identifier_tree.clone(); - - web::block(move || identifier_tree.insert(motion_key.as_bytes(), identifier_bytes)) - .await??; - Ok(()) + ) -> Result<(), Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.relate_motion_identifier(hash, identifier).await?) + } + } } #[instrument(skip(self))] - pub(crate) async fn identifier_from_filename( + pub(crate) async fn identifier_from_alias( &self, - filename: String, - ) -> Result - where - Error: From, - { - let identifier_tree = self.inner.identifier_tree.clone(); - let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes())) - .await?? - .ok_or(UploadError::MissingFile)?; - - let identifier = S::Identifier::from_bytes(path_ivec.to_vec())?; - - Ok(identifier) + alias: &Alias, + ) -> Result { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.identifier(hash).await?) + } + } } #[instrument(skip(self))] async fn store_identifier( &self, - filename: String, + hash: Vec, identifier: &I, - ) -> Result<(), Error> - where - Error: From, - { - let identifier_bytes = identifier.to_bytes()?; - let identifier_tree = self.inner.identifier_tree.clone(); - web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??; - Ok(()) + ) -> Result<(), Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + Ok(sled_repo.relate_identifier(hash.into(), identifier).await?) + } + } } #[instrument(skip(self))] pub(crate) async fn variant_identifier( &self, + alias: &Alias, process_path: &std::path::Path, - filename: &str, - ) -> Result, Error> - where - Error: From, - { - let key = self.variant_key(process_path, filename)?; - let identifier_tree = self.inner.identifier_tree.clone(); - let path_opt = web::block(move || identifier_tree.get(key)).await??; + ) -> Result, Error> { + let variant = process_path.to_string_lossy().to_string(); - if let Some(ivec) = path_opt { - let identifier = S::Identifier::from_bytes(ivec.to_vec())?; - Ok(Some(identifier)) - } else { - Ok(None) + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.variant_identifier(hash, variant).await?) + } + } + } + + /// Store the path to a generated image variant so we can easily clean it up later + #[instrument(skip(self))] + pub(crate) async fn store_full_res( + &self, + alias: &Alias, + identifier: &I, + ) -> Result<(), Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.relate_identifier(hash, identifier).await?) + } } } @@ -288,139 +184,71 @@ impl UploadManager { #[instrument(skip(self))] pub(crate) async fn store_variant( &self, - variant_process_path: Option<&std::path::Path>, + alias: &Alias, + variant_process_path: &std::path::Path, identifier: &I, - filename: &str, - ) -> Result<(), Error> - where - Error: From, - { - let key = if let Some(path) = variant_process_path { - self.variant_key(path, filename)? - } else { - let mut vec = filename.as_bytes().to_vec(); - vec.extend(b"/"); - vec.extend(&identifier.to_bytes()?); - vec - }; - let identifier_tree = self.inner.identifier_tree.clone(); - let identifier_bytes = identifier.to_bytes()?; + ) -> Result<(), Error> { + let variant = variant_process_path.to_string_lossy().to_string(); - debug!("Storing variant"); - web::block(move || identifier_tree.insert(key, identifier_bytes)).await??; - debug!("Stored variant"); - - Ok(()) + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo + .relate_variant_identifier(hash, variant, identifier) + .await?) + } + } } /// Get the image details for a given variant #[instrument(skip(self))] - pub(crate) async fn variant_details( + pub(crate) async fn details( &self, identifier: &I, - filename: String, ) -> Result, Error> where Error: From, { - let key = self.details_key(identifier, &filename)?; - let details_tree = self.inner.details_tree.clone(); - - debug!("Getting details"); - let opt = match web::block(move || details_tree.get(key)).await?? { - Some(ivec) => match serde_json::from_slice(&ivec) { - Ok(details) => Some(details), - Err(_) => None, - }, - None => None, - }; - debug!("Got details"); - - Ok(opt) + match self.inner.repo { + Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?), + } } #[instrument(skip(self))] - pub(crate) async fn store_variant_details( + pub(crate) async fn store_details( &self, identifier: &I, - filename: String, details: &Details, - ) -> Result<(), Error> - where - Error: From, - { - let key = self.details_key(identifier, &filename)?; - let details_tree = self.inner.details_tree.clone(); - let details_value = serde_json::to_vec(details)?; - - debug!("Storing details"); - web::block(move || details_tree.insert(key, details_value)).await??; - debug!("Stored details"); - - Ok(()) - } - - /// Get a list of aliases for a given file - pub(crate) async fn aliases_by_filename(&self, filename: String) -> Result, Error> { - let fname_tree = self.inner.filename_tree.clone(); - let hash = web::block(move || fname_tree.get(filename.as_bytes())) - .await?? - .ok_or(UploadError::MissingAlias)?; - - self.aliases_by_hash(&hash).await + ) -> Result<(), Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => Ok(sled_repo.relate_details(identifier, details).await?), + } } /// Get a list of aliases for a given alias - pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result, Error> { - let alias_tree = self.inner.alias_tree.clone(); - let hash = web::block(move || alias_tree.get(alias.as_bytes())) - .await?? - .ok_or(UploadError::MissingFilename)?; - - self.aliases_by_hash(&hash).await - } - - async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result, Error> { - let (start, end) = alias_key_bounds(hash); - let main_tree = self.inner.main_tree.clone(); - let aliases = web::block(move || { - main_tree - .range(start..end) - .values() - .collect::, _>>() - }) - .await??; - - debug!("Got {} aliases for hash", aliases.len()); - let aliases = aliases - .into_iter() - .filter_map(|s| String::from_utf8(s.to_vec()).ok()) - .collect::>(); - - for alias in aliases.iter() { - debug!("{}", alias); + pub(crate) async fn aliases_by_alias(&self, alias: &Alias) -> Result, Error> { + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash = sled_repo.hash(alias).await?; + Ok(sled_repo.aliases(hash).await?) + } } - - Ok(aliases) } /// Delete an alias without a delete token pub(crate) async fn delete_without_token( &self, store: S, - alias: String, + alias: Alias, ) -> Result<(), Error> where Error: From, { - let token_key = delete_key(&alias); - let alias_tree = self.inner.alias_tree.clone(); - let token = web::block(move || alias_tree.get(token_key.as_bytes())) - .await?? - .ok_or(UploadError::MissingAlias)?; + let token = match self.inner.repo { + Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?, + }; - self.delete(store, alias, String::from_utf8(token.to_vec())?) - .await + self.delete(store, alias, token).await } /// Delete the alias, and the file & variants if no more aliases exist @@ -428,57 +256,24 @@ impl UploadManager { pub(crate) async fn delete( &self, store: S, - alias: String, - token: String, + alias: Alias, + token: DeleteToken, ) -> Result<(), Error> where Error: From, { - use sled::Transactional; - let main_tree = self.inner.main_tree.clone(); - let alias_tree = self.inner.alias_tree.clone(); - - let span = Span::current(); - let alias2 = alias.clone(); - let hash = web::block(move || { - [&main_tree, &alias_tree].transaction(|v| { - let entered = span.enter(); - let main_tree = &v[0]; - let alias_tree = &v[1]; - - // -- GET TOKEN -- - debug!("Deleting alias -> delete-token mapping"); - let existing_token = alias_tree - .remove(delete_key(&alias2).as_bytes())? - .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; - - // Bail if invalid token - if existing_token != token { - warn!("Invalid delete token"); - return Err(trans_upload_error(UploadError::InvalidToken)); + let hash = match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let saved_delete_token = sled_repo.delete_token(&alias).await?; + if saved_delete_token != token { + return Err(UploadError::InvalidToken.into()); } - - // -- GET ID FOR HASH TREE CLEANUP -- - debug!("Deleting alias -> id mapping"); - let id = alias_tree - .remove(alias_id_key(&alias2).as_bytes())? - .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; - let id = String::from_utf8(id.to_vec()).map_err(trans_utf8_error)?; - - // -- GET HASH FOR HASH TREE CLEANUP -- - debug!("Deleting alias -> hash mapping"); - let hash = alias_tree - .remove(alias2.as_bytes())? - .ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?; - - // -- REMOVE HASH TREE ELEMENT -- - debug!("Deleting hash -> alias mapping"); - main_tree.remove(alias_key(&hash, &id))?; - drop(entered); - Ok(hash) - }) - }) - .await??; + let hash = sled_repo.hash(&alias).await?; + AliasRepo::cleanup(sled_repo, &alias).await?; + sled_repo.remove_alias(hash.clone(), &alias).await?; + hash.to_vec() + } + }; self.check_delete_files(store, hash).await } @@ -486,206 +281,163 @@ impl UploadManager { async fn check_delete_files( &self, store: S, - hash: sled::IVec, + hash: Vec, ) -> Result<(), Error> where Error: From, { - // -- CHECK IF ANY OTHER ALIASES EXIST -- - let main_tree = self.inner.main_tree.clone(); - let (start, end) = alias_key_bounds(&hash); - debug!("Checking for additional aliases referencing hash"); - let any_aliases = web::block(move || { - Ok(main_tree.range(start..end).next().is_some()) as Result - }) - .await??; + match self.inner.repo { + Repo::Sled(ref sled_repo) => { + let hash: ::Bytes = hash.into(); - // Bail if there are existing aliases - if any_aliases { - debug!("Other aliases reference file, not removing from disk"); - return Ok(()); - } + let aliases = sled_repo.aliases(hash.clone()).await?; - // -- DELETE HASH ENTRY -- - let main_tree = self.inner.main_tree.clone(); - let hash2 = hash.clone(); - debug!("Deleting hash -> filename mapping"); - let filename = web::block(move || main_tree.remove(&hash2)) - .await?? - .ok_or(UploadError::MissingFile)?; - - // -- DELETE FILES -- - let this = self.clone(); - let cleanup_span = tracing::info_span!( - parent: None, - "Cleanup", - filename = &tracing::field::display(String::from_utf8_lossy(&filename)), - ); - cleanup_span.follows_from(Span::current()); - debug!("Spawning cleanup task"); - actix_rt::spawn( - async move { - if let Err(e) = this - .cleanup_files(store, FilenameIVec::new(filename.clone())) - .await - { - error!("Error removing files from fs, {}", e); + if !aliases.is_empty() { + return Ok(()); } - info!( - "Files deleted for {:?}", - String::from_utf8(filename.to_vec()) + + let variant_idents = sled_repo + .variants::(hash.clone()) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + let main_ident = sled_repo.identifier(hash.clone()).await?; + let motion_ident = sled_repo.motion_identifier(hash.clone()).await?; + + let repo = sled_repo.clone(); + + HashRepo::cleanup(sled_repo, hash).await?; + + let cleanup_span = tracing::info_span!("Cleaning files"); + cleanup_span.follows_from(Span::current()); + + actix_rt::spawn( + async move { + let mut errors = Vec::new(); + + for identifier in variant_idents + .iter() + .chain(&[main_ident]) + .chain(motion_ident.iter()) + { + debug!("Deleting {:?}", identifier); + if let Err(e) = store.remove(identifier).await { + let e: Error = e.into(); + errors.push(e); + } + + if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await { + let e: Error = e.into(); + errors.push(e); + } + } + + if !errors.is_empty() { + let span = tracing::error_span!("Error deleting files"); + span.in_scope(|| { + for error in errors { + error!("{}", error); + } + }); + } + } + .instrument(cleanup_span), ); } - .instrument(cleanup_span), - ); + } Ok(()) } - /// Fetch the real on-disk filename given an alias - #[instrument(skip(self))] - pub(crate) async fn from_alias(&self, alias: String) -> Result { - let tree = self.inner.alias_tree.clone(); - debug!("Getting hash from alias"); - let hash = web::block(move || tree.get(alias.as_bytes())) - .await?? - .ok_or(UploadError::MissingAlias)?; - - let main_tree = self.inner.main_tree.clone(); - debug!("Getting filename from hash"); - let filename = web::block(move || main_tree.get(hash)) - .await?? - .ok_or(UploadError::MissingFile)?; - - let filename = String::from_utf8(filename.to_vec())?; - - Ok(filename) - } - pub(crate) fn session(&self, store: S) -> UploadManagerSession where Error: From, { UploadManagerSession::new(self.clone(), store) } - - // Find image variants and remove them from the DB and the disk - #[instrument(skip(self))] - async fn cleanup_files(&self, store: S, filename: FilenameIVec) -> Result<(), Error> - where - Error: From, - { - let filename = filename.inner; - - let filename2 = filename.clone(); - let identifier_tree = self.inner.identifier_tree.clone(); - let identifier = web::block(move || identifier_tree.remove(filename2)).await??; - - let mut errors = Vec::new(); - if let Some(identifier) = identifier { - let identifier = S::Identifier::from_bytes(identifier.to_vec())?; - debug!("Deleting {:?}", identifier); - if let Err(e) = store.remove(&identifier).await { - errors.push(e); - } - } - - let filename2 = filename.clone(); - let fname_tree = self.inner.filename_tree.clone(); - debug!("Deleting filename -> hash mapping"); - web::block(move || fname_tree.remove(filename2)).await??; - - let path_prefix = filename.clone(); - let identifier_tree = self.inner.identifier_tree.clone(); - debug!("Fetching file variants"); - let identifiers = web::block(move || { - identifier_tree - .scan_prefix(path_prefix) - .values() - .collect::, sled::Error>>() - }) - .await??; - - debug!("{} files prepared for deletion", identifiers.len()); - - for id in identifiers { - let identifier = S::Identifier::from_bytes(id.to_vec())?; - - debug!("Deleting {:?}", identifier); - if let Err(e) = store.remove(&identifier).await { - errors.push(e); - } - } - - let path_prefix = filename.clone(); - let identifier_tree = self.inner.identifier_tree.clone(); - debug!("Deleting path info"); - web::block(move || { - for res in identifier_tree.scan_prefix(path_prefix).keys() { - let key = res?; - identifier_tree.remove(key)?; - } - Ok(()) as Result<(), Error> - }) - .await??; - - for error in errors { - error!("Error deleting files, {}", error); - } - Ok(()) - } - - pub(crate) fn variant_key( - &self, - variant_process_path: &std::path::Path, - filename: &str, - ) -> Result, Error> { - let path_string = variant_process_path - .to_str() - .ok_or(UploadError::Path)? - .to_string(); - - let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec(); - Ok(vec) - } - - fn details_key(&self, identifier: &I, filename: &str) -> Result, Error> - where - Error: From, - { - let mut vec = filename.as_bytes().to_vec(); - vec.extend(b"/"); - vec.extend(&identifier.to_bytes()?); - - Ok(vec) - } } -impl FilenameIVec { - fn new(inner: sled::IVec) -> Self { - FilenameIVec { inner } - } -} - -fn trans_upload_error( - upload_error: UploadError, -) -> sled::transaction::ConflictableTransactionError { - trans_err(upload_error) -} - -fn trans_utf8_error(e: FromUtf8Error) -> sled::transaction::ConflictableTransactionError { - trans_err(e) -} - -fn trans_err(e: E) -> sled::transaction::ConflictableTransactionError +async fn migrate_file( + from: &S1, + to: &S2, + identifier: &S1::Identifier, +) -> Result where - Error: From, + S1: Store, + S2: Store, + Error: From + From, { - sled::transaction::ConflictableTransactionError::Abort(e.into()) + let stream = from.to_stream(identifier, None, None).await?; + futures_util::pin_mut!(stream); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let new_identifier = to.save_async_read(&mut reader).await?; + + Ok(new_identifier) } -fn delete_key(alias: &str) -> String { - format!("{}/delete", alias) +async fn migrate_details(repo: &R, from: I1, to: &I2) -> Result<(), Error> +where + R: IdentifierRepo, + I1: Identifier, + I2: Identifier, + Error: From<::Error>, +{ + if let Some(details) = repo.details(&from).await? { + repo.relate_details(to, &details).await?; + repo.cleanup(&from).await?; + } + + Ok(()) +} + +async fn do_migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> +where + S1: Store, + S2: Store, + Error: From + From, + R: IdentifierRepo + HashRepo + SettingsRepo, + Error: From<::Error>, + Error: From<::Error>, + Error: From<::Error>, +{ + let stream = repo.hashes().await; + let mut stream = Box::pin(stream); + + while let Some(hash) = stream.next().await { + let hash = hash?; + if let Some(identifier) = repo + .motion_identifier(hash.as_ref().to_vec().into()) + .await? + { + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + } + + for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) + .await?; + } + + let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + + repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) + .await?; + } + + // clean up the migration key to avoid interfering with future migrations + repo.remove(STORE_MIGRATION_PROGRESS).await?; + + Ok(()) } impl std::fmt::Debug for UploadManager { @@ -693,9 +445,3 @@ impl std::fmt::Debug for UploadManager { f.debug_struct("UploadManager").finish() } } - -impl std::fmt::Debug for FilenameIVec { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", String::from_utf8(self.inner.to_vec())) - } -} diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 9274697..dca15a1 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -1,19 +1,17 @@ use crate::{ error::{Error, UploadError}, magick::ValidInputType, - migrate::{alias_id_key, alias_key}, + repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, HashRepo, IdentifierRepo, Repo}, store::Store, upload_manager::{ - delete_key, hasher::{Hash, Hasher}, UploadManager, }, }; use actix_web::web; use futures_util::stream::{Stream, StreamExt}; -use tracing::{debug, instrument, warn, Span}; +use tracing::{debug, instrument, Span}; use tracing_futures::Instrument; -use uuid::Uuid; pub(crate) struct UploadManagerSession where @@ -21,7 +19,7 @@ where { store: S, manager: UploadManager, - alias: Option, + alias: Option, finished: bool, } @@ -42,19 +40,8 @@ where self.finished = true; } - pub(crate) fn alias(&self) -> Option<&str> { - self.alias.as_deref() - } -} - -enum Dup { - Exists, - New, -} - -impl Dup { - fn exists(&self) -> bool { - matches!(self, Dup::Exists) + pub(crate) fn alias(&self) -> Option<&Alias> { + self.alias.as_ref() } } @@ -79,20 +66,23 @@ where actix_rt::spawn( async move { // undo alias -> hash mapping - debug!("Remove alias -> hash mapping"); - if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) { - // undo alias -> id mapping - debug!("Remove alias -> id mapping"); - let key = alias_id_key(&alias); - if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) { - // undo hash/id -> alias mapping - debug!("Remove hash/id -> alias mapping"); - let id = String::from_utf8_lossy(&id); - let key = alias_key(&hash, &id); - let _ = manager.inner.main_tree.remove(&key); - } + match manager.inner.repo { + Repo::Sled(ref sled_repo) => { + if let Ok(hash) = sled_repo.hash(&alias).await { + debug!("Clean alias repo"); + let _ = AliasRepo::cleanup(sled_repo, &alias).await; - let _ = manager.check_delete_files(store, hash).await; + if let Ok(identifier) = sled_repo.identifier(hash.clone()).await { + debug!("Clean identifier repo"); + let _ = IdentifierRepo::cleanup(sled_repo, &identifier).await; + + debug!("Remove stored files"); + let _ = store.remove(&identifier).await; + } + debug!("Clean hash repo"); + let _ = HashRepo::cleanup(sled_repo, hash).await; + } + } } } .instrument(cleanup_span), @@ -107,42 +97,30 @@ where { /// Generate a delete token for an alias #[instrument(skip(self))] - pub(crate) async fn delete_token(&self) -> Result { + pub(crate) async fn delete_token(&self) -> Result { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; debug!("Generating delete token"); - let s: String = Uuid::new_v4().to_string(); - let delete_token = s.clone(); + let delete_token = DeleteToken::generate(); debug!("Saving delete token"); - let alias_tree = self.manager.inner.alias_tree.clone(); - let key = delete_key(&alias); - let res = web::block(move || { - alias_tree.compare_and_swap( - key.as_bytes(), - None as Option, - Some(s.as_bytes()), - ) - }) - .await??; + match self.manager.inner.repo { + Repo::Sled(ref sled_repo) => { + let res = sled_repo.relate_delete_token(&alias, &delete_token).await?; - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let s = String::from_utf8(ivec.to_vec())?; - - debug!("Returning existing delete token, {}", s); - return Ok(s); + Ok(if res.is_err() { + let delete_token = sled_repo.delete_token(&alias).await?; + debug!("Returning existing delete token, {:?}", delete_token); + delete_token + } else { + debug!("Returning new delete token, {:?}", delete_token); + delete_token + }) + } } - - debug!("Returning new delete token, {}", delete_token); - Ok(delete_token) } - /// Upload the file while preserving the filename, optionally validating the uploaded image - #[instrument(skip(self, stream))] + /// Import the file, discarding bytes if it's already present, or saving if it's new pub(crate) async fn import( mut self, alias: String, @@ -158,7 +136,7 @@ where } debug!("Validating bytes"); - let (content_type, validated_reader) = crate::validate::validate_image_bytes( + let (_, validated_reader) = crate::validate::validate_image_bytes( bytes_mut.freeze(), self.manager.inner.format, validate, @@ -167,20 +145,14 @@ where let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let filename = self.next_file(content_type).await?; - - let identifier = self - .store - .save_async_read(&mut hasher_reader, &filename) - .await?; + let identifier = self.store.save_async_read(&mut hasher_reader).await?; let hash = hasher_reader.finalize_reset().await?; - debug!("Storing alias"); - self.alias = Some(alias.clone()); - self.add_existing_alias(&hash, &alias).await?; + debug!("Adding alias"); + self.add_existing_alias(&hash, alias).await?; debug!("Saving file"); - self.save_upload(&identifier, hash, filename).await?; + self.save_upload(&identifier, hash).await?; // Return alias to file Ok(self) @@ -210,106 +182,65 @@ where let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let filename = self.next_file(input_type).await?; - - let identifier = self - .store - .save_async_read(&mut hasher_reader, &filename) - .await?; + let identifier = self.store.save_async_read(&mut hasher_reader).await?; let hash = hasher_reader.finalize_reset().await?; debug!("Adding alias"); self.add_alias(&hash, input_type).await?; debug!("Saving file"); - self.save_upload(&identifier, hash, filename).await?; + self.save_upload(&identifier, hash).await?; // Return alias to file Ok(self) } // check duplicates & store image if new - async fn save_upload( - &self, - identifier: &S::Identifier, - hash: Hash, - filename: String, - ) -> Result<(), Error> { - let dup = self.check_duplicate(hash, filename.clone()).await?; + #[instrument(skip(self, hash))] + async fn save_upload(&self, identifier: &S::Identifier, hash: Hash) -> Result<(), Error> { + let res = self.check_duplicate(&hash).await?; // bail early with alias to existing file if this is a duplicate - if dup.exists() { + if res.is_err() { debug!("Duplicate exists, removing file"); self.store.remove(identifier).await?; return Ok(()); } - self.manager.store_identifier(filename, identifier).await?; + self.manager + .store_identifier(hash.into_inner(), identifier) + .await?; Ok(()) } // check for an already-uploaded image with this hash, returning the path to the target file #[instrument(skip(self, hash))] - async fn check_duplicate(&self, hash: Hash, filename: String) -> Result { - let main_tree = self.manager.inner.main_tree.clone(); + async fn check_duplicate(&self, hash: &Hash) -> Result, Error> { + let hash = hash.as_slice().to_vec(); - let filename2 = filename.clone(); - let hash2 = hash.as_slice().to_vec(); - debug!("Inserting filename for hash"); - let res = web::block(move || { - main_tree.compare_and_swap( - hash2, - None as Option, - Some(filename2.as_bytes()), - ) - }) - .await??; - - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let name = String::from_utf8(ivec.to_vec())?; - debug!("Filename exists for hash, {}", name); - return Ok(Dup::Exists); + match self.manager.inner.repo { + Repo::Sled(ref sled_repo) => Ok(HashRepo::create(sled_repo, hash.into()).await?), } - - let fname_tree = self.manager.inner.filename_tree.clone(); - debug!("Saving filename -> hash relation"); - web::block(move || fname_tree.insert(filename, hash.into_inner())).await??; - - Ok(Dup::New) } - // generate a short filename that isn't already in-use - #[instrument(skip(self, input_type))] - async fn next_file(&self, input_type: ValidInputType) -> Result { - loop { - debug!("Filename generation loop"); - let filename = file_name(Uuid::new_v4(), input_type); + // Add an alias from an existing filename + async fn add_existing_alias(&mut self, hash: &Hash, filename: String) -> Result<(), Error> { + let alias = Alias::from_existing(&filename); - let identifier_tree = self.manager.inner.identifier_tree.clone(); - let filename2 = filename.clone(); - let filename_exists = web::block(move || identifier_tree.get(filename2.as_bytes())) - .await?? - .is_some(); + match self.manager.inner.repo { + Repo::Sled(ref sled_repo) => { + AliasRepo::create(sled_repo, &alias) + .await? + .map_err(|_| UploadError::DuplicateAlias)?; + self.alias = Some(alias.clone()); - if !filename_exists { - return Ok(filename); + let hash = hash.as_slice().to_vec(); + sled_repo.relate_hash(&alias, hash.clone().into()).await?; + sled_repo.relate_alias(hash.into(), &alias).await?; } - - debug!("Filename exists, trying again"); } - } - - #[instrument(skip(self, hash, alias))] - async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> { - self.save_alias_hash_mapping(hash, alias).await??; - - self.store_hash_id_alias_mapping(hash, alias).await?; Ok(()) } @@ -319,96 +250,25 @@ where // This will help if multiple 'users' upload the same file, and one of them wants to delete it #[instrument(skip(self, hash, input_type))] async fn add_alias(&mut self, hash: &Hash, input_type: ValidInputType) -> Result<(), Error> { - let alias = self.next_alias(hash, input_type).await?; - - self.store_hash_id_alias_mapping(hash, &alias).await?; - - Ok(()) - } - - // Add a pre-defined alias to an existin file - // - // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files - #[instrument(skip(self, hash))] - async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> { - let alias = alias.to_string(); - loop { - debug!("hash -> alias save loop"); - let db = self.manager.inner.db.clone(); - let id = web::block(move || db.generate_id()).await??.to_string(); - - let alias_tree = self.manager.inner.alias_tree.clone(); - let key = alias_id_key(&alias); - let id2 = id.clone(); - debug!("Saving alias -> id mapping"); - web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??; - - let key = alias_key(hash.as_slice(), &id); - let main_tree = self.manager.inner.main_tree.clone(); - let alias2 = alias.clone(); - debug!("Saving hash/id -> alias mapping"); - let res = web::block(move || { - main_tree.compare_and_swap(key, None as Option, Some(alias2.as_bytes())) - }) - .await??; - - if res.is_ok() { - break; - } - - debug!("Id exists, trying again"); - } - - Ok(()) - } - - // Generate an alias to the file - #[instrument(skip(self, hash, input_type))] - async fn next_alias( - &mut self, - hash: &Hash, - input_type: ValidInputType, - ) -> Result { loop { debug!("Alias gen loop"); - let alias = file_name(Uuid::new_v4(), input_type); - self.alias = Some(alias.clone()); + let alias = Alias::generate(input_type.as_ext().to_string()); - let res = self.save_alias_hash_mapping(hash, &alias).await?; + match self.manager.inner.repo { + Repo::Sled(ref sled_repo) => { + let res = AliasRepo::create(sled_repo, &alias).await?; + + if res.is_ok() { + self.alias = Some(alias.clone()); + let hash = hash.as_slice().to_vec(); + sled_repo.relate_hash(&alias, hash.clone().into()).await?; + sled_repo.relate_alias(hash.into(), &alias).await?; + return Ok(()); + } + } + }; - if res.is_ok() { - return Ok(alias); - } debug!("Alias exists, regenning"); } } - - // Save an alias to the database - #[instrument(skip(self, hash))] - async fn save_alias_hash_mapping( - &self, - hash: &Hash, - alias: &str, - ) -> Result, Error> { - let tree = self.manager.inner.alias_tree.clone(); - let vec = hash.as_slice().to_vec(); - let alias = alias.to_string(); - - debug!("Saving alias -> hash mapping"); - let res = web::block(move || { - tree.compare_and_swap(alias.as_bytes(), None as Option, Some(vec)) - }) - .await??; - - if res.is_err() { - warn!("Duplicate alias"); - return Ok(Err(UploadError::DuplicateAlias.into())); - } - - Ok(Ok(())) - } -} - -fn file_name(name: Uuid, input_type: ValidInputType) -> String { - format!("{}{}", name, input_type.as_ext()) }