mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 11:21:24 +00:00
Implement sled repo
This commit is contained in:
parent
a0c99d05eb
commit
d6567fbbbd
6 changed files with 530 additions and 91 deletions
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
upload_manager::Details,
|
||||
};
|
||||
use actix_web::web;
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
|
|
66
src/details.rs
Normal file
66
src/details.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
use crate::{error::Error, magick::ValidInputType, serde_str::Serde, store::Store};
|
||||
use actix_web::web;
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Details {
|
||||
width: usize,
|
||||
height: usize,
|
||||
content_type: Serde<mime::Mime>,
|
||||
created_at: time::OffsetDateTime,
|
||||
}
|
||||
|
||||
impl Details {
|
||||
pub(crate) fn is_motion(&self) -> bool {
|
||||
self.content_type.type_() == "video"
|
||||
|| self.content_type.type_() == "image" && self.content_type.subtype() == "gif"
|
||||
}
|
||||
|
||||
#[tracing::instrument("Details from bytes", skip(input))]
|
||||
pub(crate) async fn from_bytes(
|
||||
input: web::Bytes,
|
||||
hint: Option<ValidInputType>,
|
||||
) -> Result<Self, Error> {
|
||||
let details = crate::magick::details_bytes(input, hint).await?;
|
||||
|
||||
Ok(Details::now(
|
||||
details.width,
|
||||
details.height,
|
||||
details.mime_type,
|
||||
))
|
||||
}
|
||||
|
||||
#[tracing::instrument("Details from store")]
|
||||
pub(crate) async fn from_store<S: Store>(
|
||||
store: S,
|
||||
identifier: S::Identifier,
|
||||
expected_format: Option<ValidInputType>,
|
||||
) -> Result<Self, Error>
|
||||
where
|
||||
Error: From<S::Error>,
|
||||
{
|
||||
let details = crate::magick::details_store(store, identifier, expected_format).await?;
|
||||
|
||||
Ok(Details::now(
|
||||
details.width,
|
||||
details.height,
|
||||
details.mime_type,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn now(width: usize, height: usize, content_type: mime::Mime) -> Self {
|
||||
Details {
|
||||
width,
|
||||
height,
|
||||
content_type: Serde::new(content_type),
|
||||
created_at: time::OffsetDateTime::now_utc(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn content_type(&self) -> mime::Mime {
|
||||
(*self.content_type).clone()
|
||||
}
|
||||
|
||||
pub(crate) fn system_time(&self) -> std::time::SystemTime {
|
||||
self.created_at.into()
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ use tracing_futures::Instrument;
|
|||
|
||||
mod concurrent_processor;
|
||||
mod config;
|
||||
mod details;
|
||||
mod either;
|
||||
mod error;
|
||||
mod exiftool;
|
||||
|
@ -51,13 +52,14 @@ use crate::{magick::details_hint, store::file_store::FileStore};
|
|||
use self::{
|
||||
concurrent_processor::CancelSafeProcessor,
|
||||
config::{Config, Format, Migrate},
|
||||
details::Details,
|
||||
either::Either,
|
||||
error::{Error, UploadError},
|
||||
init_tracing::init_tracing,
|
||||
middleware::{Deadline, Internal},
|
||||
migrate::LatestDb,
|
||||
store::Store,
|
||||
upload_manager::{Details, UploadManager, UploadManagerSession},
|
||||
upload_manager::{UploadManager, UploadManagerSession},
|
||||
};
|
||||
|
||||
const MEGABYTES: usize = 1024 * 1024;
|
||||
|
|
83
src/repo.rs
83
src/repo.rs
|
@ -1,7 +1,9 @@
|
|||
use crate::{store::Identifier, upload_manager::Details};
|
||||
use crate::{details::Details, store::Identifier};
|
||||
use futures_util::Stream;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub(crate) mod sled;
|
||||
|
||||
pub(crate) struct Alias {
|
||||
id: Uuid,
|
||||
extension: String,
|
||||
|
@ -11,64 +13,99 @@ pub(crate) struct DeleteToken {
|
|||
}
|
||||
pub(crate) struct AlreadyExists;
|
||||
|
||||
impl Alias {
|
||||
fn to_bytes(&self) -> Vec<u8> {
|
||||
let mut v = self.id.as_bytes().to_vec();
|
||||
|
||||
v.extend_from_slice(self.extension.as_bytes());
|
||||
|
||||
v
|
||||
}
|
||||
|
||||
fn from_slice(bytes: &[u8]) -> Option<Self> {
|
||||
if bytes.len() > 16 {
|
||||
let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length");
|
||||
let extension = String::from_utf8_lossy(&bytes[16..]).to_string();
|
||||
|
||||
Some(Self { id, extension })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DeleteToken {
|
||||
fn to_bytes(&self) -> Vec<u8> {
|
||||
self.id.as_bytes().to_vec()
|
||||
}
|
||||
|
||||
fn from_slice(bytes: &[u8]) -> Option<Self> {
|
||||
Some(DeleteToken {
|
||||
id: Uuid::from_slice(bytes).ok()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait SettingsRepo {
|
||||
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
|
||||
type Error: std::error::Error;
|
||||
|
||||
async fn set(&self, key: &'static [u8], value: Vec<u8>) -> Result<(), Self::Error>;
|
||||
async fn get(&self, key: &'static [u8]) -> Result<Option<Vec<u8>>, Self::Error>;
|
||||
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>;
|
||||
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error>;
|
||||
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait IdentifierRepo<I: Identifier> {
|
||||
type Hash: AsRef<[u8]>;
|
||||
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
|
||||
type Error: std::error::Error;
|
||||
|
||||
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>;
|
||||
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error>;
|
||||
|
||||
async fn relate_hash(&self, identifier: I, hash: Self::Hash) -> Result<(), Self::Error>;
|
||||
async fn hash(&self, identifier: I) -> Result<Self::Hash, Self::Error>;
|
||||
async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error>;
|
||||
async fn hash(&self, identifier: I) -> Result<Self::Bytes, Self::Error>;
|
||||
|
||||
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait HashRepo {
|
||||
type Hash: AsRef<[u8]>;
|
||||
pub(crate) trait HashRepo<I: Identifier> {
|
||||
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
|
||||
type Error: std::error::Error;
|
||||
type Stream: Stream<Item = Result<Self::Hash, Self::Error>>;
|
||||
type Stream: Stream<Item = Result<Self::Bytes, Self::Error>>;
|
||||
|
||||
async fn hashes(&self) -> Self::Stream;
|
||||
|
||||
async fn create(&self, hash: Self::Hash) -> Result<Result<(), AlreadyExists>, Self::Error>;
|
||||
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error>;
|
||||
|
||||
async fn relate_alias(&self, hash: Self::Hash, alias: Alias) -> Result<(), Self::Error>;
|
||||
async fn remove_alias(&self, hash: Self::Hash, alias: Alias) -> Result<(), Self::Error>;
|
||||
async fn aliases(&self, hash: Self::Hash) -> Result<Vec<Alias>, Self::Error>;
|
||||
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
|
||||
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
|
||||
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error>;
|
||||
|
||||
async fn cleanup(&self, hash: Self::Hash) -> Result<(), Self::Error>;
|
||||
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>;
|
||||
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error>;
|
||||
|
||||
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait AliasRepo<I: Identifier> {
|
||||
type Hash: AsRef<[u8]>;
|
||||
pub(crate) trait AliasRepo {
|
||||
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
|
||||
type Error: std::error::Error;
|
||||
|
||||
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
|
||||
|
||||
async fn create_delete_token(
|
||||
async fn relate_delete_token(
|
||||
&self,
|
||||
alias: Alias,
|
||||
) -> Result<Result<DeleteToken, AlreadyExists>, Self::Error>;
|
||||
delete_token: DeleteToken,
|
||||
) -> Result<Result<(), AlreadyExists>, Self::Error>;
|
||||
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error>;
|
||||
|
||||
async fn relate_hash(&self, alias: Alias, hash: Self::Hash) -> Result<(), Self::Error>;
|
||||
async fn hash(&self, alias: Alias) -> Result<Self::Hash, Self::Error>;
|
||||
|
||||
async fn relate_identifier(&self, alias: Alias, identifier: I) -> Result<(), Self::Error>;
|
||||
async fn identifier(&self, alias: Alias) -> Result<I, Self::Error>;
|
||||
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
|
||||
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error>;
|
||||
|
||||
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
|
398
src/repo/sled.rs
Normal file
398
src/repo/sled.rs
Normal file
|
@ -0,0 +1,398 @@
|
|||
use super::{
|
||||
Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo,
|
||||
SettingsRepo,
|
||||
};
|
||||
use sled::{Db, IVec, Tree};
|
||||
|
||||
macro_rules! b {
|
||||
($self:ident.$ident:ident, $expr:expr) => {{
|
||||
let $ident = $self.$ident.clone();
|
||||
|
||||
actix_rt::task::spawn_blocking(move || $expr).await??
|
||||
}};
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum Error {
|
||||
#[error("Error in database")]
|
||||
Sled(#[from] sled::Error),
|
||||
|
||||
#[error("Invalid identifier")]
|
||||
Identifier(#[source] Box<dyn std::error::Error + Send>),
|
||||
|
||||
#[error("Invalid details json")]
|
||||
Details(#[from] serde_json::Error),
|
||||
|
||||
#[error("Required field was not present")]
|
||||
Missing,
|
||||
|
||||
#[error("Operation panicked")]
|
||||
Panic,
|
||||
}
|
||||
|
||||
pub(crate) struct SledRepo {
|
||||
settings: Tree,
|
||||
identifier_hashes: Tree,
|
||||
identifier_details: Tree,
|
||||
hashes: Tree,
|
||||
hash_aliases: Tree,
|
||||
hash_identifiers: Tree,
|
||||
aliases: Tree,
|
||||
alias_hashes: Tree,
|
||||
alias_delete_tokens: Tree,
|
||||
_db: Db,
|
||||
}
|
||||
|
||||
impl SledRepo {
|
||||
pub(crate) fn new(db: Db) -> Result<Self, Error> {
|
||||
Ok(SledRepo {
|
||||
settings: db.open_tree("pict-rs-settings-tree")?,
|
||||
identifier_hashes: db.open_tree("pict-rs-identifier-hashes-tree")?,
|
||||
identifier_details: db.open_tree("pict-rs-identifier-details-tree")?,
|
||||
hashes: db.open_tree("pict-rs-hashes-tree")?,
|
||||
hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?,
|
||||
hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?,
|
||||
aliases: db.open_tree("pict-rs-aliases-tree")?,
|
||||
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
|
||||
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
|
||||
_db: db,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SettingsRepo for SledRepo {
|
||||
type Bytes = IVec;
|
||||
type Error = Error;
|
||||
|
||||
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error> {
|
||||
b!(self.settings, settings.insert(key, value));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error> {
|
||||
let opt = b!(self.settings, settings.get(key));
|
||||
|
||||
Ok(opt)
|
||||
}
|
||||
|
||||
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error> {
|
||||
b!(self.settings, settings.remove(key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn identifier_bytes<I>(identifier: &I) -> Result<Vec<u8>, Error>
|
||||
where
|
||||
I: Identifier,
|
||||
I::Error: Send + 'static,
|
||||
{
|
||||
identifier
|
||||
.to_bytes()
|
||||
.map_err(|e| Error::Identifier(Box::new(e)))
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<I> IdentifierRepo<I> for SledRepo
|
||||
where
|
||||
I: Identifier + 'static,
|
||||
I::Error: Send + 'static,
|
||||
{
|
||||
type Bytes = IVec;
|
||||
type Error = Error;
|
||||
|
||||
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> {
|
||||
let key = identifier_bytes(&identifier)?;
|
||||
|
||||
let details = serde_json::to_vec(&details)?;
|
||||
|
||||
b!(
|
||||
self.identifier_details,
|
||||
identifier_details.insert(key, details)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error> {
|
||||
let key = identifier_bytes(&identifier)?;
|
||||
|
||||
let opt = b!(self.identifier_details, identifier_details.get(key));
|
||||
|
||||
if let Some(ivec) = opt {
|
||||
Ok(Some(serde_json::from_slice(&ivec)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error> {
|
||||
let key = identifier_bytes(&identifier)?;
|
||||
|
||||
b!(self.identifier_hashes, identifier_hashes.insert(key, hash));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn hash(&self, identifier: I) -> Result<Self::Bytes, Self::Error> {
|
||||
let key = identifier_bytes(&identifier)?;
|
||||
|
||||
let opt = b!(self.identifier_hashes, identifier_hashes.get(key));
|
||||
|
||||
opt.ok_or(Error::Missing)
|
||||
}
|
||||
|
||||
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> {
|
||||
let key = identifier_bytes(&identifier)?;
|
||||
|
||||
let key2 = key.clone();
|
||||
b!(self.identifier_hashes, identifier_hashes.remove(key2));
|
||||
b!(self.identifier_details, identifier_details.remove(key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type BoxIterator<'a, T> = Box<dyn std::iter::Iterator<Item = T> + Send + 'a>;
|
||||
|
||||
type HashIterator = BoxIterator<'static, Result<IVec, sled::Error>>;
|
||||
|
||||
type StreamItem = Result<IVec, Error>;
|
||||
|
||||
type NextFutResult = Result<(HashIterator, Option<StreamItem>), Error>;
|
||||
|
||||
pub(crate) struct HashStream {
|
||||
hashes: Option<HashIterator>,
|
||||
next_fut: Option<futures_util::future::LocalBoxFuture<'static, NextFutResult>>,
|
||||
}
|
||||
|
||||
impl futures_util::Stream for HashStream {
|
||||
type Item = StreamItem;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if let Some(mut fut) = this.next_fut.take() {
|
||||
match fut.as_mut().poll(cx) {
|
||||
std::task::Poll::Ready(Ok((iter, opt))) => {
|
||||
this.hashes = Some(iter);
|
||||
std::task::Poll::Ready(opt)
|
||||
}
|
||||
std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
|
||||
std::task::Poll::Pending => {
|
||||
this.next_fut = Some(fut);
|
||||
std::task::Poll::Pending
|
||||
}
|
||||
}
|
||||
} else if let Some(mut iter) = this.hashes.take() {
|
||||
let fut = Box::pin(async move {
|
||||
actix_rt::task::spawn_blocking(move || {
|
||||
let opt = iter.next().map(|res| res.map_err(Error::from));
|
||||
|
||||
(iter, opt)
|
||||
})
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
});
|
||||
|
||||
this.next_fut = Some(fut);
|
||||
std::pin::Pin::new(this).poll_next(cx)
|
||||
} else {
|
||||
std::task::Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
|
||||
let mut v = hash.to_vec();
|
||||
v.append(&mut alias.to_bytes());
|
||||
v
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<I> HashRepo<I> for SledRepo
|
||||
where
|
||||
I: Identifier + 'static,
|
||||
I::Error: Send + 'static,
|
||||
{
|
||||
type Bytes = IVec;
|
||||
type Error = Error;
|
||||
type Stream = HashStream;
|
||||
|
||||
async fn hashes(&self) -> Self::Stream {
|
||||
let iter = self.hashes.iter().keys();
|
||||
|
||||
HashStream {
|
||||
hashes: Some(Box::new(iter)),
|
||||
next_fut: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error> {
|
||||
let res = b!(self.hashes, {
|
||||
let hash2 = hash.clone();
|
||||
hashes.compare_and_swap(hash, None as Option<Self::Bytes>, Some(hash2))
|
||||
});
|
||||
|
||||
Ok(res.map_err(|_| AlreadyExists))
|
||||
}
|
||||
|
||||
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> {
|
||||
let key = hash_alias_key(&hash, &alias);
|
||||
|
||||
b!(
|
||||
self.hash_aliases,
|
||||
hash_aliases.insert(key, alias.to_bytes())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> {
|
||||
let key = hash_alias_key(&hash, &alias);
|
||||
|
||||
b!(self.hash_aliases, hash_aliases.remove(key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error> {
|
||||
let v = b!(self.hash_aliases, {
|
||||
Ok(hash_aliases
|
||||
.scan_prefix(hash)
|
||||
.values()
|
||||
.filter_map(Result::ok)
|
||||
.filter_map(|ivec| Alias::from_slice(&ivec))
|
||||
.collect::<Vec<_>>()) as Result<_, Error>
|
||||
});
|
||||
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error> {
|
||||
let bytes = identifier_bytes(&identifier)?;
|
||||
|
||||
b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error> {
|
||||
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
|
||||
|
||||
opt.ok_or(Error::Missing).and_then(|ivec| {
|
||||
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))
|
||||
})
|
||||
}
|
||||
|
||||
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> {
|
||||
let hash2 = hash.clone();
|
||||
b!(self.hashes, hashes.remove(hash2));
|
||||
|
||||
let hash2 = hash.clone();
|
||||
b!(self.hash_identifiers, hash_identifiers.remove(hash2));
|
||||
|
||||
let aliases = HashRepo::<I>::aliases(self, hash.clone()).await?;
|
||||
|
||||
b!(self.hash_aliases, {
|
||||
for alias in aliases {
|
||||
let key = hash_alias_key(&hash, &alias);
|
||||
|
||||
let _ = hash_aliases.remove(key);
|
||||
}
|
||||
Ok(()) as Result<(), Error>
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl AliasRepo for SledRepo {
|
||||
type Bytes = sled::IVec;
|
||||
type Error = Error;
|
||||
|
||||
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error> {
|
||||
let bytes = alias.to_bytes();
|
||||
let bytes2 = bytes.clone();
|
||||
|
||||
let res = b!(
|
||||
self.aliases,
|
||||
aliases.compare_and_swap(bytes, None as Option<Self::Bytes>, Some(bytes2))
|
||||
);
|
||||
|
||||
Ok(res.map_err(|_| AlreadyExists))
|
||||
}
|
||||
|
||||
async fn relate_delete_token(
|
||||
&self,
|
||||
alias: Alias,
|
||||
delete_token: DeleteToken,
|
||||
) -> Result<Result<(), AlreadyExists>, Self::Error> {
|
||||
let key = alias.to_bytes();
|
||||
let token = delete_token.to_bytes();
|
||||
|
||||
let res = b!(
|
||||
self.alias_delete_tokens,
|
||||
alias_delete_tokens.compare_and_swap(key, None as Option<Self::Bytes>, Some(token))
|
||||
);
|
||||
|
||||
Ok(res.map_err(|_| AlreadyExists))
|
||||
}
|
||||
|
||||
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error> {
|
||||
let key = alias.to_bytes();
|
||||
|
||||
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
|
||||
|
||||
opt.and_then(|ivec| DeleteToken::from_slice(&ivec))
|
||||
.ok_or(Error::Missing)
|
||||
}
|
||||
|
||||
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error> {
|
||||
let key = alias.to_bytes();
|
||||
|
||||
b!(self.alias_hashes, alias_hashes.insert(key, hash));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error> {
|
||||
let key = alias.to_bytes();
|
||||
|
||||
let opt = b!(self.alias_hashes, alias_hashes.get(key));
|
||||
|
||||
opt.ok_or(Error::Missing)
|
||||
}
|
||||
|
||||
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error> {
|
||||
let key = alias.to_bytes();
|
||||
|
||||
let key2 = key.clone();
|
||||
b!(self.aliases, aliases.remove(key2));
|
||||
|
||||
let key2 = key.clone();
|
||||
b!(self.alias_delete_tokens, alias_delete_tokens.remove(key2));
|
||||
|
||||
b!(self.alias_hashes, alias_hashes.remove(key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SledRepo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SledRepo").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<actix_rt::task::JoinError> for Error {
|
||||
fn from(_: actix_rt::task::JoinError) -> Self {
|
||||
Error::Panic
|
||||
}
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
use crate::{
|
||||
config::Format,
|
||||
details::Details,
|
||||
error::{Error, UploadError},
|
||||
ffmpeg::{InputFormat, ThumbnailFormat},
|
||||
magick::{details_hint, ValidInputType},
|
||||
magick::details_hint,
|
||||
migrate::{alias_id_key, alias_key, alias_key_bounds},
|
||||
serde_str::Serde,
|
||||
store::{Identifier, Store},
|
||||
};
|
||||
use actix_web::web;
|
||||
|
@ -59,14 +59,6 @@ pub(crate) struct UploadManagerInner {
|
|||
db: sled::Db,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub(crate) struct Details {
|
||||
width: usize,
|
||||
height: usize,
|
||||
content_type: Serde<mime::Mime>,
|
||||
created_at: time::OffsetDateTime,
|
||||
}
|
||||
|
||||
struct FilenameIVec {
|
||||
inner: sled::IVec,
|
||||
}
|
||||
|
@ -669,62 +661,6 @@ impl UploadManager {
|
|||
}
|
||||
}
|
||||
|
||||
impl Details {
|
||||
fn is_motion(&self) -> bool {
|
||||
self.content_type.type_() == "video"
|
||||
|| self.content_type.type_() == "image" && self.content_type.subtype() == "gif"
|
||||
}
|
||||
|
||||
#[tracing::instrument("Details from bytes", skip(input))]
|
||||
pub(crate) async fn from_bytes(
|
||||
input: web::Bytes,
|
||||
hint: Option<ValidInputType>,
|
||||
) -> Result<Self, Error> {
|
||||
let details = crate::magick::details_bytes(input, hint).await?;
|
||||
|
||||
Ok(Details::now(
|
||||
details.width,
|
||||
details.height,
|
||||
details.mime_type,
|
||||
))
|
||||
}
|
||||
|
||||
#[tracing::instrument("Details from store")]
|
||||
pub(crate) async fn from_store<S: Store>(
|
||||
store: S,
|
||||
identifier: S::Identifier,
|
||||
expected_format: Option<ValidInputType>,
|
||||
) -> Result<Self, Error>
|
||||
where
|
||||
Error: From<S::Error>,
|
||||
{
|
||||
let details = crate::magick::details_store(store, identifier, expected_format).await?;
|
||||
|
||||
Ok(Details::now(
|
||||
details.width,
|
||||
details.height,
|
||||
details.mime_type,
|
||||
))
|
||||
}
|
||||
|
||||
fn now(width: usize, height: usize, content_type: mime::Mime) -> Self {
|
||||
Details {
|
||||
width,
|
||||
height,
|
||||
content_type: Serde::new(content_type),
|
||||
created_at: time::OffsetDateTime::now_utc(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn content_type(&self) -> mime::Mime {
|
||||
(*self.content_type).clone()
|
||||
}
|
||||
|
||||
pub(crate) fn system_time(&self) -> std::time::SystemTime {
|
||||
self.created_at.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl FilenameIVec {
|
||||
fn new(inner: sled::IVec) -> Self {
|
||||
FilenameIVec { inner }
|
||||
|
|
Loading…
Reference in a new issue