From 6ed592c432614b75f96b1c7ba3b1fde9762bde5a Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 1 Apr 2022 16:51:12 -0500 Subject: [PATCH] In Progress: process jobs - Is this a good idea? it definitely will make interacting with pict-rs harder. Maybe it's best not to do this --- src/details.rs | 2 +- src/magick.rs | 4 +- src/main.rs | 31 +++++---- src/process.rs | 2 +- src/queue.rs | 144 ++++++++++++++++++++++++++++-------------- src/queue/cleanup.rs | 35 ++++++++-- src/queue/process.rs | 87 +++++++++++++++++++++++++ src/repo.rs | 42 ++++++++++++ src/store.rs | 54 +++++++++++++++- src/upload_manager.rs | 2 +- 10 files changed, 329 insertions(+), 74 deletions(-) create mode 100644 src/queue/process.rs diff --git a/src/details.rs b/src/details.rs index 1a9e9b8..37ddc50 100644 --- a/src/details.rs +++ b/src/details.rs @@ -30,7 +30,7 @@ impl Details { } #[tracing::instrument("Details from store")] - pub(crate) async fn from_store( + pub(crate) async fn from_store( store: S, identifier: S::Identifier, expected_format: Option, diff --git a/src/magick.rs b/src/magick.rs index 8efbf23..e0c1c0d 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -140,7 +140,7 @@ pub(crate) async fn details_bytes( } #[tracing::instrument(skip(store))] -pub(crate) async fn details_store( +pub(crate) async fn details_store( store: S, identifier: S::Identifier, hint: Option, @@ -255,7 +255,7 @@ pub(crate) async fn input_type_bytes(input: Bytes) -> Result( +pub(crate) fn process_image_store_read( store: S, identifier: S::Identifier, args: Vec, diff --git a/src/main.rs b/src/main.rs index 3a3a868..f243f58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,6 @@ use futures_util::{ }; use once_cell::sync::Lazy; use std::{ - collections::BTreeSet, future::ready, path::PathBuf, sync::atomic::{AtomicU64, Ordering}, @@ -142,7 +141,7 @@ struct UrlQuery { /// download an image from a URL #[instrument(name = "Downloading file", skip(client, manager))] -async fn download( +async fn download( client: web::Data, manager: web::Data, store: web::Data, @@ -214,7 +213,6 @@ type ProcessQuery = Vec<(String, String)>; fn prepare_process( query: web::Query, ext: &str, - filters: &BTreeSet, ) -> Result<(ImageFormat, Alias, PathBuf, Vec), Error> { let (alias, operations) = query @@ -237,7 +235,7 @@ fn prepare_process( let operations = operations .into_iter() - .filter(|(k, _)| filters.contains(&k.to_lowercase())) + .filter(|(k, _)| CONFIG.media.filters.contains(&k.to_lowercase())) .collect::>(); let format = ext @@ -251,14 +249,13 @@ fn prepare_process( Ok((format, alias, thumbnail_path, thumbnail_args)) } -#[instrument(name = "Fetching derived details", skip(manager, filters))] +#[instrument(name = "Fetching derived details", skip(manager))] async fn process_details( query: web::Query, ext: web::Path, manager: web::Data, - filters: web::Data>, ) -> Result { - let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?; + let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; let identifier = manager .variant_identifier::(&alias, &thumbnail_path) @@ -273,17 +270,15 @@ async fn process_details( } /// Process files -#[instrument(name = "Serving processed image", skip(manager, filters))] +#[instrument(name = "Serving processed image", skip(manager))] async fn process( range: Option>, query: web::Query, ext: web::Path, manager: web::Data, store: web::Data, - filters: web::Data>, ) -> Result { - let (format, alias, thumbnail_path, thumbnail_args) = - prepare_process(query, ext.as_str(), &filters)?; + let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; let identifier_opt = manager .variant_identifier::(&alias, &thumbnail_path) @@ -376,7 +371,7 @@ async fn process( /// Fetch file details #[instrument(name = "Fetching details", skip(manager))] -async fn details( +async fn details( alias: web::Path, manager: web::Data, store: web::Data, @@ -402,7 +397,7 @@ async fn details( /// Serve files #[instrument(name = "Serving file", skip(manager))] -async fn serve( +async fn serve( range: Option>, alias: web::Path, manager: web::Data, @@ -426,7 +421,7 @@ async fn serve( ranged_file_resp(&**store, identifier, range, details).await } -async fn ranged_file_resp( +async fn ranged_file_resp( store: &S, identifier: S::Identifier, range: Option>, @@ -652,7 +647,12 @@ async fn launch( let manager = manager.clone(); let store = store.clone(); - actix_rt::spawn(queue::process_jobs( + actix_rt::spawn(queue::process_cleanup( + manager.repo().clone(), + store.clone(), + next_worker_id(), + )); + actix_rt::spawn(queue::process_images( manager.repo().clone(), store.clone(), next_worker_id(), @@ -664,7 +664,6 @@ async fn launch( .app_data(web::Data::new(store)) .app_data(web::Data::new(manager)) .app_data(web::Data::new(build_client())) - .app_data(web::Data::new(CONFIG.media.filters.clone())) .service( web::scope("/image") .service( diff --git a/src/process.rs b/src/process.rs index 057d809..4d5b628 100644 --- a/src/process.rs +++ b/src/process.rs @@ -144,7 +144,7 @@ impl Process { }) } - pub(crate) fn store_read( + pub(crate) fn store_read( mut self, store: S, identifier: S::Identifier, diff --git a/src/queue.rs b/src/queue.rs index 9987b8b..9eceb7c 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,83 +1,135 @@ use crate::{ + config::ImageFormat, error::Error, - repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, + repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, + serde_str::Serde, store::Store, }; +use std::{future::Future, path::PathBuf, pin::Pin}; +use uuid::Uuid; mod cleanup; +mod process; const CLEANUP_QUEUE: &str = "cleanup"; +const PROCESS_QUEUE: &str = "process"; #[derive(Debug, serde::Deserialize, serde::Serialize)] -enum Job { +enum Cleanup { CleanupHash { hash: Vec }, CleanupIdentifier { identifier: Vec }, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +enum Process { + Ingest { + identifier: Vec, + upload_id: Uuid, + declared_alias: Option>, + should_validate: bool, + }, + Generate { + target_format: ImageFormat, + source: Serde, + process_path: PathBuf, + process_args: Vec, + }, +} + pub(crate) async fn queue_cleanup(repo: &R, hash: R::Bytes) -> Result<(), Error> { - let job = serde_json::to_vec(&Job::CleanupHash { + let job = serde_json::to_vec(&Cleanup::CleanupHash { hash: hash.as_ref().to_vec(), })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } -pub(crate) async fn process_jobs(repo: Repo, store: S, worker_id: String) { +pub(crate) async fn queue_ingest( + repo: &R, + identifier: Vec, + upload_id: Uuid, + declared_alias: Option, + should_validate: bool, +) -> Result<(), Error> { + let job = serde_json::to_vec(&Process::Ingest { + identifier, + declared_alias: declared_alias.map(Serde::new), + upload_id, + should_validate, + })?; + repo.push(PROCESS_QUEUE, job.into()).await?; + Ok(()) +} + +pub(crate) async fn queue_generate( + repo: &R, + target_format: ImageFormat, + source: Alias, + process_path: PathBuf, + process_args: Vec, +) -> Result<(), Error> { + let job = serde_json::to_vec(&Process::Generate { + target_format, + source: Serde::new(source), + process_path, + process_args, + })?; + repo.push(PROCESS_QUEUE, job.into()).await?; + Ok(()) +} + +pub(crate) async fn process_cleanup(repo: Repo, store: S, worker_id: String) { match repo { - Repo::Sled(ref repo) => { - if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await { - if let Err(e) = run_job(repo, &store, &job).await { - tracing::warn!("Failed to run previously dropped job: {}", e); - tracing::warn!("{:?}", e); - } - } - loop { - let res = job_loop(repo, &store, worker_id.clone()).await; - - if let Err(e) = res { - tracing::warn!("Error processing jobs: {}", e); - tracing::warn!("{:?}", e); - continue; - } - - break; - } - } + Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, cleanup::perform).await, } } -async fn job_loop(repo: &R, store: &S, worker_id: String) -> Result<(), Error> +pub(crate) async fn process_images(repo: Repo, store: S, worker_id: String) { + match repo { + Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, process::perform).await, + } +} + +type LocalBoxFuture<'a, T> = Pin + 'a>>; + +async fn process_jobs(repo: &R, store: &S, worker_id: String, callback: F) where R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R::Bytes: Clone, S: Store, + for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, +{ + if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await { + if let Err(e) = (callback)(repo, store, job.as_ref()).await { + tracing::warn!("Failed to run previously dropped job: {}", e); + tracing::warn!("{:?}", e); + } + } + loop { + let res = job_loop(repo, store, worker_id.clone(), callback).await; + + if let Err(e) = res { + tracing::warn!("Error processing jobs: {}", e); + tracing::warn!("{:?}", e); + continue; + } + + break; + } +} + +async fn job_loop(repo: &R, store: &S, worker_id: String, callback: F) -> Result<(), Error> +where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, + for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { let bytes = repo .pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec()) .await?; - run_job(repo, store, bytes.as_ref()).await?; + (callback)(repo, store, bytes.as_ref()).await?; } } - -async fn run_job(repo: &R, store: &S, job: &[u8]) -> Result<(), Error> -where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, - S: Store, -{ - match serde_json::from_slice(job) { - Ok(job) => match job { - Job::CleanupHash { hash } => cleanup::hash::(repo, hash).await?, - Job::CleanupIdentifier { identifier } => { - cleanup::identifier(repo, store, identifier).await? - } - }, - Err(e) => { - tracing::warn!("Invalid job: {}", e); - } - } - - Ok(()) -} diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 9bed6c7..1abec81 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,13 +1,40 @@ use crate::{ error::Error, - queue::{Job, CLEANUP_QUEUE}, + queue::{Cleanup, LocalBoxFuture, CLEANUP_QUEUE}, repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo}, store::{Identifier, Store}, }; use tracing::error; +pub(super) fn perform<'a, R, S>( + repo: &'a R, + store: &'a S, + job: &'a [u8], +) -> LocalBoxFuture<'a, Result<(), Error>> +where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, +{ + Box::pin(async move { + match serde_json::from_slice(job) { + Ok(job) => match job { + Cleanup::CleanupHash { hash: in_hash } => hash::(repo, in_hash).await?, + Cleanup::CleanupIdentifier { + identifier: in_identifier, + } => identifier(repo, &store, in_identifier).await?, + }, + Err(e) => { + tracing::warn!("Invalid job: {}", e); + } + } + + Ok(()) + }) +} + #[tracing::instrument(skip(repo, store))] -pub(super) async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> +async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> where R: QueueRepo + HashRepo + IdentifierRepo, R::Bytes: Clone, @@ -38,7 +65,7 @@ where } #[tracing::instrument(skip(repo))] -pub(super) async fn hash(repo: &R, hash: Vec) -> Result<(), Error> +async fn hash(repo: &R, hash: Vec) -> Result<(), Error> where R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo, R::Bytes: Clone, @@ -63,7 +90,7 @@ where for identifier in idents { if let Ok(identifier) = identifier.to_bytes() { - let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?; + let job = serde_json::to_vec(&Cleanup::CleanupIdentifier { identifier })?; repo.push(CLEANUP_QUEUE, job.into()).await?; } } diff --git a/src/queue/process.rs b/src/queue/process.rs new file mode 100644 index 0000000..261f095 --- /dev/null +++ b/src/queue/process.rs @@ -0,0 +1,87 @@ +use crate::{ + config::ImageFormat, + error::Error, + queue::{LocalBoxFuture, Process}, + repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo}, + serde_str::Serde, + store::Store, +}; +use std::path::PathBuf; +use uuid::Uuid; + +pub(super) fn perform<'a, R, S>( + repo: &'a R, + store: &'a S, + job: &'a [u8], +) -> LocalBoxFuture<'a, Result<(), Error>> +where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, +{ + Box::pin(async move { + match serde_json::from_slice(job) { + Ok(job) => match job { + Process::Ingest { + identifier, + upload_id, + declared_alias, + should_validate, + } => { + ingest( + repo, + store, + identifier, + upload_id, + declared_alias.map(Serde::into_inner), + should_validate, + ) + .await? + } + Process::Generate { + target_format, + source, + process_path, + process_args, + } => { + generate( + repo, + store, + target_format, + Serde::into_inner(source), + process_path, + process_args, + ) + .await? + } + }, + Err(e) => { + tracing::warn!("Invalid job: {}", e); + } + } + + Ok(()) + }) +} + +async fn ingest( + repo: &R, + store: &S, + identifier: Vec, + upload_id: Uuid, + declared_alias: Option, + should_validate: bool, +) -> Result<(), Error> { + unimplemented!("do this") +} + +async fn generate( + repo: &R, + store: &S, + target_format: ImageFormat, + source: Alias, + process_path: PathBuf, + process_args: Vec, +) -> Result<(), Error> { + unimplemented!("do this") +} diff --git a/src/repo.rs b/src/repo.rs index 145c3e7..e77a482 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -30,10 +30,29 @@ pub(crate) struct DeleteToken { pub(crate) struct AlreadyExists; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct UploadId { + id: Uuid, +} + +pub(crate) enum UploadResult { + Success { alias: Alias, token: DeleteToken }, + Failure { message: String }, +} + pub(crate) trait BaseRepo { type Bytes: AsRef<[u8]> + From>; } +#[async_trait::async_trait(?Send)] +pub(crate) trait UploadRepo: BaseRepo { + async fn wait(&self, upload_id: UploadId) -> Result; + + async fn claim(&self, upload_id: UploadId) -> Result<(), Error>; + + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error>; +} + #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { async fn in_progress(&self, worker_id: Vec) -> Result, Error>; @@ -362,6 +381,21 @@ impl DeleteToken { } } +impl UploadId { + pub(crate) fn generate() -> Self { + Self { id: Uuid::new_v4() } + } + + pub(crate) fn as_bytes(&self) -> &[u8] { + &self.id.as_bytes()[..] + } + + pub(crate) fn from_bytes(&self, bytes: &[u8]) -> Option { + let id = Uuid::from_slice(bytes).ok()?; + Some(Self { id }) + } +} + impl std::fmt::Display for MaybeUuid { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -377,6 +411,14 @@ impl std::fmt::Display for DeleteToken { } } +impl std::str::FromStr for Alias { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(Alias::from_existing(s)) + } +} + impl std::fmt::Display for Alias { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if let Some(ext) = self.extension() { diff --git a/src/store.rs b/src/store.rs index fea8374..4f8f0b0 100644 --- a/src/store.rs +++ b/src/store.rs @@ -16,9 +16,9 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { } #[async_trait::async_trait(?Send)] -pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { - type Identifier: Identifier; - type Stream: Stream>; +pub(crate) trait Store: Send + Sync + Clone + Debug { + type Identifier: Identifier + 'static; + type Stream: Stream> + 'static; async fn save_async_read(&self, reader: &mut Reader) -> Result where @@ -45,3 +45,51 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>; } + +#[async_trait::async_trait(?Send)] +impl<'a, T> Store for &'a T +where + T: Store, +{ + type Identifier = T::Identifier; + type Stream = T::Stream; + + async fn save_async_read(&self, reader: &mut Reader) -> Result + where + Reader: AsyncRead + Unpin, + { + T::save_async_read(self, reader).await + } + + async fn save_bytes(&self, bytes: Bytes) -> Result { + T::save_bytes(self, bytes).await + } + + async fn to_stream( + &self, + identifier: &Self::Identifier, + from_start: Option, + len: Option, + ) -> Result { + T::to_stream(self, identifier, from_start, len).await + } + + async fn read_into( + &self, + identifier: &Self::Identifier, + writer: &mut Writer, + ) -> Result<(), std::io::Error> + where + Writer: AsyncWrite + Send + Unpin, + { + T::read_into(self, identifier, writer).await + } + + async fn len(&self, identifier: &Self::Identifier) -> Result { + T::len(self, identifier).await + } + + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { + T::remove(self, identifier).await + } +} diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 1d1f48c..a5c916c 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -61,7 +61,7 @@ impl UploadManager { } } - pub(crate) async fn still_identifier_from_alias( + pub(crate) async fn still_identifier_from_alias( &self, store: S, alias: &Alias,