diff --git a/Cargo.lock b/Cargo.lock index 91bc995..ce8a3d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" dependencies = [ "futures-core", "tokio", @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" +checksum = "b02303ce8d4e8be5b855af6cf3c3a08f3eff26880faad82bab679c22d3650cb5" dependencies = [ "actix-rt", "actix-service", @@ -1482,9 +1482,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.5.13" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" dependencies = [ "bitflags 1.3.2", "libc", @@ -2657,12 +2657,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -3151,14 +3145,14 @@ dependencies = [ [[package]] name = "tokio-uring" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" dependencies = [ "bytes", + "futures-util", "io-uring", "libc", - "scoped-tls", "slab", "socket2 0.4.10", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 03d55d9..050083f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] } tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] } -tokio-uring = { version = "0.4", optional = true, features = ["bytes"] } +tokio-uring = { version = "0.5", optional = true, features = ["bytes"] } tokio-util = { version = "0.7", default-features = false, features = [ "codec", "io", diff --git a/src/file.rs b/src/file.rs index 3417fcd..ad39937 100644 --- a/src/file.rs +++ b/src/file.rs @@ -167,10 +167,10 @@ mod io_uring { while let Some(res) = stream.next().await { tracing::trace!("write_from_stream while: looping"); - let mut buf = res?; + let buf = res?; let len = buf.len(); - let mut position = 0; + let mut position: usize = 0; loop { tracing::trace!("write_from_stream: looping"); @@ -179,9 +179,8 @@ mod io_uring { break; } - let position_u64: u64 = position.try_into().unwrap(); - let (res, slice) = self - .write_at(buf.slice(position..len), cursor + position_u64) + let (res, _buf) = self + .write_at(buf.slice(position..), cursor + (position as u64)) .await; let n = res?; @@ -190,12 +189,10 @@ mod io_uring { } position += n; - - buf = slice.into_inner(); } - let position: u64 = position.try_into().unwrap(); - cursor += position; + let len: u64 = len.try_into().unwrap(); + cursor += len; } self.inner.sync_all().await?; @@ -220,7 +217,7 @@ mod io_uring { } async fn write_at(&self, buf: T, pos: u64) -> BufResult { - self.inner.write_at(buf, pos).await + self.inner.write_at(buf, pos).submit().await } } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index a690d1c..51a6a6a 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -103,33 +103,71 @@ pub(crate) enum ConnectPostgresError { BuildPool(#[source] PoolError), } -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub(crate) enum PostgresError { - #[error("Error in db pool")] - Pool(#[source] RunError), - - #[error("Error in database")] - Diesel(#[from] diesel::result::Error), - - #[error("Error deserializing hex value")] - Hex(#[source] hex::FromHexError), - - #[error("Error serializing details")] - SerializeDetails(#[source] serde_json::Error), - - #[error("Error deserializing details")] - DeserializeDetails(#[source] serde_json::Error), - - #[error("Error serializing upload result")] - SerializeUploadResult(#[source] serde_json::Error), - - #[error("Error deserializing upload result")] - DeserializeUploadResult(#[source] serde_json::Error), - - #[error("Timed out waiting for postgres")] + Pool(RunError), + Diesel(diesel::result::Error), + Hex(hex::FromHexError), + SerializeDetails(serde_json::Error), + DeserializeDetails(serde_json::Error), + SerializeUploadResult(serde_json::Error), + DeserializeUploadResult(serde_json::Error), DbTimeout, } +impl std::fmt::Display for PostgresError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Pool(_) => write!(f, "Error in db pool"), + Self::Diesel(e) => match e { + diesel::result::Error::DatabaseError(kind, _) => { + write!(f, "Error in diesel: {kind:?}") + } + diesel::result::Error::InvalidCString(_) => { + write!(f, "Error in diesel: Invalid c string") + } + diesel::result::Error::QueryBuilderError(_) => { + write!(f, "Error in diesel: Query builder") + } + diesel::result::Error::SerializationError(_) => { + write!(f, "Error in diesel: Serialization") + } + diesel::result::Error::DeserializationError(_) => { + write!(f, "Error in diesel: Deserialization") + } + _ => write!(f, "Error in diesel"), + }, + Self::Hex(_) => write!(f, "Error deserializing hex value"), + Self::SerializeDetails(_) => write!(f, "Error serializing details"), + Self::DeserializeDetails(_) => write!(f, "Error deserializing details"), + Self::SerializeUploadResult(_) => write!(f, "Error serializing upload result"), + Self::DeserializeUploadResult(_) => write!(f, "Error deserializing upload result"), + Self::DbTimeout => write!(f, "Timed out waiting for postgres"), + } + } +} + +impl std::error::Error for PostgresError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Pool(e) => Some(e), + Self::Diesel(e) => Some(e), + Self::Hex(e) => Some(e), + Self::SerializeDetails(e) => Some(e), + Self::DeserializeDetails(e) => Some(e), + Self::SerializeUploadResult(e) => Some(e), + Self::DeserializeUploadResult(e) => Some(e), + Self::DbTimeout => None, + } + } +} + +impl From for PostgresError { + fn from(value: diesel::result::Error) -> Self { + Self::Diesel(value) + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum TlsError { #[error("Couldn't read configured certificate file")]