-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Evaluate results in master node #447
Comments
A far-from-complete demo using
library(future)
future_altered <- function(expr, envir = parent.frame(), ..., substitute = TRUE, globals = TRUE, tmpfile = tempfile()){
status_file <- sprintf('%s.status', tmpfile)
result_file <- sprintf('%s.result', tmpfile)
# Current status: code is running in the slave node
saveRDS(1, status_file)
if(substitute){
expr <- substitute(expr)
}
listener <- function(delay = 0.1){
status_code <- readRDS(status_file)
switch(
as.character(status_code),
'0' = {
cat("listener stopped\n")
return()
},
'2' = {
cat("listener captured a task\n")
expr <- readRDS(tmpfile)
env <- new.env(parent = envir)
tryCatch({
eval(expr, envir = env)
saveRDS(as.list(env), result_file)
saveRDS(3, status_file)
}, error = function(e){
saveRDS(e, result_file)
saveRDS(3, status_file)
})
later::later(listener, delay = delay)
}, {
cat("listener captured code:", status_code, "\n")
later::later(listener, delay = delay)
}
)
}
listener()
future(rlang::quo_squash(rlang::quo({
run_in_master <- function(expr, env = parent.frame()){
force(env)
expr <- substitute(expr)
saveRDS(expr, file = !!tmpfile)
saveRDS(2, !!status_file)
while(readRDS(!!status_file) != 3) {
Sys.sleep(0.2)
}
# read results
tryCatch({
res <- readRDS(!!result_file)
if(inherits(res, 'error')){
stop(res)
}
list2env(res, env)
}, finally = {
saveRDS(1, !!status_file)
})
return()
}
tryCatch({
!!expr
}, finally = {
saveRDS(0, !!status_file)
})
})), ..., globals = globals, substitute = FALSE, envir = envir)
}
# future::plan('multisession', workers=2)
# tmpfile = tempfile()
f <- future_altered({
run_in_master({
master_pid <- Sys.getpid()
})
slave_pid <- Sys.getpid()
c(master_pid, slave_pid)
}, globals = FALSE, tmpfile=tmpfile)
value(f)
|
Just for my clarification, what is "master" here? Do you mean the parent R process that spawns of a future? Are you asking for: x <- rnorm(1e10) # Very large data
future({
run_in_master({
sum_x <- sum(x)
})
sum_x + 1
}) to behave as: x <- rnorm(1e10) # Very large data
sum_x <- sum(x)
future({
sum_x + 1
}) ? Knowing this helps me to reply. |
x <- rnorm(1e10) # Very large data
future({
a <- 2
run_in_master({
sum_x <- sum(x) + a
})
sum_x + 1
}) to behave as: x <- rnorm(1e10) # Very large data
a <- {get from future...}
sum_x <- sum(x) + a
future({
a <- 2
sum_x + 1
}) |
Ok, that's probably not possible but I'll keep it in mind as a teaser going forward. FWIW, what is doable is the simpler version: future({
outside_of_future({
sum_x <- sum(x)
})
f(sum_x)
}) Although not immediately useful at the low-level Future API, I've got plans for this at high-level map-reduce APIs, e.g. x <- a_huge_matrix()
future.apply::future_lapply(seq_len(nrow(x)), function(rows) {
outside_of_future({
x_subset <- x[rows, ]
})
f(x_subset)
}) This would subset |
I see. Could you expose function Thanks :) |
Hi @HenrikBengtsson, sorry to fire lots of messages to you today. I plan to create a new package to implement this feature. However, my current implementation encounters a problem. For one future instance, because the main session is not blocked, there is no problem (demo). However, if I schedule 4 cores and run 10 futures at the same time (demo). The main session is blocked waiting for previous future objects to finish. My questions are:I guess there must be some Also, if I want to extend your package and create new classes of |
For those who found this issue, please have a look at https://github.com/dipterix/futurenow |
Without having dived into what you're really trying to achieve, here's a few quick comments:
|
Hi, @HenrikBengtsson thanks for developing this amazing package. I've been using it for years.
I'm struggling with memory issue right now: when data object is too large, then future raise errors. I know this should be avoided, but is it possible to allow some large data objects with different behaviors?
For example
x
is too large to be feed intofuture
. Is it possible to have some functions such asrun_in_master()
that allow the expressions to be evaluated in master node?globals
will ignore serializingx
if it's too large and not used elsewhere (inside offuture
but outside ofrun_in_master
). Instead,sum_x <- sum(x)
will be evaluated in master node during the runtime.The text was updated successfully, but these errors were encountered: