Skip to content

Commit

Permalink
Merge pull request #3939 from tiif/blockeventfd
Browse files Browse the repository at this point in the history
Implement blocking eventfd
  • Loading branch information
oli-obk authored Nov 13, 2024
2 parents 1576f64 + eabee96 commit 9d9da34
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 109 deletions.
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ pub enum BlockReason {
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
/// Blocked on eventfd.
Eventfd,
}

/// The state of a thread.
Expand Down
201 changes: 153 additions & 48 deletions src/shims/unix/linux/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::io::ErrorKind;

use crate::concurrency::VClock;
use crate::shims::unix::fd::FileDescriptionRef;
use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -26,6 +26,10 @@ struct Event {
counter: Cell<u64>,
is_nonblock: bool,
clock: RefCell<VClock>,
/// A list of thread ids blocked on eventfd::read.
blocked_read_tid: RefCell<Vec<ThreadId>>,
/// A list of thread ids blocked on eventfd::write.
blocked_write_tid: RefCell<Vec<ThreadId>>,
}

impl FileDescription for Event {
Expand Down Expand Up @@ -72,31 +76,8 @@ impl FileDescription for Event {
// eventfd read at the size of u64.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);

// Block when counter == 0.
let counter = self.counter.get();
if counter == 0 {
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

throw_unsup_format!("eventfd: blocking is unsupported");
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&self.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;
self.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Tell userspace how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)?;
}

interp_ok(())
let weak_eventfd = self_ref.downgrade();
eventfd_read(buf_place, dest, weak_eventfd, ecx)
}

/// A write call adds the 8-byte integer value supplied in
Expand Down Expand Up @@ -127,7 +108,7 @@ impl FileDescription for Event {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

// Read the user supplied value from the pointer.
// Read the user-supplied value from the pointer.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
let num = ecx.read_scalar(&buf_place)?.to_u64()?;

Expand All @@ -137,27 +118,8 @@ impl FileDescription for Event {
}
// If the addition does not let the counter to exceed the maximum value, update the counter.
// Else, block.
match self.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
self.clock.borrow_mut().join(clock);
});
self.counter.set(new_count);
}
None | Some(u64::MAX) =>
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
throw_unsup_format!("eventfd: blocking is unsupported");
},
};
// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Return how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
let weak_eventfd = self_ref.downgrade();
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
}
}

Expand Down Expand Up @@ -217,8 +179,151 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
counter: Cell::new(val.into()),
is_nonblock,
clock: RefCell::new(VClock::default()),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
});

interp_ok(Scalar::from_i32(fd_value))
}
}

/// Block thread if the value addition will exceed u64::MAX -1,
/// else just add the user-supplied value to current counter.
fn eventfd_write<'tcx>(
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

match eventfd.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
eventfd.clock.borrow_mut().join(clock);
});

// When this function is called, the addition is guaranteed to not exceed u64::MAX - 1.
eventfd.counter.set(new_count);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock *all* threads previously blocked on `read`.
// We need to take out the blocked thread ids and unblock them together,
// because `unblock_threads` may block them again and end up re-adding the
// thread to the blocked list.
let waiting_threads = std::mem::take(&mut *eventfd.blocked_read_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}

// Return how many bytes we wrote.
return ecx.write_int(buf_place.layout.size.bytes(), dest);
}
None | Some(u64::MAX) => {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

let dest = dest.clone();

eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
}
),
);
}
};
interp_ok(())
}

/// Block thread if the current counter is 0,
/// else just return the current counter value to the caller and set the counter to 0.
fn eventfd_read<'tcx>(
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Block when counter == 0.
let counter = eventfd.counter.replace(0);

if counter == 0 {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
let dest = dest.clone();

eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
eventfd_read(buf_place, &dest, weak_eventfd, this)
}
),
);
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&eventfd.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;

// When any of the events happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock *all* threads previously blocked on `write`.
// We need to take out the blocked thread ids and unblock them together,
// because `unblock_threads` may block them again and end up re-adding the
// thread to the blocked list.
let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}

// Tell userspace how many bytes we read.
return ecx.write_int(buf_place.layout.size.bytes(), dest);
}
interp_ok(())
}
65 changes: 65 additions & 0 deletions tests/fail-dep/libc/eventfd_block_read_twice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//@only-target: linux
//~^ERROR: deadlocked
//~^^ERROR: deadlocked
//@compile-flags: -Zmiri-preemption-rate=0
//@error-in-other-file: deadlock

use std::thread;

// Test the behaviour of a thread being blocked on an eventfd read, get unblocked, and then
// get blocked again.

// The expected execution is
// 1. Thread 1 blocks.
// 2. Thread 2 blocks.
// 3. Thread 3 unblocks both thread 1 and thread 2.
// 4. Thread 1 reads.
// 5. Thread 2's `read` deadlocked.

fn main() {
// eventfd write will block when EFD_NONBLOCK flag is clear
// and the addition caused counter to exceed u64::MAX - 1.
let flags = libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };

let thread1 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread2 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially, then get unblocked by thread3, then get blocked again
// because the `read` in thread1 executes first and set the counter to 0 again.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
//~^ERROR: deadlocked
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread3 = thread::spawn(move || {
thread::park();
let sized_8_data = 1_u64.to_ne_bytes();
// Write 1 to the counter, so both thread1 and thread2 will unblock.
let res: i64 = unsafe {
libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap()
};
// Make sure that write is successful.
assert_eq!(res, 8);
});

thread1.thread().unpark();
thread2.thread().unpark();
thread3.thread().unpark();

thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
41 changes: 41 additions & 0 deletions tests/fail-dep/libc/eventfd_block_read_twice.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
error: deadlock: the evaluated program deadlocked
--> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
|
LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) };
| ^ the evaluated program deadlocked
|
= note: BACKTRACE:
= note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
= note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
= note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
note: inside `main`
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
|
LL | thread2.join().unwrap();
| ^^^^^^^^^^^^^^

error: deadlock: the evaluated program deadlocked
|
= note: the evaluated program deadlocked
= note: (no span available)
= note: BACKTRACE on thread `unnamed-ID`:

error: deadlock: the evaluated program deadlocked
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
|
LL | let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
| ^ the evaluated program deadlocked
|
= note: BACKTRACE on thread `unnamed-ID`:
= note: inside closure at tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC

error: deadlock: the evaluated program deadlocked
|
= note: the evaluated program deadlocked
= note: (no span available)
= note: BACKTRACE on thread `unnamed-ID`:

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 4 previous errors

Loading

0 comments on commit 9d9da34

Please sign in to comment.