2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-11-01 10:09:57 +00:00
pict-rs/src/store/file_store.rs

330 lines
8.8 KiB
Rust
Raw Normal View History

use crate::{
error_code::ErrorCode, file::File, repo::ArcRepo, store::Store, stream::LocalBoxStream,
};
2021-10-23 04:48:56 +00:00
use actix_web::web::Bytes;
2023-08-23 16:59:42 +00:00
use futures_core::Stream;
2021-10-23 04:48:56 +00:00
use std::{
path::{Path, PathBuf},
sync::Arc,
2021-10-23 04:48:56 +00:00
};
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite};
2022-09-24 22:18:53 +00:00
use tokio_util::io::StreamReader;
use tracing::Instrument;
2021-10-23 04:48:56 +00:00
use super::StoreError;
2021-10-23 04:48:56 +00:00
// - Settings Tree
// - last-path -> last generated path
const GENERATOR_KEY: &str = "last-path";
2021-10-23 04:48:56 +00:00
#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
2022-03-26 21:49:23 +00:00
#[error("Failed to read or write file")]
2021-10-23 04:48:56 +00:00
Io(#[from] std::io::Error),
2022-03-26 21:49:23 +00:00
#[error("Failed to generate path")]
2021-10-23 04:48:56 +00:00
PathGenerator(#[from] storage_path_generator::PathError),
2023-09-03 23:21:46 +00:00
#[error("Couldn't strip root dir")]
PrefixError,
#[error("Couldn't convert Path to String")]
StringError,
2021-10-23 04:48:56 +00:00
#[error("Tried to save over existing file")]
FileExists,
}
2023-09-02 01:50:10 +00:00
impl FileError {
pub(super) const fn error_code(&self) -> ErrorCode {
match self {
Self::Io(_) => ErrorCode::FILE_IO_ERROR,
Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR,
Self::FileExists => ErrorCode::FILE_EXISTS,
2023-09-03 23:21:46 +00:00
Self::StringError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR,
2023-09-02 01:50:10 +00:00
}
}
}
2021-10-23 04:48:56 +00:00
#[derive(Clone)]
pub(crate) struct FileStore {
path_gen: Generator,
root_dir: PathBuf,
2023-09-02 16:52:55 +00:00
repo: ArcRepo,
2021-10-23 04:48:56 +00:00
}
#[async_trait::async_trait(?Send)]
impl Store for FileStore {
2023-09-03 23:21:46 +00:00
#[tracing::instrument(level = "DEBUG", skip(self))]
2023-07-07 17:05:42 +00:00
async fn health_check(&self) -> Result<(), StoreError> {
tokio::fs::metadata(&self.root_dir)
.await
.map_err(FileError::from)?;
Ok(())
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self, reader))]
async fn save_async_read<Reader>(
&self,
mut reader: Reader,
_content_type: mime::Mime,
) -> Result<Arc<str>, StoreError>
2021-10-23 04:48:56 +00:00
where
2022-09-24 22:18:53 +00:00
Reader: AsyncRead + Unpin + 'static,
2021-10-23 04:48:56 +00:00
{
2022-03-26 21:49:23 +00:00
let path = self.next_file().await?;
2021-10-23 04:48:56 +00:00
2022-09-24 22:18:53 +00:00
if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
2021-10-23 04:48:56 +00:00
self.safe_remove_file(&path).await?;
return Err(e.into());
2021-10-23 04:48:56 +00:00
}
Ok(self.file_id_from_path(path)?)
2021-10-23 04:48:56 +00:00
}
async fn save_stream<S>(
&self,
stream: S,
content_type: mime::Mime,
) -> Result<Arc<str>, StoreError>
2022-09-24 22:18:53 +00:00
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
self.save_async_read(StreamReader::new(stream), content_type)
.await
2022-09-24 22:18:53 +00:00
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self, bytes))]
async fn save_bytes(
&self,
bytes: Bytes,
_content_type: mime::Mime,
) -> Result<Arc<str>, StoreError> {
2022-03-26 21:49:23 +00:00
let path = self.next_file().await?;
2021-10-23 04:48:56 +00:00
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
self.safe_remove_file(&path).await?;
return Err(e.into());
2021-10-23 04:48:56 +00:00
}
Ok(self.file_id_from_path(path)?)
2021-10-23 04:48:56 +00:00
}
fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> {
2023-07-14 20:23:07 +00:00
None
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self))]
2021-10-23 04:48:56 +00:00
async fn to_stream(
&self,
identifier: &Arc<str>,
2021-10-23 04:48:56 +00:00
from_start: Option<u64>,
len: Option<u64>,
) -> Result<LocalBoxStream<'static, std::io::Result<Bytes>>, StoreError> {
2021-10-23 04:48:56 +00:00
let path = self.path_from_file_id(identifier);
2022-04-07 02:40:49 +00:00
let file_span = tracing::trace_span!(parent: None, "File Stream");
let file = file_span
.in_scope(|| File::open(path))
.instrument(file_span.clone())
.await
.map_err(FileError::from)?;
2022-04-07 02:40:49 +00:00
let stream = file_span
.in_scope(|| file.read_to_stream(from_start, len))
.instrument(file_span)
2021-10-23 04:48:56 +00:00
.await?;
Ok(Box::pin(stream))
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self, writer))]
2021-10-23 04:48:56 +00:00
async fn read_into<Writer>(
&self,
identifier: &Arc<str>,
2021-10-23 04:48:56 +00:00
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
2022-09-24 22:18:53 +00:00
Writer: AsyncWrite + Unpin,
2021-10-23 04:48:56 +00:00
{
let path = self.path_from_file_id(identifier);
File::open(&path).await?.read_to_async_write(writer).await?;
Ok(())
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self))]
async fn len(&self, identifier: &Arc<str>) -> Result<u64, StoreError> {
2021-10-23 04:48:56 +00:00
let path = self.path_from_file_id(identifier);
let len = tokio::fs::metadata(path)
.await
.map_err(FileError::from)?
.len();
2021-10-23 04:48:56 +00:00
Ok(len)
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(self))]
async fn remove(&self, identifier: &Arc<str>) -> Result<(), StoreError> {
2021-10-23 04:48:56 +00:00
let path = self.path_from_file_id(identifier);
self.safe_remove_file(path).await?;
Ok(())
}
}
impl FileStore {
#[tracing::instrument(skip(repo))]
2023-09-02 16:52:55 +00:00
pub(crate) async fn build(root_dir: PathBuf, repo: ArcRepo) -> color_eyre::Result<Self> {
2022-03-26 21:49:23 +00:00
let path_gen = init_generator(&repo).await?;
2021-10-23 04:48:56 +00:00
tokio::fs::create_dir_all(&root_dir).await?;
2023-07-07 17:05:42 +00:00
2021-10-23 04:48:56 +00:00
Ok(FileStore {
root_dir,
path_gen,
2022-03-26 21:49:23 +00:00
repo,
2021-10-23 04:48:56 +00:00
})
}
fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, FileError> {
2023-09-03 23:21:46 +00:00
path.strip_prefix(&self.root_dir)
.map_err(|_| FileError::PrefixError)?
.to_str()
.ok_or(FileError::StringError)
.map(Into::into)
}
fn path_from_file_id(&self, file_id: &Arc<str>) -> PathBuf {
self.root_dir.join(file_id.as_ref())
}
async fn next_directory(&self) -> Result<PathBuf, StoreError> {
2021-10-23 04:48:56 +00:00
let path = self.path_gen.next();
2023-09-02 16:52:55 +00:00
self.repo
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
2021-10-23 04:48:56 +00:00
2022-03-25 23:47:50 +00:00
let mut target_path = self.root_dir.clone();
2021-10-23 04:48:56 +00:00
for dir in path.to_strings() {
target_path.push(dir)
}
Ok(target_path)
}
async fn next_file(&self) -> Result<PathBuf, StoreError> {
2022-03-26 21:49:23 +00:00
let target_path = self.next_directory().await?;
let filename = uuid::Uuid::new_v4().to_string();
2021-10-23 04:48:56 +00:00
Ok(target_path.join(filename))
}
2023-09-03 23:21:46 +00:00
#[tracing::instrument(level = "DEBUG", skip(self, path), fields(path = ?path.as_ref()))]
2021-10-23 04:48:56 +00:00
async fn safe_remove_file<P: AsRef<Path>>(&self, path: P) -> Result<(), FileError> {
tokio::fs::remove_file(&path).await?;
self.try_remove_parents(path.as_ref()).await;
Ok(())
}
async fn try_remove_parents(&self, mut path: &Path) {
while let Some(parent) = path.parent() {
if parent.ends_with(&self.root_dir) {
return;
}
if tokio::fs::remove_dir(parent).await.is_err() {
return;
}
path = parent;
}
}
// Try writing to a file
async fn safe_save_bytes<P: AsRef<Path>>(
&self,
path: P,
bytes: Bytes,
) -> Result<(), FileError> {
safe_create_parent(&path).await?;
// Only write the file if it doesn't already exist
if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Ok(());
}
// Open the file for writing
let mut file = File::create(&path).await?;
// try writing
if let Err(e) = file.write_from_bytes(bytes).await {
// remove file if writing failed before completion
self.safe_remove_file(path).await?;
return Err(e.into());
}
Ok(())
}
async fn safe_save_reader<P: AsRef<Path>>(
&self,
to: P,
input: &mut (impl AsyncRead + Unpin + ?Sized),
) -> Result<(), FileError> {
safe_create_parent(&to).await?;
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(FileError::FileExists);
}
let mut file = File::create(to).await?;
file.write_from_async_read(input).await?;
Ok(())
}
}
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {
if let Some(path) = path.as_ref().parent() {
tokio::fs::create_dir_all(path).await?;
}
Ok(())
}
2023-09-02 16:52:55 +00:00
async fn init_generator(repo: &ArcRepo) -> Result<Generator, StoreError> {
if let Some(ivec) = repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec()).map_err(FileError::from)?,
))
} else {
Ok(Generator::new())
2021-10-23 04:48:56 +00:00
}
}
impl std::fmt::Debug for FileStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileStore")
2022-03-22 02:43:38 +00:00
.field("path_gen", &"generator")
2021-10-23 04:48:56 +00:00
.field("root_dir", &self.root_dir)
.finish()
}
}