2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-11-20 11:21:14 +00:00

Merge branch 'main' into asonix/update-opentelemetry-023

This commit is contained in:
asonix 2024-06-09 19:16:31 +00:00
commit 8f0fc2054c
4 changed files with 78 additions and 49 deletions

24
Cargo.lock generated
View file

@ -112,9 +112,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-rt" name = "actix-rt"
version = "2.9.0" version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"tokio", "tokio",
@ -123,9 +123,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-server" name = "actix-server"
version = "2.3.0" version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" checksum = "b02303ce8d4e8be5b855af6cf3c3a08f3eff26880faad82bab679c22d3650cb5"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"actix-service", "actix-service",
@ -1482,9 +1482,9 @@ dependencies = [
[[package]] [[package]]
name = "io-uring" name = "io-uring"
version = "0.5.13" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c" checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"libc", "libc",
@ -2657,12 +2657,6 @@ dependencies = [
"pin-utils", "pin-utils",
] ]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -3151,14 +3145,14 @@ dependencies = [
[[package]] [[package]]
name = "tokio-uring" name = "tokio-uring"
version = "0.4.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef" checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-util",
"io-uring", "io-uring",
"libc", "libc",
"scoped-tls",
"slab", "slab",
"socket2 0.4.10", "socket2 0.4.10",
"tokio", "tokio",

View file

@ -70,7 +70,7 @@ time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] } tokio = { version = "1", features = ["full", "tracing"] }
tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] } tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] }
tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] } tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] }
tokio-uring = { version = "0.4", optional = true, features = ["bytes"] } tokio-uring = { version = "0.5", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = [ tokio-util = { version = "0.7", default-features = false, features = [
"codec", "codec",
"io", "io",

View file

@ -167,10 +167,10 @@ mod io_uring {
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {
tracing::trace!("write_from_stream while: looping"); tracing::trace!("write_from_stream while: looping");
let mut buf = res?; let buf = res?;
let len = buf.len(); let len = buf.len();
let mut position = 0; let mut position: usize = 0;
loop { loop {
tracing::trace!("write_from_stream: looping"); tracing::trace!("write_from_stream: looping");
@ -179,9 +179,8 @@ mod io_uring {
break; break;
} }
let position_u64: u64 = position.try_into().unwrap(); let (res, _buf) = self
let (res, slice) = self .write_at(buf.slice(position..), cursor + (position as u64))
.write_at(buf.slice(position..len), cursor + position_u64)
.await; .await;
let n = res?; let n = res?;
@ -190,12 +189,10 @@ mod io_uring {
} }
position += n; position += n;
buf = slice.into_inner();
} }
let position: u64 = position.try_into().unwrap(); let len: u64 = len.try_into().unwrap();
cursor += position; cursor += len;
} }
self.inner.sync_all().await?; self.inner.sync_all().await?;
@ -220,7 +217,7 @@ mod io_uring {
} }
async fn write_at<T: IoBuf>(&self, buf: T, pos: u64) -> BufResult<usize, T> { async fn write_at<T: IoBuf>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
self.inner.write_at(buf, pos).await self.inner.write_at(buf, pos).submit().await
} }
} }

View file

@ -103,33 +103,71 @@ pub(crate) enum ConnectPostgresError {
BuildPool(#[source] PoolError), BuildPool(#[source] PoolError),
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug)]
pub(crate) enum PostgresError { pub(crate) enum PostgresError {
#[error("Error in db pool")] Pool(RunError),
Pool(#[source] RunError), Diesel(diesel::result::Error),
Hex(hex::FromHexError),
#[error("Error in database")] SerializeDetails(serde_json::Error),
Diesel(#[from] diesel::result::Error), DeserializeDetails(serde_json::Error),
SerializeUploadResult(serde_json::Error),
#[error("Error deserializing hex value")] DeserializeUploadResult(serde_json::Error),
Hex(#[source] hex::FromHexError),
#[error("Error serializing details")]
SerializeDetails(#[source] serde_json::Error),
#[error("Error deserializing details")]
DeserializeDetails(#[source] serde_json::Error),
#[error("Error serializing upload result")]
SerializeUploadResult(#[source] serde_json::Error),
#[error("Error deserializing upload result")]
DeserializeUploadResult(#[source] serde_json::Error),
#[error("Timed out waiting for postgres")]
DbTimeout, DbTimeout,
} }
impl std::fmt::Display for PostgresError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pool(_) => write!(f, "Error in db pool"),
Self::Diesel(e) => match e {
diesel::result::Error::DatabaseError(kind, _) => {
write!(f, "Error in diesel: {kind:?}")
}
diesel::result::Error::InvalidCString(_) => {
write!(f, "Error in diesel: Invalid c string")
}
diesel::result::Error::QueryBuilderError(_) => {
write!(f, "Error in diesel: Query builder")
}
diesel::result::Error::SerializationError(_) => {
write!(f, "Error in diesel: Serialization")
}
diesel::result::Error::DeserializationError(_) => {
write!(f, "Error in diesel: Deserialization")
}
_ => write!(f, "Error in diesel"),
},
Self::Hex(_) => write!(f, "Error deserializing hex value"),
Self::SerializeDetails(_) => write!(f, "Error serializing details"),
Self::DeserializeDetails(_) => write!(f, "Error deserializing details"),
Self::SerializeUploadResult(_) => write!(f, "Error serializing upload result"),
Self::DeserializeUploadResult(_) => write!(f, "Error deserializing upload result"),
Self::DbTimeout => write!(f, "Timed out waiting for postgres"),
}
}
}
impl std::error::Error for PostgresError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Pool(e) => Some(e),
Self::Diesel(e) => Some(e),
Self::Hex(e) => Some(e),
Self::SerializeDetails(e) => Some(e),
Self::DeserializeDetails(e) => Some(e),
Self::SerializeUploadResult(e) => Some(e),
Self::DeserializeUploadResult(e) => Some(e),
Self::DbTimeout => None,
}
}
}
impl From<diesel::result::Error> for PostgresError {
fn from(value: diesel::result::Error) -> Self {
Self::Diesel(value)
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum TlsError { pub(crate) enum TlsError {
#[error("Couldn't read configured certificate file")] #[error("Couldn't read configured certificate file")]