Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Evaluate results in master node #447

Closed
dipterix opened this issue Nov 26, 2020 · 8 comments
Closed

Feature request: Evaluate results in master node #447

dipterix opened this issue Nov 26, 2020 · 8 comments

Comments

@dipterix
Copy link

Hi, @HenrikBengtsson thanks for developing this amazing package. I've been using it for years.

I'm struggling with memory issue right now: when data object is too large, then future raise errors. I know this should be avoided, but is it possible to allow some large data objects with different behaviors?

For example

x <- rnorm(1e10) # Very large data
future({
  sum_x <- sum(x)
  sum_x + 1
})

x is too large to be feed into future. Is it possible to have some functions such as run_in_master() that allow the expressions to be evaluated in master node?

x <- rnorm(1e10) # Very large data
future({
  run_in_master({
    sum_x <- sum(x)
  })
  sum_x + 1
})

globals will ignore serializing x if it's too large and not used elsewhere (inside of future but outside of run_in_master). Instead, sum_x <- sum(x) will be evaluated in master node during the runtime.

@dipterix
Copy link
Author

dipterix commented Nov 26, 2020

A far-from-complete demo using later and rlang packages. Please paste into R and run.

future_altered returns both master and slave PID in a single future call.

library(future)
future_altered <- function(expr, envir = parent.frame(), ..., substitute = TRUE, globals = TRUE, tmpfile = tempfile()){
  
  status_file <- sprintf('%s.status', tmpfile)
  result_file <- sprintf('%s.result', tmpfile)
  
  
  # Current status: code is running in the slave node
  saveRDS(1, status_file)
  
  if(substitute){
    expr <- substitute(expr)
  }
  
  listener <- function(delay = 0.1){
    status_code <- readRDS(status_file)
    switch(
      as.character(status_code),
      '0' = {
        cat("listener stopped\n")
        return()
      }, 
      '2' = {
        cat("listener captured a task\n")
        expr <- readRDS(tmpfile)
        env <- new.env(parent = envir)
        tryCatch({
          eval(expr, envir = env)
          saveRDS(as.list(env), result_file)
          saveRDS(3, status_file)
        }, error = function(e){
          saveRDS(e, result_file)
          saveRDS(3, status_file)
        })
        
        later::later(listener, delay = delay)
      }, {
        cat("listener captured code:", status_code, "\n")
        later::later(listener, delay = delay)
      }
    )
  }
  
  listener()
  
  future(rlang::quo_squash(rlang::quo({
    run_in_master <- function(expr, env = parent.frame()){
      force(env)
      expr <- substitute(expr)
      saveRDS(expr, file = !!tmpfile)
      saveRDS(2, !!status_file)
      while(readRDS(!!status_file) != 3) {
        Sys.sleep(0.2)
      }
      # read results
      tryCatch({
        res <- readRDS(!!result_file)
        if(inherits(res, 'error')){
          stop(res)
        }
        list2env(res, env)
      }, finally = {
        saveRDS(1, !!status_file)
      })
      return()
    }
    tryCatch({
      !!expr
    }, finally = {
      saveRDS(0, !!status_file)
    })
  })), ..., globals = globals, substitute = FALSE, envir = envir)
}


# future::plan('multisession', workers=2)
# tmpfile = tempfile()
f <- future_altered({
  run_in_master({
    master_pid <- Sys.getpid()
  })
  slave_pid <- Sys.getpid()
  c(master_pid, slave_pid)
}, globals = FALSE, tmpfile=tmpfile)


value(f)

@HenrikBengtsson
Copy link
Collaborator

Just for my clarification, what is "master" here? Do you mean the parent R process that spawns of a future? Are you asking for:

x <- rnorm(1e10) # Very large data
future({
  run_in_master({
    sum_x <- sum(x)
  })
  sum_x + 1
})

to behave as:

x <- rnorm(1e10) # Very large data
sum_x <- sum(x)
future({
  sum_x + 1
})

?

Knowing this helps me to reply.

@dipterix
Copy link
Author

Master node is the main process that user is running. More precisely, it's

x <- rnorm(1e10) # Very large data
future({
  a <- 2
  run_in_master({
    sum_x <- sum(x) + a
  })
  sum_x + 1
})

to behave as:

x <- rnorm(1e10) # Very large data
a <- {get from future...}
sum_x <- sum(x) + a
future({
  a <- 2
  sum_x + 1
})

@HenrikBengtsson
Copy link
Collaborator

Ok, that's probably not possible but I'll keep it in mind as a teaser going forward.

FWIW, what is doable is the simpler version:

future({
  outside_of_future({
    sum_x <- sum(x)
  })

  f(sum_x)
})

Although not immediately useful at the low-level Future API, I've got plans for this at high-level map-reduce APIs, e.g.

x <- a_huge_matrix()
future.apply::future_lapply(seq_len(nrow(x)), function(rows) {
  outside_of_future({
    x_subset <- x[rows, ]
  })

  f(x_subset)
})

This would subset x outside of each future and thereby avoiding copying all of x to each worker.

@dipterix
Copy link
Author

I see. Could you expose function tweakExpression? I think this is a very useful function if I someone wants to pre/post-process globals.

Thanks :)

@dipterix
Copy link
Author

Hi @HenrikBengtsson, sorry to fire lots of messages to you today.

I plan to create a new package to implement this feature. However, my current implementation encounters a problem.

For one future instance, because the main session is not blocked, there is no problem (demo). However, if I schedule 4 cores and run 10 futures at the same time (demo). The main session is blocked waiting for previous future objects to finish.

My questions are:

I guess there must be some await function. Is there any way to override default behavior of await?

Also, if I want to extend your package and create new classes of Future, do you have any recommendations on which functions I should implement?

@dipterix
Copy link
Author

For those who found this issue, please have a look at https://github.com/dipterix/futurenow

@HenrikBengtsson
Copy link
Collaborator

Without having dived into what you're really trying to achieve, here's a few quick comments:

  1. Some futures are resolved on workers that are persistent (e.g. PSOCK workers) but that cannot be assumed for all backends
  2. Note that futures don't have a memory of the past/previous futures, i.e. when a future is resolved it's gone from the worker
  3. Futures may be resolved on a machine running in the cloud, on the other side of earth, ..., i.e. we cannot make assumptions about the future have access to the local file system. There are thoughts on being able to declare required or options "resources", e.g. resources = "localhost" (cf. DESIGN: Future API - Minimal/Core/Essential API and Extended/Optional API #172)
  4. Not sure if it applies but for some backends you can pass information back to the parent process in a near-live fashion by signal immediateCondition:s - that is currently used to signal progress updates (progressr package).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants