diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index abaeaca..52eb27e 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -1,6 +1,6 @@ use crate::{ + details::Details, error::{Error, UploadError}, - upload_manager::Details, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; diff --git a/src/details.rs b/src/details.rs new file mode 100644 index 0000000..0730689 --- /dev/null +++ b/src/details.rs @@ -0,0 +1,66 @@ +use crate::{error::Error, magick::ValidInputType, serde_str::Serde, store::Store}; +use actix_web::web; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub(crate) struct Details { + width: usize, + height: usize, + content_type: Serde, + created_at: time::OffsetDateTime, +} + +impl Details { + pub(crate) fn is_motion(&self) -> bool { + self.content_type.type_() == "video" + || self.content_type.type_() == "image" && self.content_type.subtype() == "gif" + } + + #[tracing::instrument("Details from bytes", skip(input))] + pub(crate) async fn from_bytes( + input: web::Bytes, + hint: Option, + ) -> Result { + let details = crate::magick::details_bytes(input, hint).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) + } + + #[tracing::instrument("Details from store")] + pub(crate) async fn from_store( + store: S, + identifier: S::Identifier, + expected_format: Option, + ) -> Result + where + Error: From, + { + let details = crate::magick::details_store(store, identifier, expected_format).await?; + + Ok(Details::now( + details.width, + details.height, + details.mime_type, + )) + } + + pub(crate) fn now(width: usize, height: usize, content_type: mime::Mime) -> Self { + Details { + width, + height, + content_type: Serde::new(content_type), + created_at: time::OffsetDateTime::now_utc(), + } + } + + pub(crate) fn content_type(&self) -> mime::Mime { + (*self.content_type).clone() + } + + pub(crate) fn system_time(&self) -> std::time::SystemTime { + self.created_at.into() + } +} diff --git a/src/main.rs b/src/main.rs index 6ed8cc1..6013799 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ use tracing_futures::Instrument; mod concurrent_processor; mod config; +mod details; mod either; mod error; mod exiftool; @@ -51,13 +52,14 @@ use crate::{magick::details_hint, store::file_store::FileStore}; use self::{ concurrent_processor::CancelSafeProcessor, config::{Config, Format, Migrate}, + details::Details, either::Either, error::{Error, UploadError}, init_tracing::init_tracing, middleware::{Deadline, Internal}, migrate::LatestDb, store::Store, - upload_manager::{Details, UploadManager, UploadManagerSession}, + upload_manager::{UploadManager, UploadManagerSession}, }; const MEGABYTES: usize = 1024 * 1024; diff --git a/src/repo.rs b/src/repo.rs index 8da18f6..25e9be3 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,7 +1,9 @@ -use crate::{store::Identifier, upload_manager::Details}; +use crate::{details::Details, store::Identifier}; use futures_util::Stream; use uuid::Uuid; +pub(crate) mod sled; + pub(crate) struct Alias { id: Uuid, extension: String, @@ -11,64 +13,99 @@ pub(crate) struct DeleteToken { } pub(crate) struct AlreadyExists; +impl Alias { + fn to_bytes(&self) -> Vec { + let mut v = self.id.as_bytes().to_vec(); + + v.extend_from_slice(self.extension.as_bytes()); + + v + } + + fn from_slice(bytes: &[u8]) -> Option { + if bytes.len() > 16 { + let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length"); + let extension = String::from_utf8_lossy(&bytes[16..]).to_string(); + + Some(Self { id, extension }) + } else { + None + } + } +} + +impl DeleteToken { + fn to_bytes(&self) -> Vec { + self.id.as_bytes().to_vec() + } + + fn from_slice(bytes: &[u8]) -> Option { + Some(DeleteToken { + id: Uuid::from_slice(bytes).ok()?, + }) + } +} + #[async_trait::async_trait] pub(crate) trait SettingsRepo { + type Bytes: AsRef<[u8]> + From>; type Error: std::error::Error; - async fn set(&self, key: &'static [u8], value: Vec) -> Result<(), Self::Error>; - async fn get(&self, key: &'static [u8]) -> Result>, Self::Error>; + async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>; + async fn get(&self, key: &'static [u8]) -> Result, Self::Error>; async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>; } #[async_trait::async_trait] pub(crate) trait IdentifierRepo { - type Hash: AsRef<[u8]>; + type Bytes: AsRef<[u8]> + From>; type Error: std::error::Error; async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>; async fn details(&self, identifier: I) -> Result, Self::Error>; - async fn relate_hash(&self, identifier: I, hash: Self::Hash) -> Result<(), Self::Error>; - async fn hash(&self, identifier: I) -> Result; + async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error>; + async fn hash(&self, identifier: I) -> Result; async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>; } #[async_trait::async_trait] -pub(crate) trait HashRepo { - type Hash: AsRef<[u8]>; +pub(crate) trait HashRepo { + type Bytes: AsRef<[u8]> + From>; type Error: std::error::Error; - type Stream: Stream>; + type Stream: Stream>; async fn hashes(&self) -> Self::Stream; - async fn create(&self, hash: Self::Hash) -> Result, Self::Error>; + async fn create(&self, hash: Self::Bytes) -> Result, Self::Error>; - async fn relate_alias(&self, hash: Self::Hash, alias: Alias) -> Result<(), Self::Error>; - async fn remove_alias(&self, hash: Self::Hash, alias: Alias) -> Result<(), Self::Error>; - async fn aliases(&self, hash: Self::Hash) -> Result, Self::Error>; + async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; + async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; + async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error>; - async fn cleanup(&self, hash: Self::Hash) -> Result<(), Self::Error>; + async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>; + async fn identifier(&self, hash: Self::Bytes) -> Result; + + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>; } #[async_trait::async_trait] -pub(crate) trait AliasRepo { - type Hash: AsRef<[u8]>; +pub(crate) trait AliasRepo { + type Bytes: AsRef<[u8]> + From>; type Error: std::error::Error; async fn create(&self, alias: Alias) -> Result, Self::Error>; - async fn create_delete_token( + async fn relate_delete_token( &self, alias: Alias, - ) -> Result, Self::Error>; + delete_token: DeleteToken, + ) -> Result, Self::Error>; async fn delete_token(&self, alias: Alias) -> Result; - async fn relate_hash(&self, alias: Alias, hash: Self::Hash) -> Result<(), Self::Error>; - async fn hash(&self, alias: Alias) -> Result; - - async fn relate_identifier(&self, alias: Alias, identifier: I) -> Result<(), Self::Error>; - async fn identifier(&self, alias: Alias) -> Result; + async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>; + async fn hash(&self, alias: Alias) -> Result; async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>; } diff --git a/src/repo/sled.rs b/src/repo/sled.rs new file mode 100644 index 0000000..b543202 --- /dev/null +++ b/src/repo/sled.rs @@ -0,0 +1,398 @@ +use super::{ + Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo, + SettingsRepo, +}; +use sled::{Db, IVec, Tree}; + +macro_rules! b { + ($self:ident.$ident:ident, $expr:expr) => {{ + let $ident = $self.$ident.clone(); + + actix_rt::task::spawn_blocking(move || $expr).await?? + }}; +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error("Error in database")] + Sled(#[from] sled::Error), + + #[error("Invalid identifier")] + Identifier(#[source] Box), + + #[error("Invalid details json")] + Details(#[from] serde_json::Error), + + #[error("Required field was not present")] + Missing, + + #[error("Operation panicked")] + Panic, +} + +pub(crate) struct SledRepo { + settings: Tree, + identifier_hashes: Tree, + identifier_details: Tree, + hashes: Tree, + hash_aliases: Tree, + hash_identifiers: Tree, + aliases: Tree, + alias_hashes: Tree, + alias_delete_tokens: Tree, + _db: Db, +} + +impl SledRepo { + pub(crate) fn new(db: Db) -> Result { + Ok(SledRepo { + settings: db.open_tree("pict-rs-settings-tree")?, + identifier_hashes: db.open_tree("pict-rs-identifier-hashes-tree")?, + identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, + hashes: db.open_tree("pict-rs-hashes-tree")?, + hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?, + hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, + aliases: db.open_tree("pict-rs-aliases-tree")?, + alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, + alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, + _db: db, + }) + } +} + +#[async_trait::async_trait] +impl SettingsRepo for SledRepo { + type Bytes = IVec; + type Error = Error; + + async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error> { + b!(self.settings, settings.insert(key, value)); + + Ok(()) + } + + async fn get(&self, key: &'static [u8]) -> Result, Self::Error> { + let opt = b!(self.settings, settings.get(key)); + + Ok(opt) + } + + async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error> { + b!(self.settings, settings.remove(key)); + + Ok(()) + } +} + +fn identifier_bytes(identifier: &I) -> Result, Error> +where + I: Identifier, + I::Error: Send + 'static, +{ + identifier + .to_bytes() + .map_err(|e| Error::Identifier(Box::new(e))) +} + +#[async_trait::async_trait] +impl IdentifierRepo for SledRepo +where + I: Identifier + 'static, + I::Error: Send + 'static, +{ + type Bytes = IVec; + type Error = Error; + + async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> { + let key = identifier_bytes(&identifier)?; + + let details = serde_json::to_vec(&details)?; + + b!( + self.identifier_details, + identifier_details.insert(key, details) + ); + + Ok(()) + } + + async fn details(&self, identifier: I) -> Result, Self::Error> { + let key = identifier_bytes(&identifier)?; + + let opt = b!(self.identifier_details, identifier_details.get(key)); + + if let Some(ivec) = opt { + Ok(Some(serde_json::from_slice(&ivec)?)) + } else { + Ok(None) + } + } + + async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error> { + let key = identifier_bytes(&identifier)?; + + b!(self.identifier_hashes, identifier_hashes.insert(key, hash)); + + Ok(()) + } + + async fn hash(&self, identifier: I) -> Result { + let key = identifier_bytes(&identifier)?; + + let opt = b!(self.identifier_hashes, identifier_hashes.get(key)); + + opt.ok_or(Error::Missing) + } + + async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> { + let key = identifier_bytes(&identifier)?; + + let key2 = key.clone(); + b!(self.identifier_hashes, identifier_hashes.remove(key2)); + b!(self.identifier_details, identifier_details.remove(key)); + + Ok(()) + } +} + +type BoxIterator<'a, T> = Box + Send + 'a>; + +type HashIterator = BoxIterator<'static, Result>; + +type StreamItem = Result; + +type NextFutResult = Result<(HashIterator, Option), Error>; + +pub(crate) struct HashStream { + hashes: Option, + next_fut: Option>, +} + +impl futures_util::Stream for HashStream { + type Item = StreamItem; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + if let Some(mut fut) = this.next_fut.take() { + match fut.as_mut().poll(cx) { + std::task::Poll::Ready(Ok((iter, opt))) => { + this.hashes = Some(iter); + std::task::Poll::Ready(opt) + } + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Some(Err(e))), + std::task::Poll::Pending => { + this.next_fut = Some(fut); + std::task::Poll::Pending + } + } + } else if let Some(mut iter) = this.hashes.take() { + let fut = Box::pin(async move { + actix_rt::task::spawn_blocking(move || { + let opt = iter.next().map(|res| res.map_err(Error::from)); + + (iter, opt) + }) + .await + .map_err(Error::from) + }); + + this.next_fut = Some(fut); + std::pin::Pin::new(this).poll_next(cx) + } else { + std::task::Poll::Ready(None) + } + } +} + +fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { + let mut v = hash.to_vec(); + v.append(&mut alias.to_bytes()); + v +} + +#[async_trait::async_trait] +impl HashRepo for SledRepo +where + I: Identifier + 'static, + I::Error: Send + 'static, +{ + type Bytes = IVec; + type Error = Error; + type Stream = HashStream; + + async fn hashes(&self) -> Self::Stream { + let iter = self.hashes.iter().keys(); + + HashStream { + hashes: Some(Box::new(iter)), + next_fut: None, + } + } + + async fn create(&self, hash: Self::Bytes) -> Result, Self::Error> { + let res = b!(self.hashes, { + let hash2 = hash.clone(); + hashes.compare_and_swap(hash, None as Option, Some(hash2)) + }); + + Ok(res.map_err(|_| AlreadyExists)) + } + + async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> { + let key = hash_alias_key(&hash, &alias); + + b!( + self.hash_aliases, + hash_aliases.insert(key, alias.to_bytes()) + ); + + Ok(()) + } + + async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> { + let key = hash_alias_key(&hash, &alias); + + b!(self.hash_aliases, hash_aliases.remove(key)); + + Ok(()) + } + + async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error> { + let v = b!(self.hash_aliases, { + Ok(hash_aliases + .scan_prefix(hash) + .values() + .filter_map(Result::ok) + .filter_map(|ivec| Alias::from_slice(&ivec)) + .collect::>()) as Result<_, Error> + }); + + Ok(v) + } + + async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error> { + let bytes = identifier_bytes(&identifier)?; + + b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes)); + + Ok(()) + } + + async fn identifier(&self, hash: Self::Bytes) -> Result { + let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); + + opt.ok_or(Error::Missing).and_then(|ivec| { + I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e))) + }) + } + + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> { + let hash2 = hash.clone(); + b!(self.hashes, hashes.remove(hash2)); + + let hash2 = hash.clone(); + b!(self.hash_identifiers, hash_identifiers.remove(hash2)); + + let aliases = HashRepo::::aliases(self, hash.clone()).await?; + + b!(self.hash_aliases, { + for alias in aliases { + let key = hash_alias_key(&hash, &alias); + + let _ = hash_aliases.remove(key); + } + Ok(()) as Result<(), Error> + }); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl AliasRepo for SledRepo { + type Bytes = sled::IVec; + type Error = Error; + + async fn create(&self, alias: Alias) -> Result, Self::Error> { + let bytes = alias.to_bytes(); + let bytes2 = bytes.clone(); + + let res = b!( + self.aliases, + aliases.compare_and_swap(bytes, None as Option, Some(bytes2)) + ); + + Ok(res.map_err(|_| AlreadyExists)) + } + + async fn relate_delete_token( + &self, + alias: Alias, + delete_token: DeleteToken, + ) -> Result, Self::Error> { + let key = alias.to_bytes(); + let token = delete_token.to_bytes(); + + let res = b!( + self.alias_delete_tokens, + alias_delete_tokens.compare_and_swap(key, None as Option, Some(token)) + ); + + Ok(res.map_err(|_| AlreadyExists)) + } + + async fn delete_token(&self, alias: Alias) -> Result { + let key = alias.to_bytes(); + + let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); + + opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) + .ok_or(Error::Missing) + } + + async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error> { + let key = alias.to_bytes(); + + b!(self.alias_hashes, alias_hashes.insert(key, hash)); + + Ok(()) + } + + async fn hash(&self, alias: Alias) -> Result { + let key = alias.to_bytes(); + + let opt = b!(self.alias_hashes, alias_hashes.get(key)); + + opt.ok_or(Error::Missing) + } + + async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error> { + let key = alias.to_bytes(); + + let key2 = key.clone(); + b!(self.aliases, aliases.remove(key2)); + + let key2 = key.clone(); + b!(self.alias_delete_tokens, alias_delete_tokens.remove(key2)); + + b!(self.alias_hashes, alias_hashes.remove(key)); + + Ok(()) + } +} + +impl std::fmt::Debug for SledRepo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SledRepo").finish() + } +} + +impl From for Error { + fn from(_: actix_rt::task::JoinError) -> Self { + Error::Panic + } +} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index e934d31..7e1b7cf 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -1,10 +1,10 @@ use crate::{ config::Format, + details::Details, error::{Error, UploadError}, ffmpeg::{InputFormat, ThumbnailFormat}, - magick::{details_hint, ValidInputType}, + magick::details_hint, migrate::{alias_id_key, alias_key, alias_key_bounds}, - serde_str::Serde, store::{Identifier, Store}, }; use actix_web::web; @@ -59,14 +59,6 @@ pub(crate) struct UploadManagerInner { db: sled::Db, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct Details { - width: usize, - height: usize, - content_type: Serde, - created_at: time::OffsetDateTime, -} - struct FilenameIVec { inner: sled::IVec, } @@ -669,62 +661,6 @@ impl UploadManager { } } -impl Details { - fn is_motion(&self) -> bool { - self.content_type.type_() == "video" - || self.content_type.type_() == "image" && self.content_type.subtype() == "gif" - } - - #[tracing::instrument("Details from bytes", skip(input))] - pub(crate) async fn from_bytes( - input: web::Bytes, - hint: Option, - ) -> Result { - let details = crate::magick::details_bytes(input, hint).await?; - - Ok(Details::now( - details.width, - details.height, - details.mime_type, - )) - } - - #[tracing::instrument("Details from store")] - pub(crate) async fn from_store( - store: S, - identifier: S::Identifier, - expected_format: Option, - ) -> Result - where - Error: From, - { - let details = crate::magick::details_store(store, identifier, expected_format).await?; - - Ok(Details::now( - details.width, - details.height, - details.mime_type, - )) - } - - fn now(width: usize, height: usize, content_type: mime::Mime) -> Self { - Details { - width, - height, - content_type: Serde::new(content_type), - created_at: time::OffsetDateTime::now_utc(), - } - } - - pub(crate) fn content_type(&self) -> mime::Mime { - (*self.content_type).clone() - } - - pub(crate) fn system_time(&self) -> std::time::SystemTime { - self.created_at.into() - } -} - impl FilenameIVec { fn new(inner: sled::IVec) -> Self { FilenameIVec { inner }