This commit is contained in:
Aode (lion) 2021-10-13 19:50:07 -05:00
parent 09cb2a53b0
commit 786d8469ee
1 changed files with 95 additions and 9 deletions

View File

@ -147,7 +147,7 @@ mod io_uring {
}; };
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_uring::{ use tokio_uring::{
buf::{IoBuf, IoBufMut, Slice}, buf::{IoBuf, IoBufMut},
BufResult, BufResult,
}; };
@ -204,6 +204,8 @@ mod io_uring {
cursor += n; cursor += n;
} }
self.inner.sync_all().await?;
Ok(()) Ok(())
} }
@ -214,13 +216,10 @@ mod io_uring {
where where
R: AsyncRead + Unpin, R: AsyncRead + Unpin,
{ {
let metadata = self.metadata().await?;
let size = metadata.len();
let mut cursor: u64 = 0; let mut cursor: u64 = 0;
loop { loop {
let max_size = (size - cursor).min(65_536); let max_size = 65_536;
let mut buf = Vec::with_capacity(max_size.try_into().unwrap()); let mut buf = Vec::with_capacity(max_size.try_into().unwrap());
let n = (&mut reader).take(max_size).read_to_end(&mut buf).await?; let n = (&mut reader).take(max_size).read_to_end(&mut buf).await?;
@ -229,16 +228,24 @@ mod io_uring {
break; break;
} }
let mut buf: Slice<Vec<u8>> = buf.slice(..n);
let mut position = 0; let mut position = 0;
loop { loop {
if position == buf.len() { if position == n {
break; break;
} }
let (res, slice) = self.write_at(buf.slice(position..), cursor).await; let position_u64: u64 = position.try_into().unwrap();
position += res?; let (res, slice) = self
.write_at(buf.slice(position..n), cursor + position_u64)
.await;
let n = res?;
if n == 0 {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
position += n;
buf = slice.into_inner(); buf = slice.into_inner();
} }
@ -247,6 +254,8 @@ mod io_uring {
cursor += position; cursor += position;
} }
self.inner.sync_all().await?;
Ok(()) Ok(())
} }
@ -368,4 +377,81 @@ mod io_uring {
} }
} }
} }
#[cfg(test)]
mod tests {
use std::io::Read;
macro_rules! test_on_arbiter {
($fut:expr) => {
actix_rt::System::new().block_on(async move {
let arbiter = actix_rt::Arbiter::new();
let (tx, rx) = tokio::sync::oneshot::channel();
arbiter.spawn(async move {
let handle = actix_rt::spawn($fut);
let _ = tx.send(handle.await.unwrap());
});
rx.await.unwrap()
})
};
}
const EARTH_GIF: &'static str = "client-examples/earth.gif";
#[test]
fn read() {
let tmp = "/tmp/read-test";
test_on_arbiter!(async move {
let mut file = super::File::open(EARTH_GIF).await.unwrap();
let mut tmp_file = tokio::fs::File::create(tmp).await.unwrap();
file.read_to_async_write(&mut tmp_file).await.unwrap();
});
let mut source = std::fs::File::open(EARTH_GIF).unwrap();
let mut dest = std::fs::File::open(tmp).unwrap();
let mut source_vec = Vec::new();
source.read_to_end(&mut source_vec).unwrap();
let mut dest_vec = Vec::new();
dest.read_to_end(&mut dest_vec).unwrap();
drop(dest);
std::fs::remove_file(tmp).unwrap();
assert_eq!(source_vec.len(), dest_vec.len());
assert_eq!(source_vec, dest_vec);
}
#[test]
fn write() {
let tmp = "/tmp/write-test";
test_on_arbiter!(async move {
let mut file = tokio::fs::File::open(EARTH_GIF).await.unwrap();
let mut tmp_file = super::File::create(tmp).await.unwrap();
tmp_file.write_from_async_read(&mut file).await.unwrap();
});
let mut source = std::fs::File::open(EARTH_GIF).unwrap();
let mut dest = std::fs::File::open(tmp).unwrap();
let mut source_vec = Vec::new();
source.read_to_end(&mut source_vec).unwrap();
let mut dest_vec = Vec::new();
dest.read_to_end(&mut dest_vec).unwrap();
drop(dest);
std::fs::remove_file(tmp).unwrap();
assert_eq!(source_vec.len(), dest_vec.len());
assert_eq!(source_vec, dest_vec);
}
}
} }