use crate::{ error_code::ErrorCode, file::File, repo::{Repo, SettingsRepo}, store::Store, }; use actix_web::web::Bytes; use futures_core::Stream; use std::{ path::{Path, PathBuf}, pin::Pin, }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; use tracing::Instrument; mod file_id; pub(crate) use file_id::FileId; use super::StoreError; // - Settings Tree // - last-path -> last generated path const GENERATOR_KEY: &str = "last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum FileError { #[error("Failed to read or write file")] Io(#[from] std::io::Error), #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), #[error("Error formatting file store ID")] IdError, #[error("Malformed file store ID")] PrefixError, #[error("Tried to save over existing file")] FileExists, } impl FileError { pub(super) const fn error_code(&self) -> ErrorCode { match self { Self::Io(_) => ErrorCode::FILE_IO_ERROR, Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR, Self::FileExists => ErrorCode::FILE_EXISTS, Self::IdError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR, } } } #[derive(Clone)] pub(crate) struct FileStore { path_gen: Generator, root_dir: PathBuf, repo: Repo, } #[async_trait::async_trait(?Send)] impl Store for FileStore { type Identifier = FileId; type Stream = Pin>>>; async fn health_check(&self) -> Result<(), StoreError> { tokio::fs::metadata(&self.root_dir) .await .map_err(FileError::from)?; Ok(()) } #[tracing::instrument(skip(reader))] async fn save_async_read( &self, mut reader: Reader, _content_type: mime::Mime, ) -> Result where Reader: AsyncRead + Unpin + 'static, { let path = self.next_file().await?; if let Err(e) = self.safe_save_reader(&path, &mut reader).await { self.safe_remove_file(&path).await?; return Err(e.into()); } Ok(self.file_id_from_path(path)?) } async fn save_stream( &self, stream: S, content_type: mime::Mime, ) -> Result where S: Stream> + Unpin + 'static, { self.save_async_read(StreamReader::new(stream), content_type) .await } #[tracing::instrument(skip(bytes))] async fn save_bytes( &self, bytes: Bytes, _content_type: mime::Mime, ) -> Result { let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { self.safe_remove_file(&path).await?; return Err(e.into()); } Ok(self.file_id_from_path(path)?) } fn public_url(&self, _identifier: &Self::Identifier) -> Option { None } #[tracing::instrument] async fn to_stream( &self, identifier: &Self::Identifier, from_start: Option, len: Option, ) -> Result { let path = self.path_from_file_id(identifier); let file_span = tracing::trace_span!(parent: None, "File Stream"); let file = file_span .in_scope(|| File::open(path)) .instrument(file_span.clone()) .await .map_err(FileError::from)?; let stream = file_span .in_scope(|| file.read_to_stream(from_start, len)) .instrument(file_span) .await?; Ok(Box::pin(stream)) } #[tracing::instrument(skip(writer))] async fn read_into( &self, identifier: &Self::Identifier, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin, { let path = self.path_from_file_id(identifier); File::open(&path).await?.read_to_async_write(writer).await?; Ok(()) } #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { let path = self.path_from_file_id(identifier); let len = tokio::fs::metadata(path) .await .map_err(FileError::from)? .len(); Ok(len) } #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { let path = self.path_from_file_id(identifier); self.safe_remove_file(path).await?; Ok(()) } } impl FileStore { #[tracing::instrument(skip(repo))] pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> color_eyre::Result { let path_gen = init_generator(&repo).await?; tokio::fs::create_dir_all(&root_dir).await?; Ok(FileStore { root_dir, path_gen, repo, }) } async fn next_directory(&self) -> Result { let path = self.path_gen.next(); match self.repo { Repo::Sled(ref sled_repo) => { sled_repo .set(GENERATOR_KEY, path.to_be_bytes().into()) .await?; } } let mut target_path = self.root_dir.clone(); for dir in path.to_strings() { target_path.push(dir) } Ok(target_path) } async fn next_file(&self) -> Result { let target_path = self.next_directory().await?; let filename = uuid::Uuid::new_v4().to_string(); Ok(target_path.join(filename)) } async fn safe_remove_file>(&self, path: P) -> Result<(), FileError> { tokio::fs::remove_file(&path).await?; self.try_remove_parents(path.as_ref()).await; Ok(()) } async fn try_remove_parents(&self, mut path: &Path) { while let Some(parent) = path.parent() { if parent.ends_with(&self.root_dir) { return; } if tokio::fs::remove_dir(parent).await.is_err() { return; } path = parent; } } // Try writing to a file async fn safe_save_bytes>( &self, path: P, bytes: Bytes, ) -> Result<(), FileError> { safe_create_parent(&path).await?; // Only write the file if it doesn't already exist if let Err(e) = tokio::fs::metadata(&path).await { if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { return Ok(()); } // Open the file for writing let mut file = File::create(&path).await?; // try writing if let Err(e) = file.write_from_bytes(bytes).await { // remove file if writing failed before completion self.safe_remove_file(path).await?; return Err(e.into()); } Ok(()) } async fn safe_save_reader>( &self, to: P, input: &mut (impl AsyncRead + Unpin + ?Sized), ) -> Result<(), FileError> { safe_create_parent(&to).await?; if let Err(e) = tokio::fs::metadata(&to).await { if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { return Err(FileError::FileExists); } let mut file = File::create(to).await?; file.write_from_async_read(input).await?; Ok(()) } } pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { if let Some(path) = path.as_ref().parent() { tokio::fs::create_dir_all(path).await?; } Ok(()) } async fn init_generator(repo: &Repo) -> Result { match repo { Repo::Sled(sled_repo) => { if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { Ok(Generator::from_existing( storage_path_generator::Path::from_be_bytes(ivec.to_vec()) .map_err(FileError::from)?, )) } else { Ok(Generator::new()) } } } } impl std::fmt::Debug for FileStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FileStore") .field("path_gen", &"generator") .field("root_dir", &self.root_dir) .finish() } }