Re-use existing BytesMut when streaming file

This commit is contained in:
Aode (Lion) 2022-01-13 18:11:46 -06:00
parent 0edad8c015
commit 445c99dbf1
3 changed files with 81 additions and 62 deletions

71
Cargo.lock generated
View File

@ -113,9 +113,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-rt" name = "actix-rt"
version = "2.5.1" version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82cf33e04d9911b39bfb7be3c01309568b4315895d3358372dce64ed2c2bf32d" checksum = "cdf3f2183be1241ed4dd22611850b85d38de0b08a09f1f7bcccbd0809084b359"
dependencies = [ dependencies = [
"actix-macros", "actix-macros",
"futures-core", "futures-core",
@ -125,9 +125,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-server" name = "actix-server"
version = "2.0.0-rc.3" version = "2.0.0-rc.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9259b4f3cc9ca96d7d91a7da66b7b01c47653a0da5b0ba3f7f45a344480443b" checksum = "fbdbff45cad841b3b20d9fa8ba0df99d1a80f69f397f5b6fad827e5032458bce"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"actix-service", "actix-service",
@ -155,9 +155,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-tls" name = "actix-tls"
version = "3.0.0" version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5ef5760747cdfb108a1f35e6911a7a40939da893f95e035f9eee0c18b4b4025" checksum = "b161450ff646361005a300716e85856780ddc2fde569fd81335cc3c15b2e0933"
dependencies = [ dependencies = [
"actix-codec", "actix-codec",
"actix-rt", "actix-rt",
@ -385,9 +385,9 @@ dependencies = [
[[package]] [[package]]
name = "aws-region" name = "aws-region"
version = "0.23.3" version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1efb67b8f201dd0deea4e1240bce7da0366f8df90509a7df054973604b20b34" checksum = "4e37c2dc2c9047311911ef175e0ffbb3853f17c32b72cf3d562f455e5ff77267"
dependencies = [ dependencies = [
"anyhow", "anyhow",
] ]
@ -565,9 +565,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.1" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -575,9 +575,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.5" version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" checksum = "97242a70df9b89a65d0b6df3c4bf5b9ce03c5b7309019777fbde37e7537f8762"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -588,9 +588,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.5" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"lazy_static", "lazy_static",
@ -703,6 +703,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "fastrand"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2"
dependencies = [
"instant",
]
[[package]] [[package]]
name = "firestorm" name = "firestorm"
version = "0.5.0" version = "0.5.0"
@ -869,9 +878,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.3" version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@ -880,9 +889,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.9" version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -1061,9 +1070,9 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown 0.11.2", "hashbrown 0.11.2",
@ -1517,7 +1526,7 @@ dependencies = [
"rust-s3", "rust-s3",
"serde 1.0.133", "serde 1.0.133",
"serde_json", "serde_json",
"sha2 0.10.0", "sha2 0.10.1",
"sled", "sled",
"storage-path-generator", "storage-path-generator",
"structopt", "structopt",
@ -1818,15 +1827,16 @@ dependencies = [
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.8" version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" checksum = "87f242f1488a539a79bac6dbe7c8609ae43b7914b7736210f239a37cccb32525"
dependencies = [ dependencies = [
"base64", "base64",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2",
"http", "http",
"http-body", "http-body",
"hyper", "hyper",
@ -2086,9 +2096,9 @@ dependencies = [
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.0" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900d964dd36bb15bcf2f2b35694c072feab74969a54f2bbeec7a2d725d2bdcb6" checksum = "99c3bd8169c58782adad9290a9af5939994036b76187f7b4f0e6de91dbbfc0ec"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cpufeatures", "cpufeatures",
@ -2222,13 +2232,13 @@ dependencies = [
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.2.0" version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"fastrand",
"libc", "libc",
"rand",
"redox_syscall", "redox_syscall",
"remove_dir_all", "remove_dir_all",
"winapi", "winapi",
@ -2373,10 +2383,11 @@ dependencies = [
[[package]] [[package]]
name = "tokio-uring" name = "tokio-uring"
version = "0.1.0" version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f54af096a39b937631659f1a5da60ab7c1af334025c33b87ed913072c3c8a9" checksum = "062a33613d97344c5054d9635b35beb14492b66d9aa4bdbf21ecde1682d256df"
dependencies = [ dependencies = [
"bytes",
"io-uring", "io-uring",
"libc", "libc",
"scoped-tls", "scoped-tls",

View File

@ -44,7 +44,7 @@ structopt = "0.3.14"
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.0", features = ["serde"] } time = { version = "0.3.0", features = ["serde"] }
tokio = { version = "1", features = ["full", "tracing"] } tokio = { version = "1", features = ["full", "tracing"] }
tokio-uring = { version = "0.1", optional = true } tokio-uring = { version = "0.2", optional = true, features = ["bytes"] }
tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] }
tracing = "0.1.15" tracing = "0.1.15"
tracing-error = "0.2.0" tracing-error = "0.2.0"

View File

@ -130,7 +130,7 @@ mod tokio_file {
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
mod io_uring { mod io_uring {
use crate::store::file_store::FileError; use crate::store::file_store::FileError;
use actix_web::web::Bytes; use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
use std::{ use std::{
convert::TryInto, convert::TryInto,
@ -172,8 +172,7 @@ mod io_uring {
tokio::fs::metadata(&self.path).await tokio::fs::metadata(&self.path).await
} }
pub(crate) async fn write_from_bytes(&mut self, bytes: Bytes) -> std::io::Result<()> { pub(crate) async fn write_from_bytes(&mut self, mut buf: Bytes) -> std::io::Result<()> {
let mut buf = bytes.to_vec();
let len: u64 = buf.len().try_into().unwrap(); let len: u64 = buf.len().try_into().unwrap();
let mut cursor: u64 = 0; let mut cursor: u64 = 0;
@ -209,8 +208,7 @@ mod io_uring {
let mut cursor: u64 = 0; let mut cursor: u64 = 0;
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {
let bytes = res?; let mut buf = res?;
let mut buf = bytes.to_vec();
let len = buf.len(); let len = buf.len();
let mut position = 0; let mut position = 0;
@ -313,9 +311,9 @@ mod io_uring {
} }
let max_size = (size - cursor).min(65_536); let max_size = (size - cursor).min(65_536);
let buf = Vec::with_capacity(max_size.try_into().unwrap()); let buf = BytesMut::with_capacity(max_size.try_into().unwrap());
let (res, buf): (_, Vec<u8>) = self.read_at(buf, cursor).await; let (res, buf): (_, BytesMut) = self.read_at(buf, cursor).await;
let n: usize = res?; let n: usize = res?;
if n == 0 { if n == 0 {
@ -342,7 +340,10 @@ mod io_uring {
let size = len.unwrap_or(size - cursor) + cursor; let size = len.unwrap_or(size - cursor) + cursor;
Ok(BytesStream { Ok(BytesStream {
state: ReadFileState::File { file: Some(self) }, state: ReadFileState::File {
file: Some(self),
bytes: Some(BytesMut::new()),
},
size, size,
cursor, cursor,
callback: read_file, callback: read_file,
@ -375,6 +376,7 @@ mod io_uring {
enum ReadFileState<Fut> { enum ReadFileState<Fut> {
File { File {
file: Option<File>, file: Option<File>,
bytes: Option<BytesMut>,
}, },
Future { Future {
#[pin] #[pin]
@ -385,11 +387,9 @@ mod io_uring {
async fn read_file( async fn read_file(
file: File, file: File,
capacity: usize, buf: BytesMut,
cursor: u64, cursor: u64,
) -> (File, BufResult<usize, Vec<u8>>) { ) -> (File, BufResult<usize, BytesMut>) {
let buf = Vec::with_capacity(capacity);
let buf_res = file.read_at(buf, cursor).await; let buf_res = file.read_at(buf, cursor).await;
(file, buf_res) (file, buf_res)
@ -397,8 +397,8 @@ mod io_uring {
impl<F, Fut> Stream for BytesStream<F, Fut> impl<F, Fut> Stream for BytesStream<F, Fut>
where where
F: Fn(File, usize, u64) -> Fut, F: Fn(File, BytesMut, u64) -> Fut,
Fut: Future<Output = (File, BufResult<usize, Vec<u8>>)> + 'static, Fut: Future<Output = (File, BufResult<usize, BytesMut>)> + 'static,
{ {
type Item = std::io::Result<Bytes>; type Item = std::io::Result<Bytes>;
@ -406,7 +406,7 @@ mod io_uring {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state.as_mut().project() { match this.state.as_mut().project() {
ReadFileStateProj::File { file } => { ReadFileStateProj::File { file, bytes } => {
let cursor = *this.cursor; let cursor = *this.cursor;
let max_size = *this.size - *this.cursor; let max_size = *this.size - *this.cursor;
@ -415,9 +415,14 @@ mod io_uring {
} }
let capacity = max_size.min(65_356) as usize; let capacity = max_size.min(65_356) as usize;
let mut bytes = bytes.take().unwrap();
let file = file.take().unwrap(); let file = file.take().unwrap();
let fut = (this.callback)(file, capacity, cursor); if bytes.capacity() < capacity {
bytes.reserve(capacity - bytes.capacity());
}
let fut = (this.callback)(file, bytes, cursor);
this.state.project_replace(ReadFileState::Future { fut }); this.state.project_replace(ReadFileState::Future { fut });
self.poll_next(cx) self.poll_next(cx)
@ -425,10 +430,13 @@ mod io_uring {
ReadFileStateProj::Future { fut } => match fut.poll(cx) { ReadFileStateProj::Future { fut } => match fut.poll(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready((file, (Ok(n), mut buf))) => { Poll::Ready((file, (Ok(n), mut buf))) => {
this.state let bytes = buf.split_off(n);
.project_replace(ReadFileState::File { file: Some(file) });
this.state.project_replace(ReadFileState::File {
file: Some(file),
bytes: Some(bytes),
});
let _ = buf.split_off(n);
let n: u64 = match n.try_into() { let n: u64 = match n.try_into() {
Ok(n) => n, Ok(n) => n,
Err(_) => { Err(_) => {
@ -482,17 +490,17 @@ mod io_uring {
let mut source = std::fs::File::open(EARTH_GIF).unwrap(); let mut source = std::fs::File::open(EARTH_GIF).unwrap();
let mut dest = std::fs::File::open(tmp).unwrap(); let mut dest = std::fs::File::open(tmp).unwrap();
let mut source_vec = Vec::new(); let mut source_bytes = Vec::new();
source.read_to_end(&mut source_vec).unwrap(); source.read_to_end(&mut source_bytes).unwrap();
let mut dest_vec = Vec::new(); let mut dest_bytes = Vec::new();
dest.read_to_end(&mut dest_vec).unwrap(); dest.read_to_end(&mut dest_bytes).unwrap();
drop(dest); drop(dest);
std::fs::remove_file(tmp).unwrap(); std::fs::remove_file(tmp).unwrap();
assert_eq!(source_vec.len(), dest_vec.len()); assert_eq!(source_bytes.len(), dest_bytes.len());
assert_eq!(source_vec, dest_vec); assert_eq!(source_bytes, dest_bytes);
} }
#[test] #[test]
@ -508,17 +516,17 @@ mod io_uring {
let mut source = std::fs::File::open(EARTH_GIF).unwrap(); let mut source = std::fs::File::open(EARTH_GIF).unwrap();
let mut dest = std::fs::File::open(tmp).unwrap(); let mut dest = std::fs::File::open(tmp).unwrap();
let mut source_vec = Vec::new(); let mut source_bytes = Vec::new();
source.read_to_end(&mut source_vec).unwrap(); source.read_to_end(&mut source_bytes).unwrap();
let mut dest_vec = Vec::new(); let mut dest_bytes = Vec::new();
dest.read_to_end(&mut dest_vec).unwrap(); dest.read_to_end(&mut dest_bytes).unwrap();
drop(dest); drop(dest);
std::fs::remove_file(tmp).unwrap(); std::fs::remove_file(tmp).unwrap();
assert_eq!(source_vec.len(), dest_vec.len()); assert_eq!(source_bytes.len(), dest_bytes.len());
assert_eq!(source_vec, dest_vec); assert_eq!(source_bytes, dest_bytes);
} }
} }
} }