diff --git a/src/lib.rs b/src/lib.rs index ad02a69..caee89c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -575,6 +575,80 @@ async fn do_download_backgrounded( }))) } +#[derive(Debug, serde::Deserialize)] +struct PageQuery { + slug: Option, + limit: Option, +} + +#[derive(serde::Serialize)] +struct PageJson { + limit: usize, + + #[serde(skip_serializing_if = "Option::is_none")] + prev: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + next: Option, + + hashes: Vec, +} + +#[derive(serde::Serialize)] +struct HashJson { + hex: String, + aliases: Vec, + details: Option
, +} + +/// Get a page of hashes +#[tracing::instrument(name = "Hash Page", skip(repo))] +async fn page( + repo: web::Data, + web::Query(PageQuery { slug, limit }): web::Query, +) -> Result { + let limit = limit.unwrap_or(20); + + let page = repo.hash_page(slug, limit).await?; + + let mut hashes = Vec::with_capacity(page.hashes.len()); + + for hash in &page.hashes { + let hex = hash.to_hex(); + let aliases = repo + .for_hash(hash.clone()) + .await? + .into_iter() + .map(|a| a.to_string()) + .collect(); + + let identifier = repo.identifier(hash.clone()).await?; + let details = if let Some(identifier) = identifier { + repo.details(&identifier).await? + } else { + None + }; + + hashes.push(HashJson { + hex, + aliases, + details, + }); + } + + let page = PageJson { + limit: page.limit, + prev: page.prev(), + next: page.next(), + hashes, + }; + + Ok(HttpResponse::Ok().json(serde_json::json!({ + "msg": "ok", + "page": page, + }))) +} + /// Delete aliases and files #[tracing::instrument(name = "Deleting file", skip(repo, config))] async fn delete( @@ -1558,6 +1632,7 @@ fn configure_endpoints( .service(web::resource("/aliases").route(web::get().to(aliases))) .service(web::resource("/identifier").route(web::get().to(identifier::))) .service(web::resource("/set_not_found").route(web::post().to(set_not_found))) + .service(web::resource("/hashes").route(web::get().to(page))) .configure(extra_config), ); } diff --git a/src/repo.rs b/src/repo.rs index 3c602ba..a96380c 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -4,6 +4,7 @@ use crate::{ store::{Identifier, StoreError}, stream::LocalBoxStream, }; +use base64::Engine; use std::{fmt::Debug, sync::Arc}; use url::Url; use uuid::Uuid; @@ -499,12 +500,63 @@ where } } +#[derive(Clone)] +pub(crate) struct OrderedHash { + timestamp: time::OffsetDateTime, + hash: Hash, +} + +pub(crate) struct HashPage { + pub(crate) limit: usize, + prev: Option, + next: Option, + pub(crate) hashes: Vec, +} + +fn ordered_hash_to_string(OrderedHash { timestamp, hash }: &OrderedHash) -> String { + let mut bytes: Vec = timestamp.unix_timestamp_nanos().to_be_bytes().into(); + bytes.extend(hash.to_bytes()); + base64::prelude::BASE64_URL_SAFE.encode(bytes) +} + +fn ordered_hash_from_string(s: &str) -> Option { + let bytes = base64::prelude::BASE64_URL_SAFE.decode(s).ok()?; + let timestamp: [u8; 16] = bytes[0..16].try_into().ok()?; + let timestamp = i128::from_be_bytes(timestamp); + let timestamp = time::OffsetDateTime::from_unix_timestamp_nanos(timestamp).ok()?; + let hash = Hash::from_bytes(&bytes[16..])?; + + Some(OrderedHash { timestamp, hash }) +} + +impl HashPage { + pub(crate) fn next(&self) -> Option { + self.next.as_ref().map(ordered_hash_to_string) + } + + pub(crate) fn prev(&self) -> Option { + self.prev.as_ref().map(ordered_hash_to_string) + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { async fn size(&self) -> Result; async fn hashes(&self) -> LocalBoxStream<'static, Result>; + async fn hash_page(&self, slug: Option, limit: usize) -> Result { + let bound = slug.as_deref().and_then(ordered_hash_from_string); + + self.hashes_ordered(bound, limit).await + } + + async fn hashes_ordered( + &self, + bound: Option, + limit: usize, + ) -> Result; + async fn create_hash( &self, hash: Hash, @@ -556,6 +608,14 @@ where T::hashes(self).await } + async fn hashes_ordered( + &self, + bound: Option, + limit: usize, + ) -> Result { + T::hashes_ordered(self, bound, limit).await + } + async fn create_hash( &self, hash: Hash, diff --git a/src/repo/hash.rs b/src/repo/hash.rs index e4501ef..6c57964 100644 --- a/src/repo/hash.rs +++ b/src/repo/hash.rs @@ -26,6 +26,10 @@ impl Hash { } } + pub(crate) fn to_hex(&self) -> String { + hex::encode(self.to_bytes()) + } + pub(super) fn to_bytes(&self) -> Vec { let format = self.format.to_bytes(); diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 827563b..fa30792 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,11 +1,5 @@ use crate::{ details::MaybeHumanDate, - repo::{ - hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, - Details, DetailsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, JobId, ProxyRepo, - QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, - VariantAccessRepo, VariantAlreadyExists, - }, serde_str::Serde, store::StoreError, stream::{from_iterator, LocalBoxStream}, @@ -24,6 +18,13 @@ use tokio::sync::Notify; use url::Url; use uuid::Uuid; +use super::{ + hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, + Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, Identifier, JobId, + OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, + UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, +}; + macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -64,6 +65,7 @@ pub(crate) struct SledRepo { settings: Tree, identifier_details: Tree, hashes: Tree, + hashes_inverse: Tree, hash_aliases: Tree, hash_identifiers: Tree, hash_variant_identifiers: Tree, @@ -102,6 +104,7 @@ impl SledRepo { settings: db.open_tree("pict-rs-settings-tree")?, identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, hashes: db.open_tree("pict-rs-hashes-tree")?, + hashes_inverse: db.open_tree("pict-rs-hashes-inverse-tree")?, hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?, hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?, @@ -996,6 +999,32 @@ impl StoreMigrationRepo for SledRepo { } } +fn parse_ordered_hash(key: IVec) -> Option { + if key.len() < 16 { + return None; + } + + let timestamp_bytes: [u8; 16] = key[0..16].try_into().ok()?; + let timestamp_i128 = i128::from_be_bytes(timestamp_bytes); + let timestamp = time::OffsetDateTime::from_unix_timestamp_nanos(timestamp_i128).ok()?; + + let hash = Hash::from_bytes(&key[16..])?; + + Some(OrderedHash { timestamp, hash }) +} + +fn serialize_ordered_hash(ordered_hash: &OrderedHash) -> IVec { + let mut bytes: Vec = ordered_hash + .timestamp + .unix_timestamp_nanos() + .to_be_bytes() + .into(); + + bytes.extend(ordered_hash.hash.to_bytes()); + + IVec::from(bytes) +} + #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { async fn size(&self) -> Result { @@ -1017,6 +1046,58 @@ impl HashRepo for SledRepo { Box::pin(from_iterator(iter, 8)) } + async fn hashes_ordered( + &self, + bound: Option, + limit: usize, + ) -> Result { + let (page_iter, prev_iter) = match &bound { + Some(ordered_hash) => { + let hash_bytes = serialize_ordered_hash(ordered_hash); + ( + self.hashes_inverse.range(..hash_bytes.clone()), + Some(self.hashes_inverse.range(hash_bytes..)), + ) + } + None => (self.hashes_inverse.iter(), None), + }; + + actix_rt::task::spawn_blocking(move || { + let page_iter = page_iter + .keys() + .rev() + .filter_map(|res| res.map(parse_ordered_hash).transpose()) + .take(limit); + + let prev = prev_iter + .and_then(|prev_iter| { + prev_iter + .keys() + .filter_map(|res| res.map(parse_ordered_hash).transpose()) + .take(limit) + .last() + }) + .transpose()?; + + let hashes = page_iter.collect::, _>>()?; + + let next = hashes.last().cloned(); + + Ok(HashPage { + limit, + prev, + next, + hashes: hashes + .into_iter() + .map(|OrderedHash { hash, .. }| hash) + .collect(), + }) as Result + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(RepoError::from) + } + #[tracing::instrument(level = "trace", skip(self))] async fn create_hash( &self, @@ -1026,21 +1107,30 @@ impl HashRepo for SledRepo { let identifier: sled::IVec = identifier.to_bytes()?.into(); let hashes = self.hashes.clone(); + let hashes_inverse = self.hashes_inverse.clone(); let hash_identifiers = self.hash_identifiers.clone(); + let created_key = serialize_ordered_hash(&OrderedHash { + timestamp: time::OffsetDateTime::now_utc(), + hash: hash.clone(), + }); + let hash = hash.to_ivec(); let res = actix_web::web::block(move || { - (&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| { - if hashes.get(hash.clone())?.is_some() { - return Ok(Err(HashAlreadyExists)); - } + (&hashes, &hashes_inverse, &hash_identifiers).transaction( + |(hashes, hashes_inverse, hash_identifiers)| { + if hashes.get(hash.clone())?.is_some() { + return Ok(Err(HashAlreadyExists)); + } - hashes.insert(hash.clone(), hash.clone())?; - hash_identifiers.insert(hash.clone(), &identifier)?; + hashes.insert(hash.clone(), created_key.clone())?; + hashes_inverse.insert(created_key.clone(), hash.clone())?; + hash_identifiers.insert(hash.clone(), &identifier)?; - Ok(Ok(())) - }) + Ok(Ok(())) + }, + ) }) .await .map_err(|_| RepoError::Canceled)?; @@ -1198,6 +1288,7 @@ impl HashRepo for SledRepo { let hash = hash.to_ivec(); let hashes = self.hashes.clone(); + let hashes_inverse = self.hashes_inverse.clone(); let hash_identifiers = self.hash_identifiers.clone(); let hash_motion_identifiers = self.hash_motion_identifiers.clone(); let hash_variant_identifiers = self.hash_variant_identifiers.clone(); @@ -1216,6 +1307,7 @@ impl HashRepo for SledRepo { let res = actix_web::web::block(move || { ( &hashes, + &hashes_inverse, &hash_identifiers, &hash_motion_identifiers, &hash_variant_identifiers, @@ -1223,11 +1315,15 @@ impl HashRepo for SledRepo { .transaction( |( hashes, + hashes_inverse, hash_identifiers, hash_motion_identifiers, hash_variant_identifiers, )| { - hashes.remove(&hash)?; + if let Some(value) = hashes.remove(&hash)? { + hashes_inverse.remove(value)?; + } + hash_identifiers.remove(&hash)?; hash_motion_identifiers.remove(&hash)?;