From 8921f57a21ff202a21f270f218ca21a1dbbfd610 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 2 Sep 2023 18:30:45 -0500 Subject: [PATCH] Remove Identifier trait, Replace with Arc --- Cargo.lock | 3 + Cargo.toml | 2 +- src/backgrounded.rs | 29 ++- src/details.rs | 4 +- src/ffmpeg.rs | 4 +- src/generate.rs | 6 +- src/ingest.rs | 27 +-- src/lib.rs | 74 +++---- src/magick.rs | 4 +- src/migrate_store.rs | 29 ++- src/queue.rs | 50 ++--- src/queue/cleanup.rs | 30 +-- src/queue/process.rs | 20 +- src/range.rs | 4 +- src/repo.rs | 185 ++++++++++------ src/repo/hash.rs | 47 +++- src/repo/migrate.rs | 18 +- src/repo/postgres.rs | 201 +++++++++++++++++- .../migrations/V0001__create_hashes.rs | 2 +- .../migrations/V0002__create_variants.rs | 2 +- .../migrations/V0003__create_aliases.rs | 2 +- .../migrations/V0005__create_details.rs | 2 +- .../migrations/V0006__create_queue.rs | 4 +- src/repo/postgres/schema.rs | 40 ++-- src/repo/sled.rs | 163 +++++++------- src/repo_04.rs | 20 +- src/repo_04/sled.rs | 54 ++--- src/store.rs | 200 ++++++++--------- src/store/file_store.rs | 47 ++-- src/store/file_store/file_id.rs | 57 ----- src/store/object_store.rs | 62 +++--- src/store/object_store/object_id.rs | 37 ---- 32 files changed, 799 insertions(+), 630 deletions(-) delete mode 100644 src/store/file_store/file_id.rs delete mode 100644 src/store/object_store/object_id.rs diff --git a/Cargo.lock b/Cargo.lock index c7e9463..1a31c37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,6 +730,9 @@ dependencies = [ "byteorder", "diesel_derives", "itoa", + "serde_json", + "time", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7f6e6db..449d844 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" deadpool = { version = "0.9.5", features = ["rt_tokio_1"] } -diesel = "2.1.1" +diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } diesel-async = { version = "0.4.1", features = ["postgres", "deadpool"] } flume = "0.11.0" futures-core = "0.3" diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 0210dac..94839a1 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ error::Error, repo::{ArcRepo, UploadId}, @@ -9,19 +11,13 @@ use futures_core::Stream; use mime::APPLICATION_OCTET_STREAM; use tracing::{Instrument, Span}; -pub(crate) struct Backgrounded -where - S: Store, -{ +pub(crate) struct Backgrounded { repo: ArcRepo, - identifier: Option, + identifier: Option>, upload_id: Option, } -impl Backgrounded -where - S: Store, -{ +impl Backgrounded { pub(crate) fn disarm(mut self) { let _ = self.identifier.take(); let _ = self.upload_id.take(); @@ -31,12 +27,13 @@ where self.upload_id } - pub(crate) fn identifier(&self) -> Option<&S::Identifier> { + pub(crate) fn identifier(&self) -> Option<&Arc> { self.identifier.as_ref() } - pub(crate) async fn proxy

(repo: ArcRepo, store: S, stream: P) -> Result + pub(crate) async fn proxy(repo: ArcRepo, store: S, stream: P) -> Result where + S: Store, P: Stream> + Unpin + 'static, { let mut this = Self { @@ -50,8 +47,9 @@ where Ok(this) } - async fn do_proxy

(&mut self, store: S, stream: P) -> Result<(), Error> + async fn do_proxy(&mut self, store: S, stream: P) -> Result<(), Error> where + S: Store, P: Stream> + Unpin + 'static, { self.upload_id = Some(self.repo.create_upload().await?); @@ -68,10 +66,7 @@ where } } -impl Drop for Backgrounded -where - S: Store, -{ +impl Drop for Backgrounded { fn drop(&mut self) { let any_items = self.identifier.is_some() || self.upload_id.is_some(); @@ -90,7 +85,7 @@ where tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; } .instrument(cleanup_span), ) diff --git a/src/details.rs b/src/details.rs index edf921b..d632e32 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ bytes_stream::BytesStream, discover::Discovery, @@ -103,7 +105,7 @@ impl Details { pub(crate) async fn from_store( store: &S, - identifier: &S::Identifier, + identifier: &Arc, timeout: u64, ) -> Result { let mut buf = BytesStream::new(); diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 3602b5f..bab1582 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ error_code::ErrorCode, formats::InternalVideoFormat, @@ -132,7 +134,7 @@ impl ThumbnailFormat { #[tracing::instrument(skip(store))] pub(crate) async fn thumbnail( store: S, - from: S::Identifier, + from: Arc, input_format: InternalVideoFormat, format: ThumbnailFormat, timeout: u64, diff --git a/src/generate.rs b/src/generate.rs index 1976012..6808590 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -5,7 +5,7 @@ use crate::{ ffmpeg::ThumbnailFormat, formats::{InputProcessableFormat, InternalVideoFormat}, repo::{Alias, ArcRepo, Hash, VariantAlreadyExists}, - store::{Identifier, Store}, + store::Store, }; use actix_web::web::Bytes; use std::{path::PathBuf, time::Instant}; @@ -91,7 +91,7 @@ async fn process( let permit = crate::PROCESS_SEMAPHORE.acquire().await; let identifier = if let Some(identifier) = repo.still_identifier_from_alias(&alias).await? { - S::Identifier::from_arc(identifier)? + identifier } else { let Some(identifier) = repo.identifier(hash.clone()).await? else { return Err(UploadError::MissingIdentifier.into()); @@ -101,7 +101,7 @@ async fn process( let reader = crate::ffmpeg::thumbnail( store.clone(), - S::Identifier::from_arc(identifier)?, + identifier, input_format.unwrap_or(InternalVideoFormat::Mp4), thumbnail_format, media.process_timeout, diff --git a/src/ingest.rs b/src/ingest.rs index 938e277..e0cd34b 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ bytes_stream::BytesStream, either::Either, @@ -15,15 +17,12 @@ mod hasher; use hasher::Hasher; #[derive(Debug)] -pub(crate) struct Session -where - S: Store, -{ +pub(crate) struct Session { repo: ArcRepo, delete_token: DeleteToken, hash: Option, alias: Option, - identifier: Option, + identifier: Option>, } #[tracing::instrument(skip(stream))] @@ -49,7 +48,7 @@ pub(crate) async fn ingest( stream: impl Stream> + Unpin + 'static, declared_alias: Option, media: &crate::config::Media, -) -> Result, Error> +) -> Result where S: Store, { @@ -131,11 +130,11 @@ where #[tracing::instrument(level = "trace", skip_all)] async fn save_upload( - session: &mut Session, + session: &mut Session, repo: &ArcRepo, store: &S, hash: Hash, - identifier: &S::Identifier, + identifier: &Arc, ) -> Result<(), Error> where S: Store, @@ -153,10 +152,7 @@ where Ok(()) } -impl Session -where - S: Store, -{ +impl Session { pub(crate) fn disarm(mut self) -> DeleteToken { let _ = self.hash.take(); let _ = self.alias.take(); @@ -206,10 +202,7 @@ where } } -impl Drop for Session -where - S: Store, -{ +impl Drop for Session { fn drop(&mut self) { let any_items = self.hash.is_some() || self.alias.is_some() || self.identifier.is_some(); @@ -258,7 +251,7 @@ where tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; } .instrument(cleanup_span), ) diff --git a/src/lib.rs b/src/lib.rs index 21952fc..e6476c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,7 @@ use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; use std::{ + marker::PhantomData, path::Path, path::PathBuf, sync::Arc, @@ -69,7 +70,7 @@ use self::{ queue::queue_generate, repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, - store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, + store::{file_store::FileStore, object_store::ObjectStore, Store}, stream::{empty, once, StreamLimit, StreamMap, StreamTimeout}, }; @@ -93,7 +94,7 @@ async fn ensure_details( config: &Configuration, alias: &Alias, ) -> Result { - let Some(identifier) = repo.identifier_from_alias(alias).await?.map(S::Identifier::from_arc).transpose()? else { + let Some(identifier) = repo.identifier_from_alias(alias).await? else { return Err(UploadError::MissingAlias.into()); }; @@ -117,10 +118,10 @@ async fn ensure_details( } } -struct Upload(Value>); +struct Upload(Value, PhantomData); impl FormData for Upload { - type Item = Session; + type Item = Session; type Error = Error; fn form(req: &HttpRequest) -> Form { @@ -172,14 +173,14 @@ impl FormData for Upload { } fn extract(value: Value) -> Result { - Ok(Upload(value)) + Ok(Upload(value, PhantomData)) } } -struct Import(Value>); +struct Import(Value, PhantomData); impl FormData for Import { - type Item = Session; + type Item = Session; type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { @@ -241,14 +242,14 @@ impl FormData for Import { where Self: Sized, { - Ok(Import(value)) + Ok(Import(value, PhantomData)) } } /// Handle responding to successful uploads #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn upload( - Multipart(Upload(value)): Multipart>, + Multipart(Upload(value, _)): Multipart>, repo: web::Data, store: web::Data, config: web::Data, @@ -259,7 +260,7 @@ async fn upload( /// Handle responding to successful uploads #[tracing::instrument(name = "Imported files", skip(value, repo, store, config))] async fn import( - Multipart(Import(value)): Multipart>, + Multipart(Import(value, _)): Multipart>, repo: web::Data, store: web::Data, config: web::Data, @@ -270,7 +271,7 @@ async fn import( /// Handle responding to successful uploads #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))] async fn handle_upload( - value: Value>, + value: Value, repo: web::Data, store: web::Data, config: web::Data, @@ -312,10 +313,10 @@ async fn handle_upload( }))) } -struct BackgroundedUpload(Value>); +struct BackgroundedUpload(Value, PhantomData); impl FormData for BackgroundedUpload { - type Item = Backgrounded; + type Item = Backgrounded; type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { @@ -371,13 +372,13 @@ impl FormData for BackgroundedUpload { where Self: Sized, { - Ok(BackgroundedUpload(value)) + Ok(BackgroundedUpload(value, PhantomData)) } } #[tracing::instrument(name = "Uploaded files", skip(value, repo))] async fn upload_backgrounded( - Multipart(BackgroundedUpload(value)): Multipart>, + Multipart(BackgroundedUpload(value, _)): Multipart>, repo: web::Data, ) -> Result { let images = value @@ -394,11 +395,7 @@ async fn upload_backgrounded( for image in &images { let upload_id = image.result.upload_id().expect("Upload ID exists"); - let identifier = image - .result - .identifier() - .expect("Identifier exists") - .to_bytes()?; + let identifier = image.result.identifier().expect("Identifier exists"); queue::queue_ingest(&repo, identifier, upload_id, None).await?; @@ -560,10 +557,7 @@ async fn do_download_backgrounded( let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?; let upload_id = backgrounded.upload_id().expect("Upload ID exists"); - let identifier = backgrounded - .identifier() - .expect("Identifier exists") - .to_bytes()?; + let identifier = backgrounded.identifier().expect("Identifier exists"); queue::queue_ingest(&repo, identifier, upload_id, None).await?; @@ -764,8 +758,6 @@ async fn process_details( let identifier = repo .variant_identifier(hash, thumbnail_string) .await? - .map(S::Identifier::from_arc) - .transpose()? .ok_or(UploadError::MissingAlias)?; let details = repo.details(&identifier).await?; @@ -856,11 +848,7 @@ async fn process( .await?; } - let identifier_opt = repo - .variant_identifier(hash.clone(), path_string) - .await? - .map(S::Identifier::from_arc) - .transpose()?; + let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if let Some(identifier) = identifier_opt { let details = repo.details(&identifier).await?; @@ -980,11 +968,7 @@ async fn process_head( .await?; } - let identifier_opt = repo - .variant_identifier(hash.clone(), path_string) - .await? - .map(S::Identifier::from_arc) - .transpose()?; + let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if let Some(identifier) = identifier_opt { let details = repo.details(&identifier).await?; @@ -1047,11 +1031,7 @@ async fn process_backgrounded( return Ok(HttpResponse::BadRequest().finish()); }; - let identifier_opt = repo - .variant_identifier(hash.clone(), path_string) - .await? - .map(S::Identifier::from_arc) - .transpose()?; + let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if identifier_opt.is_some() { return Ok(HttpResponse::Accepted().finish()); @@ -1185,7 +1165,7 @@ async fn do_serve( (hash, alias, true) }; - let Some(identifier) = repo.identifier(hash.clone()).await?.map(Identifier::from_arc).transpose()? else { + let Some(identifier) = repo.identifier(hash.clone()).await? else { tracing::warn!( "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); @@ -1250,7 +1230,7 @@ async fn do_serve_head( store: web::Data, config: web::Data, ) -> Result { - let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else { + let Some(identifier) = repo.identifier_from_alias(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().finish()); }; @@ -1268,7 +1248,7 @@ async fn do_serve_head( async fn ranged_file_head_resp( store: &S, - identifier: S::Identifier, + identifier: Arc, range: Option>, details: Details, ) -> Result { @@ -1303,7 +1283,7 @@ async fn ranged_file_head_resp( async fn ranged_file_resp( store: &S, - identifier: S::Identifier, + identifier: Arc, range: Option>, details: Details, not_found: bool, @@ -1555,7 +1535,7 @@ async fn identifier( } }; - let Some(identifier) = repo.identifier_from_alias(&alias).await?.map(S::Identifier::from_arc).transpose()? else { + let Some(identifier) = repo.identifier_from_alias(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().json(serde_json::json!({ "msg": "No identifiers associated with provided alias" @@ -1564,7 +1544,7 @@ async fn identifier( Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", - "identifier": identifier.string_repr(), + "identifier": identifier.as_ref(), }))) } diff --git a/src/magick.rs b/src/magick.rs index b9efaa6..29c9636 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ error_code::ErrorCode, formats::ProcessableFormat, @@ -140,7 +142,7 @@ where pub(crate) async fn process_image_store_read( store: &S, - identifier: &S::Identifier, + identifier: &Arc, args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 5be16b9..78cb0b0 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -1,6 +1,9 @@ use std::{ rc::Rc, - sync::atomic::{AtomicU64, Ordering}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }; @@ -8,7 +11,7 @@ use crate::{ details::Details, error::{Error, UploadError}, repo::{ArcRepo, Hash}, - store::{Identifier, Store}, + store::Store, stream::IntoStreamer, }; @@ -103,7 +106,7 @@ where } // Hashes are read in a consistent order - let mut stream = repo.hashes().await.into_streamer(); + let mut stream = repo.hashes().into_streamer(); let state = Rc::new(MigrateState { repo: repo.clone(), @@ -169,7 +172,7 @@ where let current_index = index.fetch_add(1, Ordering::Relaxed); let original_identifier = match repo.identifier(hash.clone()).await { - Ok(Some(identifier)) => S1::Identifier::from_arc(identifier)?, + Ok(Some(identifier)) => identifier, Ok(None) => { tracing::warn!( "Original File identifier for hash {hash:?} is missing, queue cleanup task", @@ -214,8 +217,6 @@ where } if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { - let identifier = S1::Identifier::from_arc(identifier)?; - if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { @@ -245,8 +246,6 @@ where } for (variant, identifier) in repo.variants(hash.clone()).await? { - let identifier = S1::Identifier::from_arc(identifier)?; - if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { @@ -339,10 +338,10 @@ async fn migrate_file( repo: &ArcRepo, from: &S1, to: &S2, - identifier: &S1::Identifier, + identifier: &Arc, skip_missing_files: bool, timeout: u64, -) -> Result +) -> Result, MigrateError> where S1: Store, S2: Store, @@ -382,9 +381,9 @@ async fn do_migrate_file( repo: &ArcRepo, from: &S1, to: &S2, - identifier: &S1::Identifier, + identifier: &Arc, timeout: u64, -) -> Result +) -> Result, MigrateError> where S1: Store, S2: Store, @@ -421,11 +420,7 @@ where Ok(new_identifier) } -async fn migrate_details(repo: &ArcRepo, from: &I1, to: &I2) -> Result<(), Error> -where - I1: Identifier, - I2: Identifier, -{ +async fn migrate_details(repo: &ArcRepo, from: &Arc, to: &Arc) -> Result<(), Error> { if let Some(details) = repo.details(from).await? { repo.relate_details(to, &details).await?; repo.cleanup_details(from).await?; diff --git a/src/queue.rs b/src/queue.rs index 8a3fc20..a2c9deb 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -5,7 +5,7 @@ use crate::{ formats::InputProcessableFormat, repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId}, serde_str::Serde, - store::{Identifier, Store}, + store::Store, }; use base64::{prelude::BASE64_STANDARD, Engine}; use std::{ @@ -55,7 +55,7 @@ enum Cleanup { hash: Hash, }, Identifier { - identifier: Base64Bytes, + identifier: String, }, Alias { alias: Serde, @@ -74,7 +74,7 @@ enum Cleanup { #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Process { Ingest { - identifier: Base64Bytes, + identifier: String, upload_id: Serde, declared_alias: Option>, }, @@ -91,7 +91,7 @@ pub(crate) async fn cleanup_alias( alias: Alias, token: DeleteToken, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Alias { + let job = serde_json::to_string(&Cleanup::Alias { alias: Serde::new(alias), token: Serde::new(token), }) @@ -101,17 +101,17 @@ pub(crate) async fn cleanup_alias( } pub(crate) async fn cleanup_hash(repo: &Arc, hash: Hash) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; + let job = serde_json::to_string(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn cleanup_identifier( +pub(crate) async fn cleanup_identifier( repo: &Arc, - identifier: I, + identifier: &Arc, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Identifier { - identifier: Base64Bytes(identifier.to_bytes()?), + let job = serde_json::to_string(&Cleanup::Identifier { + identifier: identifier.to_string(), }) .map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; @@ -124,37 +124,37 @@ async fn cleanup_variants( variant: Option, ) -> Result<(), Error> { let job = - serde_json::to_vec(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; + serde_json::to_string(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } pub(crate) async fn cleanup_outdated_proxies(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; + let job = serde_json::to_string(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } pub(crate) async fn cleanup_outdated_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; + let job = serde_json::to_string(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } pub(crate) async fn cleanup_all_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::AllVariants).map_err(UploadError::PushJob)?; + let job = serde_json::to_string(&Cleanup::AllVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } pub(crate) async fn queue_ingest( repo: &Arc, - identifier: Vec, + identifier: &Arc, upload_id: UploadId, declared_alias: Option, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Process::Ingest { - identifier: Base64Bytes(identifier), + let job = serde_json::to_string(&Process::Ingest { + identifier: identifier.to_string(), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), }) @@ -170,7 +170,7 @@ pub(crate) async fn queue_generate( process_path: PathBuf, process_args: Vec, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Process::Generate { + let job = serde_json::to_string(&Process::Generate { target_format, source: Serde::new(source), process_path, @@ -220,7 +220,7 @@ async fn process_jobs( &'a Arc, &'a S, &'a Configuration, - &'a [u8], + &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { @@ -284,13 +284,13 @@ where &'a Arc, &'a S, &'a Configuration, - &'a [u8], + &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { let fut = async { - let (job_id, bytes) = repo.pop(queue, worker_id).await?; + let (job_id, string) = repo.pop(queue, worker_id).await?; let span = tracing::info_span!("Running Job"); @@ -303,7 +303,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, config, bytes.as_ref()), + (callback)(repo, store, config, string.as_ref()), ) }) .instrument(span) @@ -337,7 +337,7 @@ async fn process_image_jobs( &'a S, &'a ProcessMap, &'a Configuration, - &'a [u8], + &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { @@ -373,13 +373,13 @@ where &'a S, &'a ProcessMap, &'a Configuration, - &'a [u8], + &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { let fut = async { - let (job_id, bytes) = repo.pop(queue, worker_id).await?; + let (job_id, string) = repo.pop(queue, worker_id).await?; let span = tracing::info_span!("Running Job"); @@ -392,7 +392,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, process_map, config, bytes.as_ref()), + (callback)(repo, store, process_map, config, string.as_ref()), ) }) .instrument(span) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 34b2424..a6b76fd 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use crate::{ config::Configuration, error::{Error, UploadError}, - queue::{Base64Bytes, Cleanup, LocalBoxFuture}, + queue::{Cleanup, LocalBoxFuture}, repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, - store::{Identifier, Store}, + store::Store, stream::IntoStreamer, }; @@ -12,18 +14,18 @@ pub(super) fn perform<'a, S>( repo: &'a ArcRepo, store: &'a S, configuration: &'a Configuration, - job: &'a [u8], + job: &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> where S: Store, { Box::pin(async move { - match serde_json::from_slice(job) { + match serde_json::from_str(job) { Ok(job) => match job { Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?, Cleanup::Identifier { - identifier: Base64Bytes(in_identifier), - } => identifier(repo, store, in_identifier).await?, + identifier: in_identifier, + } => identifier(repo, store, Arc::from(in_identifier)).await?, Cleanup::Alias { alias: stored_alias, token, @@ -50,20 +52,18 @@ where } #[tracing::instrument(skip_all)] -async fn identifier(repo: &ArcRepo, store: &S, identifier: Vec) -> Result<(), Error> +async fn identifier(repo: &ArcRepo, store: &S, identifier: Arc) -> Result<(), Error> where S: Store, { - let identifier = S::Identifier::from_bytes(identifier)?; - let mut errors = Vec::new(); if let Err(e) = store.remove(&identifier).await { - errors.push(e); + errors.push(UploadError::from(e)); } if let Err(e) = repo.cleanup_details(&identifier).await { - errors.push(e); + errors.push(UploadError::from(e)); } for error in errors { @@ -100,7 +100,7 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { idents.extend(repo.motion_identifier(hash.clone()).await?); for identifier in idents { - let _ = super::cleanup_identifier(repo, identifier).await; + let _ = super::cleanup_identifier(repo, &identifier).await; } repo.cleanup_hash(hash).await?; @@ -136,7 +136,7 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E #[tracing::instrument(skip_all)] async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { - let mut hash_stream = repo.hashes().await.into_streamer(); + let mut hash_stream = repo.hashes().into_streamer(); while let Some(res) = hash_stream.next().await { let hash = res?; @@ -193,7 +193,7 @@ async fn hash_variant( .variant_identifier(hash.clone(), target_variant.clone()) .await? { - super::cleanup_identifier(repo, identifier).await?; + super::cleanup_identifier(repo, &identifier).await?; } repo.remove_variant(hash.clone(), target_variant.clone()) @@ -203,7 +203,7 @@ async fn hash_variant( for (variant, identifier) in repo.variants(hash.clone()).await? { repo.remove_variant(hash.clone(), variant.clone()).await?; repo.remove_variant_access(hash.clone(), variant).await?; - super::cleanup_identifier(repo, identifier).await?; + super::cleanup_identifier(repo, &identifier).await?; } } diff --git a/src/queue/process.rs b/src/queue/process.rs index 39a1fb9..a5f8e43 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -4,36 +4,36 @@ use crate::{ error::{Error, UploadError}, formats::InputProcessableFormat, ingest::Session, - queue::{Base64Bytes, LocalBoxFuture, Process}, + queue::{LocalBoxFuture, Process}, repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, - store::{Identifier, Store}, + store::Store, stream::StreamMap, }; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; pub(super) fn perform<'a, S>( repo: &'a ArcRepo, store: &'a S, process_map: &'a ProcessMap, config: &'a Configuration, - job: &'a [u8], + job: &'a str, ) -> LocalBoxFuture<'a, Result<(), Error>> where S: Store + 'static, { Box::pin(async move { - match serde_json::from_slice(job) { + match serde_json::from_str(job) { Ok(job) => match job { Process::Ingest { - identifier: Base64Bytes(identifier), + identifier, upload_id, declared_alias, } => { process_ingest( repo, store, - identifier, + Arc::from(identifier), Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), &config.media, @@ -72,7 +72,7 @@ where async fn process_ingest( repo: &ArcRepo, store: &S, - unprocessed_identifier: Vec, + unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, media: &crate::config::Media, @@ -81,8 +81,6 @@ where S: Store + 'static, { let fut = async { - let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?; - let ident = unprocessed_identifier.clone(); let store2 = store.clone(); let repo = repo.clone(); @@ -97,7 +95,7 @@ where let session = crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; - Ok(session) as Result, Error> + Ok(session) as Result }) .await; diff --git a/src/range.rs b/src/range.rs index 976f384..cf76319 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ error::{Error, UploadError}, store::Store, @@ -26,7 +28,7 @@ pub(crate) fn chop_bytes( pub(crate) async fn chop_store( byte_range: &ByteRangeSpec, store: &S, - identifier: &S::Identifier, + identifier: &Arc, length: u64, ) -> Result>, Error> { if let Some((start, end)) = byte_range.to_satisfiable_range(length) { diff --git a/src/repo.rs b/src/repo.rs index 4c5d923..22318d4 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -2,7 +2,6 @@ use crate::{ config, details::Details, error_code::{ErrorCode, OwnedErrorCode}, - store::{Identifier, StoreError}, stream::LocalBoxStream, }; use base64::Engine; @@ -86,6 +85,7 @@ impl RepoError { pub(crate) const fn error_code(&self) -> ErrorCode { match self { Self::SledError(e) => e.error_code(), + Self::PostgresError(e) => e.error_code(), Self::AlreadyClaimed => ErrorCode::ALREADY_CLAIMED, Self::Canceled => ErrorCode::PANIC, } @@ -111,7 +111,7 @@ pub(crate) trait FullRepo: async fn health_check(&self) -> Result<(), RepoError>; #[tracing::instrument(skip(self))] - async fn identifier_from_alias(&self, alias: &Alias) -> Result>, RepoError> { + async fn identifier_from_alias(&self, alias: &Alias) -> Result>, RepoError> { let Some(hash) = self.hash(alias).await? else { return Ok(None); }; @@ -132,7 +132,7 @@ pub(crate) trait FullRepo: async fn still_identifier_from_alias( &self, alias: &Alias, - ) -> Result>, StoreError> { + ) -> Result>, RepoError> { let Some(hash) = self.hash(alias).await? else { return Ok(None); }; @@ -372,13 +372,13 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result; + async fn push(&self, queue: &'static str, job: Arc) -> Result; async fn pop( &self, queue: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc<[u8]>), RepoError>; + ) -> Result<(JobId, Arc), RepoError>; async fn heartbeat( &self, @@ -400,7 +400,7 @@ impl QueueRepo for Arc where T: QueueRepo, { - async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result { + async fn push(&self, queue: &'static str, job: Arc) -> Result { T::push(self, queue, job).await } @@ -408,7 +408,7 @@ where &self, queue: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc<[u8]>), RepoError> { + ) -> Result<(JobId, Arc), RepoError> { T::pop(self, queue, worker_id).await } @@ -460,12 +460,12 @@ where pub(crate) trait DetailsRepo: BaseRepo { async fn relate_details( &self, - identifier: &dyn Identifier, + identifier: &Arc, details: &Details, - ) -> Result<(), StoreError>; - async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError>; + ) -> Result<(), RepoError>; + async fn details(&self, identifier: &Arc) -> Result, RepoError>; - async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError>; + async fn cleanup_details(&self, identifier: &Arc) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -475,17 +475,17 @@ where { async fn relate_details( &self, - identifier: &dyn Identifier, + identifier: &Arc, details: &Details, - ) -> Result<(), StoreError> { + ) -> Result<(), RepoError> { T::relate_details(self, identifier, details).await } - async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError> { + async fn details(&self, identifier: &Arc) -> Result, RepoError> { T::details(self, identifier).await } - async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { + async fn cleanup_details(&self, identifier: &Arc) -> Result<(), RepoError> { T::cleanup_details(self, identifier).await } } @@ -496,11 +496,11 @@ pub(crate) trait StoreMigrationRepo: BaseRepo { async fn mark_migrated( &self, - old_identifier: &dyn Identifier, - new_identifier: &dyn Identifier, - ) -> Result<(), StoreError>; + old_identifier: &Arc, + new_identifier: &Arc, + ) -> Result<(), RepoError>; - async fn is_migrated(&self, identifier: &dyn Identifier) -> Result; + async fn is_migrated(&self, identifier: &Arc) -> Result; async fn clear(&self) -> Result<(), RepoError>; } @@ -516,13 +516,13 @@ where async fn mark_migrated( &self, - old_identifier: &dyn Identifier, - new_identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + old_identifier: &Arc, + new_identifier: &Arc, + ) -> Result<(), RepoError> { T::mark_migrated(self, old_identifier, new_identifier).await } - async fn is_migrated(&self, identifier: &dyn Identifier) -> Result { + async fn is_migrated(&self, identifier: &Arc) -> Result { T::is_migrated(self, identifier).await } @@ -569,12 +569,87 @@ impl HashPage { } } +pub(crate) struct HashStream { + repo: Option, + page_future: + Option>>>>, + page: Option, +} + +impl futures_core::Stream for HashStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + loop { + let Some(repo) = &this.repo else { + return std::task::Poll::Ready(None); + }; + + let slug = if let Some(page) = &mut this.page { + // popping last in page is fine - we reversed them + if let Some(hash) = page.hashes.pop() { + return std::task::Poll::Ready(Some(Ok(hash))); + } + + let slug = page.next(); + this.page.take(); + + if let Some(slug) = slug { + Some(slug) + } else { + this.repo.take(); + return std::task::Poll::Ready(None); + } + } else { + None + }; + + if let Some(page_future) = &mut this.page_future { + let res = std::task::ready!(page_future.as_mut().poll(cx)); + + this.page_future.take(); + + match res { + Ok(mut page) => { + // reverse because we use `pop` to fetch next + page.hashes.reverse(); + + this.page = Some(page); + } + Err(e) => { + this.repo.take(); + + return std::task::Poll::Ready(Some(Err(e))); + } + } + } else { + let repo = repo.clone(); + + this.page_future = Some(Box::pin(async move { repo.hash_page(slug, 100).await })); + } + } + } +} + +impl dyn FullRepo { + pub(crate) fn hashes(self: &Arc) -> HashStream { + HashStream { + repo: Some(self.clone()), + page_future: None, + page: None, + } + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { async fn size(&self) -> Result; - async fn hashes(&self) -> LocalBoxStream<'static, Result>; - async fn hash_page(&self, slug: Option, limit: usize) -> Result { let hash = slug.as_deref().and_then(hash_from_slug); @@ -604,8 +679,8 @@ pub(crate) trait HashRepo: BaseRepo { async fn create_hash( &self, hash: Hash, - identifier: &dyn Identifier, - ) -> Result, StoreError> { + identifier: &Arc, + ) -> Result, RepoError> { self.create_hash_with_timestamp(hash, identifier, time::OffsetDateTime::now_utc()) .await } @@ -613,38 +688,34 @@ pub(crate) trait HashRepo: BaseRepo { async fn create_hash_with_timestamp( &self, hash: Hash, - identifier: &dyn Identifier, + identifier: &Arc, timestamp: time::OffsetDateTime, - ) -> Result, StoreError>; + ) -> Result, RepoError>; - async fn update_identifier( - &self, - hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError>; + async fn update_identifier(&self, hash: Hash, identifier: &Arc) -> Result<(), RepoError>; - async fn identifier(&self, hash: Hash) -> Result>, RepoError>; + async fn identifier(&self, hash: Hash) -> Result>, RepoError>; async fn relate_variant_identifier( &self, hash: Hash, variant: String, - identifier: &dyn Identifier, - ) -> Result, StoreError>; + identifier: &Arc, + ) -> Result, RepoError>; async fn variant_identifier( &self, hash: Hash, variant: String, - ) -> Result>, RepoError>; - async fn variants(&self, hash: Hash) -> Result)>, RepoError>; + ) -> Result>, RepoError>; + async fn variants(&self, hash: Hash) -> Result)>, RepoError>; async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn relate_motion_identifier( &self, hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError>; - async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError>; + identifier: &Arc, + ) -> Result<(), RepoError>; + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError>; async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError>; } @@ -658,10 +729,6 @@ where T::size(self).await } - async fn hashes(&self) -> LocalBoxStream<'static, Result> { - T::hashes(self).await - } - async fn bound(&self, hash: Hash) -> Result, RepoError> { T::bound(self, hash).await } @@ -685,21 +752,17 @@ where async fn create_hash_with_timestamp( &self, hash: Hash, - identifier: &dyn Identifier, + identifier: &Arc, timestamp: time::OffsetDateTime, - ) -> Result, StoreError> { + ) -> Result, RepoError> { T::create_hash_with_timestamp(self, hash, identifier, timestamp).await } - async fn update_identifier( - &self, - hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + async fn update_identifier(&self, hash: Hash, identifier: &Arc) -> Result<(), RepoError> { T::update_identifier(self, hash, identifier).await } - async fn identifier(&self, hash: Hash) -> Result>, RepoError> { + async fn identifier(&self, hash: Hash) -> Result>, RepoError> { T::identifier(self, hash).await } @@ -707,8 +770,8 @@ where &self, hash: Hash, variant: String, - identifier: &dyn Identifier, - ) -> Result, StoreError> { + identifier: &Arc, + ) -> Result, RepoError> { T::relate_variant_identifier(self, hash, variant, identifier).await } @@ -716,11 +779,11 @@ where &self, hash: Hash, variant: String, - ) -> Result>, RepoError> { + ) -> Result>, RepoError> { T::variant_identifier(self, hash, variant).await } - async fn variants(&self, hash: Hash) -> Result)>, RepoError> { + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { T::variants(self, hash).await } @@ -731,12 +794,12 @@ where async fn relate_motion_identifier( &self, hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + identifier: &Arc, + ) -> Result<(), RepoError> { T::relate_motion_identifier(self, hash, identifier).await } - async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { T::motion_identifier(self, hash).await } diff --git a/src/repo/hash.rs b/src/repo/hash.rs index 18c6dab..b3c8036 100644 --- a/src/repo/hash.rs +++ b/src/repo/hash.rs @@ -1,13 +1,42 @@ +use diesel::{backend::Backend, sql_types::VarChar, AsExpression, FromSqlRow}; + use crate::formats::InternalFormat; use std::sync::Arc; -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, AsExpression, FromSqlRow)] +#[diesel(sql_type = VarChar)] pub(crate) struct Hash { hash: Arc<[u8; 32]>, size: u64, format: InternalFormat, } +impl diesel::serialize::ToSql for Hash { + fn to_sql<'b>( + &'b self, + out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>, + ) -> diesel::serialize::Result { + let s = self.to_base64(); + + >::to_sql( + &s, + &mut out.reborrow(), + ) + } +} + +impl diesel::deserialize::FromSql for Hash +where + B: Backend, + String: diesel::deserialize::FromSql, +{ + fn from_sql(bytes: ::RawValue<'_>) -> diesel::deserialize::Result { + let s = String::from_sql(bytes)?; + + Self::from_base64(s).ok_or_else(|| format!("Invalid base64 hash").into()) + } +} + impl Hash { pub(crate) fn new(hash: [u8; 32], size: u64, format: InternalFormat) -> Self { Self { @@ -30,6 +59,22 @@ impl Hash { hex::encode(self.to_bytes()) } + pub(crate) fn to_base64(&self) -> String { + use base64::Engine; + + base64::engine::general_purpose::STANDARD.encode(self.to_bytes()) + } + + pub(crate) fn from_base64(input: String) -> Option { + use base64::Engine; + + let bytes = base64::engine::general_purpose::STANDARD + .decode(input) + .ok()?; + + Self::from_bytes(&bytes) + } + pub(super) fn to_bytes(&self) -> Vec { let format_byte = self.format.to_byte(); diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index d27dd24..60dbe68 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use tokio::task::JoinSet; use crate::{ @@ -33,7 +35,7 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result tracing::warn!("Checks complete, migrating repo"); tracing::warn!("{total_size} hashes will be migrated"); - let mut hash_stream = old_repo.hashes().await.into_streamer(); + let mut hash_stream = old_repo.hashes().into_streamer(); let mut index = 0; while let Some(res) = hash_stream.next().await { @@ -266,7 +268,7 @@ async fn do_migrate_hash_04( config: &Configuration, old_hash: sled::IVec, ) -> Result<(), Error> { - let Some(identifier) = old_repo.identifier::(old_hash.clone()).await? else { + let Some(identifier) = old_repo.identifier(old_hash.clone()).await? else { tracing::warn!("Skipping hash {}, no identifier", hex::encode(&old_hash)); return Ok(()); }; @@ -276,10 +278,8 @@ async fn do_migrate_hash_04( let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?; let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?; - let variants = old_repo.variants::(old_hash.clone()).await?; - let motion_identifier = old_repo - .motion_identifier::(old_hash.clone()) - .await?; + let variants = old_repo.variants(old_hash.clone()).await?; + let motion_identifier = old_repo.motion_identifier(old_hash.clone()).await?; let hash = old_hash[..].try_into().expect("Invalid hash size"); @@ -326,7 +326,7 @@ async fn set_details( new_repo: &ArcRepo, store: &S, config: &Configuration, - identifier: &S::Identifier, + identifier: &Arc, ) -> Result { if let Some(details) = new_repo.details(identifier).await? { Ok(details) @@ -342,9 +342,9 @@ async fn fetch_or_generate_details( old_repo: &OldSledRepo, store: &S, config: &Configuration, - identifier: &S::Identifier, + identifier: &Arc, ) -> Result { - let details_opt = old_repo.details(identifier).await?; + let details_opt = old_repo.details(identifier.clone()).await?; if let Some(details) = details_opt { Ok(details) diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index c66303c..9ffd3f4 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1,6 +1,8 @@ mod embedded; mod schema; +use std::sync::Arc; + use diesel::prelude::*; use diesel_async::{ pooled_connection::{ @@ -11,7 +13,12 @@ use diesel_async::{ }; use url::Url; -use super::{BaseRepo, HashRepo, RepoError}; +use crate::error_code::ErrorCode; + +use super::{ + BaseRepo, Hash, HashAlreadyExists, HashPage, HashRepo, OrderedHash, RepoError, + VariantAlreadyExists, +}; #[derive(Clone)] pub(crate) struct PostgresRepo { @@ -39,6 +46,12 @@ pub(crate) enum PostgresError { Diesel(#[source] diesel::result::Error), } +impl PostgresError { + pub(super) const fn error_code(&self) -> ErrorCode { + todo!() + } +} + impl PostgresRepo { pub(crate) async fn connect(postgres_url: Url) -> Result { let (mut client, conn) = @@ -65,6 +78,11 @@ impl PostgresRepo { } } +fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime { + let timestamp = timestamp.to_offset(time::UtcOffset::UTC); + time::PrimitiveDateTime::new(timestamp.date(), timestamp.time()) +} + impl BaseRepo for PostgresRepo {} #[async_trait::async_trait(?Send)] @@ -82,6 +100,187 @@ impl HashRepo for PostgresRepo { Ok(count.try_into().expect("non-negative count")) } + + async fn bound(&self, input_hash: Hash) -> Result, RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = hashes + .select(created_at) + .filter(hash.eq(&input_hash)) + .first(&mut conn) + .await + .map(time::PrimitiveDateTime::assume_utc) + .optional() + .map_err(PostgresError::Diesel)?; + + Ok(timestamp.map(|timestamp| OrderedHash { + timestamp, + hash: input_hash, + })) + } + + async fn hash_page_by_date( + &self, + date: time::OffsetDateTime, + limit: usize, + ) -> Result { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = to_primitive(date); + + let ordered_hash = hashes + .select((created_at, hash)) + .filter(created_at.lt(timestamp)) + .order(created_at.desc()) + .first::<(time::PrimitiveDateTime, Hash)>(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(|tup| OrderedHash { + timestamp: tup.0.assume_utc(), + hash: tup.1, + }); + + self.hashes_ordered(ordered_hash, limit).await + } + + async fn hashes_ordered( + &self, + bound: Option, + limit: usize, + ) -> Result { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let (mut page, prev) = if let Some(OrderedHash { + timestamp, + hash: bound_hash, + }) = bound + { + let timestamp = to_primitive(timestamp); + + let page = hashes + .select(hash) + .filter(created_at.lt(timestamp)) + .or_filter(created_at.eq(timestamp).and(hash.le(&bound_hash))) + .order(created_at.desc()) + .then_order_by(hash.desc()) + .limit(limit as i64 + 1) + .load::(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + let prev = hashes + .select(hash) + .filter(created_at.gt(timestamp)) + .or_filter(created_at.eq(timestamp).and(hash.gt(&bound_hash))) + .order(created_at) + .then_order_by(hash) + .offset(limit.saturating_sub(1) as i64) + .first::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + (page, prev) + } else { + let page = hashes + .select(hash) + .order(created_at.desc()) + .then_order_by(hash.desc()) + .limit(limit as i64 + 1) + .load::(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + (page, None) + }; + + let next = if page.len() > limit { page.pop() } else { None }; + + Ok(HashPage { + limit, + prev, + next, + hashes: page, + }) + } + + async fn create_hash_with_timestamp( + &self, + input_hash: Hash, + input_identifier: &Arc, + timestamp: time::OffsetDateTime, + ) -> Result, RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = to_primitive(timestamp); + + /* + insert_into(hashes).values(( + hash.eq(&input_hash), + identifier.eq(&input_identifier) + )) + */ + + todo!() + } + + async fn update_identifier(&self, hash: Hash, identifier: &Arc) -> Result<(), RepoError> { + todo!() + } + + async fn identifier(&self, hash: Hash) -> Result>, RepoError> { + todo!() + } + + async fn relate_variant_identifier( + &self, + hash: Hash, + variant: String, + identifier: &Arc, + ) -> Result, RepoError> { + todo!() + } + + async fn variant_identifier( + &self, + hash: Hash, + variant: String, + ) -> Result>, RepoError> { + todo!() + } + + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { + todo!() + } + + async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + todo!() + } + + async fn relate_motion_identifier( + &self, + hash: Hash, + identifier: &Arc, + ) -> Result<(), RepoError> { + todo!() + } + + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { + todo!() + } + + async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { + todo!() + } } impl std::fmt::Debug for PostgresRepo { diff --git a/src/repo/postgres/migrations/V0001__create_hashes.rs b/src/repo/postgres/migrations/V0001__create_hashes.rs index 9829585..1c97f08 100644 --- a/src/repo/postgres/migrations/V0001__create_hashes.rs +++ b/src/repo/postgres/migrations/V0001__create_hashes.rs @@ -8,7 +8,7 @@ pub(crate) fn migration() -> String { m.create_table("hashes", |t| { t.add_column( "hash", - types::binary() + types::text() .primary(true) .unique(true) .nullable(false) diff --git a/src/repo/postgres/migrations/V0002__create_variants.rs b/src/repo/postgres/migrations/V0002__create_variants.rs index c49c62c..4e62f1b 100644 --- a/src/repo/postgres/migrations/V0002__create_variants.rs +++ b/src/repo/postgres/migrations/V0002__create_variants.rs @@ -7,7 +7,7 @@ pub(crate) fn migration() -> String { m.create_table("variants", |t| { t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#); - t.add_column("hash", types::binary().nullable(false)); + t.add_column("hash", types::text().nullable(false)); t.add_column("variant", types::text().nullable(false)); t.add_column("identifier", types::text().nullable(false)); t.add_column( diff --git a/src/repo/postgres/migrations/V0003__create_aliases.rs b/src/repo/postgres/migrations/V0003__create_aliases.rs index a8aa37f..007f123 100644 --- a/src/repo/postgres/migrations/V0003__create_aliases.rs +++ b/src/repo/postgres/migrations/V0003__create_aliases.rs @@ -14,7 +14,7 @@ pub(crate) fn migration() -> String { .unique(true) .nullable(false), ); - t.add_column("hash", types::binary().nullable(false)); + t.add_column("hash", types::text().nullable(false)); t.add_column("token", types::text().size(60).nullable(false)); t.add_foreign_key(&["hash"], "hashes", &["hash"]); diff --git a/src/repo/postgres/migrations/V0005__create_details.rs b/src/repo/postgres/migrations/V0005__create_details.rs index e324caa..440aa02 100644 --- a/src/repo/postgres/migrations/V0005__create_details.rs +++ b/src/repo/postgres/migrations/V0005__create_details.rs @@ -10,7 +10,7 @@ pub(crate) fn migration() -> String { "identifier", types::text().primary(true).unique(true).nullable(false), ); - t.add_column("details", types::custom("jsonb").nullable(false)); + t.add_column("json", types::custom("jsonb").nullable(false)); }); m.make::().to_string() diff --git a/src/repo/postgres/migrations/V0006__create_queue.rs b/src/repo/postgres/migrations/V0006__create_queue.rs index 293ff97..d5161bf 100644 --- a/src/repo/postgres/migrations/V0006__create_queue.rs +++ b/src/repo/postgres/migrations/V0006__create_queue.rs @@ -7,7 +7,7 @@ pub(crate) fn migration() -> String { m.inject_custom("CREATE TYPE job_status AS ENUM ('new', 'running');"); - m.create_table("queue", |t| { + m.create_table("job_queue", |t| { t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#); t.add_column("queue", types::text().size(50).nullable(false)); t.add_column("job", types::custom("jsonb").nullable(false)); @@ -42,7 +42,7 @@ $$ LANGUAGE plpgsql; r#" CREATE TRIGGER queue_status AFTER INSERT OR UPDATE OF status - ON queue + ON job_queue FOR EACH ROW EXECUTE PROCEDURE queue_status_notify(); "# diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index 3ab3e46..1b6aba4 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -9,7 +9,7 @@ pub mod sql_types { diesel::table! { aliases (alias) { alias -> Text, - hash -> Bytea, + hash -> Text, token -> Text, } } @@ -17,20 +17,33 @@ diesel::table! { diesel::table! { details (identifier) { identifier -> Text, - #[sql_name = "details"] - details_json -> Jsonb, + json -> Jsonb, } } diesel::table! { hashes (hash) { - hash -> Bytea, + hash -> Text, identifier -> Text, motion_identifier -> Nullable, created_at -> Timestamp, } } +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::JobStatus; + + job_queue (id) { + id -> Uuid, + queue -> Text, + job -> Jsonb, + status -> JobStatus, + queue_time -> Timestamp, + heartbeat -> Timestamp, + } +} + diesel::table! { proxies (url) { url -> Text, @@ -39,21 +52,6 @@ diesel::table! { } } -diesel::table! { - use diesel::sql_types::*; - use super::sql_types::JobStatus; - - queue (id) { - id -> Uuid, - #[sql_name = "queue"] - queue_name -> Text, - job -> Jsonb, - status -> JobStatus, - queue_time -> Timestamp, - heartbeat -> Timestamp, - } -} - diesel::table! { refinery_schema_history (version) { version -> Int4, @@ -89,7 +87,7 @@ diesel::table! { diesel::table! { variants (id) { id -> Uuid, - hash -> Bytea, + hash -> Text, variant -> Text, identifier -> Text, accessed -> Timestamp, @@ -104,8 +102,8 @@ diesel::allow_tables_to_appear_in_same_query!( aliases, details, hashes, + job_queue, proxies, - queue, refinery_schema_history, settings, store_migrations, diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 57527c2..33cf611 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,7 +2,6 @@ use crate::{ details::HumanDate, error_code::{ErrorCode, OwnedErrorCode}, serde_str::Serde, - store::StoreError, stream::{from_iterator, LocalBoxStream}, }; use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree}; @@ -21,9 +20,9 @@ use uuid::Uuid; use super::{ hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, - Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, Identifier, JobId, - OrderedHash, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, - UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, + Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, + ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, + UploadResult, VariantAccessRepo, VariantAlreadyExists, }; macro_rules! b { @@ -55,6 +54,9 @@ pub(crate) enum SledError { #[error("Error parsing variant key")] VariantKey(#[from] VariantKeyError), + #[error("Invalid string data in db")] + Utf8(#[source] std::str::Utf8Error), + #[error("Operation panicked")] Panic, @@ -65,7 +67,7 @@ pub(crate) enum SledError { impl SledError { pub(super) const fn error_code(&self) -> ErrorCode { match self { - Self::Sled(_) | Self::VariantKey(_) => ErrorCode::SLED_ERROR, + Self::Sled(_) | Self::VariantKey(_) | Self::Utf8(_) => ErrorCode::SLED_ERROR, Self::Details(_) => ErrorCode::EXTRACT_DETAILS, Self::UploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT, Self::Panic => ErrorCode::PANIC, @@ -648,10 +650,17 @@ fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> { Arc::from(key) } +fn try_into_arc_str(ivec: IVec) -> Result, SledError> { + std::str::from_utf8(&ivec[..]) + .map_err(SledError::Utf8) + .map(String::from) + .map(Arc::from) +} + #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { - #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] - async fn push(&self, queue_name: &'static str, job: Arc<[u8]>) -> Result { + #[tracing::instrument(skip(self))] + async fn push(&self, queue_name: &'static str, job: Arc) -> Result { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); @@ -700,7 +709,7 @@ impl QueueRepo for SledRepo { &self, queue_name: &'static str, worker_id: Uuid, - ) -> Result<(JobId, Arc<[u8]>), RepoError> { + ) -> Result<(JobId, Arc), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); let now = time::OffsetDateTime::now_utc(); @@ -753,11 +762,10 @@ impl QueueRepo for SledRepo { tracing::Span::current().record("job_id", &format!("{job_id:?}")); - let opt = queue - .get(&key)? - .map(|job_bytes| (job_id, Arc::from(job_bytes.to_vec()))); + let opt = queue.get(&key)?.map(try_into_arc_str).transpose()?; - return Ok(opt) as Result)>, SledError>; + return Ok(opt.map(|job| (job_id, job))) + as Result)>, SledError>; } Ok(None) @@ -949,43 +957,46 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl DetailsRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self))] async fn relate_details( &self, - identifier: &dyn Identifier, + identifier: &Arc, details: &Details, - ) -> Result<(), StoreError> { - let key = identifier.to_bytes()?; - let details = serde_json::to_vec(&details.inner) - .map_err(SledError::Details) - .map_err(RepoError::from)?; + ) -> Result<(), RepoError> { + let key = identifier.clone(); + let details = serde_json::to_vec(&details.inner).map_err(SledError::Details)?; b!( self.identifier_details, - identifier_details.insert(key, details) + identifier_details.insert(key.as_bytes(), details) ); Ok(()) } - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn details(&self, identifier: &dyn Identifier) -> Result, StoreError> { - let key = identifier.to_bytes()?; + #[tracing::instrument(level = "trace", skip(self))] + async fn details(&self, identifier: &Arc) -> Result, RepoError> { + let key = identifier.clone(); - let opt = b!(self.identifier_details, identifier_details.get(key)); + let opt = b!( + self.identifier_details, + identifier_details.get(key.as_bytes()) + ); opt.map(|ivec| serde_json::from_slice(&ivec).map(|inner| Details { inner })) .transpose() .map_err(SledError::Details) .map_err(RepoError::from) - .map_err(StoreError::from) } - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> { - let key = identifier.to_bytes()?; + #[tracing::instrument(level = "trace", skip(self))] + async fn cleanup_details(&self, identifier: &Arc) -> Result<(), RepoError> { + let key = identifier.clone(); - b!(self.identifier_details, identifier_details.remove(key)); + b!( + self.identifier_details, + identifier_details.remove(key.as_bytes()) + ); Ok(()) } @@ -999,24 +1010,28 @@ impl StoreMigrationRepo for SledRepo { async fn mark_migrated( &self, - old_identifier: &dyn Identifier, - new_identifier: &dyn Identifier, - ) -> Result<(), StoreError> { - let key = new_identifier.to_bytes()?; - let value = old_identifier.to_bytes()?; + old_identifier: &Arc, + new_identifier: &Arc, + ) -> Result<(), RepoError> { + let key = new_identifier.clone(); + let value = old_identifier.clone(); b!( self.migration_identifiers, - migration_identifiers.insert(key, value) + migration_identifiers.insert(key.as_bytes(), value.as_bytes()) ); Ok(()) } - async fn is_migrated(&self, identifier: &dyn Identifier) -> Result { - let key = identifier.to_bytes()?; + async fn is_migrated(&self, identifier: &Arc) -> Result { + let key = identifier.clone(); - Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) + Ok(b!( + self.migration_identifiers, + migration_identifiers.get(key.as_bytes()) + ) + .is_some()) } async fn clear(&self) -> Result<(), RepoError> { @@ -1062,17 +1077,6 @@ impl HashRepo for SledRepo { )) } - async fn hashes(&self) -> LocalBoxStream<'static, Result> { - let iter = self.hashes.iter().keys().filter_map(|res| { - res.map_err(SledError::from) - .map_err(RepoError::from) - .map(Hash::from_ivec) - .transpose() - }); - - Box::pin(from_iterator(iter, 8)) - } - async fn bound(&self, hash: Hash) -> Result, RepoError> { let opt = b!(self.hashes, hashes.get(hash.to_ivec())); @@ -1197,10 +1201,10 @@ impl HashRepo for SledRepo { async fn create_hash_with_timestamp( &self, hash: Hash, - identifier: &dyn Identifier, + identifier: &Arc, timestamp: time::OffsetDateTime, - ) -> Result, StoreError> { - let identifier: sled::IVec = identifier.to_bytes()?.into(); + ) -> Result, RepoError> { + let identifier: sled::IVec = identifier.as_bytes().to_vec().into(); let hashes = self.hashes.clone(); let hashes_inverse = self.hashes_inverse.clone(); @@ -1234,63 +1238,56 @@ impl HashRepo for SledRepo { match res { Ok(res) => Ok(res), Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { - Err(StoreError::from(RepoError::from(SledError::from(e)))) + Err(RepoError::from(SledError::from(e))) } } } - async fn update_identifier( - &self, - hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError> { - let identifier = identifier.to_bytes()?; + async fn update_identifier(&self, hash: Hash, identifier: &Arc) -> Result<(), RepoError> { + let identifier = identifier.clone(); let hash = hash.to_ivec(); b!( self.hash_identifiers, - hash_identifiers.insert(hash, identifier) + hash_identifiers.insert(hash, identifier.as_bytes()) ); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn identifier(&self, hash: Hash) -> Result>, RepoError> { + async fn identifier(&self, hash: Hash) -> Result>, RepoError> { let hash = hash.to_ivec(); - let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { - return Ok(None); - }; + let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); - Ok(Some(Arc::from(ivec.to_vec()))) + Ok(opt.map(try_into_arc_str).transpose()?) } - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self))] async fn relate_variant_identifier( &self, hash: Hash, variant: String, - identifier: &dyn Identifier, - ) -> Result, StoreError> { + identifier: &Arc, + ) -> Result, RepoError> { let hash = hash.to_bytes(); let key = variant_key(&hash, &variant); - let value = identifier.to_bytes()?; + let value = identifier.clone(); let hash_variant_identifiers = self.hash_variant_identifiers.clone(); actix_rt::task::spawn_blocking(move || { hash_variant_identifiers - .compare_and_swap(key, Option::<&[u8]>::None, Some(value)) + .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) .map(|res| res.map_err(|_| VariantAlreadyExists)) }) .await .map_err(|_| RepoError::Canceled)? .map_err(SledError::from) .map_err(RepoError::from) - .map_err(StoreError::from) } #[tracing::instrument(level = "trace", skip(self))] @@ -1298,7 +1295,7 @@ impl HashRepo for SledRepo { &self, hash: Hash, variant: String, - ) -> Result>, RepoError> { + ) -> Result>, RepoError> { let hash = hash.to_bytes(); let key = variant_key(&hash, &variant); @@ -1308,11 +1305,11 @@ impl HashRepo for SledRepo { hash_variant_identifiers.get(key) ); - Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) + Ok(opt.map(try_into_arc_str).transpose()?) } #[tracing::instrument(level = "debug", skip(self))] - async fn variants(&self, hash: Hash) -> Result)>, RepoError> { + async fn variants(&self, hash: Hash) -> Result)>, RepoError> { let hash = hash.to_ivec(); let vec = b!( @@ -1321,14 +1318,14 @@ impl HashRepo for SledRepo { .scan_prefix(hash.clone()) .filter_map(|res| res.ok()) .filter_map(|(key, ivec)| { - let identifier = Arc::from(ivec.to_vec()); + let identifier = try_into_arc_str(ivec).ok(); let variant = variant_from_key(&hash, &key); if variant.is_none() { tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); } - Some((variant?, identifier)) + Some((variant?, identifier?)) }) .collect::>()) as Result, SledError> ); @@ -1350,25 +1347,25 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] + #[tracing::instrument(level = "trace", skip(self))] async fn relate_motion_identifier( &self, hash: Hash, - identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + identifier: &Arc, + ) -> Result<(), RepoError> { let hash = hash.to_ivec(); - let bytes = identifier.to_bytes()?; + let bytes = identifier.clone(); b!( self.hash_motion_identifiers, - hash_motion_identifiers.insert(hash, bytes) + hash_motion_identifiers.insert(hash, bytes.as_bytes()) ); Ok(()) } #[tracing::instrument(level = "trace", skip(self))] - async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { + async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { let hash = hash.to_ivec(); let opt = b!( @@ -1376,7 +1373,7 @@ impl HashRepo for SledRepo { hash_motion_identifiers.get(hash) ); - Ok(opt.map(|ivec| Arc::from(ivec.to_vec()))) + Ok(opt.map(try_into_arc_str).transpose()?) } #[tracing::instrument(skip(self))] diff --git a/src/repo_04.rs b/src/repo_04.rs index 174882a..0d63d55 100644 --- a/src/repo_04.rs +++ b/src/repo_04.rs @@ -2,10 +2,9 @@ use crate::{ config, details::Details, repo::{Alias, DeleteToken}, - store::{Identifier, StoreError}, }; use futures_core::Stream; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; pub(crate) use self::sled::SledRepo; @@ -46,7 +45,7 @@ pub(crate) trait SettingsRepo: BaseRepo { #[async_trait::async_trait(?Send)] pub(crate) trait IdentifierRepo: BaseRepo { - async fn details(&self, identifier: &I) -> Result, StoreError>; + async fn details(&self, identifier: Arc) -> Result, RepoError>; } #[async_trait::async_trait(?Send)] @@ -57,20 +56,11 @@ pub(crate) trait HashRepo: BaseRepo { async fn hashes(&self) -> Self::Stream; - async fn identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError>; + async fn identifier(&self, hash: Self::Bytes) -> Result>, RepoError>; - async fn variants( - &self, - hash: Self::Bytes, - ) -> Result, StoreError>; + async fn variants(&self, hash: Self::Bytes) -> Result)>, RepoError>; - async fn motion_identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError>; + async fn motion_identifier(&self, hash: Self::Bytes) -> Result>, RepoError>; } #[async_trait::async_trait(?Send)] diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs index 36595b7..09a684a 100644 --- a/src/repo_04/sled.rs +++ b/src/repo_04/sled.rs @@ -1,10 +1,9 @@ use crate::{ details::HumanDate, repo_04::{ - Alias, AliasRepo, BaseRepo, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo, - RepoError, SettingsRepo, + Alias, AliasRepo, BaseRepo, DeleteToken, Details, HashRepo, IdentifierRepo, RepoError, + SettingsRepo, }, - store::StoreError, stream::{from_iterator, LocalBoxStream}, }; use sled::{Db, IVec, Tree}; @@ -56,6 +55,9 @@ pub(crate) enum SledError { #[error("Operation panicked")] Panic, + + #[error("Error reading string")] + Utf8(#[from] std::str::Utf8Error), } #[derive(Clone)] @@ -179,17 +181,17 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { - #[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))] - async fn details(&self, identifier: &I) -> Result, StoreError> { - let key = identifier.to_bytes()?; - - let opt = b!(self.identifier_details, identifier_details.get(key)); + #[tracing::instrument(level = "trace", skip(self))] + async fn details(&self, key: Arc) -> Result, RepoError> { + let opt = b!( + self.identifier_details, + identifier_details.get(key.as_bytes()) + ); opt.map(|ivec| serde_json::from_slice::(&ivec)) .transpose() .map_err(SledError::from) .map_err(RepoError::from) - .map_err(StoreError::from) .map(|opt| opt.and_then(OldDetails::into_details)) } } @@ -219,29 +221,27 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { + async fn identifier(&self, hash: Self::Bytes) -> Result>, RepoError> { let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else { return Ok(None); }; - Ok(Some(I::from_bytes(ivec.to_vec())?)) + Ok(Some(Arc::from( + std::str::from_utf8(&ivec[..]) + .map_err(SledError::from)? + .to_string(), + ))) } #[tracing::instrument(level = "debug", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn variants( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { + async fn variants(&self, hash: Self::Bytes) -> Result)>, RepoError> { let vec = b!( self.hash_variant_identifiers, Ok(hash_variant_identifiers .scan_prefix(&hash) .filter_map(|res| res.ok()) .filter_map(|(key, ivec)| { - let identifier = I::from_bytes(ivec.to_vec()).ok(); + let identifier = String::from_utf8(ivec.to_vec()).ok(); if identifier.is_none() { tracing::warn!( "Skipping an identifier: {}", @@ -254,7 +254,7 @@ impl HashRepo for SledRepo { tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key)); } - Some((variant?, identifier?)) + Some((variant?, Arc::from(identifier?))) }) .collect::>()) as Result, SledError> ); @@ -263,16 +263,20 @@ impl HashRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))] - async fn motion_identifier( - &self, - hash: Self::Bytes, - ) -> Result, StoreError> { + async fn motion_identifier(&self, hash: Self::Bytes) -> Result>, RepoError> { let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) ); - opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose() + opt.map(|ivec| { + Ok(Arc::from( + std::str::from_utf8(&ivec[..]) + .map_err(SledError::from)? + .to_string(), + )) + }) + .transpose() } } diff --git a/src/store.rs b/src/store.rs index 66c9337..3293237 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,10 +1,9 @@ use actix_web::web::Bytes; -use base64::{prelude::BASE64_STANDARD, Engine}; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::error_code::ErrorCode; +use crate::{error_code::ErrorCode, stream::LocalBoxStream}; pub(crate) mod file_store; pub(crate) mod object_store; @@ -70,32 +69,15 @@ impl From for StoreError { } } -pub(crate) trait Identifier: Send + Sync + Debug { - fn to_bytes(&self) -> Result, StoreError>; - - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized; - - fn from_arc(arc: Arc<[u8]>) -> Result - where - Self: Sized; - - fn string_repr(&self) -> String; -} - #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { - type Identifier: Identifier + Clone + 'static; - type Stream: Stream> + Unpin + 'static; - async fn health_check(&self) -> Result<(), StoreError>; async fn save_async_read( &self, reader: Reader, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static; @@ -103,7 +85,7 @@ pub(crate) trait Store: Clone + Debug { &self, stream: S, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where S: Stream> + Unpin + 'static; @@ -111,28 +93,28 @@ pub(crate) trait Store: Clone + Debug { &self, bytes: Bytes, content_type: mime::Mime, - ) -> Result; + ) -> Result, StoreError>; - fn public_url(&self, _: &Self::Identifier) -> Option; + fn public_url(&self, _: &Arc) -> Option; async fn to_stream( &self, - identifier: &Self::Identifier, + identifier: &Arc, from_start: Option, len: Option, - ) -> Result; + ) -> Result>, StoreError>; async fn read_into( &self, - identifier: &Self::Identifier, + identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin; - async fn len(&self, identifier: &Self::Identifier) -> Result; + async fn len(&self, identifier: &Arc) -> Result; - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError>; + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] @@ -140,9 +122,6 @@ impl Store for actix_web::web::Data where T: Store, { - type Identifier = T::Identifier; - type Stream = T::Stream; - async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } @@ -151,7 +130,7 @@ where &self, reader: Reader, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { @@ -162,7 +141,7 @@ where &self, stream: S, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { @@ -173,26 +152,26 @@ where &self, bytes: Bytes, content_type: mime::Mime, - ) -> Result { + ) -> Result, StoreError> { T::save_bytes(self, bytes, content_type).await } - fn public_url(&self, identifier: &Self::Identifier) -> Option { + fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, - identifier: &Self::Identifier, + identifier: &Arc, from_start: Option, len: Option, - ) -> Result { + ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn read_into( &self, - identifier: &Self::Identifier, + identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where @@ -201,11 +180,83 @@ where T::read_into(self, identifier, writer).await } - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { + T::remove(self, identifier).await + } +} + +#[async_trait::async_trait(?Send)] +impl Store for Arc +where + T: Store, +{ + async fn health_check(&self) -> Result<(), StoreError> { + T::health_check(self).await + } + + async fn save_async_read( + &self, + reader: Reader, + content_type: mime::Mime, + ) -> Result, StoreError> + where + Reader: AsyncRead + Unpin + 'static, + { + T::save_async_read(self, reader, content_type).await + } + + async fn save_stream( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result, StoreError> + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream, content_type).await + } + + async fn save_bytes( + &self, + bytes: Bytes, + content_type: mime::Mime, + ) -> Result, StoreError> { + T::save_bytes(self, bytes, content_type).await + } + + fn public_url(&self, identifier: &Arc) -> Option { + T::public_url(self, identifier) + } + + async fn to_stream( + &self, + identifier: &Arc, + from_start: Option, + len: Option, + ) -> Result>, StoreError> { + T::to_stream(self, identifier, from_start, len).await + } + + async fn read_into( + &self, + identifier: &Arc, + writer: &mut Writer, + ) -> Result<(), std::io::Error> + where + Writer: AsyncWrite + Unpin, + { + T::read_into(self, identifier, writer).await + } + + async fn len(&self, identifier: &Arc) -> Result { + T::len(self, identifier).await + } + + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } @@ -215,9 +266,6 @@ impl<'a, T> Store for &'a T where T: Store, { - type Identifier = T::Identifier; - type Stream = T::Stream; - async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } @@ -226,7 +274,7 @@ where &self, reader: Reader, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { @@ -237,7 +285,7 @@ where &self, stream: S, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { @@ -248,26 +296,26 @@ where &self, bytes: Bytes, content_type: mime::Mime, - ) -> Result { + ) -> Result, StoreError> { T::save_bytes(self, bytes, content_type).await } - fn public_url(&self, identifier: &Self::Identifier) -> Option { + fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, - identifier: &Self::Identifier, + identifier: &Arc, from_start: Option, len: Option, - ) -> Result { + ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn read_into( &self, - identifier: &Self::Identifier, + identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where @@ -276,59 +324,11 @@ where T::read_into(self, identifier, writer).await } - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } - -impl Identifier for Vec { - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized, - { - Ok(bytes) - } - - fn from_arc(arc: Arc<[u8]>) -> Result - where - Self: Sized, - { - Ok(Vec::from(&arc[..])) - } - - fn to_bytes(&self) -> Result, StoreError> { - Ok(self.clone()) - } - - fn string_repr(&self) -> String { - BASE64_STANDARD.encode(self.as_slice()) - } -} - -impl Identifier for Arc<[u8]> { - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized, - { - Ok(Arc::from(bytes)) - } - - fn from_arc(arc: Arc<[u8]>) -> Result - where - Self: Sized, - { - Ok(arc) - } - - fn to_bytes(&self) -> Result, StoreError> { - Ok(Vec::from(&self[..])) - } - - fn string_repr(&self) -> String { - BASE64_STANDARD.encode(&self[..]) - } -} diff --git a/src/store/file_store.rs b/src/store/file_store.rs index b1738cb..dafa8f7 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,18 +1,17 @@ -use crate::{error_code::ErrorCode, file::File, repo::ArcRepo, store::Store}; +use crate::{ + error_code::ErrorCode, file::File, repo::ArcRepo, store::Store, stream::LocalBoxStream, +}; use actix_web::web::Bytes; use futures_core::Stream; use std::{ path::{Path, PathBuf}, - pin::Pin, + sync::Arc, }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; use tracing::Instrument; -mod file_id; -pub(crate) use file_id::FileId; - use super::StoreError; // - Settings Tree @@ -28,11 +27,8 @@ pub(crate) enum FileError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), - #[error("Error formatting file store ID")] - IdError, - - #[error("Malformed file store ID")] - PrefixError, + #[error("Couldn't convert Path to String")] + StringError, #[error("Tried to save over existing file")] FileExists, @@ -44,7 +40,7 @@ impl FileError { Self::Io(_) => ErrorCode::FILE_IO_ERROR, Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR, Self::FileExists => ErrorCode::FILE_EXISTS, - Self::IdError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR, + Self::StringError => ErrorCode::FORMAT_FILE_ID_ERROR, } } } @@ -58,9 +54,6 @@ pub(crate) struct FileStore { #[async_trait::async_trait(?Send)] impl Store for FileStore { - type Identifier = FileId; - type Stream = Pin>>>; - async fn health_check(&self) -> Result<(), StoreError> { tokio::fs::metadata(&self.root_dir) .await @@ -74,7 +67,7 @@ impl Store for FileStore { &self, mut reader: Reader, _content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { @@ -92,7 +85,7 @@ impl Store for FileStore { &self, stream: S, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { @@ -105,7 +98,7 @@ impl Store for FileStore { &self, bytes: Bytes, _content_type: mime::Mime, - ) -> Result { + ) -> Result, StoreError> { let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { @@ -116,17 +109,17 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } - fn public_url(&self, _identifier: &Self::Identifier) -> Option { + fn public_url(&self, _identifier: &Arc) -> Option { None } #[tracing::instrument] async fn to_stream( &self, - identifier: &Self::Identifier, + identifier: &Arc, from_start: Option, len: Option, - ) -> Result { + ) -> Result>, StoreError> { let path = self.path_from_file_id(identifier); let file_span = tracing::trace_span!(parent: None, "File Stream"); @@ -147,7 +140,7 @@ impl Store for FileStore { #[tracing::instrument(skip(writer))] async fn read_into( &self, - identifier: &Self::Identifier, + identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where @@ -161,7 +154,7 @@ impl Store for FileStore { } #[tracing::instrument] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Arc) -> Result { let path = self.path_from_file_id(identifier); let len = tokio::fs::metadata(path) @@ -173,7 +166,7 @@ impl Store for FileStore { } #[tracing::instrument] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { let path = self.path_from_file_id(identifier); self.safe_remove_file(path).await?; @@ -196,6 +189,14 @@ impl FileStore { }) } + fn file_id_from_path(&self, path: PathBuf) -> Result, FileError> { + path.to_str().ok_or(FileError::StringError).map(Into::into) + } + + fn path_from_file_id(&self, file_id: &Arc) -> PathBuf { + self.root_dir.join(file_id.as_ref()) + } + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs deleted file mode 100644 index 279ddbc..0000000 --- a/src/store/file_store/file_id.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::store::{ - file_store::{FileError, FileStore}, - Identifier, StoreError, -}; -use std::path::PathBuf; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct FileId(PathBuf); - -impl Identifier for FileId { - fn to_bytes(&self) -> Result, StoreError> { - let vec = self - .0 - .to_str() - .ok_or(FileError::IdError)? - .as_bytes() - .to_vec(); - - Ok(vec) - } - - fn from_bytes(bytes: Vec) -> Result - where - Self: Sized, - { - let string = String::from_utf8(bytes).map_err(|_| FileError::IdError)?; - - let id = FileId(PathBuf::from(string)); - - Ok(id) - } - - fn from_arc(arc: std::sync::Arc<[u8]>) -> Result - where - Self: Sized, - { - Self::from_bytes(Vec::from(&arc[..])) - } - - fn string_repr(&self) -> String { - self.0.to_string_lossy().into_owned() - } -} - -impl FileStore { - pub(super) fn file_id_from_path(&self, path: PathBuf) -> Result { - let stripped = path - .strip_prefix(&self.root_dir) - .map_err(|_| FileError::PrefixError)?; - - Ok(FileId(stripped.to_path_buf())) - } - - pub(super) fn path_from_file_id(&self, file_id: &FileId) -> PathBuf { - self.root_dir.join(&file_id.0) - } -} diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 301c181..4c1aab7 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -3,7 +3,7 @@ use crate::{ error_code::ErrorCode, repo::ArcRepo, store::Store, - stream::{IntoStreamer, StreamMap}, + stream::{IntoStreamer, LocalBoxStream, StreamMap}, }; use actix_rt::task::JoinError; use actix_web::{ @@ -19,16 +19,13 @@ use futures_core::Stream; use reqwest::{header::RANGE, Body, Response}; use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; -use std::{pin::Pin, string::FromUtf8Error, time::Duration}; +use std::{string::FromUtf8Error, sync::Arc, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::io::ReaderStream; use tracing::Instrument; use url::Url; -mod object_id; -pub(crate) use object_id::ObjectId; - use super::StoreError; const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); @@ -189,9 +186,6 @@ async fn status_error(response: Response) -> StoreError { #[async_trait::async_trait(?Send)] impl Store for ObjectStore { - type Identifier = ObjectId; - type Stream = Pin>>>; - async fn health_check(&self) -> Result<(), StoreError> { let response = self .head_bucket_request() @@ -211,7 +205,7 @@ impl Store for ObjectStore { &self, reader: Reader, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { @@ -224,7 +218,7 @@ impl Store for ObjectStore { &self, mut stream: S, content_type: mime::Mime, - ) -> Result + ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { @@ -363,7 +357,7 @@ impl Store for ObjectStore { &self, bytes: Bytes, content_type: mime::Mime, - ) -> Result { + ) -> Result, StoreError> { let (req, object_id) = self.put_object_request(bytes.len(), content_type).await?; let response = req.body(bytes).send().await.map_err(ObjectError::from)?; @@ -375,9 +369,9 @@ impl Store for ObjectStore { Ok(object_id) } - fn public_url(&self, identifier: &Self::Identifier) -> Option { + fn public_url(&self, identifier: &Arc) -> Option { self.public_endpoint.clone().map(|mut endpoint| { - endpoint.set_path(identifier.as_str()); + endpoint.set_path(identifier.as_ref()); endpoint }) } @@ -385,10 +379,10 @@ impl Store for ObjectStore { #[tracing::instrument(skip(self))] async fn to_stream( &self, - identifier: &Self::Identifier, + identifier: &Arc, from_start: Option, len: Option, - ) -> Result { + ) -> Result>, StoreError> { let response = self .get_object_request(identifier, from_start, len) .send() @@ -409,7 +403,7 @@ impl Store for ObjectStore { #[tracing::instrument(skip(self, writer))] async fn read_into( &self, - identifier: &Self::Identifier, + identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where @@ -440,7 +434,7 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(self))] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Arc) -> Result { let response = self .head_object_request(identifier) .send() @@ -464,7 +458,7 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(self))] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> { + async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { let response = self .delete_object_request(identifier) .send() @@ -523,7 +517,7 @@ impl ObjectStore { &self, length: usize, content_type: mime::Mime, - ) -> Result<(RequestBuilder, ObjectId), StoreError> { + ) -> Result<(RequestBuilder, Arc), StoreError> { let path = self.next_file().await?; let mut action = self.bucket.put_object(Some(&self.credentials), &path); @@ -535,13 +529,13 @@ impl ObjectStore { .headers_mut() .insert("content-length", length.to_string()); - Ok((self.build_request(action), ObjectId::from_string(path))) + Ok((self.build_request(action), Arc::from(path))) } async fn create_multipart_request( &self, content_type: mime::Mime, - ) -> Result<(RequestBuilder, ObjectId), StoreError> { + ) -> Result<(RequestBuilder, Arc), StoreError> { let path = self.next_file().await?; let mut action = self @@ -552,13 +546,13 @@ impl ObjectStore { .headers_mut() .insert("content-type", content_type.as_ref()); - Ok((self.build_request(action), ObjectId::from_string(path))) + Ok((self.build_request(action), Arc::from(path))) } async fn create_upload_part_request( &self, buf: BytesStream, - object_id: &ObjectId, + object_id: &Arc, part_number: u16, upload_id: &str, ) -> Result { @@ -566,7 +560,7 @@ impl ObjectStore { let mut action = self.bucket.upload_part( Some(&self.credentials), - object_id.as_str(), + object_id.as_ref(), part_number, upload_id, ); @@ -601,13 +595,13 @@ impl ObjectStore { async fn send_complete_multipart_request<'a, I: Iterator>( &'a self, - object_id: &'a ObjectId, + object_id: &'a Arc, upload_id: &'a str, etags: I, ) -> Result { let mut action = self.bucket.complete_multipart_upload( Some(&self.credentials), - object_id.as_str(), + object_id.as_ref(), upload_id, etags, ); @@ -628,12 +622,12 @@ impl ObjectStore { fn create_abort_multipart_request( &self, - object_id: &ObjectId, + object_id: &Arc, upload_id: &str, ) -> RequestBuilder { let action = self.bucket.abort_multipart_upload( Some(&self.credentials), - object_id.as_str(), + object_id.as_ref(), upload_id, ); @@ -671,13 +665,13 @@ impl ObjectStore { fn get_object_request( &self, - identifier: &ObjectId, + identifier: &Arc, from_start: Option, len: Option, ) -> RequestBuilder { let action = self .bucket - .get_object(Some(&self.credentials), identifier.as_str()); + .get_object(Some(&self.credentials), identifier.as_ref()); let req = self.build_request(action); @@ -695,18 +689,18 @@ impl ObjectStore { ) } - fn head_object_request(&self, identifier: &ObjectId) -> RequestBuilder { + fn head_object_request(&self, identifier: &Arc) -> RequestBuilder { let action = self .bucket - .head_object(Some(&self.credentials), identifier.as_str()); + .head_object(Some(&self.credentials), identifier.as_ref()); self.build_request(action) } - fn delete_object_request(&self, identifier: &ObjectId) -> RequestBuilder { + fn delete_object_request(&self, identifier: &Arc) -> RequestBuilder { let action = self .bucket - .delete_object(Some(&self.credentials), identifier.as_str()); + .delete_object(Some(&self.credentials), identifier.as_ref()); self.build_request(action) } diff --git a/src/store/object_store/object_id.rs b/src/store/object_store/object_id.rs deleted file mode 100644 index f8a7dd9..0000000 --- a/src/store/object_store/object_id.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::store::{object_store::ObjectError, Identifier, StoreError}; - -#[derive(Debug, Clone)] -pub(crate) struct ObjectId(String); - -impl Identifier for ObjectId { - fn to_bytes(&self) -> Result, StoreError> { - Ok(self.0.as_bytes().to_vec()) - } - - fn from_bytes(bytes: Vec) -> Result { - Ok(ObjectId( - String::from_utf8(bytes).map_err(ObjectError::from)?, - )) - } - - fn from_arc(arc: std::sync::Arc<[u8]>) -> Result - where - Self: Sized, - { - Self::from_bytes(Vec::from(&arc[..])) - } - - fn string_repr(&self) -> String { - self.0.clone() - } -} - -impl ObjectId { - pub(super) fn from_string(string: String) -> Self { - ObjectId(string) - } - - pub(super) fn as_str(&self) -> &str { - &self.0 - } -}