diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 03b1a15..7a73223 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -148,13 +148,33 @@ impl Args { }, } } - Command::MigrateStore(migrate_store) => { + Command::MigrateStore(MigrateStore { + skip_missing_files, + store, + }) => { let server = Server::default(); let media = Media::default(); - match migrate_store { - MigrateStore::Filesystem(MigrateFilesystem { from, to }) => match to { - MigrateStoreInner::Filesystem(MigrateFilesystemInner { to, repo }) => { + match store { + MigrateStoreFrom::Filesystem(MigrateFilesystem { from, to }) => match to { + MigrateStoreTo::Filesystem(MigrateFilesystemInner { to, repo }) => Output { + config_format: ConfigFormat { + server, + old_db, + tracing, + media, + store: None, + repo, + }, + operation: Operation::MigrateStore { + skip_missing_files, + from: from.into(), + to: to.into(), + }, + config_file, + save_to, + }, + MigrateStoreTo::ObjectStorage(MigrateObjectStorageInner { to, repo }) => { Output { config_format: ConfigFormat { server, @@ -165,6 +185,7 @@ impl Args { repo, }, operation: Operation::MigrateStore { + skip_missing_files, from: from.into(), to: to.into(), }, @@ -172,29 +193,32 @@ impl Args { save_to, } } - MigrateStoreInner::ObjectStorage(MigrateObjectStorageInner { - to, - repo, - }) => Output { - config_format: ConfigFormat { - server, - old_db, - tracing, - media, - store: None, - repo, - }, - operation: Operation::MigrateStore { - from: from.into(), - to: to.into(), - }, - config_file, - save_to, - }, }, - MigrateStore::ObjectStorage(MigrateObjectStorage { from, to }) => match to { - MigrateStoreInner::Filesystem(MigrateFilesystemInner { to, repo }) => { - Output { + MigrateStoreFrom::ObjectStorage(MigrateObjectStorage { from, to }) => { + match to { + MigrateStoreTo::Filesystem(MigrateFilesystemInner { to, repo }) => { + Output { + config_format: ConfigFormat { + server, + old_db, + tracing, + media, + store: None, + repo, + }, + operation: Operation::MigrateStore { + skip_missing_files, + from: from.into(), + to: to.into(), + }, + config_file, + save_to, + } + } + MigrateStoreTo::ObjectStorage(MigrateObjectStorageInner { + to, + repo, + }) => Output { config_format: ConfigFormat { server, old_db, @@ -204,33 +228,15 @@ impl Args { repo, }, operation: Operation::MigrateStore { + skip_missing_files, from: from.into(), to: to.into(), }, config_file, save_to, - } + }, } - MigrateStoreInner::ObjectStorage(MigrateObjectStorageInner { - to, - repo, - }) => Output { - config_format: ConfigFormat { - server, - old_db, - tracing, - media, - store: None, - repo, - }, - operation: Operation::MigrateStore { - from: from.into(), - to: to.into(), - }, - config_file, - save_to, - }, - }, + } } } } @@ -249,6 +255,7 @@ pub(super) struct Output { pub(crate) enum Operation { Run, MigrateStore { + skip_missing_files: bool, from: crate::config::primitives::Store, to: crate::config::primitives::Store, }, @@ -418,7 +425,6 @@ enum Command { Run(Run), /// Migrates from one provided media store to another - #[command(flatten)] MigrateStore(MigrateStore), } @@ -527,9 +533,20 @@ enum RunStore { ObjectStorage(RunObjectStorage), } +#[derive(Debug, Parser)] +struct MigrateStore { + /// Normally, pict-rs will keep retrying when errors occur during migration. This flag tells + /// pict-rs to ignore errors that are caused by files not existing. + #[arg(long)] + skip_missing_files: bool, + + #[command(subcommand)] + store: MigrateStoreFrom, +} + /// Configure the pict-rs storage migration #[derive(Debug, Subcommand)] -enum MigrateStore { +enum MigrateStoreFrom { /// Migrate from the provided filesystem storage Filesystem(MigrateFilesystem), @@ -539,7 +556,7 @@ enum MigrateStore { /// Configure the destination storage for pict-rs storage migration #[derive(Debug, Subcommand)] -enum MigrateStoreInner { +enum MigrateStoreTo { /// Migrate to the provided filesystem storage Filesystem(MigrateFilesystemInner), @@ -554,7 +571,7 @@ struct MigrateFilesystem { from: Filesystem, #[command(subcommand)] - to: MigrateStoreInner, + to: MigrateStoreTo, } /// Migrate pict-rs' storage to the provided filesystem storage @@ -574,7 +591,7 @@ struct MigrateObjectStorage { from: crate::config::primitives::ObjectStorage, #[command(subcommand)] - to: MigrateStoreInner, + to: MigrateStoreTo, } /// Migrate pict-rs' storage to the provided object storage diff --git a/src/error.rs b/src/error.rs index 4c2fa1c..c8a930e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,7 +46,7 @@ pub(crate) enum UploadError { Upload(#[from] actix_form_data::Error), #[error("Error in DB")] - Sled(#[from] crate::repo::sled::SledError), + Repo(#[from] crate::repo::RepoError), #[error("Error in old sled DB")] OldSled(#[from] ::sled::Error), @@ -63,11 +63,8 @@ pub(crate) enum UploadError { #[error("Error stripping prefix")] StripPrefix(#[from] std::path::StripPrefixError), - #[error("Error storing file")] - FileStore(#[from] crate::store::file_store::FileError), - - #[error("Error storing object")] - ObjectStore(#[from] crate::store::object_store::ObjectError), + #[error("Error in store")] + Store(#[source] crate::store::StoreError), #[error("Provided process path is invalid")] ParsePath, @@ -81,9 +78,6 @@ pub(crate) enum UploadError { #[error("No files present in upload")] NoFiles, - #[error("Upload was already claimed")] - AlreadyClaimed, - #[error("Requested a file that doesn't exist")] MissingAlias, @@ -151,6 +145,15 @@ impl From for UploadError { } } +impl From for UploadError { + fn from(value: crate::store::StoreError) -> Self { + match value { + crate::store::StoreError::Repo(repo_error) => Self::Repo(repo_error), + e => Self::Store(e), + } + } +} + impl ResponseError for Error { fn status_code(&self) -> StatusCode { match self.kind() { @@ -160,11 +163,16 @@ impl ResponseError for Error { | UploadError::NoFiles | UploadError::Upload(_) | UploadError::UnsupportedFormat - | UploadError::AlreadyClaimed + | UploadError::Store(crate::store::StoreError::Repo( + crate::repo::RepoError::AlreadyClaimed, + )) + | UploadError::Repo(crate::repo::RepoError::AlreadyClaimed) | UploadError::SilentVideoDisabled, ) => StatusCode::BAD_REQUEST, Some( - UploadError::Sled(crate::repo::sled::SledError::Missing) + UploadError::Repo(crate::repo::RepoError::SledError( + crate::repo::sled::SledError::Missing, + )) | UploadError::MissingAlias, ) => StatusCode::NOT_FOUND, Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN, diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 68b54b2..5e3bf0e 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, magick::{Details, ValidInputType}, process::Process, - store::Store, + store::{Store, StoreError}, }; use actix_web::web::Bytes; use once_cell::sync::OnceCell; @@ -413,7 +413,9 @@ where { let input_file = crate::tmp_file::tmp_file(None); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&input_file).await?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(StoreError::from)?; let tmp_one = crate::file::File::create(&input_file).await?; let tmp_one = (f)(tmp_one).await?; @@ -523,11 +525,15 @@ pub(crate) async fn transcode_bytes( ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(transcode_options.input_file_extension())); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&input_file).await?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(StoreError::from)?; let output_file = crate::tmp_file::tmp_file(Some(transcode_options.output_file_extension())); let output_file_str = output_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&output_file).await?; + crate::store::file_store::safe_create_parent(&output_file) + .await + .map_err(StoreError::from)?; let mut tmp_one = crate::file::File::create(&input_file).await?; tmp_one.write_from_bytes(input).await?; @@ -557,7 +563,10 @@ pub(crate) async fn transcode_bytes( tokio::fs::remove_file(input_file).await?; let tmp_two = crate::file::File::open(&output_file).await?; - let stream = tmp_two.read_to_stream(None, None).await?; + let stream = tmp_two + .read_to_stream(None, None) + .await + .map_err(StoreError::from)?; let reader = tokio_util::io::StreamReader::new(stream); let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); @@ -573,11 +582,15 @@ pub(crate) async fn thumbnail( ) -> Result { let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension())); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&input_file).await?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(StoreError::from)?; let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension())); let output_file_str = output_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&output_file).await?; + crate::store::file_store::safe_create_parent(&output_file) + .await + .map_err(StoreError::from)?; let mut tmp_one = crate::file::File::create(&input_file).await?; tmp_one @@ -607,7 +620,10 @@ pub(crate) async fn thumbnail( tokio::fs::remove_file(input_file).await?; let tmp_two = crate::file::File::open(&output_file).await?; - let stream = tmp_two.read_to_stream(None, None).await?; + let stream = tmp_two + .read_to_stream(None, None) + .await + .map_err(StoreError::from)?; let reader = tokio_util::io::StreamReader::new(stream); let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); diff --git a/src/lib.rs b/src/lib.rs index 952648e..e3eb03f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1108,7 +1108,12 @@ async fn launch( Ok(()) } -async fn migrate_inner(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()> +async fn migrate_inner( + repo: &Repo, + from: S1, + to: config::Store, + skip_missing_files: bool, +) -> color_eyre::Result<()> where S1: Store, { @@ -1117,7 +1122,7 @@ where let to = FileStore::build(path.clone(), repo.clone()).await?.build(); match repo { - Repo::Sled(repo) => migrate_store(repo, from, to).await?, + Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, } } config::Store::ObjectStorage(config::ObjectStorage { @@ -1147,7 +1152,7 @@ where .build(); match repo { - Repo::Sled(repo) => migrate_store(repo, from, to).await?, + Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, } } } @@ -1219,11 +1224,15 @@ pub async fn run() -> color_eyre::Result<()> { match (*OPERATION).clone() { Operation::Run => (), - Operation::MigrateStore { from, to } => { + Operation::MigrateStore { + skip_missing_files, + from, + to, + } => { match from { config::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?.build(); - migrate_inner(&repo, from, to).await?; + migrate_inner(&repo, from, to, skip_missing_files).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, @@ -1251,7 +1260,7 @@ pub async fn run() -> color_eyre::Result<()> { .await? .build(); - migrate_inner(&repo, from, to).await?; + migrate_inner(&repo, from, to, skip_missing_files).await?; } } @@ -1304,15 +1313,22 @@ const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; const STORE_MIGRATION_MOTION: &str = "store-migration-motion"; const STORE_MIGRATION_VARIANT: &str = "store-migration-variant"; -async fn migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> +async fn migrate_store( + repo: &R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> where S1: Store + Clone, S2: Store + Clone, R: IdentifierRepo + HashRepo + SettingsRepo, { + tracing::info!("Migrating store"); + let mut failure_count = 0; - while let Err(e) = do_migrate_store(repo, from.clone(), to.clone()).await { + while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await { tracing::error!("Failed with {}", e.to_string()); failure_count += 1; @@ -1326,7 +1342,12 @@ where Ok(()) } -async fn do_migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> +async fn do_migrate_store( + repo: &R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> where S1: Store, S2: Store, @@ -1352,12 +1373,22 @@ where .await? { if repo.get(STORE_MIGRATION_MOTION).await?.is_none() { - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; - repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into()) - .await?; + match migrate_file(&from, &to, &identifier, skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_motion_identifier( + hash.as_ref().to_vec().into(), + &new_identifier, + ) + .await?; + repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into()) + .await?; + } + Err(e) if e.is_not_found() && skip_missing_files => { + tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + } + Err(e) => return Err(e.into()), + } } } @@ -1371,22 +1402,45 @@ where continue; } - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) - .await?; - repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) - .await?; + match migrate_file(&from, &to, &identifier, skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, identifier, &new_identifier).await?; + repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) + .await?; + repo.relate_variant_identifier( + hash.as_ref().to_vec().into(), + variant, + &new_identifier, + ) + .await?; - repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) - .await?; + repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) + .await?; + } + Err(e) if e.is_not_found() && skip_missing_files => { + tracing::warn!( + "Skipping variant {} for hash {}", + variant, + hex::encode(&hash) + ); + } + Err(e) => return Err(e.into()), + } } let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; + + match migrate_file(&from, &to, &identifier, skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + } + Err(e) if e.is_not_found() && skip_missing_files => { + tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + } + Err(e) => return Err(e.into()), + } repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) .await?; @@ -1404,7 +1458,8 @@ async fn migrate_file( from: &S1, to: &S2, identifier: &S1::Identifier, -) -> Result + skip_missing_files: bool, +) -> Result where S1: Store, S2: Store, @@ -1414,6 +1469,7 @@ where loop { match do_migrate_file(from, to, identifier).await { Ok(identifier) => return Ok(identifier), + Err(e) if e.is_not_found() && skip_missing_files => return Err(e), Err(e) => { failure_count += 1; @@ -1432,7 +1488,7 @@ async fn do_migrate_file( from: &S1, to: &S2, identifier: &S1::Identifier, -) -> Result +) -> Result where S1: Store, S2: Store, diff --git a/src/magick.rs b/src/magick.rs index 520c7b2..8e75dbe 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, process::Process, repo::Alias, - store::Store, + store::{Store, StoreError}, }; use actix_web::web::Bytes; use tokio::{ @@ -140,7 +140,9 @@ pub(crate) async fn details_bytes( if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { let input_file = crate::tmp_file::tmp_file(Some(hint)); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&input_file).await?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(StoreError::from)?; let mut tmp_one = crate::file::File::create(&input_file).await?; tmp_one.write_from_bytes(input).await?; @@ -178,7 +180,9 @@ pub(crate) async fn details_store( if let Some(hint) = hint.and_then(|hint| hint.video_hint()) { let input_file = crate::tmp_file::tmp_file(Some(hint)); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; - crate::store::file_store::safe_create_parent(&input_file).await?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(StoreError::from)?; let mut tmp_one = crate::file::File::create(&input_file).await?; tmp_one diff --git a/src/range.rs b/src/range.rs index 51066e9..dfcfc35 100644 --- a/src/range.rs +++ b/src/range.rs @@ -34,7 +34,8 @@ pub(crate) async fn chop_store( let end = end + 1; return store .to_stream(identifier, Some(start), Some(end.saturating_sub(start))) - .await; + .await + .map_err(Error::from); } Err(UploadError::Range.into()) diff --git a/src/repo.rs b/src/repo.rs index 638b5c0..6e85133 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,8 +1,7 @@ use crate::{ config, details::Details, - error::Error, - store::{file_store::FileId, Identifier}, + store::{file_store::FileId, Identifier, StoreError}, }; use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::Stream; @@ -47,6 +46,18 @@ pub(crate) enum UploadResult { Failure { message: String }, } +#[derive(Debug, thiserror::Error)] +pub(crate) enum RepoError { + #[error("Error in sled")] + SledError(#[from] crate::repo::sled::SledError), + + #[error("Upload was already claimed")] + AlreadyClaimed, + + #[error("Panic in blocking operation")] + Canceled, +} + #[async_trait::async_trait(?Send)] pub(crate) trait FullRepo: UploadRepo @@ -60,19 +71,19 @@ pub(crate) trait FullRepo: + Clone + Debug { - async fn health_check(&self) -> Result<(), Error>; + async fn health_check(&self) -> Result<(), RepoError>; #[tracing::instrument(skip(self))] async fn identifier_from_alias( &self, alias: &Alias, - ) -> Result { + ) -> Result { let hash = self.hash(alias).await?; self.identifier(hash).await } #[tracing::instrument(skip(self))] - async fn aliases_from_alias(&self, alias: &Alias) -> Result, Error> { + async fn aliases_from_alias(&self, alias: &Alias) -> Result, RepoError> { let hash = self.hash(alias).await?; self.aliases(hash).await } @@ -81,7 +92,7 @@ pub(crate) trait FullRepo: async fn still_identifier_from_alias( &self, alias: &Alias, - ) -> Result, Error> { + ) -> Result, StoreError> { let hash = self.hash(alias).await?; let identifier = self.identifier::(hash.clone()).await?; @@ -98,7 +109,7 @@ impl FullRepo for actix_web::web::Data where T: FullRepo, { - async fn health_check(&self) -> Result<(), Error> { + async fn health_check(&self) -> Result<(), RepoError> { T::health_check(self).await } } @@ -116,13 +127,13 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { - async fn create(&self, upload_id: UploadId) -> Result<(), Error>; + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>; - async fn wait(&self, upload_id: UploadId) -> Result; + async fn wait(&self, upload_id: UploadId) -> Result; - async fn claim(&self, upload_id: UploadId) -> Result<(), Error>; + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>; - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error>; + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -130,30 +141,30 @@ impl UploadRepo for actix_web::web::Data where T: UploadRepo, { - async fn create(&self, upload_id: UploadId) -> Result<(), Error> { + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { T::create(self, upload_id).await } - async fn wait(&self, upload_id: UploadId) -> Result { + async fn wait(&self, upload_id: UploadId) -> Result { T::wait(self, upload_id).await } - async fn claim(&self, upload_id: UploadId) -> Result<(), Error> { + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { T::claim(self, upload_id).await } - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> { + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { T::complete(self, upload_id, result).await } } #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), Error>; + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError>; - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>; + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>; - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; + async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result; } #[async_trait::async_trait(?Send)] @@ -161,24 +172,24 @@ impl QueueRepo for actix_web::web::Data where T: QueueRepo, { - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), Error> { + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { T::requeue_in_progress(self, worker_prefix).await } - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> { + async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> { T::push(self, queue, job).await } - async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result { + async fn pop(&self, queue: &'static str, worker_id: Vec) -> Result { T::pop(self, queue, worker_id).await } } #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo: BaseRepo { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error>; - async fn get(&self, key: &'static str) -> Result, Error>; - async fn remove(&self, key: &'static str) -> Result<(), Error>; + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>; + async fn get(&self, key: &'static str) -> Result, RepoError>; + async fn remove(&self, key: &'static str) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -186,15 +197,15 @@ impl SettingsRepo for actix_web::web::Data where T: SettingsRepo, { - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> { + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { T::set(self, key, value).await } - async fn get(&self, key: &'static str) -> Result, Error> { + async fn get(&self, key: &'static str) -> Result, RepoError> { T::get(self, key).await } - async fn remove(&self, key: &'static str) -> Result<(), Error> { + async fn remove(&self, key: &'static str) -> Result<(), RepoError> { T::remove(self, key).await } } @@ -205,10 +216,10 @@ pub(crate) trait IdentifierRepo: BaseRepo { &self, identifier: &I, details: &Details, - ) -> Result<(), Error>; - async fn details(&self, identifier: &I) -> Result, Error>; + ) -> Result<(), StoreError>; + async fn details(&self, identifier: &I) -> Result, StoreError>; - async fn cleanup(&self, identifier: &I) -> Result<(), Error>; + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] @@ -220,66 +231,67 @@ where &self, identifier: &I, details: &Details, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { T::relate_details(self, identifier, details).await } - async fn details(&self, identifier: &I) -> Result, Error> { + async fn details(&self, identifier: &I) -> Result, StoreError> { T::details(self, identifier).await } - async fn cleanup(&self, identifier: &I) -> Result<(), Error> { + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { T::cleanup(self, identifier).await } } #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { - type Stream: Stream>; + type Stream: Stream>; async fn hashes(&self) -> Self::Stream; - async fn create(&self, hash: Self::Bytes) -> Result, Error>; + async fn create(&self, hash: Self::Bytes) -> Result, RepoError>; - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>; - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>; - async fn aliases(&self, hash: Self::Bytes) -> Result, Error>; + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>; + async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError>; async fn relate_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error>; - async fn identifier(&self, hash: Self::Bytes) -> Result; + ) -> Result<(), StoreError>; + async fn identifier(&self, hash: Self::Bytes) + -> Result; async fn relate_variant_identifier( &self, hash: Self::Bytes, variant: String, identifier: &I, - ) -> Result<(), Error>; + ) -> Result<(), StoreError>; async fn variant_identifier( &self, hash: Self::Bytes, variant: String, - ) -> Result, Error>; + ) -> Result, StoreError>; async fn variants( &self, hash: Self::Bytes, - ) -> Result, Error>; - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error>; + ) -> Result, StoreError>; + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; async fn relate_motion_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error>; + ) -> Result<(), StoreError>; async fn motion_identifier( &self, hash: Self::Bytes, - ) -> Result, Error>; + ) -> Result, StoreError>; - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error>; + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -293,19 +305,19 @@ where T::hashes(self).await } - async fn create(&self, hash: Self::Bytes) -> Result, Error> { + async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { T::create(self, hash).await } - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { T::relate_alias(self, hash, alias).await } - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { T::remove_alias(self, hash, alias).await } - async fn aliases(&self, hash: Self::Bytes) -> Result, Error> { + async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError> { T::aliases(self, hash).await } @@ -313,11 +325,14 @@ where &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { T::relate_identifier(self, hash, identifier).await } - async fn identifier(&self, hash: Self::Bytes) -> Result { + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result { T::identifier(self, hash).await } @@ -326,7 +341,7 @@ where hash: Self::Bytes, variant: String, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { T::relate_variant_identifier(self, hash, variant, identifier).await } @@ -334,18 +349,18 @@ where &self, hash: Self::Bytes, variant: String, - ) -> Result, Error> { + ) -> Result, StoreError> { T::variant_identifier(self, hash, variant).await } async fn variants( &self, hash: Self::Bytes, - ) -> Result, Error> { + ) -> Result, StoreError> { T::variants(self, hash).await } - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> { + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { T::remove_variant(self, hash, variant).await } @@ -353,37 +368,37 @@ where &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { T::relate_motion_identifier(self, hash, identifier).await } async fn motion_identifier( &self, hash: Self::Bytes, - ) -> Result, Error> { + ) -> Result, StoreError> { T::motion_identifier(self, hash).await } - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> { + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { T::cleanup(self, hash).await } } #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo: BaseRepo { - async fn create(&self, alias: &Alias) -> Result, Error>; + async fn create(&self, alias: &Alias) -> Result, RepoError>; async fn relate_delete_token( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, Error>; - async fn delete_token(&self, alias: &Alias) -> Result; + ) -> Result, RepoError>; + async fn delete_token(&self, alias: &Alias) -> Result; - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error>; - async fn hash(&self, alias: &Alias) -> Result; + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>; + async fn hash(&self, alias: &Alias) -> Result; - async fn cleanup(&self, alias: &Alias) -> Result<(), Error>; + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -391,7 +406,7 @@ impl AliasRepo for actix_web::web::Data where T: AliasRepo, { - async fn create(&self, alias: &Alias) -> Result, Error> { + async fn create(&self, alias: &Alias) -> Result, RepoError> { T::create(self, alias).await } @@ -399,23 +414,23 @@ where &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, Error> { + ) -> Result, RepoError> { T::relate_delete_token(self, alias, delete_token).await } - async fn delete_token(&self, alias: &Alias) -> Result { + async fn delete_token(&self, alias: &Alias) -> Result { T::delete_token(self, alias).await } - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> { + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> { T::relate_hash(self, alias, hash).await } - async fn hash(&self, alias: &Alias) -> Result { + async fn hash(&self, alias: &Alias) -> Result { T::hash(self, alias).await } - async fn cleanup(&self, alias: &Alias) -> Result<(), Error> { + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { T::cleanup(self, alias).await } } @@ -836,14 +851,14 @@ impl std::fmt::Display for Alias { } impl Identifier for Vec { - fn from_bytes(bytes: Vec) -> Result + fn from_bytes(bytes: Vec) -> Result where Self: Sized, { Ok(bytes) } - fn to_bytes(&self) -> Result, Error> { + fn to_bytes(&self) -> Result, StoreError> { Ok(self.clone()) } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 347e348..ed47afa 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,10 +1,10 @@ use crate::{ - error::{Error, UploadError}, repo::{ Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, + store::StoreError, stream::from_iterator, }; use futures_util::Stream; @@ -19,6 +19,8 @@ use std::{ }; use tokio::sync::Notify; +use super::RepoError; + macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -27,7 +29,10 @@ macro_rules! b { actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr)) .await - .map_err(SledError::from)?? + .map_err(SledError::from) + .map_err(RepoError::from)? + .map_err(SledError::from) + .map_err(RepoError::from)? }}; } @@ -97,12 +102,12 @@ impl BaseRepo for SledRepo { #[async_trait::async_trait(?Send)] impl FullRepo for SledRepo { - async fn health_check(&self) -> Result<(), Error> { + async fn health_check(&self) -> Result<(), RepoError> { let next = self.healthz_count.fetch_add(1, Ordering::Relaxed); b!(self.healthz, { healthz.insert("healthz", &next.to_be_bytes()[..]) }); - self.healthz.flush_async().await?; + self.healthz.flush_async().await.map_err(SledError::from)?; b!(self.healthz, healthz.get("healthz")); Ok(()) } @@ -146,13 +151,13 @@ impl From for UploadResult { #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, upload_id: UploadId) -> Result<(), Error> { + async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> { b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); Ok(()) } #[tracing::instrument(skip(self))] - async fn wait(&self, upload_id: UploadId) -> Result { + async fn wait(&self, upload_id: UploadId) -> Result { let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); let bytes = upload_id.as_bytes().to_vec(); @@ -160,40 +165,42 @@ impl UploadRepo for SledRepo { if let Some(bytes) = opt { if bytes != b"1" { - let result: InnerUploadResult = serde_json::from_slice(&bytes)?; + let result: InnerUploadResult = + serde_json::from_slice(&bytes).map_err(SledError::from)?; return Ok(result.into()); } } else { - return Err(UploadError::AlreadyClaimed.into()); + return Err(RepoError::AlreadyClaimed); } while let Some(event) = (&mut subscriber).await { match event { sled::Event::Remove { .. } => { - return Err(UploadError::AlreadyClaimed.into()); + return Err(RepoError::AlreadyClaimed); } sled::Event::Insert { value, .. } => { if value != b"1" { - let result: InnerUploadResult = serde_json::from_slice(&value)?; + let result: InnerUploadResult = + serde_json::from_slice(&value).map_err(SledError::from)?; return Ok(result.into()); } } } } - Err(UploadError::Canceled.into()) + Err(RepoError::Canceled) } #[tracing::instrument(level = "trace", skip(self))] - async fn claim(&self, upload_id: UploadId) -> Result<(), Error> { + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { b!(self.uploads, uploads.remove(upload_id.as_bytes())); Ok(()) } #[tracing::instrument(level = "trace", skip(self, result))] - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> { + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { let result: InnerUploadResult = result.into(); - let result = serde_json::to_vec(&result)?; + let result = serde_json::to_vec(&result).map_err(SledError::from)?; b!(self.uploads, uploads.insert(upload_id.as_bytes(), result)); @@ -204,7 +211,7 @@ impl UploadRepo for SledRepo { #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] - async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), Error> { + async fn requeue_in_progress(&self, worker_prefix: Vec) -> Result<(), RepoError> { let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, { let vec = in_progress_queue .scan_prefix(worker_prefix) @@ -229,7 +236,7 @@ impl QueueRepo for SledRepo { }) .collect::>(); - Ok(vec) as Result<_, Error> + Ok(vec) as Result<_, SledError> }); let db = self.db.clone(); @@ -242,15 +249,15 @@ impl QueueRepo for SledRepo { queue.insert(key, job)?; } - Ok(()) as Result<(), Error> + Ok(()) as Result<(), SledError> }); Ok(()) } #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] - async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> { - let id = self.db.generate_id()?; + async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { + let id = self.db.generate_id().map_err(SledError::from)?; let mut key = queue_name.as_bytes().to_vec(); key.extend(id.to_be_bytes()); @@ -276,7 +283,7 @@ impl QueueRepo for SledRepo { &self, queue_name: &'static str, worker_id: Vec, - ) -> Result { + ) -> Result { loop { let in_progress_queue = self.in_progress_queue.clone(); @@ -333,21 +340,21 @@ impl QueueRepo for SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { #[tracing::instrument(level = "trace", skip(value))] - async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> { + async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> { b!(self.settings, settings.insert(key, value)); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn get(&self, key: &'static str) -> Result, Error> { + async fn get(&self, key: &'static str) -> Result, RepoError> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } #[tracing::instrument(level = "trace", skip(self))] - async fn remove(&self, key: &'static str) -> Result<(), Error> { + async fn remove(&self, key: &'static str) -> Result<(), RepoError> { b!(self.settings, settings.remove(key)); Ok(()) @@ -374,9 +381,11 @@ impl IdentifierRepo for SledRepo { &self, identifier: &I, details: &Details, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { let key = identifier.to_bytes()?; - let details = serde_json::to_vec(&details)?; + let details = serde_json::to_vec(&details) + .map_err(SledError::from) + .map_err(RepoError::from)?; b!( self.identifier_details, @@ -387,18 +396,20 @@ impl IdentifierRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn details(&self, identifier: &I) -> Result, Error> { + async fn details(&self, identifier: &I) -> Result, StoreError> { let key = identifier.to_bytes()?; let opt = b!(self.identifier_details, identifier_details.get(key)); opt.map(|ivec| serde_json::from_slice(&ivec)) .transpose() - .map_err(Error::from) + .map_err(SledError::from) + .map_err(RepoError::from) + .map_err(StoreError::from) } #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn cleanup(&self, identifier: &I) -> Result<(), Error> { + async fn cleanup(&self, identifier: &I) -> Result<(), StoreError> { let key = identifier.to_bytes()?; b!(self.identifier_details, identifier_details.remove(key)); @@ -407,7 +418,7 @@ impl IdentifierRepo for SledRepo { } } -type StreamItem = Result; +type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>; fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { @@ -425,13 +436,13 @@ impl HashRepo for SledRepo { .hashes .iter() .keys() - .map(|res| res.map_err(Error::from)); + .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); Box::pin(from_iterator(iter, 8)) } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn create(&self, hash: Self::Bytes) -> Result, Error> { + async fn create(&self, hash: Self::Bytes) -> Result, RepoError> { let res = b!(self.hashes, { let hash2 = hash.clone(); hashes.compare_and_swap(hash, None as Option, Some(hash2)) @@ -441,7 +452,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { let key = hash_alias_key(&hash, alias); let value = alias.to_bytes(); @@ -451,7 +462,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> { let key = hash_alias_key(&hash, alias); b!(self.hash_aliases, hash_aliases.remove(key)); @@ -460,7 +471,7 @@ impl HashRepo for SledRepo { } #[tracing::instrument(skip_all)] - async fn aliases(&self, hash: Self::Bytes) -> Result, Error> { + async fn aliases(&self, hash: Self::Bytes) -> Result, RepoError> { let v = b!(self.hash_aliases, { Ok(hash_aliases .scan_prefix(hash) @@ -478,7 +489,7 @@ impl HashRepo for SledRepo { &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { let bytes = identifier.to_bytes()?; b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes)); @@ -487,11 +498,15 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn identifier(&self, hash: Self::Bytes) -> Result { + async fn identifier( + &self, + hash: Self::Bytes, + ) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); opt.ok_or(SledError::Missing) - .map_err(Error::from) + .map_err(RepoError::from) + .map_err(StoreError::from) .and_then(|ivec| I::from_bytes(ivec.to_vec())) } @@ -501,7 +516,7 @@ impl HashRepo for SledRepo { hash: Self::Bytes, variant: String, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { let key = variant_key(&hash, &variant); let value = identifier.to_bytes()?; @@ -518,7 +533,7 @@ impl HashRepo for SledRepo { &self, hash: Self::Bytes, variant: String, - ) -> Result, Error> { + ) -> Result, StoreError> { let key = variant_key(&hash, &variant); let opt = b!( @@ -526,16 +541,14 @@ impl HashRepo for SledRepo { hash_variant_identifiers.get(key) ); - opt.map(|ivec| I::from_bytes(ivec.to_vec())) - .transpose() - .map_err(Error::from) + opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn variants( &self, hash: Self::Bytes, - ) -> Result, Error> { + ) -> Result, StoreError> { let vec = b!( self.hash_variant_identifiers, Ok(hash_variant_identifiers @@ -557,14 +570,14 @@ impl HashRepo for SledRepo { Some((variant?, identifier?)) }) - .collect::>()) as Result, sled::Error> + .collect::>()) as Result, SledError> ); Ok(vec) } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> { + async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { let key = variant_key(&hash, &variant); b!( @@ -580,7 +593,7 @@ impl HashRepo for SledRepo { &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { let bytes = identifier.to_bytes()?; b!( @@ -595,19 +608,17 @@ impl HashRepo for SledRepo { async fn motion_identifier( &self, hash: Self::Bytes, - ) -> Result, Error> { + ) -> Result, StoreError> { let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) ); - opt.map(|ivec| I::from_bytes(ivec.to_vec())) - .transpose() - .map_err(Error::from) + opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> { + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { let hash2 = hash.clone(); b!(self.hashes, hashes.remove(hash2)); @@ -628,7 +639,7 @@ impl HashRepo for SledRepo { let _ = hash_aliases.remove(key); } - Ok(()) as Result<(), sled::Error> + Ok(()) as Result<(), SledError> }); let variant_keys = b!(self.hash_variant_identifiers, { @@ -638,13 +649,13 @@ impl HashRepo for SledRepo { .filter_map(Result::ok) .collect::>(); - Ok(v) as Result, sled::Error> + Ok(v) as Result, SledError> }); b!(self.hash_variant_identifiers, { for key in variant_keys { let _ = hash_variant_identifiers.remove(key); } - Ok(()) as Result<(), sled::Error> + Ok(()) as Result<(), SledError> }); Ok(()) @@ -654,7 +665,7 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create(&self, alias: &Alias) -> Result, Error> { + async fn create(&self, alias: &Alias) -> Result, RepoError> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -671,7 +682,7 @@ impl AliasRepo for SledRepo { &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, Error> { + ) -> Result, RepoError> { let key = alias.to_bytes(); let token = delete_token.to_bytes(); @@ -684,18 +695,18 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn delete_token(&self, alias: &Alias) -> Result { + async fn delete_token(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) .ok_or(SledError::Missing) - .map_err(Error::from) + .map_err(RepoError::from) } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> { + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> { let key = alias.to_bytes(); b!(self.alias_hashes, alias_hashes.insert(key, hash)); @@ -704,16 +715,16 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn hash(&self, alias: &Alias) -> Result { + async fn hash(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_hashes, alias_hashes.get(key)); - opt.ok_or(SledError::Missing).map_err(Error::from) + opt.ok_or(SledError::Missing).map_err(RepoError::from) } #[tracing::instrument(skip(self))] - async fn cleanup(&self, alias: &Alias) -> Result<(), Error> { + async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> { let key = alias.to_bytes(); let key2 = key.clone(); diff --git a/src/store.rs b/src/store.rs index 98b22b0..a15a04a 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,4 +1,3 @@ -use crate::error::Error; use actix_web::web::Bytes; use futures_util::stream::Stream; use std::fmt::Debug; @@ -7,10 +6,56 @@ use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod file_store; pub(crate) mod object_store; -pub(crate) trait Identifier: Send + Sync + Clone + Debug { - fn to_bytes(&self) -> Result, Error>; +#[derive(Debug, thiserror::Error)] +pub(crate) enum StoreError { + #[error("Error in file store")] + FileStore(#[source] crate::store::file_store::FileError), - fn from_bytes(bytes: Vec) -> Result + #[error("Error in object store")] + ObjectStore(#[source] crate::store::object_store::ObjectError), + + #[error("Error in DB")] + Repo(#[from] crate::repo::RepoError), + + #[error("Requested file is not found")] + NotFound, +} + +impl StoreError { + pub(crate) const fn is_not_found(&self) -> bool { + matches!(self, Self::NotFound) + } +} + +impl From for StoreError { + fn from(value: crate::store::file_store::FileError) -> Self { + match value { + crate::store::file_store::FileError::Io(e) + if e.kind() == std::io::ErrorKind::NotFound => + { + Self::NotFound + } + e => Self::FileStore(e), + } + } +} + +impl From for StoreError { + fn from(value: crate::store::object_store::ObjectError) -> Self { + match value { + crate::store::object_store::ObjectError::Status( + actix_web::http::StatusCode::NOT_FOUND, + _, + ) => Self::NotFound, + e => Self::ObjectStore(e), + } + } +} + +pub(crate) trait Identifier: Send + Sync + Clone + Debug { + fn to_bytes(&self) -> Result, StoreError>; + + fn from_bytes(bytes: Vec) -> Result where Self: Sized; @@ -28,22 +73,22 @@ pub(crate) trait Store: Clone + Debug { type Identifier: Identifier + 'static; type Stream: Stream> + Unpin + 'static; - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where Reader: AsyncRead + Unpin + 'static; - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, stream: S) -> Result where S: Stream> + Unpin + 'static; - async fn save_bytes(&self, bytes: Bytes) -> Result; + async fn save_bytes(&self, bytes: Bytes) -> Result; async fn to_stream( &self, identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result; + ) -> Result; async fn read_into( &self, @@ -53,9 +98,9 @@ pub(crate) trait Store: Clone + Debug { where Writer: AsyncWrite + Unpin; - async fn len(&self, identifier: &Self::Identifier) -> Result; + async fn len(&self, identifier: &Self::Identifier) -> Result; - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>; + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] @@ -66,21 +111,21 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, stream: S) -> Result where S: Stream> + Unpin + 'static, { T::save_stream(self, stream).await } - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -89,7 +134,7 @@ where identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { T::to_stream(self, identifier, from_start, len).await } @@ -104,11 +149,11 @@ where T::read_into(self, identifier, writer).await } - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { T::len(self, identifier).await } - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { T::remove(self, identifier).await } } @@ -121,21 +166,21 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, stream: S) -> Result where S: Stream> + Unpin + 'static, { T::save_stream(self, stream).await } - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -144,7 +189,7 @@ where identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { T::to_stream(self, identifier, from_start, len).await } @@ -159,11 +204,11 @@ where T::read_into(self, identifier, writer).await } - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { T::len(self, identifier).await } - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { T::remove(self, identifier).await } } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 917e6d4..b186fdf 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,5 +1,4 @@ use crate::{ - error::Error, file::File, repo::{Repo, SettingsRepo}, store::{Store, StoreConfig}, @@ -18,6 +17,8 @@ use tracing::Instrument; mod file_id; pub(crate) use file_id::FileId; +use super::StoreError; + // - Settings Tree // - last-path -> last generated path @@ -62,7 +63,10 @@ impl Store for FileStore { type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, mut reader: Reader) -> Result + async fn save_async_read( + &self, + mut reader: Reader, + ) -> Result where Reader: AsyncRead + Unpin + 'static, { @@ -76,7 +80,7 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, stream: S) -> Result where S: Stream> + Unpin + 'static, { @@ -84,7 +88,7 @@ impl Store for FileStore { } #[tracing::instrument(skip(bytes))] - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { @@ -101,14 +105,15 @@ impl Store for FileStore { identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { let path = self.path_from_file_id(identifier); let file_span = tracing::trace_span!(parent: None, "File Stream"); let file = file_span .in_scope(|| File::open(path)) .instrument(file_span.clone()) - .await?; + .await + .map_err(FileError::from)?; let stream = file_span .in_scope(|| file.read_to_stream(from_start, len)) @@ -135,16 +140,19 @@ impl Store for FileStore { } #[tracing::instrument] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { let path = self.path_from_file_id(identifier); - let len = tokio::fs::metadata(path).await?.len(); + let len = tokio::fs::metadata(path) + .await + .map_err(FileError::from)? + .len(); Ok(len) } #[tracing::instrument] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { let path = self.path_from_file_id(identifier); self.safe_remove_file(path).await?; @@ -154,7 +162,7 @@ impl Store for FileStore { } impl FileStore { - pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result { + pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result { let path_gen = init_generator(&repo).await?; Ok(FileStore { @@ -164,7 +172,7 @@ impl FileStore { }) } - async fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); match self.repo { @@ -183,7 +191,7 @@ impl FileStore { Ok(target_path) } - async fn next_file(&self) -> Result { + async fn next_file(&self) -> Result { let target_path = self.next_directory().await?; let filename = uuid::Uuid::new_v4().to_string(); @@ -271,12 +279,13 @@ pub(crate) async fn safe_create_parent>(path: P) -> Result<(), Fi Ok(()) } -async fn init_generator(repo: &Repo) -> Result { +async fn init_generator(repo: &Repo) -> Result { match repo { Repo::Sled(sled_repo) => { if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + storage_path_generator::Path::from_be_bytes(ivec.to_vec()) + .map_err(FileError::from)?, )) } else { Ok(Generator::new()) diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs index dfb776f..63d484b 100644 --- a/src/store/file_store/file_id.rs +++ b/src/store/file_store/file_id.rs @@ -1,9 +1,6 @@ -use crate::{ - error::Error, - store::{ - file_store::{FileError, FileStore}, - Identifier, - }, +use crate::store::{ + file_store::{FileError, FileStore}, + Identifier, StoreError, }; use std::path::PathBuf; @@ -11,7 +8,7 @@ use std::path::PathBuf; pub(crate) struct FileId(PathBuf); impl Identifier for FileId { - fn to_bytes(&self) -> Result, Error> { + fn to_bytes(&self) -> Result, StoreError> { let vec = self .0 .to_str() @@ -22,7 +19,7 @@ impl Identifier for FileId { Ok(vec) } - fn from_bytes(bytes: Vec) -> Result + fn from_bytes(bytes: Vec) -> Result where Self: Sized, { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 7a483de..d687206 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,6 +1,5 @@ use crate::{ bytes_stream::BytesStream, - error::Error, repo::{Repo, SettingsRepo}, store::{Store, StoreConfig}, }; @@ -27,6 +26,8 @@ use url::Url; mod object_id; pub(crate) use object_id::ObjectId; +use super::StoreError; + const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); // - Settings Tree @@ -42,6 +43,9 @@ pub(crate) enum ObjectError { #[error("Failed to generate request")] S3(#[from] BucketError), + #[error("IO Error")] + IO(#[from] std::io::Error), + #[error("Error making request")] SendRequest(String), @@ -62,6 +66,9 @@ pub(crate) enum ObjectError { #[error("Invalid status: {0}\n{1}")] Status(StatusCode, String), + + #[error("Unable to upload image")] + Upload(awc::error::PayloadError), } impl From for ObjectError { @@ -131,7 +138,7 @@ fn payload_to_io_error(e: PayloadError) -> std::io::Error { } #[tracing::instrument(skip(stream))] -async fn read_chunk(stream: &mut S) -> std::io::Result +async fn read_chunk(stream: &mut S) -> Result where S: Stream> + Unpin + 'static, { @@ -148,9 +155,9 @@ where Ok(buf) } -async fn status_error(mut response: ClientResponse) -> Error { +async fn status_error(mut response: ClientResponse) -> StoreError { let body = match response.body().await { - Err(e) => return e.into(), + Err(e) => return ObjectError::Upload(e).into(), Ok(body) => body, }; @@ -164,7 +171,7 @@ impl Store for ObjectStore { type Identifier = ObjectId; type Stream = Pin>>>; - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where Reader: AsyncRead + Unpin + 'static, { @@ -172,7 +179,7 @@ impl Store for ObjectStore { } #[tracing::instrument(skip_all)] - async fn save_stream(&self, mut stream: S) -> Result + async fn save_stream(&self, mut stream: S) -> Result where S: Stream> + Unpin + 'static, { @@ -181,7 +188,10 @@ impl Store for ObjectStore { if first_chunk.len() < CHUNK_SIZE { drop(stream); let (req, object_id) = self.put_object_request().await?; - let response = req.send_body(first_chunk).await?; + let response = req + .send_body(first_chunk) + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); @@ -199,7 +209,7 @@ impl Store for ObjectStore { return Err(status_error(response).await); } - let body = response.body().await?; + let body = response.body().await.map_err(ObjectError::Upload)?; let body: InitiateMultipartUploadResponse = quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; let upload_id = &body.upload_id; @@ -236,7 +246,8 @@ impl Store for ObjectStore { ) .await? .send_body(buf) - .await?; + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); @@ -253,7 +264,7 @@ impl Store for ObjectStore { // early-drop response to close its tracing spans drop(response); - Ok(etag) as Result + Ok(etag) as Result } .instrument(tracing::Span::current()), ); @@ -276,20 +287,22 @@ impl Store for ObjectStore { upload_id, etags.iter().map(|s| s.as_ref()), ) - .await?; + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); } - Ok(()) as Result<(), Error> + Ok(()) as Result<(), StoreError> } .await; if let Err(e) = res { self.create_abort_multipart_request(&object_id, upload_id) .send() - .await?; + .await + .map_err(ObjectError::from)?; return Err(e); } @@ -297,7 +310,7 @@ impl Store for ObjectStore { } #[tracing::instrument(skip_all)] - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; let response = req.send_body(bytes).await.map_err(ObjectError::from)?; @@ -315,7 +328,7 @@ impl Store for ObjectStore { identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { let response = self .get_object_request(identifier, from_start, len) .send() @@ -361,7 +374,7 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(self))] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { let response = self .head_object_request(identifier) .send() @@ -385,8 +398,12 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(self))] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let response = self.delete_object_request(identifier).send().await?; + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { + let response = self + .delete_object_request(identifier) + .send() + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); @@ -407,7 +424,7 @@ impl ObjectStore { secret_key: String, session_token: Option, repo: Repo, - ) -> Result { + ) -> Result { let path_gen = init_generator(&repo).await?; Ok(ObjectStoreConfig { @@ -423,7 +440,7 @@ impl ObjectStore { }) } - async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self.bucket.put_object(Some(&self.credentials), &path); @@ -435,7 +452,7 @@ impl ObjectStore { Ok((self.build_request(action), ObjectId::from_string(path))) } - async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self @@ -455,7 +472,7 @@ impl ObjectStore { object_id: &ObjectId, part_number: u16, upload_id: &str, - ) -> Result { + ) -> Result { use md5::Digest; let mut action = self.bucket.upload_part( @@ -593,7 +610,7 @@ impl ObjectStore { self.build_request(action) } - async fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); match self.repo { @@ -607,7 +624,7 @@ impl ObjectStore { Ok(path) } - async fn next_file(&self) -> Result { + async fn next_file(&self) -> Result { let path = self.next_directory().await?.to_strings().join("/"); let filename = uuid::Uuid::new_v4().to_string(); @@ -615,12 +632,13 @@ impl ObjectStore { } } -async fn init_generator(repo: &Repo) -> Result { +async fn init_generator(repo: &Repo) -> Result { match repo { Repo::Sled(sled_repo) => { if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { Ok(Generator::from_existing( - storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + storage_path_generator::Path::from_be_bytes(ivec.to_vec()) + .map_err(ObjectError::from)?, )) } else { Ok(Generator::new()) diff --git a/src/store/object_store/object_id.rs b/src/store/object_store/object_id.rs index 129d41b..512f884 100644 --- a/src/store/object_store/object_id.rs +++ b/src/store/object_store/object_id.rs @@ -1,17 +1,14 @@ -use crate::{ - error::Error, - store::{object_store::ObjectError, Identifier}, -}; +use crate::store::{object_store::ObjectError, Identifier, StoreError}; #[derive(Debug, Clone)] pub(crate) struct ObjectId(String); impl Identifier for ObjectId { - fn to_bytes(&self) -> Result, Error> { + fn to_bytes(&self) -> Result, StoreError> { Ok(self.0.as_bytes().to_vec()) } - fn from_bytes(bytes: Vec) -> Result { + fn from_bytes(bytes: Vec) -> Result { Ok(ObjectId( String::from_utf8(bytes).map_err(ObjectError::from)?, ))