2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 19:31:35 +00:00

postgres: add already-claimed case, general: tracing paranoia

This commit is contained in:
asonix 2023-09-03 21:30:47 -05:00
parent 31caea438e
commit a43de122f9
15 changed files with 234 additions and 189 deletions

View file

@ -82,14 +82,12 @@ impl Drop for Backgrounded {
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(
actix_rt::spawn( async move {
async move { let _ = crate::queue::cleanup_identifier(&repo, &identifier).await;
let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; }
} .instrument(cleanup_span),
.instrument(cleanup_span), );
)
});
} }
if let Some(upload_id) = self.upload_id { if let Some(upload_id) = self.upload_id {
@ -97,14 +95,12 @@ impl Drop for Backgrounded {
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(
actix_rt::spawn( async move {
async move { let _ = repo.claim(upload_id).await;
let _ = repo.claim(upload_id).await; }
} .instrument(cleanup_span),
.instrument(cleanup_span), );
)
});
} }
} }
} }

View file

@ -446,15 +446,15 @@ mod io_uring {
actix_rt::System::new().block_on(async move { actix_rt::System::new().block_on(async move {
let arbiter = actix_rt::Arbiter::new(); let arbiter = actix_rt::Arbiter::new();
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = crate::sync::channel(1);
arbiter.spawn(async move { arbiter.spawn(async move {
let handle = actix_rt::spawn($fut); let handle = crate::sync::spawn($fut);
let _ = tx.send(handle.await.unwrap()); let _ = tx.send(handle.await.unwrap());
}); });
rx.await.unwrap() rx.into_recv_async().await.unwrap()
}) })
}; };
} }

View file

@ -217,14 +217,12 @@ impl Drop for Session {
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup hash", hash = ?hash); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup hash", hash = ?hash);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(
actix_rt::spawn( async move {
async move { let _ = crate::queue::cleanup_hash(&repo, hash).await;
let _ = crate::queue::cleanup_hash(&repo, hash).await; }
} .instrument(cleanup_span),
.instrument(cleanup_span), );
)
});
} }
if let Some(alias) = self.alias.take() { if let Some(alias) = self.alias.take() {
@ -233,14 +231,12 @@ impl Drop for Session {
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(
actix_rt::spawn( async move {
async move { let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
let _ = crate::queue::cleanup_alias(&repo, alias, token).await; }
} .instrument(cleanup_span),
.instrument(cleanup_span), );
)
});
} }
if let Some(identifier) = self.identifier.take() { if let Some(identifier) = self.identifier.take() {
@ -248,14 +244,12 @@ impl Drop for Session {
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(
actix_rt::spawn( async move {
async move { let _ = crate::queue::cleanup_identifier(&repo, &identifier).await;
let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; }
} .instrument(cleanup_span),
.instrument(cleanup_span), );
)
});
} }
} }
} }

View file

@ -82,16 +82,15 @@ mod test {
actix_rt::System::new().block_on(async move { actix_rt::System::new().block_on(async move {
let arbiter = actix_rt::Arbiter::new(); let arbiter = actix_rt::Arbiter::new();
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") let (tx, rx) = crate::sync::channel(1);
.in_scope(|| tokio::sync::oneshot::channel());
arbiter.spawn(async move { arbiter.spawn(async move {
let handle = actix_rt::spawn($fut); let handle = crate::sync::spawn($fut);
let _ = tx.send(handle.await.unwrap()); let _ = tx.send(handle.await.unwrap());
}); });
rx.await.unwrap() rx.into_recv_async().await.unwrap()
}) })
}; };
} }

View file

@ -26,6 +26,7 @@ mod repo_04;
mod serde_str; mod serde_str;
mod store; mod store;
mod stream; mod stream;
mod sync;
mod tmp_file; mod tmp_file;
mod validate; mod validate;
@ -84,8 +85,9 @@ const DAYS: u32 = 24 * HOURS;
const NOT_FOUND_KEY: &str = "404-alias"; const NOT_FOUND_KEY: &str = "404-alias";
static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| { static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
tracing::trace_span!(parent: None, "Initialize semaphore") let permits = num_cpus::get().saturating_sub(1).max(1);
.in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
crate::sync::bare_semaphore(permits)
}); });
async fn ensure_details<S: Store + 'static>( async fn ensure_details<S: Store + 'static>(
@ -1671,44 +1673,39 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) {
return; return;
} }
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(async move {
actix_rt::spawn(async move { let mut interval = actix_rt::time::interval(Duration::from_secs(30));
let mut interval = actix_rt::time::interval(Duration::from_secs(30));
loop { loop {
interval.tick().await; interval.tick().await;
if let Err(e) = queue::cleanup_outdated_variants(&repo).await { if let Err(e) = queue::cleanup_outdated_variants(&repo).await {
tracing::warn!( tracing::warn!(
"Failed to spawn cleanup for outdated variants:{}", "Failed to spawn cleanup for outdated variants:{}",
format!("\n{e}\n{e:?}") format!("\n{e}\n{e:?}")
); );
}
if let Err(e) = queue::cleanup_outdated_proxies(&repo).await {
tracing::warn!(
"Failed to spawn cleanup for outdated proxies:{}",
format!("\n{e}\n{e:?}")
);
}
} }
});
}) if let Err(e) = queue::cleanup_outdated_proxies(&repo).await {
tracing::warn!(
"Failed to spawn cleanup for outdated proxies:{}",
format!("\n{e}\n{e:?}")
);
}
}
});
} }
fn spawn_workers<S>(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap) fn spawn_workers<S>(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap)
where where
S: Store + 'static, S: Store + 'static,
{ {
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { crate::sync::spawn(queue::process_cleanup(
actix_rt::spawn(queue::process_cleanup( repo.clone(),
repo.clone(), store.clone(),
store.clone(), config.clone(),
config.clone(), ));
)) crate::sync::spawn(queue::process_images(repo, store, process_map, config));
});
tracing::trace_span!(parent: None, "Spawn task")
.in_scope(|| actix_rt::spawn(queue::process_images(repo, store, process_map, config)));
} }
async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>( async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(

View file

@ -61,7 +61,7 @@ where
tracing::warn!("Retrying migration +{failure_count}"); tracing::warn!("Retrying migration +{failure_count}");
} }
tokio::time::sleep(Duration::from_secs(3)).await; actix_rt::time::sleep(Duration::from_secs(3)).await;
} }
Ok(()) Ok(())
@ -364,7 +364,7 @@ where
tracing::warn!("Failed moving file. Retrying +{failure_count}"); tracing::warn!("Failed moving file. Retrying +{failure_count}");
} }
tokio::time::sleep(Duration::from_secs(3)).await; actix_rt::time::sleep(Duration::from_secs(3)).await;
} }
} }
} }

View file

@ -1,5 +1,6 @@
use actix_rt::task::JoinHandle; use actix_rt::task::JoinHandle;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use flume::r#async::RecvFut;
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -10,7 +11,6 @@ use std::{
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, ChildStdin, ChildStdout, Command}, process::{Child, ChildStdin, ChildStdout, Command},
sync::oneshot::{channel, Receiver},
}; };
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
@ -73,7 +73,7 @@ struct DropHandle {
pub(crate) struct ProcessRead<I> { pub(crate) struct ProcessRead<I> {
inner: I, inner: I,
err_recv: Receiver<std::io::Error>, err_recv: RecvFut<'static, std::io::Error>,
err_closed: bool, err_closed: bool,
#[allow(dead_code)] #[allow(dead_code)]
handle: DropHandle, handle: DropHandle,
@ -206,39 +206,37 @@ impl Process {
let stdin = child.stdin.take().expect("stdin exists"); let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists"); let stdout = child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %command) let (tx, rx) = crate::sync::channel::<std::io::Error>(1);
.in_scope(channel::<std::io::Error>); let rx = rx.into_recv_async();
let span = tracing::info_span!(parent: None, "Background process task", %command); let span = tracing::info_span!(parent: None, "Background process task", %command);
span.follows_from(Span::current()); span.follows_from(Span::current());
let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { let handle = crate::sync::spawn(
actix_rt::spawn( async move {
async move { let child_fut = async {
let child_fut = async { (f)(stdin).await?;
(f)(stdin).await?;
child.wait().await child.wait().await
}; };
let error = match actix_rt::time::timeout(timeout, child_fut).await { let error = match actix_rt::time::timeout(timeout, child_fut).await {
Ok(Ok(status)) if status.success() => { Ok(Ok(status)) if status.success() => {
guard.disarm(); guard.disarm();
return; return;
} }
Ok(Ok(status)) => { Ok(Ok(status)) => {
std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) std::io::Error::new(std::io::ErrorKind::Other, StatusError(status))
} }
Ok(Err(e)) => e, Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(), Err(_) => std::io::ErrorKind::TimedOut.into(),
}; };
let _ = tx.send(error); let _ = tx.send(error);
let _ = child.kill().await; let _ = child.kill().await;
} }
.instrument(span), .instrument(span),
) );
});
let sleep = actix_rt::time::sleep(timeout); let sleep = actix_rt::time::sleep(timeout);

View file

@ -86,7 +86,7 @@ where
let repo = repo.clone(); let repo = repo.clone();
let media = media.clone(); let media = media.clone();
let error_boundary = actix_rt::spawn(async move { let error_boundary = crate::sync::spawn(async move {
let stream = store2 let stream = store2
.to_stream(&ident, None, None) .to_stream(&ident, None, None)
.await? .await?

View file

@ -20,7 +20,8 @@ use diesel_async::{
AsyncConnection, AsyncPgConnection, RunQueryDsl, AsyncConnection, AsyncPgConnection, RunQueryDsl,
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio_postgres::{AsyncMessage, Notification}; use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
use tracing::Instrument;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
@ -64,7 +65,7 @@ async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner:
inner inner
.queue_notifications .queue_notifications
.entry(queue_name) .entry(queue_name)
.or_insert_with(|| Arc::new(Notify::new())) .or_insert_with(crate::sync::notify)
.notify_waiters(); .notify_waiters();
} }
"upload_completion_channel" => { "upload_completion_channel" => {
@ -134,12 +135,11 @@ impl PostgresError {
impl PostgresRepo { impl PostgresRepo {
pub(crate) async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> { pub(crate) async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> {
let (mut client, conn) = let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls)
tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls) .await
.await .map_err(ConnectPostgresError::ConnectForMigration)?;
.map_err(ConnectPostgresError::ConnectForMigration)?;
let handle = actix_rt::spawn(conn); let handle = crate::sync::spawn(conn);
embedded::migrations::runner() embedded::migrations::runner()
.run_async(&mut client) .run_async(&mut client)
@ -166,10 +166,12 @@ impl PostgresRepo {
health_count: AtomicU64::new(0), health_count: AtomicU64::new(0),
pool, pool,
queue_notifications: DashMap::new(), queue_notifications: DashMap::new(),
upload_notifier: Notify::new(), upload_notifier: crate::sync::bare_notify(),
}); });
let notifications = Arc::new(actix_rt::spawn(delegate_notifications(rx, inner.clone()))); let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone()));
let notifications = Arc::new(handle);
Ok(PostgresRepo { Ok(PostgresRepo {
inner, inner,
@ -198,41 +200,53 @@ fn build_handler(sender: flume::Sender<Notification>) -> ConfigFn {
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> { move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
let sender = sender.clone(); let sender = sender.clone();
Box::pin(async move { let connect_span = tracing::trace_span!(parent: None, "connect future");
let (client, mut conn) =
tokio_postgres::connect(config, tokio_postgres::tls::NoTls)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
// not very cash money (structured concurrency) of me Box::pin(
actix_rt::spawn(async move { async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { let (client, conn) =
match res { tokio_postgres::connect(config, tokio_postgres::tls::NoTls)
Err(e) => { .await
tracing::error!("Database Connection {e:?}"); .map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
return;
}
Ok(AsyncMessage::Notice(e)) => {
tracing::warn!("Database Notice {e:?}");
}
Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?");
}
}
Ok(_) => {
tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application");
}
}
}
});
AsyncPgConnection::try_from(client).await // not very cash money (structured concurrency) of me
}) spawn_db_notification_task(sender, conn);
AsyncPgConnection::try_from(client).await
}
.instrument(connect_span),
)
}, },
) )
} }
fn spawn_db_notification_task(
sender: flume::Sender<Notification>,
mut conn: Connection<Socket, NoTlsStream>,
) {
crate::sync::spawn(async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
match res {
Err(e) => {
tracing::error!("Database Connection {e:?}");
return;
}
Ok(AsyncMessage::Notice(e)) => {
tracing::warn!("Database Notice {e:?}");
}
Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?");
}
}
Ok(_) => {
tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application");
}
}
}
});
}
fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime { fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime {
let timestamp = timestamp.to_offset(time::UtcOffset::UTC); let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
time::PrimitiveDateTime::new(timestamp.date(), timestamp.time()) time::PrimitiveDateTime::new(timestamp.date(), timestamp.time())
@ -849,7 +863,7 @@ impl QueueRepo for PostgresRepo {
.inner .inner
.queue_notifications .queue_notifications
.entry(String::from(queue_name)) .entry(String::from(queue_name))
.or_insert_with(|| Arc::new(Notify::new())) .or_insert_with(crate::sync::notify)
.clone(); .clone();
diesel::sql_query("LISTEN queue_status_channel;") diesel::sql_query("LISTEN queue_status_channel;")
@ -1330,20 +1344,27 @@ impl UploadRepo for PostgresRepo {
.await .await
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
let opt = uploads let nested_opt = uploads
.select(result) .select(result)
.filter(id.eq(upload_id.id)) .filter(id.eq(upload_id.id))
.get_result(&mut conn) .get_result(&mut conn)
.await .await
.optional() .optional()
.map_err(PostgresError::Diesel)? .map_err(PostgresError::Diesel)?;
.flatten();
if let Some(upload_result) = opt { match nested_opt {
let upload_result: InnerUploadResult = serde_json::from_value(upload_result) Some(opt) => {
.map_err(PostgresError::DeserializeUploadResult)?; if let Some(upload_result) = opt {
let upload_result: InnerUploadResult =
serde_json::from_value(upload_result)
.map_err(PostgresError::DeserializeUploadResult)?;
return Ok(upload_result.into()); return Ok(upload_result.into());
}
}
None => {
return Err(RepoError::AlreadyClaimed);
}
} }
drop(conn); drop(conn);

View file

@ -29,9 +29,7 @@ macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{ ($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone(); let $ident = $self.$ident.clone();
let span = tracing::Span::current(); crate::sync::spawn_blocking(move || $expr)
actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr))
.await .await
.map_err(SledError::from) .map_err(SledError::from)
.map_err(RepoError::from)? .map_err(RepoError::from)?
@ -175,7 +173,7 @@ impl SledRepo {
let this = self.db.clone(); let this = self.db.clone();
actix_rt::task::spawn_blocking(move || { crate::sync::spawn_blocking(move || {
let export = this.export(); let export = this.export();
export_db.import(export); export_db.import(export);
}) })
@ -258,7 +256,7 @@ impl AliasAccessRepo for SledRepo {
let alias_access = self.alias_access.clone(); let alias_access = self.alias_access.clone();
let inverse_alias_access = self.inverse_alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&alias_access, &inverse_alias_access).transaction( (&alias_access, &inverse_alias_access).transaction(
|(alias_access, inverse_alias_access)| { |(alias_access, inverse_alias_access)| {
if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? { if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? {
@ -324,7 +322,7 @@ impl AliasAccessRepo for SledRepo {
let alias_access = self.alias_access.clone(); let alias_access = self.alias_access.clone();
let inverse_alias_access = self.inverse_alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&alias_access, &inverse_alias_access).transaction( (&alias_access, &inverse_alias_access).transaction(
|(alias_access, inverse_alias_access)| { |(alias_access, inverse_alias_access)| {
if let Some(old) = alias_access.remove(alias.to_bytes())? { if let Some(old) = alias_access.remove(alias.to_bytes())? {
@ -364,7 +362,7 @@ impl VariantAccessRepo for SledRepo {
let variant_access = self.variant_access.clone(); let variant_access = self.variant_access.clone();
let inverse_variant_access = self.inverse_variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&variant_access, &inverse_variant_access).transaction( (&variant_access, &inverse_variant_access).transaction(
|(variant_access, inverse_variant_access)| { |(variant_access, inverse_variant_access)| {
if let Some(old) = variant_access.insert(&key, &value_bytes)? { if let Some(old) = variant_access.insert(&key, &value_bytes)? {
@ -434,7 +432,7 @@ impl VariantAccessRepo for SledRepo {
let variant_access = self.variant_access.clone(); let variant_access = self.variant_access.clone();
let inverse_variant_access = self.inverse_variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&variant_access, &inverse_variant_access).transaction( (&variant_access, &inverse_variant_access).transaction(
|(variant_access, inverse_variant_access)| { |(variant_access, inverse_variant_access)| {
if let Some(old) = variant_access.remove(&key)? { if let Some(old) = variant_access.remove(&key)? {
@ -678,7 +676,7 @@ impl QueueRepo for SledRepo {
let queue = self.queue.clone(); let queue = self.queue.clone();
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&queue, &job_state).transaction(|(queue, job_state)| { (&queue, &job_state).transaction(|(queue, job_state)| {
let state = JobState::pending(); let state = JobState::pending();
@ -705,7 +703,7 @@ impl QueueRepo for SledRepo {
.write() .write()
.unwrap() .unwrap()
.entry(queue_name) .entry(queue_name)
.or_insert_with(|| Arc::new(Notify::new())) .or_insert_with(crate::sync::notify)
.notify_one(); .notify_one();
metrics_guard.disarm(); metrics_guard.disarm();
@ -728,7 +726,7 @@ impl QueueRepo for SledRepo {
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let span = tracing::Span::current(); let span = tracing::Span::current();
let opt = actix_rt::task::spawn_blocking(move || { let opt = crate::sync::spawn_blocking(move || {
let _guard = span.enter(); let _guard = span.enter();
// Job IDs are generated with Uuid version 7 - defining their first bits as a // Job IDs are generated with Uuid version 7 - defining their first bits as a
// timestamp. Scanning a prefix should give us jobs in the order they were queued. // timestamp. Scanning a prefix should give us jobs in the order they were queued.
@ -802,9 +800,7 @@ impl QueueRepo for SledRepo {
notify notify
} else { } else {
let mut guard = self.queue_notifier.write().unwrap(); let mut guard = self.queue_notifier.write().unwrap();
let entry = guard let entry = guard.entry(queue_name).or_insert_with(crate::sync::notify);
.entry(queue_name)
.or_insert_with(|| Arc::new(Notify::new()));
Arc::clone(entry) Arc::clone(entry)
}; };
@ -823,7 +819,7 @@ impl QueueRepo for SledRepo {
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
actix_rt::task::spawn_blocking(move || { crate::sync::spawn_blocking(move || {
if let Some(state) = job_state.get(&key)? { if let Some(state) = job_state.get(&key)? {
let new_state = JobState::running(worker_id); let new_state = JobState::running(worker_id);
@ -853,7 +849,7 @@ impl QueueRepo for SledRepo {
let queue = self.queue.clone(); let queue = self.queue.clone();
let job_state = self.job_state.clone(); let job_state = self.job_state.clone();
let res = actix_rt::task::spawn_blocking(move || { let res = crate::sync::spawn_blocking(move || {
(&queue, &job_state).transaction(|(queue, job_state)| { (&queue, &job_state).transaction(|(queue, job_state)| {
queue.remove(&key[..])?; queue.remove(&key[..])?;
job_state.remove(&key[..])?; job_state.remove(&key[..])?;
@ -1112,7 +1108,7 @@ impl HashRepo for SledRepo {
None => (self.hashes_inverse.iter(), None), None => (self.hashes_inverse.iter(), None),
}; };
actix_rt::task::spawn_blocking(move || { crate::sync::spawn_blocking(move || {
let page_iter = page_iter let page_iter = page_iter
.keys() .keys()
.rev() .rev()
@ -1164,7 +1160,7 @@ impl HashRepo for SledRepo {
let page_iter = self.hashes_inverse.range(..=date_nanos); let page_iter = self.hashes_inverse.range(..=date_nanos);
let prev_iter = Some(self.hashes_inverse.range(date_nanos..)); let prev_iter = Some(self.hashes_inverse.range(date_nanos..));
actix_rt::task::spawn_blocking(move || { crate::sync::spawn_blocking(move || {
let page_iter = page_iter let page_iter = page_iter
.keys() .keys()
.rev() .rev()
@ -1292,7 +1288,7 @@ impl HashRepo for SledRepo {
let hash_variant_identifiers = self.hash_variant_identifiers.clone(); let hash_variant_identifiers = self.hash_variant_identifiers.clone();
actix_rt::task::spawn_blocking(move || { crate::sync::spawn_blocking(move || {
hash_variant_identifiers hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
.map(|res| res.map_err(|_| VariantAlreadyExists)) .map(|res| res.map_err(|_| VariantAlreadyExists))

View file

@ -34,9 +34,7 @@ macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{ ($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone(); let $ident = $self.$ident.clone();
let span = tracing::Span::current(); crate::sync::spawn_blocking(move || $expr)
actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr))
.await .await
.map_err(SledError::from) .map_err(SledError::from)
.map_err(RepoError::from)? .map_err(RepoError::from)?

View file

@ -277,7 +277,7 @@ impl Store for ObjectStore {
let object_id2 = object_id.clone(); let object_id2 = object_id.clone();
let upload_id2 = upload_id.clone(); let upload_id2 = upload_id.clone();
let handle = actix_rt::spawn( let handle = crate::sync::spawn(
async move { async move {
let response = this let response = this
.create_upload_part_request( .create_upload_part_request(

View file

@ -1,5 +1,6 @@
use actix_rt::{task::JoinHandle, time::Sleep}; use actix_rt::{task::JoinHandle, time::Sleep};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use flume::r#async::RecvStream;
use futures_core::Stream; use futures_core::Stream;
use std::{ use std::{
future::Future, future::Future,
@ -174,19 +175,25 @@ pin_project_lite::pin_project! {
} }
} }
enum IterStreamState<I, T> { enum IterStreamState<I, T>
where
T: 'static,
{
New { New {
iterator: I, iterator: I,
buffer: usize, buffer: usize,
}, },
Running { Running {
handle: JoinHandle<()>, handle: JoinHandle<()>,
receiver: tokio::sync::mpsc::Receiver<T>, receiver: RecvStream<'static, T>,
}, },
Pending, Pending,
} }
pub(crate) struct IterStream<I, T> { pub(crate) struct IterStream<I, T>
where
T: 'static,
{
state: IterStreamState<I, T>, state: IterStreamState<I, T>,
} }
@ -287,14 +294,13 @@ where
match std::mem::replace(&mut this.state, IterStreamState::Pending) { match std::mem::replace(&mut this.state, IterStreamState::Pending) {
IterStreamState::New { iterator, buffer } => { IterStreamState::New { iterator, buffer } => {
let (sender, receiver) = tracing::trace_span!(parent: None, "Create channel") let (sender, receiver) = crate::sync::channel(buffer);
.in_scope(|| tokio::sync::mpsc::channel(buffer));
let mut handle = actix_rt::task::spawn_blocking(move || { let mut handle = crate::sync::spawn_blocking(move || {
let iterator = iterator.into_iter(); let iterator = iterator.into_iter();
for item in iterator { for item in iterator {
if sender.blocking_send(item).is_err() { if sender.send(item).is_err() {
break; break;
} }
} }
@ -304,14 +310,17 @@ where
return Poll::Ready(None); return Poll::Ready(None);
} }
this.state = IterStreamState::Running { handle, receiver }; this.state = IterStreamState::Running {
handle,
receiver: receiver.into_stream(),
};
self.poll_next(cx) self.poll_next(cx)
} }
IterStreamState::Running { IterStreamState::Running {
mut handle, mut handle,
mut receiver, mut receiver,
} => match Pin::new(&mut receiver).poll_recv(cx) { } => match Pin::new(&mut receiver).poll_next(cx) {
Poll::Ready(Some(item)) => { Poll::Ready(Some(item)) => {
this.state = IterStreamState::Running { handle, receiver }; this.state = IterStreamState::Running { handle, receiver };

38
src/sync.rs Normal file
View file

@ -0,0 +1,38 @@
use std::sync::Arc;
use tokio::sync::{Notify, Semaphore};
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {
tracing::trace_span!(parent: None, "make channel").in_scope(|| flume::bounded(bound))
}
pub(crate) fn notify() -> Arc<Notify> {
Arc::new(bare_notify())
}
pub(crate) fn bare_notify() -> Notify {
tracing::trace_span!(parent: None, "make notifier").in_scope(Notify::new)
}
pub(crate) fn bare_semaphore(permits: usize) -> Semaphore {
tracing::trace_span!(parent: None, "make semaphore").in_scope(|| Semaphore::new(permits))
}
pub(crate) fn spawn<F>(future: F) -> actix_rt::task::JoinHandle<F::Output>
where
F: std::future::Future + 'static,
F::Output: 'static,
{
tracing::trace_span!(parent: None, "spawn task").in_scope(|| actix_rt::spawn(future))
}
pub(crate) fn spawn_blocking<F, Out>(function: F) -> actix_rt::task::JoinHandle<Out>
where
F: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let outer_span = tracing::Span::current();
tracing::trace_span!(parent: None, "spawn blocking task")
.in_scope(|| actix_rt::task::spawn_blocking(move || outer_span.in_scope(function)))
}

View file

@ -13,8 +13,7 @@ struct TmpFile(PathBuf);
impl Drop for TmpFile { impl Drop for TmpFile {
fn drop(&mut self) { fn drop(&mut self) {
tracing::trace_span!(parent: None, "Spawn task") crate::sync::spawn(tokio::fs::remove_file(self.0.clone()));
.in_scope(|| actix_rt::spawn(tokio::fs::remove_file(self.0.clone())));
} }
} }