Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROBUSTNESS: Protect main-worker communication using suspendInterrupts()? #29

Open
HenrikBengtsson opened this issue Oct 25, 2020 · 14 comments

Comments

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented Oct 25, 2020

Issue

If the user hits Ctrl-C (signals a user interrupt) while the main R session and a worker communicates data, then the communication ends up in an unrecoverable corrupt. The only solution is to restart with a new cluster while waiting for the old cluster node to timeout (30 days?)

Suggestion

In R (> 3.5.0), we have suspendInterrupts(expr) that suspends interrupts while evaluating an expression.

Could we wrap all communication calls, i.e. all serialize()/unserialize() calls in suspendInterrupts()?

There should be no need to do this on workers. Also, this way the worker can be terminated by the operating system or a job scheduler by signaling a nicer interrupt signal.

It should probably also be sufficient to protect interactive R sessions. When running R in batch mode, hitting Ctrl-C often means we want the whole R process to terminate. OTH, with proper interrupt handling (e.g. protecting communication as above and then capture user interrupts outside), our R process could terminate nicely, which here means calling stopCluster() etc.

Actions

Investigate exactly which type of interrupt signals are suspended.

Protect what can be protected in the existing parallelly code.

Document that Ctrl-\ can be used to kill R if above get stuck. (What happens in RStudio?)

@HenrikBengtsson HenrikBengtsson changed the title Use suspendInterrupts() to protect communication? ROBUSTNESS: Protect main-worker communication using suspendInterrupts()? Oct 25, 2020
@HenrikBengtsson
Copy link
Collaborator Author

One observation: the "suspend" part in suspendInterrupts() appear to mean, disable and drop all interrupts until resume. Interrupts are not queued and resignaled afterward. Here is some code I used on R 4.0.3 on Linux to test this:

## Slow will self terminate after 'duration' seconds
slow <- function(duration = 3.0) {
  tmax <- Sys.time() + duration
  kk <- 0
  repeat {
   kk <- kk + 1
   cat(sprintf("\r%d", kk))
   if (Sys.time() > tmax) break
  }
  cat("\n")
}

test <- function(...) {
  conditions <- list()
  res <- withCallingHandlers({
    suspendInterrupts(slow(...))
  }, conditions = function(cond) {
    conditions <<- c(conditions, list(cond))
    utils::str(cond)
    cond
  })
  conditions
}

which is then called as:

> test()
68646^C
> 

@HenrikBengtsson
Copy link
Collaborator Author

Another observation, which I'm pretty sure I've already posted about on R-devel or somewhere: At least on Linux, it is not possible to suspend interrupts during Sys.sleep(), e.g.

> system.time({ withCallingHandlers(suspendInterrupts(Sys.sleep(10.0)), interrupt = identity) })
^C
Timing stopped at: 0 0 1.012

@HenrikBengtsson
Copy link
Collaborator Author

HenrikBengtsson commented Oct 25, 2020

At least on Linux, it looks like both serialize() and unserialize() cannot be interrupted. The following simple, interactive example illustrates this:

write_test <- function(x) {
  file <- tempfile()
  message("File name: ", file)
  if (utils::file_test("-f", file)) file.remove(file)
  con <- gzfile(file, open = "wb")
  on.exit(close(con))
  serialize(x, connection=con)
  file
}

read_test <- function(file) {
  con <- gzfile(file, open = "rb")
  on.exit(close(con))
  unserialize(connection=con)
}

## Test data
set.seed(42)
x <- rnorm(1e7)

serialize() cannot be interrupted

system.time(file <- write_test(x))
File name: /tmp/hb/RtmpwbTfWb/file1c835830a0e9
   user  system elapsed 
  3.232   0.012   3.245 
file.size(file)
## [1] 76783182

and while signaling user interrupts;

system.time(file <- write_test(x))
## ^C^C^C^C^C^C^C^C
## Timing stopped at: 3.252 0.084 5.479
file.size(file)
## [1] 76783182

unserialize() cannot be interrupted

unserialize() is much faster than serialize() so we have to hit Ctrl-C very quickly;

system.time(y <- read_test(file))
## ^C   user  system elapsed 
##   0.445   0.027   0.472

> identical(y,x)
[1] TRUE

This is on Linux. This needs to be verified on other platforms too.

@king-of-poppk
Copy link

king-of-poppk commented Jul 28, 2023

You could try also .tryResumeInterrupt, even though it is meant for internal use...

withCallingHandlers({Sys.sleep(10)}, interrupt = \(cond) {.tryResumeInterrupt()})

This could be used to record that interrupts have been received, while still being able to finish important work.

@king-of-poppk
Copy link

king-of-poppk commented Jul 29, 2023

A better illustration maybe:

> withCallingHandlers({for (i in 1:10) {Sys.sleep(1); print(i)}}, interrupt = \(cond) {print("INTERRUPTED")})
[1] 1
[1] 2
^C[1] "INTERRUPTED"

versus

> withCallingHandlers({for (i in 1:10) {Sys.sleep(1); print(i)}}, interrupt = \(cond) {print("INTERRUPTED"); .tryResumeInterrupt(); print("HU")})
[1] 1
^C[1] "INTERRUPTED"
[1] 2
^C[1] "INTERRUPTED"
[1] 3
^C[1] "INTERRUPTED"
[1] 4
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
[1] 5
[1] 6
^C[1] "INTERRUPTED"
[1] 7
^C[1] "INTERRUPTED"
[1] 8
[1] 9
[1] 10

@king-of-poppk
Copy link

Also, there is some significant overhead, but it really seems to continue from where it left off (somewhere in the middle of a sleep call):

> withCallingHandlers({for (i in 1:3) {x <- Sys.time(); Sys.sleep(3); print(Sys.time() - x)}}, interrupt = \(cond) {print("INTERRUPTED"); .tryResumeInterrupt(); print("UH")})
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
Time difference of 10.33775 secs
Time difference of 3.002125 secs
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
Time difference of 11.1719 secs

@king-of-poppk
Copy link

king-of-poppk commented Jul 29, 2023

Another option would be to have a dedicated channel for cluster clients to request a specific future to be "cancelled"/"unscheduled". It seems to me that perhaps interpreting interrupts to mean "interrupt the currently executing future on that worker" is too vague given "the currently executing future on that worker" is vague. And if you try to distinguish between those and interrupts that mean "interrupt the worker" I think you are in trouble. You can only rely on SIGINT and SIGTERM and I guess that is not enough to squeeze in more than what those are meant to be used for (graceful vs non-graceful kill of the worker?).

I assume you already have some sort of communication thread running on the worker?! So that those messages could be received while the future is executing.

@king-of-poppk
Copy link

Maybe an easier first attempt is to simply drop in the following (or the C equivalent) at the very beginning of the worker:

interruptCount <- 0L
globalCallingHandlers(
  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }
)

and check/reset the count where relevant. Can also have different handling depending on whether job or management code is running. Or leave it to the user to explicitly handle interrupts within a job to abort the job early for example.

@king-of-poppk
Copy link

king-of-poppk commented Jul 31, 2023

I just tried

future::plan(
  future::multisession,
  rscript_startup = "globalCallingHandlers(interrupt = \\(.) .tryResumeInterrupt())"
)

and it does make things more robust, although not perfect.

PS: Upon further inspection, it does not seem to make much of a difference...

@HenrikBengtsson
Copy link
Collaborator Author

HenrikBengtsson commented Jul 31, 2023

Maybe an easier first attempt is to simply drop in the following (or the C equivalent) at the very beginning of the worker:

interruptCount <- 0L
globalCallingHandlers(
  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }
)

and check/reset the count where relevant. Can also have different handling depending on whether job or management code is running. Or leave it to the user to explicitly handle interrupts within a job to abort the job early for example.

Thanks for looking into this and thinking about it. My gut feeling says that one cannot assume a calling handler can handle the case when there is a burst of interrupts coming in. For instance, what happens if there is another interrupt signalled when we're in the middle of the:

  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }

handler? Will that interrupt the handler? AFAIK, suspendInterrupts() should handle such cases.

Note that this issue is specifically about interrupts occurring while the main process and a worker process communicates (and it has nothing to do with interrupting futures). If such a communication is interrupted, further communication attempts will be out of sync and most likely result in rough failures. To handle that gracefully, one can either improve the communication protocol or ignore interrupts. This issue aims for the simpler solution, i.e. the latter.

FWIW, in R devel ("4.4.0"), serialize()/unserialize() can now be interrupted, i.e. they are no longer atomic, cf. #29 (comment).

Also, there is some significant overhead, but it really seems to continue from where it left off (somewhere in the middle of a sleep call)

Note that Sys.sleep() cannot be interrupted, e.g. #29 (comment)

@king-of-poppk
Copy link

king-of-poppk commented Aug 1, 2023

Note that Sys.sleep() cannot be interrupted, e.g. #29 (comment)

OK. I am on MacOS Ventura 13.5 with R4.2 and I witness the following which seems contradictory:

y <- Sys.time()

globalCallingHandlers(
  interrupt = \(.) {
    print("INTERRUPT")
    y <<- Sys.time()
    .tryResumeInterrupt()
  }
)

for (i in 1:10) {
  x <- Sys.time()
  y <- x
  Sys.sleep(3)
  if (y != x) {
    print("Time elapsed since last interrupt:")
    print(Sys.time() - y)
  }
  print("Time elapsed in this loop iteration:")
  print(Sys.time() - x)
}

which logs

[1] "Time elapsed in this loop iteration:"
Time difference of 3.004497 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.0034 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.7835841 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.005537 secs
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 2.884698 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003882 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.01319003 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002887 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003268 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00247 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00239 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003064 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002458 secs

PS: OK, now I understand. You meant you cannot SUSPEND interrupts that occur within Sys.sleep.

@king-of-poppk
Copy link

My gut feeling says that one cannot assume a calling handler can handle the case when there is a burst of interrupts coming in.

Indeed:

globalCallingHandlers(
  interrupt = \(.) {
    print("INTERRUPT")
    Sys.sleep(2)
    print("RESUME")
    .tryResumeInterrupt()
  }
)

Sys.sleep(10)

which logs

[1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
Called from: Sys.sleep(2)

if you ^C too fast.

@king-of-poppk
Copy link

AFAIK, suspendInterrupts() should handle such cases.

Here is what I tried, without success:

future::plan(
  future::multisession,
  rscript_args = c(
    "*",
    "-e",
    shQuote(
      "suspendInterrupts(parallel:::.workRSOCK())",
      type = "sh"
    )
  )
)

and the following, in case there is a Sys.sleep statement somewhere:

future::plan(
  future::multisession,
  rscript_args = c(
    "*",
    "-e",
    shQuote(
      "suspendInterrupts(withCallingHandlers(parallel:::.workRSOCK(), interrupt = \\(.) .tryResumeInterrupt()))",,
      type = "sh"
    )
  )
)

@king-of-poppk
Copy link

For instance, what happens if there is another interrupt signalled when we're in the middle

I understand this can happen in general, however, I was hoping that this would never happen in my use case since I send at most one SIGINT per instantiated future (after it has been instantiated, via killNode(f$workers[[f$node]], signal = tools::SIGINT).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants