Skip to content

Commit

Permalink
Sketch out a first look at #227
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Feb 5, 2018
1 parent 0d8bca6 commit d403d72
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
50 changes: 50 additions & 0 deletions R/future.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
make_future <- function(config){
config$queue <- R6_queue$new(nodes = toposort_targets(config))
config$workers <- new_future_workers(config = config)
while (length(config$queue)){
config <- monitor_future_workers(config = config)
sleep(1e-9)
}
}

new_future_workers <- function(config){
num_workers <- min(length(config$queue), config$max_hpc_workers)
lightly_parallelize(
X = seq_len(num_workers),
FUN = function(id) worker$new(id = id, config = config),
jobs = config$max_local_workers
)
}

monitor_future_workers <- function(config){
status <- lightly_parallelize(
X = config$workers,
FUN = function(worker){
worker$status()
},
jobs = config$max_local_workers
)
if (any(status == "error")){
report_failed_targets(status, config)
}
is_idle <- status == "idle"
n_deploy <- min(sum(is_idle), config$queue$n())
next_targets <- config$queue$pop(n = n_deploy)
idle_workers <- config$workers[is_idle]
config$workers[is_idle] <- lightly_parallelize(
X = seq_along(next_targets),
FUN = function(index){
idle_workers[[i]]$assign(target = next_targets[[i]])
idle_workers[[i]]$deploy()
},
jobs = config$max_local_workers
)
lightly_parallelize(
X = config$workers,
FUN = function(worker){
worker$finalize()
},
jobs = config$max_local_workers
)
config
}
22 changes: 22 additions & 0 deletions R/queue.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
R6_queue <- R6::R6Class(
"R6_queue",
public = list(
nodes = NULL,
initialize = function(nodes){
self$nodes <- nodes
},
push = function(x){
self$nodes <- c(self$nodes, x)
},
pop = function(n = 1){
n <- min(length(self$nodes), n)
index <- seq_len(n)
out <- self$nodes[index]
self$nodes <- self$nodes[-index]
out
},
n = function(){
length(self$nodes)
}
)
)
79 changes: 79 additions & 0 deletions R/worker.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
R6_worker <- R6::R6Class(
"R6_worker",
public = list(
id = NULL,
target = NULL,
meta = NULL,
config = NULL,
evaluator = NULL,
value = NULL,
future = NULL,
start = NULL
initialize = function(
id, target, config = list(), lazy = FALSE, evaluator = future::plan("next")
){
self$id <- id
self$target <- target
self$config <- config
self$lazy <- lazy
if (!is.NULL(evaluator)){
self$evaluator <- evaluator
}
},
assign = function(target){
self$target <- target
},
deploy = function(){
prune_envir(targets = target, config = config)
self$meta <- drake_meta(target = target, config = config)
should_build <- should_build_target(
target = self$target,
meta = self$meta,
config = self$config
)
self$start <- proc.time()
set_progress(target = target, value = "in progress", config = config)
if (should_build){
self$future <- future::future(
drake_build(target = self$target, config = self$config),
packages = config$packages,
lazy = self$lazy,
evaluator = self$evaluator
)
}
},
set_status = function(status){
self$config$cache$get(
key = self$id,
value = status,
namespace = "workers"
)
},
get_status = function(){
self$config$cache$get(
key = self$id,
namespace = "workers"
)
}
status = function(){
resolved <- future::resolved(self$future)
idle <- self$get_status()
if (resolved && idle){
"done"
} else if (!resolved && !idle){
"running"
} else {
"error"
}
},
finalize = function(){
value <- future::value(self$future)
assign(x = self$target, value = value, envir = self$config$envir)
store_target(
target = target, value = value, meta = meta,
start = start, config = config)
)
self$assign(target = NULL)
}
)
)

0 comments on commit d403d72

Please sign in to comment.