Add semaphore around process operations

This commit is contained in:
Aode (Lion) 2021-09-04 16:01:48 -05:00
parent 79c9db254b
commit e88b9a024a
4 changed files with 33 additions and 3 deletions

3
Cargo.lock generated
View File

@ -960,7 +960,7 @@ dependencies = [
[[package]] [[package]]
name = "pict-rs" name = "pict-rs"
version = "0.3.0-alpha.23" version = "0.3.0-alpha.24"
dependencies = [ dependencies = [
"actix-form-data", "actix-form-data",
"actix-rt", "actix-rt",
@ -970,6 +970,7 @@ dependencies = [
"base64", "base64",
"futures-core", "futures-core",
"mime", "mime",
"num_cpus",
"once_cell", "once_cell",
"rand", "rand",
"serde", "serde",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "pict-rs" name = "pict-rs"
description = "A simple image hosting service" description = "A simple image hosting service"
version = "0.3.0-alpha.23" version = "0.3.0-alpha.24"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -19,6 +19,7 @@ awc = { version = "3.0.0-beta.7", default-features = false }
base64 = "0.13.0" base64 = "0.13.0"
futures-core = "0.3.17" futures-core = "0.3.17"
mime = "0.3.1" mime = "0.3.1"
num_cpus = "1.13"
once_cell = "1.4.0" once_cell = "1.4.0"
rand = "0.8.0" rand = "0.8.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -18,6 +18,9 @@ pub(crate) enum UploadError {
#[error("Error interacting with filesystem, {0}")] #[error("Error interacting with filesystem, {0}")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("Failed to acquire the semaphore")]
Semaphore,
#[error("Panic in blocking operation")] #[error("Panic in blocking operation")]
Canceled, Canceled,
@ -97,6 +100,12 @@ impl From<actix_web::error::BlockingError> for UploadError {
} }
} }
impl From<tokio::sync::AcquireError> for UploadError {
fn from(_: tokio::sync::AcquireError) -> Self {
UploadError::Semaphore
}
}
impl ResponseError for UploadError { impl ResponseError for UploadError {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match self { match self {

View File

@ -7,7 +7,7 @@ use actix_web::{
}; };
use awc::Client; use awc::Client;
use futures_core::stream::Stream; use futures_core::stream::Stream;
use once_cell::sync::Lazy; use once_cell::sync::{Lazy, OnceCell};
use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime}; use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -59,6 +59,12 @@ static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
path path
}); });
static CONFIG: Lazy<Config> = Lazy::new(Config::from_args); static CONFIG: Lazy<Config> = Lazy::new(Config::from_args);
static PROCESS_SEMAPHORE: OnceCell<tokio::sync::Semaphore> = OnceCell::new();
fn process_semaphore() -> &'static tokio::sync::Semaphore {
PROCESS_SEMAPHORE
.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
}
// try moving a file // try moving a file
#[instrument] #[instrument]
@ -243,7 +249,9 @@ async fn download(
let stream = Box::pin(once(fut)); let stream = Box::pin(once(fut));
let permit = process_semaphore().acquire().await?;
let alias = manager.upload(stream).await?; let alias = manager.upload(stream).await?;
drop(permit);
let delete_token = manager.delete_token(alias.clone()).await?; let delete_token = manager.delete_token(alias.clone()).await?;
let name = manager.from_alias(alias.to_owned()).await?; let name = manager.from_alias(alias.to_owned()).await?;
@ -404,6 +412,7 @@ async fn process(
} }
} }
let permit = process_semaphore().acquire().await?;
let file = tokio::fs::File::open(original_path.clone()).await?; let file = tokio::fs::File::open(original_path.clone()).await?;
let mut processed_reader = let mut processed_reader =
@ -411,6 +420,8 @@ async fn process(
let mut vec = Vec::new(); let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?; processed_reader.read_to_end(&mut vec).await?;
drop(permit);
let bytes = web::Bytes::from(vec); let bytes = web::Bytes::from(vec);
let details = if let Some(details) = details { let details = if let Some(details) = details {
@ -673,11 +684,15 @@ async fn main() -> Result<(), anyhow::Error> {
let span = tracing::info_span!("file-upload", ?filename); let span = tracing::info_span!("file-upload", ?filename);
let entered = span.enter(); let entered = span.enter();
let permit = process_semaphore().acquire().await?;
let res = manager.upload(stream).await.map(|alias| { let res = manager.upload(stream).await.map(|alias| {
let mut path = PathBuf::new(); let mut path = PathBuf::new();
path.push(alias); path.push(alias);
Some(path) Some(path)
}); });
drop(permit);
drop(entered); drop(entered);
res res
} }
@ -702,6 +717,8 @@ async fn main() -> Result<(), anyhow::Error> {
let span = tracing::info_span!("file-import", ?filename); let span = tracing::info_span!("file-import", ?filename);
let entered = span.enter(); let entered = span.enter();
let permit = process_semaphore().acquire().await?;
let res = manager let res = manager
.import(filename, content_type, validate_imports, stream) .import(filename, content_type, validate_imports, stream)
.await .await
@ -710,6 +727,8 @@ async fn main() -> Result<(), anyhow::Error> {
path.push(alias); path.push(alias);
Some(path) Some(path)
}); });
drop(permit);
drop(entered); drop(entered);
res res
} }