From c38375ee9264fd11742517d0110ca152bfb347f1 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 9 Jun 2024 14:20:51 -0500 Subject: [PATCH 1/2] Update metrics to 0.23 --- Cargo.lock | 12 ++++++------ Cargo.toml | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2165a69..7c289d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1653,9 +1653,9 @@ checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "metrics" -version = "0.22.3" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" dependencies = [ "ahash", "portable-atomic", @@ -1663,9 +1663,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d58e362dc7206e9456ddbcdbd53c71ba441020e62104703075a69151e38d85f" +checksum = "26eb45aff37b45cff885538e1dcbd6c2b462c04fe84ce0155ea469f325672c98" dependencies = [ "base64 0.22.1", "http-body-util", @@ -1683,9 +1683,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.16.3" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" dependencies = [ "crossbeam-epoch", "crossbeam-utils", diff --git a/Cargo.toml b/Cargo.toml index b15c3ef..b64924e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,8 @@ diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } futures-core = "0.3" hex = "0.4.3" md-5 = "0.10.5" -metrics = "0.22.0" -metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] } +metrics = "0.23.0" +metrics-exporter-prometheus = { version = "0.15.0", default-features = false, features = ["http-listener"] } mime = "0.3.1" nanorand = { version = "0.7", optional = true } opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] } From b56960785467d680706102842434370ffa900bd6 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 9 Jun 2024 14:44:18 -0500 Subject: [PATCH 2/2] Better handle concurrent proxies --- src/lib.rs | 28 ++++++++++++++++++++++++---- src/repo.rs | 14 ++++++++++++-- src/repo/postgres.rs | 24 +++++++++++++++++------- src/repo/sled.rs | 28 ++++++++++++++++++++-------- 4 files changed, 73 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6fb89d9..9f68624 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,10 @@ use self::{ middleware::{Deadline, Internal, Log, Metrics, Payload}, migrate_store::migrate_store, queue::queue_generate, - repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult}, + repo::{ + sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, ProxyAlreadyExists, Repo, UploadId, + UploadResult, + }, serde_str::Serde, state::State, store::{file_store::FileStore, object_store::ObjectStore, Store}, @@ -1286,11 +1289,28 @@ async fn proxy_alias_from_query( } else if !state.config.server.read_only { let stream = download_stream(proxy.as_str(), state).await?; - let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?; + // some time has passed, see if we've proxied elsewhere + if let Some(alias) = state.repo.related(proxy.clone()).await? { + alias + } else { + let (alias, token, _) = + ingest_inline(stream, state, &Default::default()).await?; - state.repo.relate_url(proxy, alias.clone()).await?; + // last check, do we succeed or fail to relate the proxy alias + if let Err(ProxyAlreadyExists) = + state.repo.relate_url(proxy.clone(), alias.clone()).await? + { + queue::cleanup_alias(&state.repo, alias, token).await?; - alias + state + .repo + .related(proxy) + .await? + .ok_or(UploadError::MissingAlias)? + } else { + alias + } + } } else { return Err(UploadError::ReadOnly.into()); }; diff --git a/src/repo.rs b/src/repo.rs index dfe3fcb..4d01c84 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -46,6 +46,8 @@ pub(crate) struct HashAlreadyExists; pub(crate) struct AliasAlreadyExists; #[derive(Debug)] pub(crate) struct VariantAlreadyExists; +#[derive(Debug)] +pub(crate) struct ProxyAlreadyExists; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { @@ -151,7 +153,11 @@ impl BaseRepo for Arc where T: BaseRepo {} #[async_trait::async_trait(?Send)] pub(crate) trait ProxyRepo: BaseRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>; + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError>; async fn related(&self, url: Url) -> Result, RepoError>; @@ -163,7 +169,11 @@ impl ProxyRepo for Arc where T: ProxyRepo, { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError> { T::relate_url(self, url, alias).await } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 51a6a6a..edd9fc8 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -47,8 +47,8 @@ use super::{ notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, + ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, + UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; #[derive(Clone)] @@ -1884,21 +1884,31 @@ impl StoreMigrationRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl ProxyRepo for PostgresRepo { #[tracing::instrument(level = "debug", skip(self))] - async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + input_url: Url, + input_alias: Alias, + ) -> Result, RepoError> { use schema::proxies::dsl::*; let mut conn = self.get_connection().await?; - diesel::insert_into(proxies) + let res = diesel::insert_into(proxies) .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map_err(|_| PostgresError::DbTimeout)?; - Ok(()) + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(ProxyAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index af0e2cd..6f1a553 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -26,8 +26,8 @@ use super::{ notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, + ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, + UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; macro_rules! b { @@ -218,20 +218,32 @@ impl FullRepo for SledRepo { #[async_trait::async_trait(?Send)] impl ProxyRepo for SledRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError> { let proxy = self.proxy.clone(); let inverse_proxy = self.inverse_proxy.clone(); - crate::sync::spawn_blocking("sled-io", move || { - proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?; - inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; + let res = crate::sync::spawn_blocking("sled-io", move || { + match proxy.compare_and_swap( + url.as_str().as_bytes(), + Option::::None, + Some(alias.to_bytes()), + )? { + Ok(_) => { + inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; - Ok(()) as Result<(), SledError> + Ok(Ok(())) as Result, SledError> + } + Err(_) => Ok(Err(ProxyAlreadyExists)), + } }) .await .map_err(|_| RepoError::Canceled)??; - Ok(()) + Ok(res) } async fn related(&self, url: Url) -> Result, RepoError> {