From 445c99dbf1ceab2951f4c7e31dad8f6e5867795a Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 13 Jan 2022 18:11:46 -0600 Subject: [PATCH] Re-use existing BytesMut when streaming file --- Cargo.lock | 71 +++++++++++++++++++++++++++++++---------------------- Cargo.toml | 2 +- src/file.rs | 70 +++++++++++++++++++++++++++++----------------------- 3 files changed, 81 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b15bdf..f88055f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.5.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82cf33e04d9911b39bfb7be3c01309568b4315895d3358372dce64ed2c2bf32d" +checksum = "cdf3f2183be1241ed4dd22611850b85d38de0b08a09f1f7bcccbd0809084b359" dependencies = [ "actix-macros", "futures-core", @@ -125,9 +125,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.0.0-rc.3" +version = "2.0.0-rc.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9259b4f3cc9ca96d7d91a7da66b7b01c47653a0da5b0ba3f7f45a344480443b" +checksum = "fbdbff45cad841b3b20d9fa8ba0df99d1a80f69f397f5b6fad827e5032458bce" dependencies = [ "actix-rt", "actix-service", @@ -155,9 +155,9 @@ dependencies = [ [[package]] name = "actix-tls" -version = "3.0.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ef5760747cdfb108a1f35e6911a7a40939da893f95e035f9eee0c18b4b4025" +checksum = "b161450ff646361005a300716e85856780ddc2fde569fd81335cc3c15b2e0933" dependencies = [ "actix-codec", "actix-rt", @@ -385,9 +385,9 @@ dependencies = [ [[package]] name = "aws-region" -version = "0.23.3" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1efb67b8f201dd0deea4e1240bce7da0366f8df90509a7df054973604b20b34" +checksum = "4e37c2dc2c9047311911ef175e0ffbb3853f17c32b72cf3d562f455e5ff77267" dependencies = [ "anyhow", ] @@ -565,9 +565,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" dependencies = [ "cfg-if", "crossbeam-utils", @@ -575,9 +575,9 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +checksum = "97242a70df9b89a65d0b6df3c4bf5b9ce03c5b7309019777fbde37e7537f8762" dependencies = [ "cfg-if", "crossbeam-utils", @@ -588,9 +588,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" dependencies = [ "cfg-if", "lazy_static", @@ -703,6 +703,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "fastrand" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2" +dependencies = [ + "instant", +] + [[package]] name = "firestorm" version = "0.5.0" @@ -869,9 +878,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ "cfg-if", "libc", @@ -880,9 +889,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" +checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689" dependencies = [ "bytes", "fnv", @@ -1061,9 +1070,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", "hashbrown 0.11.2", @@ -1517,7 +1526,7 @@ dependencies = [ "rust-s3", "serde 1.0.133", "serde_json", - "sha2 0.10.0", + "sha2 0.10.1", "sled", "storage-path-generator", "structopt", @@ -1818,15 +1827,16 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" +checksum = "87f242f1488a539a79bac6dbe7c8609ae43b7914b7736210f239a37cccb32525" dependencies = [ "base64", "bytes", "encoding_rs", "futures-core", "futures-util", + "h2", "http", "http-body", "hyper", @@ -2086,9 +2096,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d964dd36bb15bcf2f2b35694c072feab74969a54f2bbeec7a2d725d2bdcb6" +checksum = "99c3bd8169c58782adad9290a9af5939994036b76187f7b4f0e6de91dbbfc0ec" dependencies = [ "cfg-if", "cpufeatures", @@ -2222,13 +2232,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ "cfg-if", + "fastrand", "libc", - "rand", "redox_syscall", "remove_dir_all", "winapi", @@ -2373,10 +2383,11 @@ dependencies = [ [[package]] name = "tokio-uring" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f54af096a39b937631659f1a5da60ab7c1af334025c33b87ed913072c3c8a9" +checksum = "062a33613d97344c5054d9635b35beb14492b66d9aa4bdbf21ecde1682d256df" dependencies = [ + "bytes", "io-uring", "libc", "scoped-tls", diff --git a/Cargo.toml b/Cargo.toml index 1c2221a..57e05f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } tokio = { version = "1", features = ["full", "tracing"] } -tokio-uring = { version = "0.1", optional = true } +tokio-uring = { version = "0.2", optional = true, features = ["bytes"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" tracing-error = "0.2.0" diff --git a/src/file.rs b/src/file.rs index 1c9f093..6f011e7 100644 --- a/src/file.rs +++ b/src/file.rs @@ -130,7 +130,7 @@ mod tokio_file { #[cfg(feature = "io-uring")] mod io_uring { use crate::store::file_store::FileError; - use actix_web::web::Bytes; + use actix_web::web::{Bytes, BytesMut}; use futures_util::stream::{Stream, StreamExt}; use std::{ convert::TryInto, @@ -172,8 +172,7 @@ mod io_uring { tokio::fs::metadata(&self.path).await } - pub(crate) async fn write_from_bytes(&mut self, bytes: Bytes) -> std::io::Result<()> { - let mut buf = bytes.to_vec(); + pub(crate) async fn write_from_bytes(&mut self, mut buf: Bytes) -> std::io::Result<()> { let len: u64 = buf.len().try_into().unwrap(); let mut cursor: u64 = 0; @@ -209,8 +208,7 @@ mod io_uring { let mut cursor: u64 = 0; while let Some(res) = stream.next().await { - let bytes = res?; - let mut buf = bytes.to_vec(); + let mut buf = res?; let len = buf.len(); let mut position = 0; @@ -313,9 +311,9 @@ mod io_uring { } let max_size = (size - cursor).min(65_536); - let buf = Vec::with_capacity(max_size.try_into().unwrap()); + let buf = BytesMut::with_capacity(max_size.try_into().unwrap()); - let (res, buf): (_, Vec) = self.read_at(buf, cursor).await; + let (res, buf): (_, BytesMut) = self.read_at(buf, cursor).await; let n: usize = res?; if n == 0 { @@ -342,7 +340,10 @@ mod io_uring { let size = len.unwrap_or(size - cursor) + cursor; Ok(BytesStream { - state: ReadFileState::File { file: Some(self) }, + state: ReadFileState::File { + file: Some(self), + bytes: Some(BytesMut::new()), + }, size, cursor, callback: read_file, @@ -375,6 +376,7 @@ mod io_uring { enum ReadFileState { File { file: Option, + bytes: Option, }, Future { #[pin] @@ -385,11 +387,9 @@ mod io_uring { async fn read_file( file: File, - capacity: usize, + buf: BytesMut, cursor: u64, - ) -> (File, BufResult>) { - let buf = Vec::with_capacity(capacity); - + ) -> (File, BufResult) { let buf_res = file.read_at(buf, cursor).await; (file, buf_res) @@ -397,8 +397,8 @@ mod io_uring { impl Stream for BytesStream where - F: Fn(File, usize, u64) -> Fut, - Fut: Future>)> + 'static, + F: Fn(File, BytesMut, u64) -> Fut, + Fut: Future)> + 'static, { type Item = std::io::Result; @@ -406,7 +406,7 @@ mod io_uring { let mut this = self.as_mut().project(); match this.state.as_mut().project() { - ReadFileStateProj::File { file } => { + ReadFileStateProj::File { file, bytes } => { let cursor = *this.cursor; let max_size = *this.size - *this.cursor; @@ -415,9 +415,14 @@ mod io_uring { } let capacity = max_size.min(65_356) as usize; + let mut bytes = bytes.take().unwrap(); let file = file.take().unwrap(); - let fut = (this.callback)(file, capacity, cursor); + if bytes.capacity() < capacity { + bytes.reserve(capacity - bytes.capacity()); + } + + let fut = (this.callback)(file, bytes, cursor); this.state.project_replace(ReadFileState::Future { fut }); self.poll_next(cx) @@ -425,10 +430,13 @@ mod io_uring { ReadFileStateProj::Future { fut } => match fut.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready((file, (Ok(n), mut buf))) => { - this.state - .project_replace(ReadFileState::File { file: Some(file) }); + let bytes = buf.split_off(n); + + this.state.project_replace(ReadFileState::File { + file: Some(file), + bytes: Some(bytes), + }); - let _ = buf.split_off(n); let n: u64 = match n.try_into() { Ok(n) => n, Err(_) => { @@ -482,17 +490,17 @@ mod io_uring { let mut source = std::fs::File::open(EARTH_GIF).unwrap(); let mut dest = std::fs::File::open(tmp).unwrap(); - let mut source_vec = Vec::new(); - source.read_to_end(&mut source_vec).unwrap(); + let mut source_bytes = Vec::new(); + source.read_to_end(&mut source_bytes).unwrap(); - let mut dest_vec = Vec::new(); - dest.read_to_end(&mut dest_vec).unwrap(); + let mut dest_bytes = Vec::new(); + dest.read_to_end(&mut dest_bytes).unwrap(); drop(dest); std::fs::remove_file(tmp).unwrap(); - assert_eq!(source_vec.len(), dest_vec.len()); - assert_eq!(source_vec, dest_vec); + assert_eq!(source_bytes.len(), dest_bytes.len()); + assert_eq!(source_bytes, dest_bytes); } #[test] @@ -508,17 +516,17 @@ mod io_uring { let mut source = std::fs::File::open(EARTH_GIF).unwrap(); let mut dest = std::fs::File::open(tmp).unwrap(); - let mut source_vec = Vec::new(); - source.read_to_end(&mut source_vec).unwrap(); + let mut source_bytes = Vec::new(); + source.read_to_end(&mut source_bytes).unwrap(); - let mut dest_vec = Vec::new(); - dest.read_to_end(&mut dest_vec).unwrap(); + let mut dest_bytes = Vec::new(); + dest.read_to_end(&mut dest_bytes).unwrap(); drop(dest); std::fs::remove_file(tmp).unwrap(); - assert_eq!(source_vec.len(), dest_vec.len()); - assert_eq!(source_vec, dest_vec); + assert_eq!(source_bytes.len(), dest_bytes.len()); + assert_eq!(source_bytes, dest_bytes); } } }