use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; pub(crate) mod file_store; pub(crate) mod object_store; #[derive(Debug, thiserror::Error)] pub(crate) enum StoreError { #[error("Error in file store")] FileStore(#[source] crate::store::file_store::FileError), #[error("Error in object store")] ObjectStore(#[source] crate::store::object_store::ObjectError), #[error("Error in DB")] Repo(#[from] crate::repo::RepoError), #[error("Error in 0.4 DB")] Repo04(#[from] crate::repo_04::RepoError), #[error("Error reading bytes stream")] ReadStream(#[source] std::io::Error), #[error("Requested file is not found")] FileNotFound(#[source] std::io::Error), #[error("Requested object is not found")] ObjectNotFound(#[source] crate::store::object_store::ObjectError), } impl StoreError { pub(crate) const fn error_code(&self) -> ErrorCode { match self { Self::FileStore(e) => e.error_code(), Self::ObjectStore(e) => e.error_code(), Self::Repo(e) => e.error_code(), Self::ReadStream(_) => ErrorCode::IO_ERROR, Self::Repo04(_) => ErrorCode::OLD_REPO_ERROR, Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND, } } pub(crate) const fn is_not_found(&self) -> bool { matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) } pub(crate) const fn is_disconnected(&self) -> bool { match self { Self::Repo(e) => e.is_disconnected(), _ => false, } } } impl From for StoreError { fn from(value: crate::store::file_store::FileError) -> Self { match value { crate::store::file_store::FileError::Io(e) if e.kind() == std::io::ErrorKind::NotFound => { Self::FileNotFound(e) } e => Self::FileStore(e), } } } impl From for StoreError { fn from(value: crate::store::object_store::ObjectError) -> Self { match value { e @ crate::store::object_store::ObjectError::Status( reqwest::StatusCode::NOT_FOUND, _, _, ) => Self::ObjectNotFound(e), e => Self::ObjectStore(e), } } } pub(crate) trait Store: Clone + Debug { async fn health_check(&self) -> Result<(), StoreError>; async fn save_stream( &self, stream: S, content_type: mime::Mime, extension: Option<&str>, ) -> Result, StoreError> where S: Stream>; fn public_url(&self, _: &Arc) -> Option; async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError>; async fn to_bytes( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result { let stream = self.to_stream(identifier, from_start, len).await?; BytesStream::try_from_stream(stream) .await .map_err(StoreError::ReadStream) } async fn len(&self, identifier: &Arc) -> Result; async fn remove(&self, identifier: &Arc) -> Result<(), StoreError>; } impl Store for actix_web::web::Data where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, extension: Option<&str>, ) -> Result, StoreError> where S: Stream>, { T::save_stream(self, stream, content_type, extension).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } impl Store for Arc where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, extension: Option<&str>, ) -> Result, StoreError> where S: Stream>, { T::save_stream(self, stream, content_type, extension).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } impl<'a, T> Store for &'a T where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, extension: Option<&str>, ) -> Result, StoreError> where S: Stream>, { T::save_stream(self, stream, content_type, extension).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } }