Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve warnings on Linux with latest Rust(1.68.0) and threadpool configuration #72

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
backtrace = "0.3"
chan-signal = "0.2"
clap = "2.29.0"
daemonize = "0.2"
daemonize = "0.5"
env_logger = "0.4"
fd = { git = "https://github.com/stemjail/fd-rs.git", rev = "3bc3e3587f8904cce8bf29163a2021c2f5906557" }
fuse = "0.3.0"
Expand Down
126 changes: 67 additions & 59 deletions src/catfs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Handle {
if !valid && (flags & rlibc::O_TRUNC) == 0 {
debug!("read ahead {:?}", path.as_ref());
handle.has_page_in_thread = true;
let mut h = handle.clone();
let h = handle.clone();
let path = path.as_ref().to_path_buf();
tp.lock().unwrap().execute(move || {
if let Err(e) = h.copy(true, disable_splice) {
Expand All @@ -190,10 +190,6 @@ impl Handle {
debug!("read ahead {:?} canceled", path);
}
}
// the files are always closed in the main IO path, consume
// the fds to prevent closing
h.src_file.into_raw();
h.cache_file.into_raw();
});
}

Expand Down Expand Up @@ -248,20 +244,19 @@ impl Handle {
Err(e) => {
return Err(RError::from(e));
}
Ok(mut cache) => {
let mut src = File::openat(src_dir, path, rlibc::O_RDONLY, 0)?;
Ok(cache) => {
let src = File::openat(src_dir, path, rlibc::O_RDONLY, 0)?;
cache.set_xattr(
"user.catfs.src_chksum",
Handle::src_chksum(&src)?.as_slice(),
)?;
src.close()?;
cache.close()?;
}
}

return Ok(());
}

#[cfg(target_os = "linux")]
pub fn set_pristine(&self, pristine: bool) -> error::Result<()> {
if pristine {
self.cache_file.set_xattr(
Expand All @@ -272,7 +267,26 @@ impl Handle {
} else {
if let Err(e) = self.cache_file.remove_xattr("user.catfs.src_chksum") {
let my_errno = e.raw_os_error().unwrap();
if my_errno != libc::ENODATA && my_errno != libc::ENOATTR {
if my_errno != libc::ENODATA {
return Err(RError::from(e));
}
}
}
return Ok(());
}

#[cfg(not(target_os = "linux"))]
pub fn set_pristine(&self, pristine: bool) -> error::Result<()> {
if pristine {
self.cache_file.set_xattr(
"user.catfs.src_chksum",
Handle::src_chksum(&self.src_file)?
.as_slice(),
)?;
} else {
if let Err(e) = self.cache_file.remove_xattr("user.catfs.src_chksum") {
let my_errno = e.raw_os_error().unwrap();
if my_errno != libc::ENODATA {
return Err(RError::from(e));
}
}
Expand Down Expand Up @@ -308,9 +322,9 @@ impl Handle {
check_only: bool,
) -> error::Result<bool> {
match File::openat(src_dir, path, rlibc::O_RDONLY, 0) {
Ok(mut src_file) => {
Ok(src_file) => {
match File::openat(cache_dir, path, rlibc::O_RDONLY, 0) {
Ok(mut cache_file) => {
Ok(cache_file) => {
let valid: bool;
if cache_valid_if_present || Handle::is_pristine(&src_file, &cache_file)? {
valid = true;
Expand All @@ -321,12 +335,9 @@ impl Handle {
rlibc::unlinkat(cache_dir, path, 0)?;
}
}
src_file.close()?;
cache_file.close()?;
return Ok(valid);
}
Err(e) => {
src_file.close()?;
if error::try_enoent(e)? {
return Ok(false);
}
Expand Down Expand Up @@ -476,7 +487,7 @@ impl Handle {
if let Err(e) = self.src_file.flush() {
error!("!flush(src) = {}", e);
// flush failed, now the fd is invalid, get rid of it
self.src_file.into_raw();
std::mem::drop(&self.src_file);

// we couldn't flush the src_file, because of some
// linux vfs oddity the file would appear to be
Expand Down Expand Up @@ -569,7 +580,7 @@ impl Handle {
path: &dyn AsRef<Path>,
create: bool,
) -> error::Result<()> {
let _ = self.page_in_res.0.lock().unwrap();
let _unused = self.page_in_res.0.lock().unwrap();

let mut buf = [0u8; 0];
let mut flags = rlibc::O_RDWR;
Expand All @@ -589,56 +600,64 @@ impl Handle {
flags |= rlibc::O_CREAT;
}

if let Err(e) = self.src_file.close() {
// normal for this close to fail
if e.raw_os_error().unwrap() != libc::ENOTSUP {
return Err(RError::from(e));
}
}

self.src_file = File::openat(dir, path, flags, mode)?;
return Ok(());
}

fn copy_user(&self, rh: &File, wh: &File) -> error::Result<i64> {
let mut buf = [0u8; 32 * 1024];
let mut offset = 0;
loop {
let nread = rh.read_at(&mut buf, offset)?;
if nread == 0 {
break;
}
wh.write_at(&buf[..nread], offset)?;
offset += nread as i64;
match wh.write_lock() {
Ok(write_fd) => {
loop {
let nread = rh.read_at(&mut buf, offset)?;
if nread == 0 {
break;
}
wh.write_at_nolock(&buf[..nread], offset, *write_fd)?;
offset += nread as i64;

self.notify_offset(Ok(offset), false)?;
self.notify_offset(Ok(offset), false)?;
}
return Ok(offset);
},
Err(_) => return Err(RError::from(io::Error::new(io::ErrorKind::Other, "couldn't get write lock!"))),
}

return Ok(offset);
}

#[cfg(not(target_os = "macos"))]
fn copy_splice(&self, rh: &File, wh: &File) -> error::Result<i64> {
let (pin, pout) = rlibc::pipe()?;
let pin = fd::FileDesc::new(pin, /*close_on_drop=*/ true);
let pout = fd::FileDesc::new(pout, /*close_on_drop=*/ true);

let mut offset = 0;
loop {
let nread = rlibc::splice(rh.as_raw_fd(), offset, pout.as_raw_fd(), -1, 128 * 1024)?;
if nread == 0 {
break;
}

let mut written = 0;
while written < nread {
let nxfer = rlibc::splice(pin.as_raw_fd(), -1, wh.as_raw_fd(), offset, 128 * 1024)?;
match wh.write_lock() {
Ok( write_fd ) => {
match rh.read_lock() {
Ok(read_fd) => {
loop {
let nread = rlibc::splice(*read_fd, offset, pout.as_raw_fd(), -1, 128 * 1024)?;
if nread == 0 {
break;
}

written += nxfer;
offset += nxfer as i64;
let mut written = 0;
while written < nread {
let nxfer = rlibc::splice(pin.as_raw_fd(), -1, *write_fd, offset, 128 * 1024)?;

self.notify_offset(Ok(offset), false)?;
}
written += nxfer;
offset += nxfer as i64;

self.notify_offset(Ok(offset), false)?;
}
}
},
Err(_) => return Err(RError::from(io::Error::new(io::ErrorKind::Other, "couldn't get read lock!"))),
}
},
Err(_) => return Err(RError::from(io::Error::new(io::ErrorKind::Other, "couldn't get write lock!"))),
}

if let Err(e) = rlibc::close(pin.into_raw_fd()) {
Expand Down Expand Up @@ -696,25 +715,14 @@ impl Handle {

impl Drop for Handle {
fn drop(&mut self) {
if self.cache_file.valid() {
if let Err(e) = self.cache_file.close() {
error!("!close(cache) = {}", RError::from(e));
}
}

if self.src_file.valid() {
if let Err(e) = self.src_file.close() {
error!("!close(src) = {}", RError::from(e));
}
}
}
}

impl Clone for Handle {
fn clone(&self) -> Self {
return Handle {
src_file: File::with_fd(self.src_file.as_raw_fd()),
cache_file: File::with_fd(self.cache_file.as_raw_fd()),
src_file: self.src_file.clone(),
cache_file: self.cache_file.clone(),
dirty: self.dirty,
write_through_failed: self.write_through_failed,
has_page_in_thread: false,
Expand Down
2 changes: 2 additions & 0 deletions src/catfs/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct FlagStorage {
pub free_space: DiskSpace,
pub uid: libc::uid_t,
pub gid: libc::gid_t,
pub catfs_threadpool_size: usize,
pub pcatfs_threadpool_size: usize
}

#[cfg(test)]
Expand Down
7 changes: 3 additions & 4 deletions src/catfs/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,13 @@ impl Inode {
}

pub fn truncate(&mut self, size: u64) -> error::Result<()> {
let mut f = File::openat(self.src_dir, &self.path, rlibc::O_WRONLY, 0)?;
let f = File::openat(self.src_dir, &self.path, rlibc::O_WRONLY, 0)?;
f.set_size(size)?;
f.close()?;
std::mem::drop(&f);

match File::openat(self.cache_dir, &self.path, rlibc::O_WRONLY, 0) {
Ok(mut f) => {
Ok(f) => {
f.set_size(size)?;
f.close()?;
}
Err(e) => {
error::try_enoent(e)?;
Expand Down
4 changes: 2 additions & 2 deletions src/catfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn make_self<T>(s: &mut T) -> &'static T {
}

impl CatFS {
pub fn new(from: &dyn AsRef<Path>, to: &dyn AsRef<Path>) -> error::Result<CatFS> {
pub fn new(from: &dyn AsRef<Path>, to: &dyn AsRef<Path>, n_threads : usize) -> error::Result<CatFS> {
let src_dir = rlibc::open(from, rlibc::O_RDONLY, 0)?;
let cache_dir = rlibc::open(to, rlibc::O_RDONLY, 0)?;

Expand All @@ -123,7 +123,7 @@ impl CatFS {
store: Mutex::new(Default::default()),
dh_store: Mutex::new(Default::default()),
fh_store: Mutex::new(Default::default()),
tp: Mutex::new(ThreadPool::new(5)),
tp: Mutex::new(ThreadPool::new(n_threads)),
};

catfs.make_root()?;
Expand Down
Loading