From c795c1edfadd305caad77ed12d8b1bc71eb6e2dc Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 20 Oct 2023 19:08:11 -0500 Subject: [PATCH] Replace most of actix-rt with tokio, give names to tasks --- src/backgrounded.rs | 2 ++ src/file.rs | 2 +- src/future.rs | 4 +-- src/ingest.rs | 3 +++ src/ingest/hasher.rs | 2 +- src/lib.rs | 25 ++++++++----------- src/middleware/payload.rs | 6 ++--- src/migrate_store.rs | 4 +-- src/process.rs | 7 +++--- src/queue.rs | 6 ++--- src/queue/process.rs | 2 +- src/repo/postgres.rs | 11 ++++++--- src/repo/sled.rs | 30 +++++++++++------------ src/repo_04/sled.rs | 6 ++--- src/store/object_store.rs | 1 + src/stream.rs | 6 ++--- src/sync.rs | 51 +++++++++++++++++++++++++++++++++------ src/tmp_file.rs | 2 +- 18 files changed, 105 insertions(+), 65 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 31c784c..fce27e0 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -83,6 +83,7 @@ impl Drop for Backgrounded { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier); crate::sync::spawn( + "backgrounded-cleanup-identifier", async move { let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; } @@ -96,6 +97,7 @@ impl Drop for Backgrounded { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); crate::sync::spawn( + "backgrounded-claim-upload", async move { let _ = repo.claim(upload_id).await; } diff --git a/src/file.rs b/src/file.rs index 9f0cd0b..ba7ae6b 100644 --- a/src/file.rs +++ b/src/file.rs @@ -381,7 +381,7 @@ mod io_uring { macro_rules! test_async { ($fut:expr) => { actix_web::rt::System::new() - .block_on(async move { crate::sync::spawn($fut).await.unwrap() }) + .block_on(async move { crate::sync::spawn("tests", $fut).await.unwrap() }) }; } diff --git a/src/future.rs b/src/future.rs index c6f5fbd..4be1820 100644 --- a/src/future.rs +++ b/src/future.rs @@ -35,11 +35,11 @@ pub(crate) trait NowOrNever: Future { } pub(crate) trait WithTimeout: Future { - fn with_timeout(self, duration: Duration) -> actix_web::rt::time::Timeout + fn with_timeout(self, duration: Duration) -> tokio::time::Timeout where Self: Sized, { - actix_web::rt::time::timeout(duration, self) + tokio::time::timeout(duration, self) } } diff --git a/src/ingest.rs b/src/ingest.rs index df95a9b..b553e54 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -248,6 +248,7 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup hash", hash = ?hash); crate::sync::spawn( + "session-cleanup-hash", async move { let _ = crate::queue::cleanup_hash(&repo, hash).await; } @@ -262,6 +263,7 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias); crate::sync::spawn( + "session-cleanup-alias", async move { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; } @@ -275,6 +277,7 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); crate::sync::spawn( + "session-cleanup-identifier", async move { let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; } diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 5a6ef73..2a4ee50 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -80,7 +80,7 @@ mod test { macro_rules! test_async { ($fut:expr) => { actix_web::rt::System::new() - .block_on(async move { crate::sync::spawn($fut).await.unwrap() }) + .block_on(async move { crate::sync::spawn("tests", $fut).await.unwrap() }) }; } diff --git a/src/lib.rs b/src/lib.rs index c485ba2..bc131fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1772,8 +1772,8 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { return; } - crate::sync::spawn(async move { - let mut interval = actix_web::rt::time::interval(Duration::from_secs(30)); + crate::sync::spawn("queue-cleanup", async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; @@ -1805,19 +1805,14 @@ fn spawn_workers( ) where S: Store + 'static, { - crate::sync::spawn(queue::process_cleanup( - repo.clone(), - store.clone(), - config.clone(), - )); - crate::sync::spawn(queue::process_images( - tmp_dir, - repo, - store, - client, - process_map, - config, - )); + crate::sync::spawn( + "cleanup-worker", + queue::process_cleanup(repo.clone(), store.clone(), config.clone()), + ); + crate::sync::spawn( + "process-worker", + queue::process_images(tmp_dir, repo, store, client, process_map, config), + ); } async fn launch_file_store( diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 37bcebb..057735e 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -47,7 +47,7 @@ async fn drain(rx: flume::Receiver) { } #[derive(Clone)] -struct DrainHandle(Option>>); +struct DrainHandle(Option>>); pub(crate) struct Payload { sender: flume::Sender, @@ -65,7 +65,7 @@ pub(crate) struct PayloadStream { } impl DrainHandle { - fn new(handle: actix_web::rt::task::JoinHandle<()>) -> Self { + fn new(handle: tokio::task::JoinHandle<()>) -> Self { Self(Some(Rc::new(handle))) } } @@ -74,7 +74,7 @@ impl Payload { pub(crate) fn new() -> Self { let (tx, rx) = crate::sync::channel(LIMIT); - let handle = DrainHandle::new(crate::sync::spawn(drain(rx))); + let handle = DrainHandle::new(crate::sync::spawn("drain-payloads", drain(rx))); Payload { sender: tx, handle } } diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 94d8dda..661ae61 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -67,7 +67,7 @@ where tracing::warn!("Retrying migration +{failure_count}"); } - actix_web::rt::time::sleep(Duration::from_secs(3)).await; + tokio::time::sleep(Duration::from_secs(3)).await; } Ok(()) @@ -398,7 +398,7 @@ where tracing::warn!("Failed moving file. Retrying +{failure_count}"); } - actix_web::rt::time::sleep(Duration::from_secs(3)).await; + tokio::time::sleep(Duration::from_secs(3)).await; } } } diff --git a/src/process.rs b/src/process.rs index fb81ecf..e79aaa2 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,4 +1,3 @@ -use actix_web::rt::task::JoinHandle; use actix_web::web::Bytes; use flume::r#async::RecvFut; use std::{ @@ -11,6 +10,7 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, process::{Child, ChildStdin, ChildStdout, Command}, + task::JoinHandle, }; use tracing::{Instrument, Span}; @@ -78,7 +78,7 @@ pub(crate) struct ProcessRead { #[allow(dead_code)] handle: DropHandle, eof: bool, - sleep: Pin>, + sleep: Pin>, } #[derive(Debug, thiserror::Error)] @@ -213,6 +213,7 @@ impl Process { span.follows_from(Span::current()); let handle = crate::sync::spawn( + "await-process", async move { let child_fut = async { (f)(stdin).await?; @@ -238,7 +239,7 @@ impl Process { .instrument(span), ); - let sleep = actix_web::rt::time::sleep(timeout); + let sleep = tokio::time::sleep(timeout); ProcessRead { inner: stdout, diff --git a/src/queue.rs b/src/queue.rs index 57ade78..1db5778 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -203,7 +203,7 @@ async fn process_jobs( tracing::warn!("{}", format!("{e:?}")); if e.is_disconnected() { - actix_web::rt::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } continue; @@ -341,7 +341,7 @@ async fn process_image_jobs( tracing::warn!("{}", format!("{e:?}")); if e.is_disconnected() { - actix_web::rt::time::sleep(Duration::from_secs(3)).await; + tokio::time::sleep(Duration::from_secs(3)).await; } continue; @@ -423,7 +423,7 @@ where let mut fut = std::pin::pin!(fut.instrument(tracing::info_span!("job-future", job_id = ?job_id))); - let mut interval = actix_web::rt::time::interval(Duration::from_secs(5)); + let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut hb = None; diff --git a/src/queue/process.rs b/src/queue/process.rs index 33f64b9..6587528 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -136,7 +136,7 @@ where let client = client.clone(); let media = media.clone(); - let error_boundary = crate::sync::spawn(async move { + let error_boundary = crate::sync::spawn("ingest-media", async move { let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); let session = crate::ingest::ingest( diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index f221443..4a22efa 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -49,7 +49,7 @@ use super::{ pub(crate) struct PostgresRepo { inner: Arc, #[allow(dead_code)] - notifications: Arc>, + notifications: Arc>, } struct Inner { @@ -151,7 +151,7 @@ impl PostgresRepo { .await .map_err(ConnectPostgresError::ConnectForMigration)?; - let handle = crate::sync::spawn(conn); + let handle = crate::sync::spawn("postgres-migrations", conn); embedded::migrations::runner() .run_async(&mut client) @@ -199,7 +199,10 @@ impl PostgresRepo { upload_notifications: DashMap::new(), }); - let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone(), parallelism * 8)); + let handle = crate::sync::spawn( + "postgres-delegate-notifications", + delegate_notifications(rx, inner.clone(), parallelism * 8), + ); let notifications = Arc::new(handle); @@ -411,7 +414,7 @@ fn spawn_db_notification_task( sender: flume::Sender, mut conn: Connection, ) { - crate::sync::spawn(async move { + crate::sync::spawn("postgres-notifications", async move { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { match res { Err(e) => { diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 632e5eb..3400f0a 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -30,7 +30,7 @@ macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - crate::sync::spawn_blocking(move || $expr) + crate::sync::spawn_blocking("sled-io", move || $expr) .await .map_err(SledError::from) .map_err(RepoError::from)? @@ -174,7 +174,7 @@ impl SledRepo { let this = self.db.clone(); - crate::sync::spawn_blocking(move || { + crate::sync::spawn_blocking("sled-io", move || { let export = this.export(); export_db.import(export); }) @@ -257,7 +257,7 @@ impl AliasAccessRepo for SledRepo { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&alias_access, &inverse_alias_access).transaction( |(alias_access, inverse_alias_access)| { if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? { @@ -323,7 +323,7 @@ impl AliasAccessRepo for SledRepo { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&alias_access, &inverse_alias_access).transaction( |(alias_access, inverse_alias_access)| { if let Some(old) = alias_access.remove(alias.to_bytes())? { @@ -363,7 +363,7 @@ impl VariantAccessRepo for SledRepo { let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&variant_access, &inverse_variant_access).transaction( |(variant_access, inverse_variant_access)| { if let Some(old) = variant_access.insert(&key, &value_bytes)? { @@ -433,7 +433,7 @@ impl VariantAccessRepo for SledRepo { let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&variant_access, &inverse_variant_access).transaction( |(variant_access, inverse_variant_access)| { if let Some(old) = variant_access.remove(&key)? { @@ -633,7 +633,7 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&queue, &job_state).transaction(|(queue, job_state)| { let state = JobState::pending(); @@ -683,7 +683,7 @@ impl QueueRepo for SledRepo { let job_state = self.job_state.clone(); let span = tracing::Span::current(); - let opt = crate::sync::spawn_blocking(move || { + let opt = crate::sync::spawn_blocking("sled-io", move || { let _guard = span.enter(); // Job IDs are generated with Uuid version 7 - defining their first bits as a // timestamp. Scanning a prefix should give us jobs in the order they were queued. @@ -776,7 +776,7 @@ impl QueueRepo for SledRepo { let job_state = self.job_state.clone(); - crate::sync::spawn_blocking(move || { + crate::sync::spawn_blocking("sled-io", move || { if let Some(state) = job_state.get(&key)? { let new_state = JobState::running(worker_id); @@ -806,7 +806,7 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); - let res = crate::sync::spawn_blocking(move || { + let res = crate::sync::spawn_blocking("sled-io", move || { (&queue, &job_state).transaction(|(queue, job_state)| { queue.remove(&key[..])?; job_state.remove(&key[..])?; @@ -1065,7 +1065,7 @@ impl HashRepo for SledRepo { None => (self.hashes_inverse.iter(), None), }; - crate::sync::spawn_blocking(move || { + crate::sync::spawn_blocking("sled-io", move || { let page_iter = page_iter .keys() .rev() @@ -1117,7 +1117,7 @@ impl HashRepo for SledRepo { let page_iter = self.hashes_inverse.range(..=date_nanos); let prev_iter = Some(self.hashes_inverse.range(date_nanos..)); - crate::sync::spawn_blocking(move || { + crate::sync::spawn_blocking("sled-io", move || { let page_iter = page_iter .keys() .rev() @@ -1245,7 +1245,7 @@ impl HashRepo for SledRepo { let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - crate::sync::spawn_blocking(move || { + crate::sync::spawn_blocking("sled-io", move || { hash_variant_identifiers .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) .map(|res| res.map_err(|_| VariantAlreadyExists)) @@ -1537,8 +1537,8 @@ impl std::fmt::Debug for SledRepo { } } -impl From for SledError { - fn from(_: actix_web::rt::task::JoinError) -> Self { +impl From for SledError { + fn from(_: tokio::task::JoinError) -> Self { SledError::Panic } } diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs index e60f2ea..7ce7373 100644 --- a/src/repo_04/sled.rs +++ b/src/repo_04/sled.rs @@ -34,7 +34,7 @@ macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - crate::sync::spawn_blocking(move || $expr) + crate::sync::spawn_blocking("04-sled-io", move || $expr) .await .map_err(SledError::from) .map_err(RepoError::from)? @@ -313,8 +313,8 @@ impl std::fmt::Debug for SledRepo { } } -impl From for SledError { - fn from(_: actix_web::rt::task::JoinError) -> Self { +impl From for SledError { + fn from(_: tokio::task::JoinError) -> Self { SledError::Panic } } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a5f9e56..3d6f667 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -274,6 +274,7 @@ impl Store for ObjectStore { let object_id2 = object_id.clone(); let upload_id2 = upload_id.to_string(); let handle = crate::sync::spawn( + "upload-multipart-part", async move { let response = this .create_upload_part_request( diff --git a/src/stream.rs b/src/stream.rs index a9b1f10..3cf6641 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -30,7 +30,7 @@ where { let (tx, rx) = crate::sync::channel(1); - let handle = crate::sync::spawn(async move { + let handle = crate::sync::spawn("send-stream", async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); @@ -59,7 +59,7 @@ where { let (tx, rx) = crate::sync::channel(buffer); - let handle = crate::sync::spawn_blocking(move || { + let handle = crate::sync::spawn_blocking("blocking-iterator", move || { for value in iterator { if tx.send(value).is_err() { break; @@ -148,7 +148,7 @@ where S::Item: 'static, { streem::try_from_fn(|yielder| async move { - actix_web::rt::time::timeout(duration, async move { + tokio::time::timeout(duration, async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); diff --git a/src/sync.rs b/src/sync.rs index 059b19c..3c5ce28 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -4,7 +4,13 @@ use tokio::sync::{Notify, Semaphore}; #[track_caller] pub(crate) fn channel(bound: usize) -> (flume::Sender, flume::Receiver) { - tracing::trace_span!(parent: None, "make channel").in_scope(|| flume::bounded(bound)) + let span = tracing::trace_span!(parent: None, "make channel"); + let guard = span.enter(); + + let channel = flume::bounded(bound); + + drop(guard); + channel } #[track_caller] @@ -14,31 +20,60 @@ pub(crate) fn notify() -> Arc { #[track_caller] pub(crate) fn bare_notify() -> Notify { - tracing::trace_span!(parent: None, "make notifier").in_scope(Notify::new) + let span = tracing::trace_span!(parent: None, "make notifier"); + let guard = span.enter(); + + let notify = Notify::new(); + + drop(guard); + notify } #[track_caller] pub(crate) fn bare_semaphore(permits: usize) -> Semaphore { - tracing::trace_span!(parent: None, "make semaphore").in_scope(|| Semaphore::new(permits)) + let span = tracing::trace_span!(parent: None, "make semaphore"); + let guard = span.enter(); + + let semaphore = Semaphore::new(permits); + + drop(guard); + semaphore } #[track_caller] -pub(crate) fn spawn(future: F) -> actix_web::rt::task::JoinHandle +pub(crate) fn spawn(name: &str, future: F) -> tokio::task::JoinHandle where F: std::future::Future + 'static, F::Output: 'static, { - tracing::trace_span!(parent: None, "spawn task").in_scope(|| actix_web::rt::spawn(future)) + let span = tracing::trace_span!(parent: None, "spawn task"); + let guard = span.enter(); + + let handle = tokio::task::Builder::new() + .name(name) + .spawn_local(future) + .expect("Failed to spawn"); + + drop(guard); + handle } #[track_caller] -pub(crate) fn spawn_blocking(function: F) -> actix_web::rt::task::JoinHandle +pub(crate) fn spawn_blocking(name: &str, function: F) -> tokio::task::JoinHandle where F: FnOnce() -> Out + Send + 'static, Out: Send + 'static, { let outer_span = tracing::Span::current(); - tracing::trace_span!(parent: None, "spawn blocking task") - .in_scope(|| actix_web::rt::task::spawn_blocking(move || outer_span.in_scope(function))) + let span = tracing::trace_span!(parent: None, "spawn blocking task"); + let guard = span.enter(); + + let handle = tokio::task::Builder::new() + .name(name) + .spawn_blocking(move || outer_span.in_scope(function)) + .expect("Failed to spawn"); + + drop(guard); + handle } diff --git a/src/tmp_file.rs b/src/tmp_file.rs index ef9c7e9..793165c 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -51,7 +51,7 @@ struct TmpFile(PathBuf); impl Drop for TmpFile { fn drop(&mut self) { - crate::sync::spawn(tokio::fs::remove_file(self.0.clone())); + crate::sync::spawn("remove-tmpfile", tokio::fs::remove_file(self.0.clone())); } }