diff --git a/Cargo.lock b/Cargo.lock index 8ceaad7..37ff9af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -960,7 +960,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.3.0-alpha.23" +version = "0.3.0-alpha.24" dependencies = [ "actix-form-data", "actix-rt", @@ -970,6 +970,7 @@ dependencies = [ "base64", "futures-core", "mime", + "num_cpus", "once_cell", "rand", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8f1b70e..cdffe68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.3.0-alpha.23" +version = "0.3.0-alpha.24" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -19,6 +19,7 @@ awc = { version = "3.0.0-beta.7", default-features = false } base64 = "0.13.0" futures-core = "0.3.17" mime = "0.3.1" +num_cpus = "1.13" once_cell = "1.4.0" rand = "0.8.0" serde = { version = "1.0", features = ["derive"] } diff --git a/src/error.rs b/src/error.rs index eee7e05..d73b4cc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,6 +18,9 @@ pub(crate) enum UploadError { #[error("Error interacting with filesystem, {0}")] Io(#[from] std::io::Error), + #[error("Failed to acquire the semaphore")] + Semaphore, + #[error("Panic in blocking operation")] Canceled, @@ -97,6 +100,12 @@ impl From for UploadError { } } +impl From for UploadError { + fn from(_: tokio::sync::AcquireError) -> Self { + UploadError::Semaphore + } +} + impl ResponseError for UploadError { fn status_code(&self) -> StatusCode { match self { diff --git a/src/main.rs b/src/main.rs index dd55389..3d48de7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use actix_web::{ }; use awc::Client; use futures_core::stream::Stream; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime}; use structopt::StructOpt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -59,6 +59,12 @@ static TMP_DIR: Lazy = Lazy::new(|| { path }); static CONFIG: Lazy = Lazy::new(Config::from_args); +static PROCESS_SEMAPHORE: OnceCell = OnceCell::new(); + +fn process_semaphore() -> &'static tokio::sync::Semaphore { + PROCESS_SEMAPHORE + .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) +} // try moving a file #[instrument] @@ -243,7 +249,9 @@ async fn download( let stream = Box::pin(once(fut)); + let permit = process_semaphore().acquire().await?; let alias = manager.upload(stream).await?; + drop(permit); let delete_token = manager.delete_token(alias.clone()).await?; let name = manager.from_alias(alias.to_owned()).await?; @@ -404,6 +412,7 @@ async fn process( } } + let permit = process_semaphore().acquire().await?; let file = tokio::fs::File::open(original_path.clone()).await?; let mut processed_reader = @@ -411,6 +420,8 @@ async fn process( let mut vec = Vec::new(); processed_reader.read_to_end(&mut vec).await?; + drop(permit); + let bytes = web::Bytes::from(vec); let details = if let Some(details) = details { @@ -673,11 +684,15 @@ async fn main() -> Result<(), anyhow::Error> { let span = tracing::info_span!("file-upload", ?filename); let entered = span.enter(); + let permit = process_semaphore().acquire().await?; + let res = manager.upload(stream).await.map(|alias| { let mut path = PathBuf::new(); path.push(alias); Some(path) }); + + drop(permit); drop(entered); res } @@ -702,6 +717,8 @@ async fn main() -> Result<(), anyhow::Error> { let span = tracing::info_span!("file-import", ?filename); let entered = span.enter(); + let permit = process_semaphore().acquire().await?; + let res = manager .import(filename, content_type, validate_imports, stream) .await @@ -710,6 +727,8 @@ async fn main() -> Result<(), anyhow::Error> { path.push(alias); Some(path) }); + + drop(permit); drop(entered); res }