diff --git a/Cargo.lock b/Cargo.lock index 807c0ac..8c13b2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" dependencies = [ "futures-core", "tokio", @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" +checksum = "b02303ce8d4e8be5b855af6cf3c3a08f3eff26880faad82bab679c22d3650cb5" dependencies = [ "actix-rt", "actix-service", @@ -1482,9 +1482,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.5.13" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" dependencies = [ "bitflags 1.3.2", "libc", @@ -1653,9 +1653,9 @@ checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "metrics" -version = "0.22.3" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" dependencies = [ "ahash", "portable-atomic", @@ -1663,9 +1663,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d58e362dc7206e9456ddbcdbd53c71ba441020e62104703075a69151e38d85f" +checksum = "26eb45aff37b45cff885538e1dcbd6c2b462c04fe84ce0155ea469f325672c98" dependencies = [ "base64 0.22.1", "http-body-util", @@ -1683,9 +1683,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.16.3" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -1809,9 +1809,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opentelemetry" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" dependencies = [ "futures-core", "futures-sink", @@ -1819,21 +1819,19 @@ dependencies = [ "once_cell", "pin-project-lite", "thiserror", - "urlencoding", ] [[package]] name = "opentelemetry-otlp" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" dependencies = [ "async-trait", "futures-core", "http 0.2.12", "opentelemetry", "opentelemetry-proto", - "opentelemetry-semantic-conventions", "opentelemetry_sdk", "prost", "thiserror", @@ -1843,9 +1841,9 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -1853,24 +1851,18 @@ dependencies = [ "tonic 0.11.0", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" - [[package]] name = "opentelemetry_sdk" -version = "0.22.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" dependencies = [ "async-trait", - "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", "glob", + "lazy_static", "once_cell", "opentelemetry", "ordered-float", @@ -1997,7 +1989,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.5.15" +version = "0.5.16" dependencies = [ "actix-form-data", "actix-web", @@ -2649,12 +2641,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -3138,14 +3124,14 @@ dependencies = [ [[package]] name = "tokio-uring" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" dependencies = [ "bytes", + "futures-util", "io-uring", "libc", - "scoped-tls", "slab", "socket2 0.4.10", "tokio", @@ -3355,9 +3341,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" dependencies = [ "js-sys", "once_cell", @@ -3465,12 +3451,6 @@ dependencies = [ "serde", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 3a67deb..02e0cb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.5.15" +version = "0.5.16" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -37,13 +37,13 @@ diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } futures-core = "0.3" hex = "0.4.3" md-5 = "0.10.5" -metrics = "0.22.0" -metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] } +metrics = "0.23.0" +metrics-exporter-prometheus = { version = "0.15.0", default-features = false, features = ["http-listener"] } mime = "0.3.1" nanorand = { version = "0.7", optional = true } -opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] } -opentelemetry = "0.22" -opentelemetry-otlp = "0.15" +opentelemetry_sdk = { version = "0.23", features = ["rt-tokio"] } +opentelemetry = "0.23" +opentelemetry-otlp = "0.16" pin-project-lite = "0.2.7" refinery = { version = "0.8.10", features = ["tokio-postgres", "postgres"] } reqwest = { version = "0.12.0", default-features = false, features = ["json", "rustls-tls-no-provider", "stream"] } @@ -70,7 +70,7 @@ time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] } tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] } -tokio-uring = { version = "0.4", optional = true, features = ["bytes"] } +tokio-uring = { version = "0.5", optional = true, features = ["bytes"] } tokio-util = { version = "0.7", default-features = false, features = [ "codec", "io", @@ -79,7 +79,7 @@ toml = "0.8.0" tracing = "0.1.15" tracing-error = "0.2.0" tracing-log = "0.2.0" -tracing-opentelemetry = "0.23" +tracing-opentelemetry = "0.24" tracing-subscriber = { version = "0.3.0", features = [ "ansi", "env-filter", @@ -96,7 +96,7 @@ webpki-roots = "0.26.0" [dependencies.tracing-actix-web] version = "0.7.10" default-features = false -features = ["opentelemetry_0_22"] +features = ["opentelemetry_0_23"] [patch.crates-io] reqwest = { git = "https://github.com/asonix/reqwest", branch = "asonix/limit-openssl-probe" } diff --git a/pict-rs.nix b/pict-rs.nix index 049bc05..d76165d 100644 --- a/pict-rs.nix +++ b/pict-rs.nix @@ -11,7 +11,7 @@ rustPlatform.buildRustPackage { pname = "pict-rs"; - version = "0.5.15"; + version = "0.5.16"; src = ./.; cargoLock = { diff --git a/releases/0.5.16.md b/releases/0.5.16.md new file mode 100644 index 0000000..56aecd7 --- /dev/null +++ b/releases/0.5.16.md @@ -0,0 +1,46 @@ +# pict-rs 0.5.16 + +pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images, +animations, and videos, as well as providing basic image processing functionality. + +## Overview + +pict-rs 0.5.16 includes a couple bugfixes for parsing query paramters, better handling of proxied +media, clearer postgres errors, and updated dependencies. + +### Fixes + +- [Query Fixes](#query-fixes) +- [Better Concurrent Proxies](#better-concurrent-proxies) + + +### Changes + +- [Improved Postgres Errors](#improved-postgres-errors) + + +## Upgrade Notes + +There are no significant changes from 0.5.15. Upgrading should be a simple as pulling a new version +of pict-rs. + + +## Descriptions + +### Query Fixes + +A couple boolean query parameters were not getting parsed properly, this impacted ?backgrounded for +image downloads and ?force for pruning media. pict-rs 0.5.16 resolves this. + + +### Better Concurrent Proxies + +When the same proxy endpoint was accessed more than once concurrently, only one request would end up +succeeding. Now pict-rs better handles when multiple concurrent requests are made. + + +### Improved Postgres Errors + +In some postgres errors, it is difficult to tell whether an error originated from within the +postgres client or the postgres server. pict-rs 0.5.16 now includes extra context when displaying +postgres errors to help discern this. diff --git a/src/file.rs b/src/file.rs index 3417fcd..ad39937 100644 --- a/src/file.rs +++ b/src/file.rs @@ -167,10 +167,10 @@ mod io_uring { while let Some(res) = stream.next().await { tracing::trace!("write_from_stream while: looping"); - let mut buf = res?; + let buf = res?; let len = buf.len(); - let mut position = 0; + let mut position: usize = 0; loop { tracing::trace!("write_from_stream: looping"); @@ -179,9 +179,8 @@ mod io_uring { break; } - let position_u64: u64 = position.try_into().unwrap(); - let (res, slice) = self - .write_at(buf.slice(position..len), cursor + position_u64) + let (res, _buf) = self + .write_at(buf.slice(position..), cursor + (position as u64)) .await; let n = res?; @@ -190,12 +189,10 @@ mod io_uring { } position += n; - - buf = slice.into_inner(); } - let position: u64 = position.try_into().unwrap(); - cursor += position; + let len: u64 = len.try_into().unwrap(); + cursor += len; } self.inner.sync_all().await?; @@ -220,7 +217,7 @@ mod io_uring { } async fn write_at(&self, buf: T, pos: u64) -> BufResult { - self.inner.write_at(buf, pos).await + self.inner.write_at(buf, pos).submit().await } } diff --git a/src/lib.rs b/src/lib.rs index 6b509f1..7b0b16f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,10 @@ use self::{ middleware::{Deadline, Internal, Log, Metrics, Payload}, migrate_store::migrate_store, queue::queue_generate, - repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult}, + repo::{ + sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, ProxyAlreadyExists, Repo, UploadId, + UploadResult, + }, serde_str::Serde, state::State, store::{file_store::FileStore, object_store::ObjectStore, Store}, @@ -1286,11 +1289,28 @@ async fn proxy_alias_from_query( } else if !state.config.server.read_only { let stream = download_stream(proxy.as_str(), state).await?; - let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?; + // some time has passed, see if we've proxied elsewhere + if let Some(alias) = state.repo.related(proxy.clone()).await? { + alias + } else { + let (alias, token, _) = + ingest_inline(stream, state, &Default::default()).await?; - state.repo.relate_url(proxy, alias.clone()).await?; + // last check, do we succeed or fail to relate the proxy alias + if let Err(ProxyAlreadyExists) = + state.repo.relate_url(proxy.clone(), alias.clone()).await? + { + queue::cleanup_alias(&state.repo, alias, token).await?; - alias + state + .repo + .related(proxy) + .await? + .ok_or(UploadError::MissingAlias)? + } else { + alias + } + } } else { return Err(UploadError::ReadOnly.into()); }; diff --git a/src/repo.rs b/src/repo.rs index dfe3fcb..4d01c84 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -46,6 +46,8 @@ pub(crate) struct HashAlreadyExists; pub(crate) struct AliasAlreadyExists; #[derive(Debug)] pub(crate) struct VariantAlreadyExists; +#[derive(Debug)] +pub(crate) struct ProxyAlreadyExists; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { @@ -151,7 +153,11 @@ impl BaseRepo for Arc where T: BaseRepo {} #[async_trait::async_trait(?Send)] pub(crate) trait ProxyRepo: BaseRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>; + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError>; async fn related(&self, url: Url) -> Result, RepoError>; @@ -163,7 +169,11 @@ impl ProxyRepo for Arc where T: ProxyRepo, { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError> { T::relate_url(self, url, alias).await } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index a690d1c..edd9fc8 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -47,8 +47,8 @@ use super::{ notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, + ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, + UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; #[derive(Clone)] @@ -103,33 +103,71 @@ pub(crate) enum ConnectPostgresError { BuildPool(#[source] PoolError), } -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub(crate) enum PostgresError { - #[error("Error in db pool")] - Pool(#[source] RunError), - - #[error("Error in database")] - Diesel(#[from] diesel::result::Error), - - #[error("Error deserializing hex value")] - Hex(#[source] hex::FromHexError), - - #[error("Error serializing details")] - SerializeDetails(#[source] serde_json::Error), - - #[error("Error deserializing details")] - DeserializeDetails(#[source] serde_json::Error), - - #[error("Error serializing upload result")] - SerializeUploadResult(#[source] serde_json::Error), - - #[error("Error deserializing upload result")] - DeserializeUploadResult(#[source] serde_json::Error), - - #[error("Timed out waiting for postgres")] + Pool(RunError), + Diesel(diesel::result::Error), + Hex(hex::FromHexError), + SerializeDetails(serde_json::Error), + DeserializeDetails(serde_json::Error), + SerializeUploadResult(serde_json::Error), + DeserializeUploadResult(serde_json::Error), DbTimeout, } +impl std::fmt::Display for PostgresError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Pool(_) => write!(f, "Error in db pool"), + Self::Diesel(e) => match e { + diesel::result::Error::DatabaseError(kind, _) => { + write!(f, "Error in diesel: {kind:?}") + } + diesel::result::Error::InvalidCString(_) => { + write!(f, "Error in diesel: Invalid c string") + } + diesel::result::Error::QueryBuilderError(_) => { + write!(f, "Error in diesel: Query builder") + } + diesel::result::Error::SerializationError(_) => { + write!(f, "Error in diesel: Serialization") + } + diesel::result::Error::DeserializationError(_) => { + write!(f, "Error in diesel: Deserialization") + } + _ => write!(f, "Error in diesel"), + }, + Self::Hex(_) => write!(f, "Error deserializing hex value"), + Self::SerializeDetails(_) => write!(f, "Error serializing details"), + Self::DeserializeDetails(_) => write!(f, "Error deserializing details"), + Self::SerializeUploadResult(_) => write!(f, "Error serializing upload result"), + Self::DeserializeUploadResult(_) => write!(f, "Error deserializing upload result"), + Self::DbTimeout => write!(f, "Timed out waiting for postgres"), + } + } +} + +impl std::error::Error for PostgresError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Pool(e) => Some(e), + Self::Diesel(e) => Some(e), + Self::Hex(e) => Some(e), + Self::SerializeDetails(e) => Some(e), + Self::DeserializeDetails(e) => Some(e), + Self::SerializeUploadResult(e) => Some(e), + Self::DeserializeUploadResult(e) => Some(e), + Self::DbTimeout => None, + } + } +} + +impl From for PostgresError { + fn from(value: diesel::result::Error) -> Self { + Self::Diesel(value) + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum TlsError { #[error("Couldn't read configured certificate file")] @@ -1846,21 +1884,31 @@ impl StoreMigrationRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl ProxyRepo for PostgresRepo { #[tracing::instrument(level = "debug", skip(self))] - async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + input_url: Url, + input_alias: Alias, + ) -> Result, RepoError> { use schema::proxies::dsl::*; let mut conn = self.get_connection().await?; - diesel::insert_into(proxies) + let res = diesel::insert_into(proxies) .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map_err(|_| PostgresError::DbTimeout)?; - Ok(()) + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(ProxyAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index af0e2cd..6f1a553 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -26,8 +26,8 @@ use super::{ notification_map::{NotificationEntry, NotificationMap}, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash, - ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, + ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, + UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo, }; macro_rules! b { @@ -218,20 +218,32 @@ impl FullRepo for SledRepo { #[async_trait::async_trait(?Send)] impl ProxyRepo for SledRepo { - async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + async fn relate_url( + &self, + url: Url, + alias: Alias, + ) -> Result, RepoError> { let proxy = self.proxy.clone(); let inverse_proxy = self.inverse_proxy.clone(); - crate::sync::spawn_blocking("sled-io", move || { - proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?; - inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; + let res = crate::sync::spawn_blocking("sled-io", move || { + match proxy.compare_and_swap( + url.as_str().as_bytes(), + Option::::None, + Some(alias.to_bytes()), + )? { + Ok(_) => { + inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; - Ok(()) as Result<(), SledError> + Ok(Ok(())) as Result, SledError> + } + Err(_) => Ok(Err(ProxyAlreadyExists)), + } }) .await .map_err(|_| RepoError::Canceled)??; - Ok(()) + Ok(res) } async fn related(&self, url: Url) -> Result, RepoError> {