Add backgrounding routes

- Accept backgrounded uploads
- Allow backgrounded processing

Still TODO:
- Endpoint for waiting on/claiming an upload
This commit is contained in:
Aode (lion) 2022-04-02 20:56:29 -05:00
parent c4d014597e
commit 8734dfbdc7
6 changed files with 252 additions and 10 deletions

92
src/backgrounded.rs Normal file
View File

@ -0,0 +1,92 @@
use crate::{
error::Error,
repo::{FullRepo, UploadId, UploadRepo},
store::Store,
};
use actix_web::web::Bytes;
use futures_util::{Stream, TryStreamExt};
use tokio_util::io::StreamReader;
pub(crate) struct Backgrounded<R, S>
where
R: FullRepo + 'static,
S: Store,
{
repo: R,
identifier: Option<S::Identifier>,
upload_id: Option<UploadId>,
}
impl<R, S> Backgrounded<R, S>
where
R: FullRepo + 'static,
S: Store,
{
pub(crate) fn disarm(mut self) {
let _ = self.identifier.take();
let _ = self.upload_id.take();
}
pub(crate) fn upload_id(&self) -> Option<UploadId> {
self.upload_id
}
pub(crate) fn identifier(&self) -> Option<&S::Identifier> {
self.identifier.as_ref()
}
pub(crate) async fn proxy<P>(repo: R, store: S, stream: P) -> Result<Self, Error>
where
P: Stream<Item = Result<Bytes, Error>>,
{
let mut this = Self {
repo,
identifier: None,
upload_id: Some(UploadId::generate()),
};
this.do_proxy(store, stream).await?;
Ok(this)
}
async fn do_proxy<P>(&mut self, store: S, stream: P) -> Result<(), Error>
where
P: Stream<Item = Result<Bytes, Error>>,
{
UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?;
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut reader = StreamReader::new(Box::pin(stream));
let identifier = store.save_async_read(&mut reader).await?;
self.identifier = Some(identifier.clone());
Ok(())
}
}
impl<R, S> Drop for Backgrounded<R, S>
where
R: FullRepo + 'static,
S: Store,
{
fn drop(&mut self) {
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
});
}
if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone();
actix_rt::spawn(async move {
let _ = repo.claim(upload_id).await;
});
}
}
}

View File

@ -22,6 +22,7 @@ use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_futures::Instrument;
mod backgrounded;
mod concurrent_processor;
mod config;
mod details;
@ -48,6 +49,7 @@ mod tmp_file;
mod validate;
use self::{
backgrounded::Backgrounded,
config::{Configuration, ImageFormat, Operation},
details::Details,
either::Either,
@ -57,6 +59,7 @@ use self::{
magick::details_hint,
middleware::{Deadline, Internal},
migrate::LatestDb,
queue::queue_generate,
repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo},
serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
@ -134,6 +137,48 @@ async fn upload<R: FullRepo, S: Store + 'static>(
})))
}
#[instrument(name = "Uploaded files", skip(value))]
async fn upload_backgrounded<R: FullRepo, S: Store>(
value: Value<Backgrounded<R, S>>,
repo: web::Data<R>,
) -> Result<HttpResponse, Error> {
let images = value
.map()
.and_then(|mut m| m.remove("images"))
.and_then(|images| images.array())
.ok_or(UploadError::NoFiles)?;
let mut files = Vec::new();
let images = images
.into_iter()
.filter_map(|i| i.file())
.collect::<Vec<_>>();
for image in &images {
let upload_id = image.result.upload_id().expect("Upload ID exists");
let identifier = image
.result
.identifier()
.expect("Identifier exists")
.to_bytes()?;
queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?;
files.push(serde_json::json!({
"file": upload_id.to_string(),
}));
}
for image in images {
image.result.disarm();
}
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
"files": files
})))
}
#[derive(Debug, serde::Deserialize)]
struct UrlQuery {
url: String,
@ -339,6 +384,30 @@ async fn process<R: FullRepo, S: Store + 'static>(
))
}
/// Process files
#[instrument(name = "Spawning image process", skip(repo))]
async fn process_backgrounded<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
repo: web::Data<R>,
) -> Result<HttpResponse, Error> {
let (target_format, source, process_path, process_args) = prepare_process(query, ext.as_str())?;
let path_string = process_path.to_string_lossy().to_string();
let hash = repo.hash(&source).await?;
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
if identifier_opt.is_some() {
return Ok(HttpResponse::Accepted().finish());
}
queue_generate(&**repo, target_format, source, process_path, process_args).await?;
Ok(HttpResponse::Accepted().finish())
}
/// Fetch file details
#[instrument(name = "Fetching details", skip(repo))]
async fn details<R: FullRepo, S: Store + 'static>(
@ -603,6 +672,31 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
})),
);
// Create a new Multipart Form validator for backgrounded uploads
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let repo2 = repo.clone();
let store2 = store.clone();
let backgrounded_form = Form::new()
.max_files(10)
.max_file_size(CONFIG.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
let repo = repo2.clone();
let store = store2.clone();
let span = tracing::info_span!("file-proxy", ?filename);
let stream = stream.map_err(Error::from);
Box::pin(
async move { Backgrounded::proxy(repo, store, stream).await }.instrument(span),
)
})),
);
HttpServer::new(move || {
let store = store.clone();
let repo = repo.clone();
@ -632,6 +726,11 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
.wrap(form.clone())
.route(web::post().to(upload::<R, S>)),
)
.service(
web::resource("/backgrounded")
.wrap(backgrounded_form.clone())
.route(web::post().to(upload_backgrounded::<R, S>)),
)
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
@ -642,6 +741,10 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
web::resource("/original/{filename}").route(web::get().to(serve::<R, S>)),
)
.service(web::resource("/process.{ext}").route(web::get().to(process::<R, S>)))
.service(
web::resource("/process_backgrounded.{ext}")
.route(web::get().to(process_backgrounded::<R, S>)),
)
.service(
web::scope("/details")
.service(

View File

@ -1,13 +1,14 @@
use crate::{
config::ImageFormat,
error::Error,
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo},
repo::{
Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, UploadId,
},
serde_str::Serde,
store::{Identifier, Store},
};
use std::{future::Future, path::PathBuf, pin::Pin};
use tracing::Instrument;
use uuid::Uuid;
mod cleanup;
mod process;
@ -33,7 +34,7 @@ enum Cleanup {
enum Process {
Ingest {
identifier: Vec<u8>,
upload_id: Uuid,
upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>,
should_validate: bool,
},
@ -80,14 +81,14 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
identifier: Vec<u8>,
upload_id: Uuid,
upload_id: UploadId,
declared_alias: Option<Alias>,
should_validate: bool,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Ingest {
identifier,
declared_alias: declared_alias.map(Serde::new),
upload_id,
upload_id: Serde::new(upload_id),
should_validate,
})?;
repo.push(PROCESS_QUEUE, job.into()).await?;

View File

@ -32,7 +32,7 @@ where
repo,
store,
identifier,
upload_id.into(),
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
should_validate,
)

View File

@ -31,7 +31,7 @@ pub(crate) struct DeleteToken {
pub(crate) struct AlreadyExists;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
id: Uuid,
}
@ -88,6 +88,8 @@ pub(crate) trait BaseRepo {
#[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error>;
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error>;
async fn claim(&self, upload_id: UploadId) -> Result<(), Error>;
@ -439,6 +441,26 @@ impl From<Uuid> for UploadId {
}
}
impl From<UploadId> for Uuid {
fn from(uid: UploadId) -> Self {
uid.id
}
}
impl std::str::FromStr for UploadId {
type Err = <Uuid as std::str::FromStr>::Err;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(UploadId { id: s.parse()? })
}
}
impl std::fmt::Display for UploadId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.id, f)
}
}
impl std::fmt::Display for MaybeUuid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

View File

@ -125,14 +125,38 @@ impl From<InnerUploadResult> for UploadResult {
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error> {
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
Ok(())
}
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes());
while let Some(event) = (&mut subscriber).await {
if let sled::Event::Insert { value, .. } = event {
let result: InnerUploadResult = serde_json::from_slice(&value)?;
let bytes = upload_id.as_bytes().to_vec();
let opt = b!(self.uploads, uploads.get(bytes));
if let Some(bytes) = opt {
if bytes != b"1" {
let result: InnerUploadResult = serde_json::from_slice(&bytes)?;
return Ok(result.into());
}
} else {
return Err(UploadError::NoFiles.into());
}
while let Some(event) = (&mut subscriber).await {
match event {
sled::Event::Remove { .. } => {
return Err(UploadError::NoFiles.into());
}
sled::Event::Insert { value, .. } => {
if value != b"1" {
let result: InnerUploadResult = serde_json::from_slice(&value)?;
return Ok(result.into());
}
}
}
}
Err(UploadError::Canceled.into())