diff --git a/src/backgrounded.rs b/src/backgrounded.rs new file mode 100644 index 0000000..58b3b04 --- /dev/null +++ b/src/backgrounded.rs @@ -0,0 +1,92 @@ +use crate::{ + error::Error, + repo::{FullRepo, UploadId, UploadRepo}, + store::Store, +}; +use actix_web::web::Bytes; +use futures_util::{Stream, TryStreamExt}; +use tokio_util::io::StreamReader; + +pub(crate) struct Backgrounded +where + R: FullRepo + 'static, + S: Store, +{ + repo: R, + identifier: Option, + upload_id: Option, +} + +impl Backgrounded +where + R: FullRepo + 'static, + S: Store, +{ + pub(crate) fn disarm(mut self) { + let _ = self.identifier.take(); + let _ = self.upload_id.take(); + } + + pub(crate) fn upload_id(&self) -> Option { + self.upload_id + } + + pub(crate) fn identifier(&self) -> Option<&S::Identifier> { + self.identifier.as_ref() + } + + pub(crate) async fn proxy

(repo: R, store: S, stream: P) -> Result + where + P: Stream>, + { + let mut this = Self { + repo, + identifier: None, + upload_id: Some(UploadId::generate()), + }; + + this.do_proxy(store, stream).await?; + + Ok(this) + } + + async fn do_proxy

(&mut self, store: S, stream: P) -> Result<(), Error> + where + P: Stream>, + { + UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?; + + let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let mut reader = StreamReader::new(Box::pin(stream)); + + let identifier = store.save_async_read(&mut reader).await?; + + self.identifier = Some(identifier.clone()); + + Ok(()) + } +} + +impl Drop for Backgrounded +where + R: FullRepo + 'static, + S: Store, +{ + fn drop(&mut self) { + if let Some(identifier) = self.identifier.take() { + let repo = self.repo.clone(); + + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + }); + } + + if let Some(upload_id) = self.upload_id { + let repo = self.repo.clone(); + + actix_rt::spawn(async move { + let _ = repo.claim(upload_id).await; + }); + } + } +} diff --git a/src/main.rs b/src/main.rs index a022a46..e096f62 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; use tracing_futures::Instrument; +mod backgrounded; mod concurrent_processor; mod config; mod details; @@ -48,6 +49,7 @@ mod tmp_file; mod validate; use self::{ + backgrounded::Backgrounded, config::{Configuration, ImageFormat, Operation}, details::Details, either::Either, @@ -57,6 +59,7 @@ use self::{ magick::details_hint, middleware::{Deadline, Internal}, migrate::LatestDb, + queue::queue_generate, repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, @@ -134,6 +137,48 @@ async fn upload( }))) } +#[instrument(name = "Uploaded files", skip(value))] +async fn upload_backgrounded( + value: Value>, + repo: web::Data, +) -> Result { + let images = value + .map() + .and_then(|mut m| m.remove("images")) + .and_then(|images| images.array()) + .ok_or(UploadError::NoFiles)?; + + let mut files = Vec::new(); + let images = images + .into_iter() + .filter_map(|i| i.file()) + .collect::>(); + + for image in &images { + let upload_id = image.result.upload_id().expect("Upload ID exists"); + let identifier = image + .result + .identifier() + .expect("Identifier exists") + .to_bytes()?; + + queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; + + files.push(serde_json::json!({ + "file": upload_id.to_string(), + })); + } + + for image in images { + image.result.disarm(); + } + + Ok(HttpResponse::Created().json(&serde_json::json!({ + "msg": "ok", + "files": files + }))) +} + #[derive(Debug, serde::Deserialize)] struct UrlQuery { url: String, @@ -339,6 +384,30 @@ async fn process( )) } +/// Process files +#[instrument(name = "Spawning image process", skip(repo))] +async fn process_backgrounded( + query: web::Query, + ext: web::Path, + repo: web::Data, +) -> Result { + let (target_format, source, process_path, process_args) = prepare_process(query, ext.as_str())?; + + let path_string = process_path.to_string_lossy().to_string(); + let hash = repo.hash(&source).await?; + let identifier_opt = repo + .variant_identifier::(hash.clone(), path_string) + .await?; + + if identifier_opt.is_some() { + return Ok(HttpResponse::Accepted().finish()); + } + + queue_generate(&**repo, target_format, source, process_path, process_args).await?; + + Ok(HttpResponse::Accepted().finish()) +} + /// Fetch file details #[instrument(name = "Fetching details", skip(repo))] async fn details( @@ -603,6 +672,31 @@ async fn launch( })), ); + // Create a new Multipart Form validator for backgrounded uploads + // + // This form is expecting a single array field, 'images' with at most 10 files in it + let repo2 = repo.clone(); + let store2 = store.clone(); + let backgrounded_form = Form::new() + .max_files(10) + .max_file_size(CONFIG.media.max_file_size * MEGABYTES) + .transform_error(transform_error) + .field( + "images", + Field::array(Field::file(move |filename, _, stream| { + let repo = repo2.clone(); + let store = store2.clone(); + + let span = tracing::info_span!("file-proxy", ?filename); + + let stream = stream.map_err(Error::from); + + Box::pin( + async move { Backgrounded::proxy(repo, store, stream).await }.instrument(span), + ) + })), + ); + HttpServer::new(move || { let store = store.clone(); let repo = repo.clone(); @@ -632,6 +726,11 @@ async fn launch( .wrap(form.clone()) .route(web::post().to(upload::)), ) + .service( + web::resource("/backgrounded") + .wrap(backgrounded_form.clone()) + .route(web::post().to(upload_backgrounded::)), + ) .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") @@ -642,6 +741,10 @@ async fn launch( web::resource("/original/{filename}").route(web::get().to(serve::)), ) .service(web::resource("/process.{ext}").route(web::get().to(process::))) + .service( + web::resource("/process_backgrounded.{ext}") + .route(web::get().to(process_backgrounded::)), + ) .service( web::scope("/details") .service( diff --git a/src/queue.rs b/src/queue.rs index fbfc1ae..22e095f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,13 +1,14 @@ use crate::{ config::ImageFormat, error::Error, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo}, + repo::{ + Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, UploadId, + }, serde_str::Serde, store::{Identifier, Store}, }; use std::{future::Future, path::PathBuf, pin::Pin}; use tracing::Instrument; -use uuid::Uuid; mod cleanup; mod process; @@ -33,7 +34,7 @@ enum Cleanup { enum Process { Ingest { identifier: Vec, - upload_id: Uuid, + upload_id: Serde, declared_alias: Option>, should_validate: bool, }, @@ -80,14 +81,14 @@ pub(crate) async fn cleanup_identifier( pub(crate) async fn queue_ingest( repo: &R, identifier: Vec, - upload_id: Uuid, + upload_id: UploadId, declared_alias: Option, should_validate: bool, ) -> Result<(), Error> { let job = serde_json::to_vec(&Process::Ingest { identifier, declared_alias: declared_alias.map(Serde::new), - upload_id, + upload_id: Serde::new(upload_id), should_validate, })?; repo.push(PROCESS_QUEUE, job.into()).await?; diff --git a/src/queue/process.rs b/src/queue/process.rs index 24f7e30..d8be535 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -32,7 +32,7 @@ where repo, store, identifier, - upload_id.into(), + Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), should_validate, ) diff --git a/src/repo.rs b/src/repo.rs index e823bcf..a90755b 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -31,7 +31,7 @@ pub(crate) struct DeleteToken { pub(crate) struct AlreadyExists; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { id: Uuid, } @@ -88,6 +88,8 @@ pub(crate) trait BaseRepo { #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { + async fn create(&self, upload_id: UploadId) -> Result<(), Error>; + async fn wait(&self, upload_id: UploadId) -> Result; async fn claim(&self, upload_id: UploadId) -> Result<(), Error>; @@ -439,6 +441,26 @@ impl From for UploadId { } } +impl From for Uuid { + fn from(uid: UploadId) -> Self { + uid.id + } +} + +impl std::str::FromStr for UploadId { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + Ok(UploadId { id: s.parse()? }) + } +} + +impl std::fmt::Display for UploadId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.id, f) + } +} + impl std::fmt::Display for MaybeUuid { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ab568b0..7955309 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -125,14 +125,38 @@ impl From for UploadResult { #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { + async fn create(&self, upload_id: UploadId) -> Result<(), Error> { + b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); + Ok(()) + } + async fn wait(&self, upload_id: UploadId) -> Result { let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); - while let Some(event) = (&mut subscriber).await { - if let sled::Event::Insert { value, .. } = event { - let result: InnerUploadResult = serde_json::from_slice(&value)?; + let bytes = upload_id.as_bytes().to_vec(); + let opt = b!(self.uploads, uploads.get(bytes)); + + if let Some(bytes) = opt { + if bytes != b"1" { + let result: InnerUploadResult = serde_json::from_slice(&bytes)?; return Ok(result.into()); } + } else { + return Err(UploadError::NoFiles.into()); + } + + while let Some(event) = (&mut subscriber).await { + match event { + sled::Event::Remove { .. } => { + return Err(UploadError::NoFiles.into()); + } + sled::Event::Insert { value, .. } => { + if value != b"1" { + let result: InnerUploadResult = serde_json::from_slice(&value)?; + return Ok(result.into()); + } + } + } } Err(UploadError::Canceled.into())