Skip to content

Commit 00695bf

Browse files
crwenethe
authored andcommitted
feat: add support for FileSystemSyncAccessHandle
1 parent a9e34e3 commit 00695bf

File tree

5 files changed

+186
-3
lines changed

5 files changed

+186
-3
lines changed

fusio/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ opfs = [
4949
"dep:web-sys",
5050
"no-send",
5151
]
52+
sync = ["opfs"]
5253
tokio = ["async-stream", "dep:tokio"]
5354
tokio-http = ["dep:reqwest", "http"]
5455
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]
@@ -116,7 +117,9 @@ web-sys = { version = "0.3", optional = true, features = [
116117
"FileSystemFileHandle",
117118
"FileSystemGetDirectoryOptions",
118119
"FileSystemGetFileOptions",
120+
"FileSystemReadWriteOptions",
119121
"FileSystemRemoveOptions",
122+
"FileSystemSyncAccessHandle",
120123
"FileSystemWritableFileStream",
121124
"Navigator",
122125
"ReadableStream",

fusio/src/impls/disk/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ pub(crate) mod opfs;
88
#[cfg(all(feature = "opfs", target_arch = "wasm32", feature = "fs"))]
99
#[allow(unused)]
1010
pub use opfs::fs::*;
11+
#[cfg(all(feature = "opfs", feature = "sync", target_arch = "wasm32"))]
12+
#[allow(unused)]
13+
pub use opfs::sync::OPFSSyncFile;
1114
#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
1215
#[allow(unused)]
1316
pub use opfs::OPFSFile;

fusio/src/impls/disk/opfs/fs.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::future::Future;
2-
31
use async_stream::stream;
42
use futures_core::Stream;
53
use futures_util::StreamExt;
@@ -13,7 +11,10 @@ use web_sys::{
1311
FileSystemGetFileOptions, FileSystemRemoveOptions,
1412
};
1513

14+
#[cfg(not(feature = "sync"))]
1615
use super::OPFSFile;
16+
#[cfg(feature = "sync")]
17+
use crate::disk::OPFSSyncFile;
1718
use crate::{
1819
disk::opfs::{promise, storage},
1920
error::wasm_err,
@@ -26,6 +27,9 @@ use crate::{
2627
pub struct OPFS;
2728

2829
impl Fs for OPFS {
30+
#[cfg(feature = "sync")]
31+
type File = OPFSSyncFile;
32+
#[cfg(not(feature = "sync"))]
2933
type File = OPFSFile;
3034

3135
fn file_system(&self) -> FileSystemTag {
@@ -59,7 +63,13 @@ impl Fs for OPFS {
5963
)
6064
.await?;
6165

62-
Ok(OPFSFile::new(file_handle))
66+
cfg_if::cfg_if! {
67+
if #[cfg(feature = "sync")] {
68+
Ok(Self::File::new(file_handle).await?)
69+
} else {
70+
Ok(OPFSFile::new(file_handle))
71+
}
72+
}
6373
}
6474

6575
/// Recursively creates a directory and all of its parent components if they are missing.

fusio/src/impls/disk/opfs/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#[cfg(feature = "fs")]
22
pub mod fs;
33

4+
#[cfg(feature = "sync")]
5+
pub mod sync;
6+
47
use std::{io, sync::Arc};
58

69
use js_sys::Uint8Array;
@@ -62,6 +65,7 @@ impl FileHandle {
6265

6366
(result, buf)
6467
}
68+
6569
/// Attempts to write an entire buffer into the stream.
6670
///
6771
/// No changes are written to the actual file on disk until the stream is closed.
@@ -199,6 +203,7 @@ pub struct OPFSFile {
199203
}
200204

201205
impl OPFSFile {
206+
#[allow(unused)]
202207
pub(crate) fn new(file_handle: FileSystemFileHandle) -> Self {
203208
Self {
204209
file_handle: Some(Arc::new(FileHandle::new(file_handle))),

fusio/src/impls/disk/opfs/sync/mod.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use std::io;
2+
3+
use web_sys::{FileSystemFileHandle, FileSystemReadWriteOptions, FileSystemSyncAccessHandle};
4+
5+
use crate::{disk::opfs::promise, error::wasm_err, Error, IoBuf, IoBufMut, Read, Write};
6+
7+
/// OPFS based on [FileSystemWritableFileStream](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle)
8+
/// This file is only accessible inside dedicated Web Workers.
9+
pub struct OPFSSyncFile {
10+
file_handle: FileSystemFileHandle,
11+
access_handle: Option<FileSystemSyncAccessHandle>,
12+
}
13+
14+
impl OPFSSyncFile {
15+
pub(crate) async fn new(file_handle: FileSystemFileHandle) -> Result<Self, Error> {
16+
let js_promise = file_handle.create_sync_access_handle();
17+
let access_handle = Some(promise::<FileSystemSyncAccessHandle>(js_promise).await?);
18+
Ok(Self {
19+
file_handle,
20+
access_handle,
21+
})
22+
}
23+
24+
pub(crate) async fn access_handle(&mut self) -> &FileSystemSyncAccessHandle {
25+
if self.access_handle.is_none() {
26+
let js_promise = self.file_handle.create_sync_access_handle();
27+
self.access_handle = Some(
28+
promise::<FileSystemSyncAccessHandle>(js_promise)
29+
.await
30+
.unwrap(),
31+
);
32+
}
33+
self.access_handle.as_ref().unwrap()
34+
}
35+
}
36+
37+
impl Write for OPFSSyncFile {
38+
/// Attempts to write an entire buffer into the file.
39+
///
40+
/// No changes are written to the actual file on disk until [`OPFSFile::close`] has been called.
41+
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/write)
42+
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
43+
match self
44+
.access_handle()
45+
.await
46+
.write_with_u8_array(buf.as_slice())
47+
{
48+
Ok(_) => (Ok(()), buf),
49+
Err(err) => (Err(wasm_err(err)), buf),
50+
}
51+
}
52+
53+
/// Persists any changes made to the file.
54+
/// See more detail in [write](https://developer.mozilla.org/en-US/docs/Web/API/FileSystemSyncAccessHandle/flush)
55+
async fn flush(&mut self) -> Result<(), Error> {
56+
self.access_handle().await.flush().map_err(wasm_err)
57+
}
58+
59+
async fn close(&mut self) -> Result<(), Error> {
60+
if let Some(access_handle) = self.access_handle.take() {
61+
access_handle.close();
62+
}
63+
Ok(())
64+
}
65+
}
66+
67+
impl Read for OPFSSyncFile {
68+
/// Reads the exact number of bytes required to fill `buf` at `pos`.
69+
///
70+
/// # Errors
71+
///
72+
/// If the operation encounters an "end of file" before completely
73+
/// filling the buffer, it returns an error of [`crate::Error`].
74+
async fn read_exact_at<B: IoBufMut>(&mut self, mut buf: B, pos: u64) -> (Result<(), Error>, B) {
75+
let buf_len = buf.bytes_init() as i32;
76+
let options = FileSystemReadWriteOptions::new();
77+
options.set_at(pos as f64);
78+
79+
let access_handle = self.access_handle().await;
80+
let size = access_handle
81+
.get_size()
82+
.expect("InvalidStateError: file is already closed.");
83+
if (size.round() as u64) < pos + buf_len as u64 {
84+
return (
85+
Err(Error::Io(io::Error::new(
86+
io::ErrorKind::UnexpectedEof,
87+
"Read unexpected eof",
88+
))),
89+
buf,
90+
);
91+
}
92+
match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
93+
Ok(_) => (Ok(()), buf),
94+
Err(err) => (Err(wasm_err(err)), buf),
95+
}
96+
}
97+
98+
/// Reads all bytes until EOF in this source, placing them into `buf`.
99+
///
100+
/// # Errors
101+
///
102+
/// If an error is encountered then the `read_to_end_at` operation
103+
/// immediately completes.
104+
async fn read_to_end_at(&mut self, mut buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
105+
let options = FileSystemReadWriteOptions::new();
106+
options.set_at(pos as f64);
107+
108+
let access_handle = self.access_handle().await;
109+
let size = access_handle
110+
.get_size()
111+
.expect("InvalidStateError: file is already closed.");
112+
let buf_len = size.round() as usize - pos as usize;
113+
if buf_len == 0 {
114+
return (
115+
Err(Error::Io(io::Error::new(
116+
io::ErrorKind::UnexpectedEof,
117+
"Read unexpected eof",
118+
))),
119+
buf,
120+
);
121+
}
122+
buf.resize(buf_len, 0);
123+
124+
match access_handle.read_with_u8_array_and_options(buf.as_slice_mut(), &options) {
125+
Ok(_) => (Ok(()), buf),
126+
Err(err) => (Err(wasm_err(err)), buf),
127+
}
128+
}
129+
130+
/// Return the size of file in bytes.
131+
async fn size(&self) -> Result<u64, Error> {
132+
match self.access_handle.as_ref() {
133+
Some(access_handle) => access_handle
134+
.get_size()
135+
.map(|sz| sz.round() as u64)
136+
.map_err(wasm_err),
137+
None => {
138+
// FIXME: here should throw an error
139+
let js_promise = self.file_handle.create_sync_access_handle();
140+
let access_handle = promise::<FileSystemSyncAccessHandle>(js_promise)
141+
.await
142+
.unwrap();
143+
let result = access_handle
144+
.get_size()
145+
.map(|sz| sz.round() as u64)
146+
.map_err(wasm_err);
147+
148+
access_handle.close();
149+
150+
result
151+
}
152+
}
153+
}
154+
}
155+
156+
impl Drop for OPFSSyncFile {
157+
fn drop(&mut self) {
158+
if let Some(access_handle) = self.access_handle.take() {
159+
access_handle.close();
160+
}
161+
}
162+
}

0 commit comments

Comments
 (0)