diff --git a/src/file.rs b/src/file.rs index 2d4c6b8..a31f0e1 100644 --- a/src/file.rs +++ b/src/file.rs @@ -147,7 +147,7 @@ mod io_uring { }; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio_uring::{ - buf::{IoBuf, IoBufMut, Slice}, + buf::{IoBuf, IoBufMut}, BufResult, }; @@ -204,6 +204,8 @@ mod io_uring { cursor += n; } + self.inner.sync_all().await?; + Ok(()) } @@ -214,13 +216,10 @@ mod io_uring { where R: AsyncRead + Unpin, { - let metadata = self.metadata().await?; - let size = metadata.len(); - let mut cursor: u64 = 0; loop { - let max_size = (size - cursor).min(65_536); + let max_size = 65_536; let mut buf = Vec::with_capacity(max_size.try_into().unwrap()); let n = (&mut reader).take(max_size).read_to_end(&mut buf).await?; @@ -229,16 +228,24 @@ mod io_uring { break; } - let mut buf: Slice> = buf.slice(..n); let mut position = 0; loop { - if position == buf.len() { + if position == n { break; } - let (res, slice) = self.write_at(buf.slice(position..), cursor).await; - position += res?; + let position_u64: u64 = position.try_into().unwrap(); + let (res, slice) = self + .write_at(buf.slice(position..n), cursor + position_u64) + .await; + + let n = res?; + if n == 0 { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + } + + position += n; buf = slice.into_inner(); } @@ -247,6 +254,8 @@ mod io_uring { cursor += position; } + self.inner.sync_all().await?; + Ok(()) } @@ -368,4 +377,81 @@ mod io_uring { } } } + + #[cfg(test)] + mod tests { + use std::io::Read; + + macro_rules! test_on_arbiter { + ($fut:expr) => { + actix_rt::System::new().block_on(async move { + let arbiter = actix_rt::Arbiter::new(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + + arbiter.spawn(async move { + let handle = actix_rt::spawn($fut); + + let _ = tx.send(handle.await.unwrap()); + }); + + rx.await.unwrap() + }) + }; + } + + const EARTH_GIF: &'static str = "client-examples/earth.gif"; + + #[test] + fn read() { + let tmp = "/tmp/read-test"; + + test_on_arbiter!(async move { + let mut file = super::File::open(EARTH_GIF).await.unwrap(); + let mut tmp_file = tokio::fs::File::create(tmp).await.unwrap(); + file.read_to_async_write(&mut tmp_file).await.unwrap(); + }); + + let mut source = std::fs::File::open(EARTH_GIF).unwrap(); + let mut dest = std::fs::File::open(tmp).unwrap(); + + let mut source_vec = Vec::new(); + source.read_to_end(&mut source_vec).unwrap(); + + let mut dest_vec = Vec::new(); + dest.read_to_end(&mut dest_vec).unwrap(); + + drop(dest); + std::fs::remove_file(tmp).unwrap(); + + assert_eq!(source_vec.len(), dest_vec.len()); + assert_eq!(source_vec, dest_vec); + } + + #[test] + fn write() { + let tmp = "/tmp/write-test"; + + test_on_arbiter!(async move { + let mut file = tokio::fs::File::open(EARTH_GIF).await.unwrap(); + let mut tmp_file = super::File::create(tmp).await.unwrap(); + tmp_file.write_from_async_read(&mut file).await.unwrap(); + }); + + let mut source = std::fs::File::open(EARTH_GIF).unwrap(); + let mut dest = std::fs::File::open(tmp).unwrap(); + + let mut source_vec = Vec::new(); + source.read_to_end(&mut source_vec).unwrap(); + + let mut dest_vec = Vec::new(); + dest.read_to_end(&mut dest_vec).unwrap(); + + drop(dest); + std::fs::remove_file(tmp).unwrap(); + + assert_eq!(source_vec.len(), dest_vec.len()); + assert_eq!(source_vec, dest_vec); + } + } }