diff --git a/src/main.rs b/src/main.rs index 2978211..f7d1c7d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,11 @@ use actix_web::{ }; use awc::Client; use dashmap::{mapref::entry::Entry, DashMap}; -use futures_util::{stream::{LocalBoxStream, once}, Stream}; -use once_cell::sync::{Lazy, OnceCell}; +use futures_util::{ + stream::{once, LocalBoxStream}, + Stream, +}; +use once_cell::sync::Lazy; use std::{ collections::HashSet, future::{ready, Future}, @@ -69,15 +72,12 @@ static TMP_DIR: Lazy = Lazy::new(|| { path }); static CONFIG: Lazy = Lazy::new(Config::from_args); -static PROCESS_SEMAPHORE: OnceCell = OnceCell::new(); +static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| tokio::sync::Semaphore::new( + num_cpus::get().saturating_sub(1).max(1), +)); static PROCESS_MAP: Lazy>>> = Lazy::new(DashMap::new); -fn process_semaphore() -> &'static tokio::sync::Semaphore { - PROCESS_SEMAPHORE - .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) -} - struct CancelSafeProcessor { path: PathBuf, receiver: Option>, @@ -329,7 +329,7 @@ async fn download( let stream = Box::pin(once(fut)); - let permit = process_semaphore().acquire().await?; + let permit = PROCESS_SEMAPHORE.acquire().await?; let alias = manager.upload(stream).await?; drop(permit); let delete_token = manager.delete_token(alias.clone()).await?; @@ -495,7 +495,7 @@ async fn process( } } - let permit = process_semaphore().acquire().await?; + let permit = PROCESS_SEMAPHORE.acquire().await?; let file = tokio::fs::File::open(original_path.clone()).await?; @@ -774,7 +774,7 @@ async fn main() -> Result<(), anyhow::Error> { let span = tracing::info_span!("file-upload", ?filename); let entered = span.enter(); - let permit = process_semaphore().acquire().await?; + let permit = PROCESS_SEMAPHORE.acquire().await?; let res = manager.upload(stream).await.map(|alias| { let mut path = PathBuf::new(); @@ -807,7 +807,7 @@ async fn main() -> Result<(), anyhow::Error> { let span = tracing::info_span!("file-import", ?filename); let entered = span.enter(); - let permit = process_semaphore().acquire().await?; + let permit = PROCESS_SEMAPHORE.acquire().await?; let res = manager .import(filename, content_type, validate_imports, stream)