-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ROBUSTNESS: Protect main-worker communication using suspendInterrupts()? #29
Comments
One observation: the "suspend" part in ## Slow will self terminate after 'duration' seconds
slow <- function(duration = 3.0) {
tmax <- Sys.time() + duration
kk <- 0
repeat {
kk <- kk + 1
cat(sprintf("\r%d", kk))
if (Sys.time() > tmax) break
}
cat("\n")
}
test <- function(...) {
conditions <- list()
res <- withCallingHandlers({
suspendInterrupts(slow(...))
}, conditions = function(cond) {
conditions <<- c(conditions, list(cond))
utils::str(cond)
cond
})
conditions
} which is then called as:
|
Another observation, which I'm pretty sure I've already posted about on R-devel or somewhere: At least on Linux, it is not possible to suspend interrupts during > system.time({ withCallingHandlers(suspendInterrupts(Sys.sleep(10.0)), interrupt = identity) })
^C
Timing stopped at: 0 0 1.012 |
At least on Linux, it looks like both write_test <- function(x) {
file <- tempfile()
message("File name: ", file)
if (utils::file_test("-f", file)) file.remove(file)
con <- gzfile(file, open = "wb")
on.exit(close(con))
serialize(x, connection=con)
file
}
read_test <- function(file) {
con <- gzfile(file, open = "rb")
on.exit(close(con))
unserialize(connection=con)
}
## Test data
set.seed(42)
x <- rnorm(1e7) serialize() cannot be interruptedsystem.time(file <- write_test(x))
File name: /tmp/hb/RtmpwbTfWb/file1c835830a0e9
user system elapsed
3.232 0.012 3.245
file.size(file)
## [1] 76783182 and while signaling user interrupts; system.time(file <- write_test(x))
## ^C^C^C^C^C^C^C^C
## Timing stopped at: 3.252 0.084 5.479
file.size(file)
## [1] 76783182 unserialize() cannot be interrupted
system.time(y <- read_test(file))
## ^C user system elapsed
## 0.445 0.027 0.472
> identical(y,x)
[1] TRUE This is on Linux. This needs to be verified on other platforms too. |
You could try also
This could be used to record that interrupts have been received, while still being able to finish important work. |
A better illustration maybe:
versus
|
Also, there is some significant overhead, but it really seems to continue from where it left off (somewhere in the middle of a sleep call):
|
Another option would be to have a dedicated channel for cluster clients to request a specific future to be "cancelled"/"unscheduled". It seems to me that perhaps interpreting interrupts to mean "interrupt the currently executing future on that worker" is too vague given "the currently executing future on that worker" is vague. And if you try to distinguish between those and interrupts that mean "interrupt the worker" I think you are in trouble. You can only rely on I assume you already have some sort of communication thread running on the worker?! So that those messages could be received while the future is executing. |
Maybe an easier first attempt is to simply drop in the following (or the C equivalent) at the very beginning of the worker: interruptCount <- 0L
globalCallingHandlers(
interrupt = \(.) {
interruptCount <<- interruptCount + 1L
.tryResumeInterrupt()
}
) and check/reset the count where relevant. Can also have different handling depending on whether job or management code is running. Or leave it to the user to explicitly handle interrupts within a job to abort the job early for example. |
I just tried future::plan(
future::multisession,
rscript_startup = "globalCallingHandlers(interrupt = \\(.) .tryResumeInterrupt())"
) and PS: Upon further inspection, it does not seem to make much of a difference... |
Thanks for looking into this and thinking about it. My gut feeling says that one cannot assume a calling handler can handle the case when there is a burst of interrupts coming in. For instance, what happens if there is another interrupt signalled when we're in the middle of the: interrupt = \(.) {
interruptCount <<- interruptCount + 1L
.tryResumeInterrupt()
} handler? Will that interrupt the handler? AFAIK, Note that this issue is specifically about interrupts occurring while the main process and a worker process communicates (and it has nothing to do with interrupting futures). If such a communication is interrupted, further communication attempts will be out of sync and most likely result in rough failures. To handle that gracefully, one can either improve the communication protocol or ignore interrupts. This issue aims for the simpler solution, i.e. the latter. FWIW, in R devel ("4.4.0"),
Note that |
OK. I am on MacOS Ventura 13.5 with R4.2 and I witness the following which seems contradictory: y <- Sys.time()
globalCallingHandlers(
interrupt = \(.) {
print("INTERRUPT")
y <<- Sys.time()
.tryResumeInterrupt()
}
)
for (i in 1:10) {
x <- Sys.time()
y <- x
Sys.sleep(3)
if (y != x) {
print("Time elapsed since last interrupt:")
print(Sys.time() - y)
}
print("Time elapsed in this loop iteration:")
print(Sys.time() - x)
} which logs [1] "Time elapsed in this loop iteration:"
Time difference of 3.004497 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.0034 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.7835841 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.005537 secs
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 2.884698 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003882 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.01319003 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002887 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003268 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00247 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00239 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003064 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002458 secs PS: OK, now I understand. You meant you cannot SUSPEND interrupts that occur within |
Indeed: globalCallingHandlers(
interrupt = \(.) {
print("INTERRUPT")
Sys.sleep(2)
print("RESUME")
.tryResumeInterrupt()
}
)
Sys.sleep(10) which logs [1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
Called from: Sys.sleep(2) if you |
Here is what I tried, without success: future::plan(
future::multisession,
rscript_args = c(
"*",
"-e",
shQuote(
"suspendInterrupts(parallel:::.workRSOCK())",
type = "sh"
)
)
) and the following, in case there is a future::plan(
future::multisession,
rscript_args = c(
"*",
"-e",
shQuote(
"suspendInterrupts(withCallingHandlers(parallel:::.workRSOCK(), interrupt = \\(.) .tryResumeInterrupt()))",,
type = "sh"
)
)
) |
I understand this can happen in general, however, I was hoping that this would never happen in my use case since I send at most one |
Issue
If the user hits Ctrl-C (signals a user interrupt) while the main R session and a worker communicates data, then the communication ends up in an unrecoverable corrupt. The only solution is to restart with a new cluster while waiting for the old cluster node to timeout (30 days?)
Suggestion
In R (> 3.5.0), we have
suspendInterrupts(expr)
that suspends interrupts while evaluating an expression.Could we wrap all communication calls, i.e. all
serialize()
/unserialize()
calls insuspendInterrupts()
?There should be no need to do this on workers. Also, this way the worker can be terminated by the operating system or a job scheduler by signaling a nicer interrupt signal.
It should probably also be sufficient to protect interactive R sessions. When running R in batch mode, hitting Ctrl-C often means we want the whole R process to terminate. OTH, with proper interrupt handling (e.g. protecting communication as above and then capture user interrupts outside), our R process could terminate nicely, which here means calling
stopCluster()
etc.Actions
Investigate exactly which type of interrupt signals are suspended.
Protect what can be protected in the existing parallelly code.
Document that Ctrl-\ can be used to kill R if above get stuck. (What happens in RStudio?)
The text was updated successfully, but these errors were encountered: