From 3129f7844e3e9503d19caf425c2750ad29a4d0e1 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 13 Aug 2023 22:06:42 -0500 Subject: [PATCH] BROKEN: start work on hash discriminant --- src/formats.rs | 37 ++++++++++++++- src/ingest/hasher.rs | 26 ++++++---- src/repo.rs | 74 ++++++++++++++--------------- src/repo/hash.rs | 65 +++++++++++++++++++++++++ src/repo/sled.rs | 110 ++++++++++++++++++++++++++----------------- 5 files changed, 220 insertions(+), 92 deletions(-) create mode 100644 src/repo/hash.rs diff --git a/src/formats.rs b/src/formats.rs index 7826e20..3247c31 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -7,7 +7,9 @@ use std::str::FromStr; pub(crate) use animation::{AnimationFormat, AnimationOutput}; pub(crate) use image::{ImageFormat, ImageInput, ImageOutput}; -pub(crate) use video::{InternalVideoFormat, OutputVideoFormat, VideoFormat, VideoCodec, AudioCodec}; +pub(crate) use video::{ + AudioCodec, InternalVideoFormat, OutputVideoFormat, VideoCodec, VideoFormat, +}; #[derive(Clone, Debug)] pub(crate) struct Validations<'a> { @@ -75,6 +77,39 @@ impl InternalFormat { } } + pub(crate) const fn to_bytes(self) -> &'static [u8] { + match self { + Self::Animation(AnimationFormat::Apng) => b"a-apng", + Self::Animation(AnimationFormat::Avif) => b"a-avif", + Self::Animation(AnimationFormat::Gif) => b"a-gif", + Self::Animation(AnimationFormat::Webp) => b"a-webp", + Self::Image(ImageFormat::Avif) => b"i-avif", + Self::Image(ImageFormat::Jpeg) => b"i-jpeg", + Self::Image(ImageFormat::Jxl) => b"i-jxl", + Self::Image(ImageFormat::Png) => b"i-png", + Self::Image(ImageFormat::Webp) => b"i-webp", + Self::Video(InternalVideoFormat::Mp4) => b"v-mp4", + Self::Video(InternalVideoFormat::Webm) => b"v-webm", + } + } + + pub(crate) const fn from_bytes(bytes: &[u8]) -> Option { + match bytes { + b"a-apng" => Some(Self::Animation(AnimationFormat::Apng)), + b"a-avif" => Some(Self::Animation(AnimationFormat::Avif)), + b"a-gif" => Some(Self::Animation(AnimationFormat::Gif)), + b"a-webp" => Some(Self::Animation(AnimationFormat::Webp)), + b"i-avif" => Some(Self::Image(ImageFormat::Avif)), + b"i-jpeg" => Some(Self::Image(ImageFormat::Jpeg)), + b"i-jxl" => Some(Self::Image(ImageFormat::Jxl)), + b"i-png" => Some(Self::Image(ImageFormat::Png)), + b"i-webp" => Some(Self::Image(ImageFormat::Webp)), + b"v-mp4" => Some(Self::Video(InternalVideoFormat::Mp4)), + b"v-webm" => Some(Self::Video(InternalVideoFormat::Webm)), + _ => None, + } + } + pub(crate) fn maybe_from_media_type(mime: &mime::Mime, has_frames: bool) -> Option { match (mime.type_(), mime.subtype().as_str(), has_frames) { (mime::IMAGE, "apng", _) => Some(Self::Animation(AnimationFormat::Apng)), diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 811600b..6123e57 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -7,12 +7,17 @@ use std::{ }; use tokio::io::{AsyncRead, ReadBuf}; +struct State { + hasher: D, + size: u64, +} + pin_project_lite::pin_project! { pub(crate) struct Hasher { #[pin] inner: I, - hasher: Rc>, + state: Rc>>, } } @@ -23,12 +28,15 @@ where pub(super) fn new(reader: I, digest: D) -> Self { Hasher { inner: reader, - hasher: Rc::new(RefCell::new(digest)), + state: Rc::new(RefCell::new(State { + hasher: digest, + size: 0, + })), } } - pub(super) fn hasher(&self) -> Rc> { - Rc::clone(&self.hasher) + pub(super) fn state(&self) -> Rc>> { + Rc::clone(&self.state) } } @@ -45,15 +53,15 @@ where let this = self.as_mut().project(); let reader = this.inner; - let hasher = this.hasher; + let state = this.state; let before_len = buf.filled().len(); let poll_res = reader.poll_read(cx, buf); let after_len = buf.filled().len(); if after_len > before_len { - hasher - .borrow_mut() - .update(&buf.filled()[before_len..after_len]); + let mut guard = state.borrow_mut(); + guard.hasher.update(&buf.filled()[before_len..after_len]); + guard.size += u64::try_from(after_len - before_len).expect("Size is reasonable"); } poll_res } @@ -93,7 +101,7 @@ mod test { tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; - Ok(reader.hasher().borrow_mut().finalize_reset().to_vec()) as std::io::Result<_> + Ok(reader.state().borrow_mut().hasher.finalize_reset().to_vec()) as std::io::Result<_> }) .unwrap(); diff --git a/src/repo.rs b/src/repo.rs index 69d446c..3640f57 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -8,8 +8,11 @@ use std::fmt::Debug; use url::Url; use uuid::Uuid; +mod hash; pub(crate) mod sled; +pub(crate) use hash::Hash; + #[derive(Clone, Debug)] pub(crate) enum Repo { Sled(self::sled::SledRepo), @@ -207,17 +210,16 @@ where pub(crate) trait VariantAccessRepo: BaseRepo { type VariantAccessStream: Stream>; - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; - async fn contains_variant(&self, hash: Self::Bytes, variant: String) - -> Result; + async fn contains_variant(&self, hash: Hash, variant: String) -> Result; async fn older_variants( &self, timestamp: time::OffsetDateTime, ) -> Result; - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -227,15 +229,11 @@ where { type VariantAccessStream = T::VariantAccessStream; - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::accessed(self, hash, variant).await } - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { + async fn contains_variant(&self, hash: Hash, variant: String) -> Result { T::contains_variant(self, hash, variant).await } @@ -246,7 +244,7 @@ where T::older_variants(self, timestamp).await } - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::remove_access(self, hash, variant).await } } @@ -436,7 +434,7 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { - type Stream: Stream>; + type Stream: Stream>; async fn size(&self) -> Result; @@ -444,49 +442,49 @@ pub(crate) trait HashRepo: BaseRepo { async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError>; async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError>; async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError>; async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError>; async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>; + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError>; async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError>; - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>; + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -506,7 +504,7 @@ where async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError> { T::create(self, hash, identifier).await @@ -514,7 +512,7 @@ where async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { T::update_identifier(self, hash, identifier).await @@ -522,14 +520,14 @@ where async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::identifier(self, hash).await } async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError> { @@ -538,7 +536,7 @@ where async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError> { T::variant_identifier(self, hash, variant).await @@ -546,18 +544,18 @@ where async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::variants(self, hash).await } - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::remove_variant(self, hash, variant).await } async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { T::relate_motion_identifier(self, hash, identifier).await @@ -565,12 +563,12 @@ where async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { T::motion_identifier(self, hash).await } - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { T::cleanup(self, hash).await } } @@ -581,14 +579,14 @@ pub(crate) trait AliasRepo: BaseRepo { &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError>; async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; - async fn hash(&self, alias: &Alias) -> Result, RepoError>; + async fn hash(&self, alias: &Alias) -> Result, RepoError>; - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn for_hash(&self, hash: Hash) -> Result, RepoError>; async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>; } @@ -602,7 +600,7 @@ where &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError> { T::create(self, alias, delete_token, hash).await } @@ -611,11 +609,11 @@ where T::delete_token(self, alias).await } - async fn hash(&self, alias: &Alias) -> Result, RepoError> { + async fn hash(&self, alias: &Alias) -> Result, RepoError> { T::hash(self, alias).await } - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn for_hash(&self, hash: Hash) -> Result, RepoError> { T::for_hash(self, hash).await } diff --git a/src/repo/hash.rs b/src/repo/hash.rs new file mode 100644 index 0000000..91c12e3 --- /dev/null +++ b/src/repo/hash.rs @@ -0,0 +1,65 @@ +use crate::formats::InternalFormat; +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) struct Hash { + hash: Arc<[u8; 32]>, + size: u64, + format: InternalFormat, +} + +impl Hash { + pub(crate) fn new(hash: Arc<[u8; 32]>, size: u64, format: InternalFormat) -> Self { + Self { hash, format, size } + } + + pub(super) fn to_bytes(&self) -> Vec { + let format = self.format.to_bytes(); + + let mut vec = Vec::with_capacity(32 + 8 + format.len()); + + vec.extend_from_slice(&self.hash[..]); + vec.extend(self.size.to_be_bytes()); + vec.extend(format); + + vec + } + + pub(super) fn to_ivec(&self) -> sled::IVec { + sled::IVec::from(self.to_bytes()) + } + + pub(super) fn from_ivec(ivec: sled::IVec) -> Option { + Self::from_bytes(&ivec) + } + + pub(super) fn from_bytes(bytes: &[u8]) -> Option { + if bytes.len() < 32 + 8 + 5 { + return None; + } + + let hash = &bytes[..32]; + let size = &bytes[32..40]; + let format = &bytes[40..]; + + let hash: [u8; 32] = hash.try_into().expect("Correct length"); + let size: [u8; 8] = size.try_into().expect("Correct length"); + let format = InternalFormat::from_bytes(format)?; + + Some(Self { + hash: Arc::new(hash), + size: u64::from_be_bytes(size), + format, + }) + } +} + +impl std::fmt::Debug for Hash { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Hash") + .field("hash", &hex::encode(&*self.hash)) + .field("format", &self.format) + .field("size", &self.size) + .finish() + } +} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 986a75b..10c851f 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,7 +1,7 @@ use crate::{ details::MaybeHumanDate, repo::{ - Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, + hash::Hash, Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, MigrationRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, @@ -405,8 +405,9 @@ impl AliasAccessRepo for SledRepo { impl VariantAccessRepo for SledRepo { type VariantAccessStream = VariantAccessStream; - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn accessed(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let now_string = time::OffsetDateTime::now_utc() @@ -428,12 +429,9 @@ impl VariantAccessRepo for SledRepo { .map_err(RepoError::from) } - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn contains_variant( - &self, - hash: Self::Bytes, - variant: String, - ) -> Result { + #[tracing::instrument(level = "debug", skip(self))] + async fn contains_variant(&self, hash: Hash, variant: String) -> Result { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let timestamp = b!(self.variant_access, variant_access.get(key)); @@ -465,8 +463,9 @@ impl VariantAccessRepo for SledRepo { }) } - #[tracing::instrument(level = "debug", skip_all, fields(hash = %hex::encode(&hash), variant = %variant))] - async fn remove_access(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "debug", skip(self))] + async fn remove_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); let key = variant_access_key(&hash, &variant); let variant_access = self.variant_access.clone(); @@ -1012,7 +1011,7 @@ impl MigrationRepo for SledRepo { } } -type StreamItem = Result; +type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>; #[async_trait::async_trait(?Send)] @@ -1028,19 +1027,20 @@ impl HashRepo for SledRepo { } async fn hashes(&self) -> Self::Stream { - let iter = self - .hashes - .iter() - .keys() - .map(|res| res.map_err(SledError::from).map_err(RepoError::from)); + let iter = self.hashes.iter().keys().filter_map(|res| { + res.map_err(SledError::from) + .map_err(RepoError::from) + .map(Hash::from_ivec) + .transpose() + }); Box::pin(from_iterator(iter, 8)) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn create( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result, StoreError> { let identifier: sled::IVec = identifier.to_bytes()?.into(); @@ -1048,14 +1048,16 @@ impl HashRepo for SledRepo { let hashes = self.hashes.clone(); let hash_identifiers = self.hash_identifiers.clone(); + let hash = hash.to_ivec(); + let res = actix_web::web::block(move || { (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { - if hashes.get(&hash)?.is_some() { + if hashes.get(hash.clone())?.is_some() { return Ok(Err(HashAlreadyExists)); } - hashes.insert(&hash, &hash)?; - hash_identifiers.insert(&hash, &identifier)?; + hashes.insert(hash.clone(), hash.clone())?; + hash_identifiers.insert(hash.clone(), &identifier)?; Ok(Ok(())) }) @@ -1073,11 +1075,13 @@ impl HashRepo for SledRepo { async fn update_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { let identifier = identifier.to_bytes()?; + let hash = hash.to_ivec(); + b!( self.hash_identifiers, hash_identifiers.insert(hash, identifier) @@ -1086,11 +1090,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { return Ok(None); }; @@ -1098,13 +1104,15 @@ impl HashRepo for SledRepo { Ok(Some(I::from_bytes(ivec.to_vec())?)) } - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, identifier: &I, ) -> Result<(), StoreError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); let value = identifier.to_bytes()?; @@ -1116,12 +1124,14 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn variant_identifier( &self, - hash: Self::Bytes, + hash: Hash, variant: String, ) -> Result, StoreError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); let opt = b!( @@ -1132,15 +1142,17 @@ impl HashRepo for SledRepo { opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } - #[tracing::instrument(level = "debug", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "debug", skip(self))] async fn variants( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let vec = b!( self.hash_variant_identifiers, Ok(hash_variant_identifiers - .scan_prefix(&hash) + .scan_prefix(hash.clone()) .filter_map(|res| res.ok()) .filter_map(|(key, ivec)| { let identifier = I::from_bytes(ivec.to_vec()).ok(); @@ -1164,8 +1176,10 @@ impl HashRepo for SledRepo { Ok(vec) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> { + #[tracing::instrument(level = "trace", skip(self))] + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let hash = hash.to_bytes(); + let key = variant_key(&hash, &variant); b!( @@ -1176,12 +1190,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, identifier: &I, ) -> Result<(), StoreError> { + let hash = hash.to_ivec(); let bytes = identifier.to_bytes()?; b!( @@ -1192,11 +1207,13 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(level = "trace", skip(self))] async fn motion_identifier( &self, - hash: Self::Bytes, + hash: Hash, ) -> Result, StoreError> { + let hash = hash.to_ivec(); + let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) @@ -1205,8 +1222,10 @@ impl HashRepo for SledRepo { opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> { + #[tracing::instrument(skip(self))] + async fn cleanup(&self, hash: Hash) -> Result<(), RepoError> { + let hash = hash.to_ivec(); + let hashes = self.hashes.clone(); let hash_identifiers = self.hash_identifiers.clone(); let hash_motion_identifiers = self.hash_motion_identifiers.clone(); @@ -1274,8 +1293,9 @@ impl AliasRepo for SledRepo { &self, alias: &Alias, delete_token: &DeleteToken, - hash: Self::Bytes, + hash: Hash, ) -> Result, RepoError> { + let hash = hash.to_ivec(); let alias: sled::IVec = alias.to_bytes().into(); let delete_token: sled::IVec = delete_token.to_bytes().into(); @@ -1328,16 +1348,18 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self))] - async fn hash(&self, alias: &Alias) -> Result, RepoError> { + async fn hash(&self, alias: &Alias) -> Result, RepoError> { let key = alias.to_bytes(); let opt = b!(self.alias_hashes, alias_hashes.get(key)); - Ok(opt) + Ok(opt.and_then(Hash::from_ivec)) } #[tracing::instrument(skip_all)] - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn for_hash(&self, hash: Hash) -> Result, RepoError> { + let hash = hash.to_ivec(); + let v = b!(self.hash_aliases, { Ok(hash_aliases .scan_prefix(hash)