Skip to content

enhancement: using std::os::unix::fs::FileExt to random read tokio file #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions benches/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn write(c: &mut Criterion) {

let fs = TokioFs;
let file = Rc::new(RefCell::new(runtime.block_on(async {
fs.open_options(&path, OpenOptions::default().write(true).append(true))
fs.open_options(&path, OpenOptions::default().write(true))
.await
.unwrap()
})));
Expand All @@ -52,7 +52,7 @@ fn write(c: &mut Criterion) {

async move {
tokio::io::AsyncWriteExt::write_all(
&mut *(*file).borrow_mut(),
file.borrow_mut().as_mut(),
&bytes.as_ref()[..],
)
.await
Expand Down Expand Up @@ -82,11 +82,13 @@ fn read(c: &mut Criterion) {
let fs = TokioFs;
let file = Rc::new(RefCell::new(runtime.block_on(async {
let mut file = fs
.open_options(&path, OpenOptions::default().write(true).append(true))
.open_options(&path, OpenOptions::default().write(true))
.await
.unwrap();
let (result, _) = file.write_all(&write_bytes[..]).await;
result.unwrap();
for _ in 0..1024 * 1024 {
let (result, _) = file.write_all(&write_bytes[..]).await;
result.unwrap();
}
file
})));

Expand All @@ -96,11 +98,11 @@ fn read(c: &mut Criterion) {
let mut bytes = [0u8; 4096];

async move {
fusio::dynamic::DynSeek::seek(&mut *(*file).borrow_mut(), 0)
.await
.unwrap();
let random_pos = rand::thread_rng().gen_range(0..4096 * 1024 * 1024 - 4096);
let file = file.clone();
let (result, _) =
fusio::Read::read_exact(&mut *(*file).borrow_mut(), &mut bytes[..]).await;
fusio::Read::read_exact_at(&mut *file.borrow_mut(), &mut bytes[..], random_pos)
.await;
result.unwrap();
}
})
Expand All @@ -112,12 +114,16 @@ fn read(c: &mut Criterion) {
let mut bytes = [0u8; 4096];

async move {
let random_pos = rand::thread_rng().gen_range(0..4096 * 1024 * 1024 - 4096);
let file = file.clone();
let _ = tokio::io::AsyncSeekExt::seek(
file.borrow_mut().as_mut(),
SeekFrom::Start(random_pos),
)
.await
.unwrap();
let _ =
tokio::io::AsyncSeekExt::seek(&mut *(*file).borrow_mut(), SeekFrom::Start(0))
.await
.unwrap();
let _ =
tokio::io::AsyncReadExt::read_exact(&mut *(*file).borrow_mut(), &mut bytes[..])
tokio::io::AsyncReadExt::read_exact(file.borrow_mut().as_mut(), &mut bytes[..])
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/fs/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
serdes::{Decode, Encode},
};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let mut bytes = Vec::new();
let mut cursor = Cursor::new(&mut bytes);
Expand Down
8 changes: 4 additions & 4 deletions fusio-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mod tests {
items
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_write_u8() {
let temp_dir = TempDir::new().unwrap();
let path = Path::from_filesystem_path(temp_dir.path())
Expand Down Expand Up @@ -280,7 +280,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_write_struct() {
let temp_dir = TempDir::new().unwrap();
let path = Path::from_filesystem_path(temp_dir.path())
Expand Down Expand Up @@ -333,7 +333,7 @@ mod tests {
}

#[ignore = "s3"]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_write_s3() {
let path = Path::from_url_path("log").unwrap();
let option = Options::new(path).fs(FsOptions::S3 {
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_recover_empty() {
let temp_dir = TempDir::new().unwrap();
let path = Path::from_filesystem_path(temp_dir.path())
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source_0 = Arc::new(1u64);
let source_1 = Arc::new("Hello! Tonbo".to_string());
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source_0 = true;
let source_1 = false;
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source = Bytes::from_static(b"hello! Tonbo");

Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_u8_encode_decode() {
let source = b"hello! Tonbo".to_vec();

Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {

use super::*;

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
// Implement a simple struct that implements Encode and Decode
struct TestStruct(u32);
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/num.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source_0 = 8u8;
let source_1 = 16u16;
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source_0 = Some(1u64);
let source_1 = None;
Expand Down
2 changes: 1 addition & 1 deletion fusio-log/src/serdes/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod tests {

use crate::serdes::{Decode, Encode};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_encode_decode() {
let source_0 = "Hello! World";
let source_1 = "Hello! Tonbo".to_string();
Expand Down
2 changes: 1 addition & 1 deletion fusio-object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl<O: ObjectStore> Write for S3File<O> {
#[cfg(test)]
mod tests {

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_s3() {
use std::{env, env::VarError, sync::Arc};

Expand Down
4 changes: 2 additions & 2 deletions fusio-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ mod tests {
}

#[cfg(feature = "tokio")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tokio_async_reader_with_prefetch_footer_size() {
async_reader_with_prefetch_footer_size().await;
}
Expand All @@ -444,7 +444,7 @@ mod tests {
}

#[cfg(feature = "tokio")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tokio_async_reader_with_large_metadata() {
async_reader_with_large_metadata().await;
}
Expand Down
4 changes: 2 additions & 2 deletions fusio-parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ mod tests {
}

#[cfg(feature = "tokio")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tokio_basic_write() {
basic_write().await;
}
Expand All @@ -234,7 +234,7 @@ mod tests {
}

#[cfg(feature = "tokio")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tokio_async_writer() {
async_writer().await;
}
Expand Down
1 change: 1 addition & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ serde_urlencoded = { version = "0.7", optional = true }
thiserror = "1"
tokio = { version = "1", optional = true, default-features = false, features = [
"io-util",
"rt-multi-thread",
] }
url = { version = "2.5.3", default-features = false, features = ["std"] }

Expand Down
6 changes: 3 additions & 3 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,21 @@ pub async fn copy(
mod tests {

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_dyn_fs() {
use tempfile::tempfile;

use crate::{disk::tokio::TokioFile, Write};

let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()), 0);
let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let mut dyn_file: Box<dyn super::DynFile> = Box::new(file);
let buf = [24, 9, 24, 0];
let (result, _) = dyn_file.write_all(&buf[..]).await;
result.unwrap();
}

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_dyn_buf_fs() {
use tempfile::NamedTempFile;

Expand Down
2 changes: 1 addition & 1 deletion fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tests {
feature = "aws",
not(feature = "completion-based")
))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_diff_fs_copy() -> Result<(), crate::Error> {
use std::sync::Arc;

Expand Down
12 changes: 6 additions & 6 deletions fusio/src/impls/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ pub(crate) mod tests {
}

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_buf_read() {
use tempfile::tempfile;

use crate::disk::tokio::TokioFile;

let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()), 0);
let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let _ = file
.write_all([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15].as_slice())
.await;
Expand Down Expand Up @@ -236,7 +236,7 @@ pub(crate) mod tests {
}

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_buf_read_write() {
use tempfile::tempfile;

Expand All @@ -245,7 +245,7 @@ pub(crate) mod tests {
Read, Write,
};

let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()), 0);
let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let mut writer = BufWriter::new(file, 4);
{
let _ = writer.write_all("Hello".as_bytes()).await;
Expand Down Expand Up @@ -284,13 +284,13 @@ pub(crate) mod tests {
}

#[cfg(all(feature = "tokio", not(feature = "completion-based")))]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_buf_read_eof() {
use tempfile::tempfile;

use crate::disk::tokio::TokioFile;

let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()), 0);
let mut file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
let _ = file.write_all([0, 1, 2].as_slice()).await;

let mut reader = BufReader::new(file, 8).await.unwrap();
Expand Down
10 changes: 3 additions & 7 deletions fusio/src/impls/disk/tokio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@ impl Fs for TokioFs {
.read(options.read)
.write(options.write)
.create(options.create)
.truncate(options.truncate)
.append(!options.truncate)
.open(&local_path)
.await?;

let pos = if options.truncate {
0
} else {
file.metadata().await?.len()
};

Ok(TokioFile::new(file, pos))
Ok(TokioFile::new(file))
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
Expand Down
Loading
Loading