Don't process images that are already being processed

This commit is contained in:
Aode (Lion) 2021-09-05 17:15:30 -05:00
parent af1cdbbb3f
commit 810831ca73
3 changed files with 153 additions and 57 deletions

13
Cargo.lock generated
View File

@ -461,6 +461,16 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.16" version = "0.99.16"
@ -960,7 +970,7 @@ dependencies = [
[[package]] [[package]]
name = "pict-rs" name = "pict-rs"
version = "0.3.0-alpha.24" version = "0.3.0-alpha.25"
dependencies = [ dependencies = [
"actix-form-data", "actix-form-data",
"actix-rt", "actix-rt",
@ -968,6 +978,7 @@ dependencies = [
"anyhow", "anyhow",
"awc", "awc",
"base64", "base64",
"dashmap",
"futures-core", "futures-core",
"mime", "mime",
"num_cpus", "num_cpus",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "pict-rs" name = "pict-rs"
description = "A simple image hosting service" description = "A simple image hosting service"
version = "0.3.0-alpha.24" version = "0.3.0-alpha.25"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -17,6 +17,7 @@ actix-web = { version = "4.0.0-beta.8", default-features = false }
anyhow = "1.0" anyhow = "1.0"
awc = { version = "3.0.0-beta.7", default-features = false } awc = { version = "3.0.0-beta.7", default-features = false }
base64 = "0.13.0" base64 = "0.13.0"
dashmap = "4.0.2"
futures-core = "0.3.17" futures-core = "0.3.17"
mime = "0.3.1" mime = "0.3.1"
num_cpus = "1.13" num_cpus = "1.13"

View File

@ -6,11 +6,15 @@ use actix_web::{
web, App, HttpResponse, HttpResponseBuilder, HttpServer, web, App, HttpResponse, HttpResponseBuilder, HttpServer,
}; };
use awc::Client; use awc::Client;
use dashmap::{mapref::entry::Entry, DashMap};
use futures_core::stream::Stream; use futures_core::stream::Stream;
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::{Lazy, OnceCell};
use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime}; use std::{collections::HashSet, future::{Future, ready}, path::PathBuf, time::SystemTime, task::{Context, Poll}, pin::Pin};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot::{Sender, Receiver},
};
use tracing::{debug, error, info, instrument, Span}; use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@ -60,12 +64,82 @@ static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
}); });
static CONFIG: Lazy<Config> = Lazy::new(Config::from_args); static CONFIG: Lazy<Config> = Lazy::new(Config::from_args);
static PROCESS_SEMAPHORE: OnceCell<tokio::sync::Semaphore> = OnceCell::new(); static PROCESS_SEMAPHORE: OnceCell<tokio::sync::Semaphore> = OnceCell::new();
static PROCESS_MAP: Lazy<DashMap<PathBuf, Vec<Sender<(Details, web::Bytes)>>>> =
Lazy::new(DashMap::new);
fn process_semaphore() -> &'static tokio::sync::Semaphore { fn process_semaphore() -> &'static tokio::sync::Semaphore {
PROCESS_SEMAPHORE PROCESS_SEMAPHORE
.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
} }
struct CancelSafeProcessor<F> {
path: PathBuf,
receiver: Option<Receiver<(Details, web::Bytes)>>,
fut: F,
}
impl<F> CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), UploadError>> + Unpin,
{
pub(crate) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
let receiver = match entry {
Entry::Vacant(vacant) => {
vacant.insert(Vec::new());
None
}
Entry::Occupied(mut occupied) => {
let (tx, rx) = tokio::sync::oneshot::channel();
occupied.get_mut().push(tx);
Some(rx)
}
};
CancelSafeProcessor { path, receiver, fut }
}
}
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), UploadError>> + Unpin,
{
type Output = Result<(Details, web::Bytes), UploadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut rx) = self.receiver {
Pin::new(rx).poll(cx).map(|res| res.map_err(|_| UploadError::Canceled))
} else {
match Pin::new(&mut self.fut).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => {
let opt = PROCESS_MAP.remove(&self.path);
match res {
Err(e) => Poll::Ready(Err(e)),
Ok(tup) => {
if let Some((_, vec)) = opt {
for sender in vec {
let _ = sender.send(tup.clone());
}
}
Poll::Ready(Ok(tup))
}
}
}
}
}
}
}
impl<F> Drop for CancelSafeProcessor<F> {
fn drop(&mut self) {
if self.receiver.is_none() {
PROCESS_MAP.remove(&self.path);
}
}
}
// try moving a file // try moving a file
#[instrument] #[instrument]
async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> {
@ -389,68 +463,78 @@ async fn process(
let mut original_path = manager.image_dir(); let mut original_path = manager.image_dir();
original_path.push(name.clone()); original_path.push(name.clone());
// Create and save a JPG for motion images (gif, mp4) let thumbnail_path2 = thumbnail_path.clone();
if let Some((updated_path, exists)) = let process_fut = async {
self::processor::prepare_image(original_path.clone()).await? let thumbnail_path = thumbnail_path2;
{ // Create and save a JPG for motion images (gif, mp4)
original_path = updated_path.clone(); if let Some((updated_path, exists)) =
self::processor::prepare_image(original_path.clone()).await?
{
original_path = updated_path.clone();
if exists.is_new() { if exists.is_new() {
// Save the transcoded file in another task // Save the transcoded file in another task
debug!("Spawning storage task"); debug!("Spawning storage task");
let span = Span::current(); let span = Span::current();
let manager2 = manager.clone(); let manager2 = manager.clone();
let name = name.clone(); let name = name.clone();
actix_rt::spawn(async move { actix_rt::spawn(async move {
let entered = span.enter(); let entered = span.enter();
if let Err(e) = manager2.store_variant(updated_path, name).await { if let Err(e) = manager2.store_variant(updated_path, name).await {
error!("Error storing variant, {}", e); error!("Error storing variant, {}", e);
return; return;
} }
drop(entered); drop(entered);
}); });
}
} }
}
let permit = process_semaphore().acquire().await?; let permit = process_semaphore().acquire().await?;
let file = tokio::fs::File::open(original_path.clone()).await?;
let mut processed_reader = let file = tokio::fs::File::open(original_path.clone()).await?;
crate::magick::process_image_write_read(file, thumbnail_args, format)?;
let mut vec = Vec::new(); let mut processed_reader =
processed_reader.read_to_end(&mut vec).await?; crate::magick::process_image_write_read(file, thumbnail_args, format)?;
drop(permit);
let bytes = web::Bytes::from(vec); let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?;
let bytes = web::Bytes::from(vec);
let details = if let Some(details) = details { drop(permit);
details
} else { let details = if let Some(details) = details {
Details::from_bytes(bytes.clone()).await? details
} else {
Details::from_bytes(bytes.clone()).await?
};
let span = tracing::Span::current();
let details2 = details.clone();
let bytes2 = bytes.clone();
actix_rt::spawn(async move {
let entered = span.enter();
if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e);
return;
}
if let Err(e) = manager
.store_variant_details(thumbnail_path.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await {
tracing::warn!("Error saving variant info: {}", e);
}
drop(entered);
});
Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError>
}; };
let span = tracing::Span::current(); let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?;
let details2 = details.clone();
let bytes2 = bytes.clone();
actix_rt::spawn(async move {
let entered = span.enter();
if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e);
return;
}
if let Err(e) = manager
.store_variant_details(thumbnail_path.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await {
tracing::warn!("Error saving variant info: {}", e);
}
drop(entered);
});
return Ok(srv_response( return Ok(srv_response(
HttpResponse::Ok(), HttpResponse::Ok(),