diff --git a/.cargo/config b/.cargo/config index bff29e6..ba416f7 100644 --- a/.cargo/config +++ b/.cargo/config @@ -1,2 +1,2 @@ [build] -rustflags = ["--cfg", "tokio_unstable"] +rustflags = ["--cfg", "tokio_unstable", "--cfg", "uuid_unstable"] diff --git a/Cargo.lock b/Cargo.lock index 3fcfa63..4cdfa17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,6 +320,12 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "autocfg" version = "1.1.0" @@ -2969,6 +2975,7 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ + "atomic", "getrandom", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index ffd657c..88ca16e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ tracing-subscriber = { version = "0.3.0", features = [ "tracing-log", ] } url = { version = "2.2", features = ["serde"] } -uuid = { version = "1", features = ["v4", "serde"] } +uuid = { version = "1", features = ["serde", "std", "v4", "v7"] } [dependencies.tracing-actix-web] version = "0.7.5" diff --git a/defaults.toml b/defaults.toml index b6c9371..243ae0e 100644 --- a/defaults.toml +++ b/defaults.toml @@ -1,6 +1,5 @@ [server] address = "0.0.0.0:8080" -worker_id = "pict-rs-1" read_only = false max_file_count = 1 diff --git a/dev.toml b/dev.toml index ac01239..947c2d1 100644 --- a/dev.toml +++ b/dev.toml @@ -1,6 +1,5 @@ [server] address = '0.0.0.0:8080' -worker_id = 'pict-rs-1' api_key = 'api-key' max_file_count = 10 diff --git a/pict-rs.toml b/pict-rs.toml index 2bc5a3f..17e7862 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -5,14 +5,6 @@ # default: 0.0.0.0:8080 address = '0.0.0.0:8080' -## Optional: pict-rs worker id -# environment variable PICTRS__SERVER__WORKER_ID -# default: pict-rs-1 -# -# This is used for the internal job queue. It will have more meaning once a shared metadata -# repository (like postgres) can be defined. -worker_id = 'pict-rs-1' - ## Optional: shared secret for internal endpoints # environment variable: PICTRS__SERVER__API_KEY # default: empty diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 52cc4d1..9322fcd 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -47,7 +47,6 @@ impl Args { Command::Run(Run { address, api_key, - worker_id, client_pool_size, client_timeout, metrics_prometheus_address, @@ -100,7 +99,6 @@ impl Args { let server = Server { address, api_key, - worker_id, read_only, max_file_count, }; @@ -386,8 +384,6 @@ struct Server { #[serde(skip_serializing_if = "Option::is_none")] address: Option, #[serde(skip_serializing_if = "Option::is_none")] - worker_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] api_key: Option, #[serde(skip_serializing_if = "std::ops::Not::not")] read_only: bool, @@ -763,10 +759,6 @@ struct Run { #[arg(long)] api_key: Option, - /// ID of this pict-rs node. Doesn't do much yet - #[arg(long)] - worker_id: Option, - /// Number of connections the internel HTTP client should maintain in its pool /// /// This number defaults to 100, and the total number is multiplied by the number of cores diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 8a626a2..187158c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -21,7 +21,6 @@ pub(crate) struct Defaults { #[serde(rename_all = "snake_case")] struct ServerDefaults { address: SocketAddr, - worker_id: String, read_only: bool, max_file_count: u32, } @@ -183,7 +182,6 @@ impl Default for ServerDefaults { fn default() -> Self { ServerDefaults { address: "0.0.0.0:8080".parse().expect("Valid address string"), - worker_id: String::from("pict-rs-1"), read_only: false, max_file_count: 1, } diff --git a/src/config/file.rs b/src/config/file.rs index 7f3e57c..7716618 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -95,8 +95,6 @@ pub(crate) enum Repo { pub(crate) struct Server { pub(crate) address: SocketAddr, - pub(crate) worker_id: String, - #[serde(skip_serializing_if = "Option::is_none")] pub(crate) api_key: Option, diff --git a/src/lib.rs b/src/lib.rs index bd30421..89818ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,6 @@ use std::{ future::ready, path::Path, path::PathBuf, - sync::atomic::{AtomicU64, Ordering}, time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; @@ -1495,14 +1494,6 @@ fn build_client(config: &Configuration) -> Result { .build()) } -fn next_worker_id(config: &Configuration) -> String { - static WORKER_ID: AtomicU64 = AtomicU64::new(0); - - let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); - - format!("{}-{}", config.server.worker_id, next_id) -} - fn configure_endpoints< R: FullRepo + 'static, S: Store + 'static, @@ -1638,26 +1629,15 @@ where R: FullRepo + 'static, S: Store + 'static, { - let worker_id_1 = next_worker_id(&config); - let worker_id_2 = next_worker_id(&config); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn(queue::process_cleanup( repo.clone(), store.clone(), config.clone(), - worker_id_1, - )) - }); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_images( - repo, - store, - process_map, - config, - worker_id_2, )) }); + tracing::trace_span!(parent: None, "Spawn task") + .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, process_map, config))); } async fn launch_file_store( diff --git a/src/queue.rs b/src/queue.rs index 1613bd5..ba932ce 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -187,17 +187,8 @@ pub(crate) async fn process_cleanup( repo: R, store: S, config: Configuration, - worker_id: String, ) { - process_jobs( - &repo, - &store, - &config, - worker_id, - CLEANUP_QUEUE, - cleanup::perform, - ) - .await + process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await } pub(crate) async fn process_images( @@ -205,14 +196,12 @@ pub(crate) async fn process_images( store: S, process_map: ProcessMap, config: Configuration, - worker_id: String, ) { process_image_jobs( &repo, &store, &process_map, &config, - worker_id, PROCESS_QUEUE, process::perform, ) @@ -225,7 +214,6 @@ async fn process_jobs( repo: &R, store: &S, config: &Configuration, - worker_id: String, queue: &'static str, callback: F, ) where @@ -235,8 +223,10 @@ async fn process_jobs( for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { + let worker_id = uuid::Uuid::new_v4(); + loop { - let res = job_loop(repo, store, config, worker_id.clone(), queue, callback).await; + let res = job_loop(repo, store, config, worker_id, queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -249,15 +239,15 @@ async fn process_jobs( } struct MetricsGuard { - worker_id: String, + worker_id: uuid::Uuid, queue: &'static str, start: Instant, armed: bool, } impl MetricsGuard { - fn guard(worker_id: String, queue: &'static str) -> Self { - metrics::increment_counter!("pict-rs.job.start", "queue" => queue, "worker-id" => worker_id.clone()); + fn guard(worker_id: uuid::Uuid, queue: &'static str) -> Self { + metrics::increment_counter!("pict-rs.job.start", "queue" => queue, "worker-id" => worker_id.to_string()); Self { worker_id, @@ -274,8 +264,8 @@ impl MetricsGuard { impl Drop for MetricsGuard { fn drop(&mut self) { - metrics::histogram!("pict-rs.job.duration", self.start.elapsed().as_secs_f64(), "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string()); - metrics::increment_counter!("pict-rs.job.end", "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string()); + metrics::histogram!("pict-rs.job.duration", self.start.elapsed().as_secs_f64(), "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()); + metrics::increment_counter!("pict-rs.job.end", "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()); } } @@ -283,7 +273,7 @@ async fn job_loop( repo: &R, store: &S, config: &Configuration, - worker_id: String, + worker_id: uuid::Uuid, queue: &'static str, callback: F, ) -> Result<(), Error> @@ -295,21 +285,27 @@ where + Copy, { loop { - let (job_id, bytes) = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; + let (job_id, bytes) = repo.pop(queue).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); - let guard = MetricsGuard::guard(worker_id.clone(), queue); + let guard = MetricsGuard::guard(worker_id, queue); - span.in_scope(|| { - heartbeat( - repo, - job_id, - (callback)(repo, store, config, bytes.as_ref()), - ) - }) - .instrument(span) - .await?; + let res = span + .in_scope(|| { + heartbeat( + repo, + queue, + job_id, + (callback)(repo, store, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await; + + repo.complete_job(queue, job_id).await?; + + res?; guard.disarm(); } @@ -320,7 +316,6 @@ async fn process_image_jobs( store: &S, process_map: &ProcessMap, config: &Configuration, - worker_id: String, queue: &'static str, callback: F, ) where @@ -336,17 +331,11 @@ async fn process_image_jobs( ) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { + let worker_id = uuid::Uuid::new_v4(); + loop { - let res = image_job_loop( - repo, - store, - process_map, - config, - worker_id.clone(), - queue, - callback, - ) - .await; + let res = + image_job_loop(repo, store, process_map, config, worker_id, queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -363,7 +352,7 @@ async fn image_job_loop( store: &S, process_map: &ProcessMap, config: &Configuration, - worker_id: String, + worker_id: uuid::Uuid, queue: &'static str, callback: F, ) -> Result<(), Error> @@ -381,27 +370,33 @@ where + Copy, { loop { - let (job_id, bytes) = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; + let (job_id, bytes) = repo.pop(queue).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id); - let guard = MetricsGuard::guard(worker_id.clone(), queue); + let guard = MetricsGuard::guard(worker_id, queue); - span.in_scope(|| { - heartbeat( - repo, - job_id, - (callback)(repo, store, process_map, config, bytes.as_ref()), - ) - }) - .instrument(span) - .await?; + let res = span + .in_scope(|| { + heartbeat( + repo, + queue, + job_id, + (callback)(repo, store, process_map, config, bytes.as_ref()), + ) + }) + .instrument(span) + .await; + + repo.complete_job(queue, job_id).await?; + + res?; guard.disarm(); } } -async fn heartbeat(repo: &R, job_id: JobId, fut: Fut) -> Fut::Output +async fn heartbeat(repo: &R, queue: &'static str, job_id: JobId, fut: Fut) -> Fut::Output where R: QueueRepo, Fut: std::future::Future, @@ -419,7 +414,7 @@ where } _ = interval.tick() => { if hb.is_none() { - hb = Some(repo.heartbeat(job_id)); + hb = Some(repo.heartbeat(queue, job_id)); } } opt = poll_opt(hb.as_mut()), if hb.is_some() => { diff --git a/src/repo.rs b/src/repo.rs index 122b0fd..d324ba2 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -289,7 +289,7 @@ pub(crate) struct JobId(Uuid); impl JobId { pub(crate) fn gen() -> Self { - Self(Uuid::new_v4()) + Self(Uuid::now_v7()) } pub(crate) const fn as_bytes(&self) -> &[u8; 16] { @@ -303,19 +303,13 @@ impl JobId { #[async_trait::async_trait(?Send)] pub(crate) trait QueueRepo: BaseRepo { - async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError>; - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>; - async fn pop( - &self, - queue: &'static str, - worker_id: Vec, - ) -> Result<(JobId, Self::Bytes), RepoError>; + async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError>; - async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError>; + async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>; - async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError>; + async fn complete_job(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -323,28 +317,20 @@ impl QueueRepo for actix_web::web::Data where T: QueueRepo, { - async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError> { - T::requeue_timed_out(self, worker_prefix).await - } - async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> { T::push(self, queue, job).await } - async fn pop( - &self, - queue: &'static str, - worker_id: Vec, - ) -> Result<(JobId, Self::Bytes), RepoError> { - T::pop(self, queue, worker_id).await + async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { + T::pop(self, queue).await } - async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError> { - T::heartbeat(self, job_id).await + async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError> { + T::heartbeat(self, queue, job_id).await } - async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError> { - T::complete_job(self, job_id).await + async fn complete_job(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError> { + T::complete_job(self, queue, job_id).await } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 515a898..6bb276e 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -57,6 +57,9 @@ pub(crate) enum SledError { #[error("Operation panicked")] Panic, + + #[error("Another process updated this value before us")] + Conflict, } #[derive(Clone)] @@ -129,7 +132,7 @@ impl SledRepo { } fn open(mut path: PathBuf, cache_capacity: u64) -> Result { - path.push("v0.4.0-alpha.1"); + path.push("v0.5.0"); let db = ::sled::Config::new() .cache_capacity(cache_capacity) @@ -626,7 +629,7 @@ impl UploadRepo for SledRepo { enum JobState { Pending, - Running(Vec), + Running([u8; 8]), } impl JobState { @@ -637,35 +640,34 @@ impl JobState { fn running() -> Self { Self::Running( time::OffsetDateTime::now_utc() - .format(&time::format_description::well_known::Rfc3339) - .expect("Can format") - .into_bytes(), + .unix_timestamp() + .to_be_bytes(), ) } fn as_bytes(&self) -> &[u8] { match self { - Self::Pending => b"pending", + Self::Pending => b"pend", Self::Running(ref bytes) => bytes, } } } +fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> { + let mut key = queue.as_bytes().to_vec(); + key.extend(job_id.as_bytes()); + + Arc::from(key) +} + #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { - #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))] - async fn requeue_timed_out(&self, worker_prefix: Vec) -> Result<(), RepoError> { - todo!() - } - #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { let metrics_guard = PushMetricsGuard::guard(queue_name); let id = JobId::gen(); - let mut key = queue_name.as_bytes().to_vec(); - key.push(0); - key.extend(id.as_bytes()); + let key = job_key(queue_name, id); let queue = self.queue.clone(); let job_state = self.job_state.clone(); @@ -674,8 +676,8 @@ impl QueueRepo for SledRepo { (&queue, &job_state).transaction(|(queue, job_state)| { let state = JobState::pending(); - queue.insert(key.as_slice(), &job)?; - job_state.insert(key.as_slice(), state.as_bytes())?; + queue.insert(&key[..], &job)?; + job_state.insert(&key[..], state.as_bytes())?; Ok(()) }) @@ -705,25 +707,35 @@ impl QueueRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, worker_id), fields(worker_id = %String::from_utf8_lossy(&worker_id)))] - async fn pop( - &self, - queue_name: &'static str, - worker_id: Vec, - ) -> Result<(JobId, Self::Bytes), RepoError> { + #[tracing::instrument(skip(self))] + async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Self::Bytes), RepoError> { let metrics_guard = PopMetricsGuard::guard(queue_name); + let now = time::OffsetDateTime::now_utc(); + loop { let queue = self.queue.clone(); let job_state = self.job_state.clone(); let opt = actix_rt::task::spawn_blocking(move || { + // Job IDs are generated with Uuid version 7 - defining their first bits as a + // timestamp. Scanning a prefix should give us jobs in the order they were queued. for res in job_state.scan_prefix(queue_name) { let (key, value) = res?; - if value != "pending" { - // TODO: requeue dead jobs - continue; + if value.len() == 8 { + let unix_timestamp = + i64::from_be_bytes(value[0..8].try_into().expect("Verified length")); + + let timestamp = time::OffsetDateTime::from_unix_timestamp(unix_timestamp) + .expect("Valid timestamp"); + + // heartbeats should update every 5 seconds, so 30 seconds without an + // update is 6 missed beats + if timestamp.saturating_add(time::Duration::seconds(30)) > now { + // job hasn't expired + continue; + } } let state = JobState::running(); @@ -738,7 +750,7 @@ impl QueueRepo for SledRepo { } } - let id_bytes = &key[queue_name.len() + 1..]; + let id_bytes = &key[queue_name.len()..]; let id_bytes: [u8; 16] = id_bytes.try_into().expect("Key length"); @@ -780,12 +792,52 @@ impl QueueRepo for SledRepo { } } - async fn heartbeat(&self, job_id: JobId) -> Result<(), RepoError> { - todo!() + #[tracing::instrument(skip(self))] + async fn heartbeat(&self, queue_name: &'static str, job_id: JobId) -> Result<(), RepoError> { + let key = job_key(queue_name, job_id); + + let job_state = self.job_state.clone(); + + actix_rt::task::spawn_blocking(move || { + if let Some(state) = job_state.get(&key)? { + let new_state = JobState::running(); + + match job_state.compare_and_swap(&key, Some(state), Some(new_state.as_bytes()))? { + Ok(_) => Ok(()), + Err(_) => Err(SledError::Conflict), + } + } else { + Ok(()) + } + }) + .await + .map_err(|_| RepoError::Canceled)??; + + Ok(()) } - async fn complete_job(&self, job_id: JobId) -> Result<(), RepoError> { - todo!() + #[tracing::instrument(skip(self))] + async fn complete_job(&self, queue_name: &'static str, job_id: JobId) -> Result<(), RepoError> { + let key = job_key(queue_name, job_id); + + let queue = self.queue.clone(); + let job_state = self.job_state.clone(); + + let res = actix_rt::task::spawn_blocking(move || { + (&queue, &job_state).transaction(|(queue, job_state)| { + queue.remove(&key[..])?; + job_state.remove(&key[..])?; + Ok(()) + }) + }) + .await + .map_err(|_| RepoError::Canceled)?; + + if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { + return Err(RepoError::from(SledError::from(e))); + } + + Ok(()) } }