-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial work on a manual scheduler (#227) #259
Conversation
There's a problem with file targets: need to make sure they stay up to date.
Creates bugs too
Codecov Report
@@ Coverage Diff @@
## master #259 +/- ##
======================================
Coverage 100% 100%
======================================
Files 64 66 +2
Lines 4682 4871 +189
======================================
+ Hits 4682 4871 +189
Continue to review full report at Codecov.
|
FYI: just wrapped in a solution to #169. Seems to work fine on an old project I keep around for testing purposes. Details are here in the parallelism vignette. cc @kendonB |
Amazing and I look forward to testing this! Just glancing at the vignette:
So good that this functionality is getting in. No more making in multiple stages! Sent with thumbs only. |
Glad to hear your enthusiasm, Kendon. I can think of no better alpha/beta tester than you.
library(future)
> remote <- future::plan(multisession)
> local <- future::plan(multicore)
> remote
sequential:
- args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, label = NULL, ...)
- tweaked: FALSE
- call: plan("default", .init = FALSE)
> local
multisession:
- args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, persistent = FALSE, workers = availableCores(), gc = FALSE, earlySignal = FALSE, label = NULL, ...)
- tweaked: FALSE
- call: future::plan(multisession)
> @HenrikBengtsson, what is the safest way to collect a bunch of heterogeneous custom evaluators in advance? |
A plain R list should be fine; "evaluators" are just plain functions with some extra attributes. |
@HenrikBengtsson that's great to hear. It means #169 should be possible. Is there a better way to generate that list of evaluators? As I show in the above code fragment, it seems like I am either not using |
Without having going into the details of what your doing, I should share there is a long-term(!) plan to support resource specifications when setting up futures. It's on my todo list to write some of these ideas up to get the discussion going and to help others like to to plan ahead. I'm thinking of something like: a <- future({ expr }, resources = c(ram = ">= 5 GB", mount = "/home/alice/data", local = TRUE))
b <- future({ expr }, resources = c(ram = ">= 3 GB")) This would make it possible to control which worker / type of worker each future will be resolved on. It sounds related to what's in this issue (at least from a quick look). To implement this will take a lot of time to get right, but it might help converge thoughts and ideas. I need to find time to fully understand where you're going, but fo you to go forward, the rule of thumb would be to avoid hacking into the internals of futures "too much". If there's a use case that you have that is not currently provided by the Future API, it's not unlikely we can figure a way to add it. Your and others higher-level usage of the Future API helps form its identity. |
Thanks Henrik. Now I understand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Just a few minor nits. I'm really excited to try this!
workers <- initialize_workers(config) | ||
# While any targets are queued or running... | ||
while (work_remains(queue = queue, workers = workers, config = config)){ | ||
for (id in seq_along(workers)){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (id in get_idle_workers(workers)) {
would reduce indent by one, but I'm not sure of the consequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean it should be outside the while(work_remains(...)){
loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant collapsing the for
and the if
. Probably not worth it.
) | ||
} | ||
} | ||
Sys.sleep(1e-9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually sleep? Maybe it's cleaner to offload to a function that waits for a task to finish, some parallel backends may provide blocking methods that don't poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my preliminary debugging/testing, I found it helpful to set the interval to 0.1, and I did notice the sleeping. Definitely in favor of a better alternative, I just do not know what that would be specifically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure we're on the same page: There's polling (which always works), and there's notifications/passive waiting (which needs backend support). To avoid full CPU load when polling, it's advisable to sleep -- not too much, but also not too briefly.
Here, I'd suggest to extract a function that returns as soon as the scheduler needs to do some work. This allows to use passive waiting where available, or polling otherwise.
future::plan("next") | ||
structure( | ||
future::future( | ||
expr = drake_future_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid exporting drake_future_task
, we'd need to sneak in the triple colon here somehow to avoid R CMD check
failures. Not sure if it's worth the effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm not sure it's worth the effort either.
R/future.R
Outdated
} | ||
|
||
all_concluded <- function(workers, config){ | ||
lapply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the rare situations where I'd use a for
loop for early abortion. I haven't found a purrr verb that implements this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And as far as looping is concerned, the number of workers is likely to be small. See c753067.
if (is_idle(worker)){ | ||
NULL | ||
} else { | ||
# It's hard to make this line run in a small test workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this line be hit in a parallel workflow with Sleep(1)
till Sleep(5)
and two processors? Maybe also with Sleep(0.1)
till Sleep(0.5)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would hope so. I have had problems with Sys.sleep()
in the past, but I think those issues are unrelated.
out | ||
} | ||
|
||
decrease_revdep_keys <- function(worker, config, queue){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nomenclature: "rev" vs. "downstream"/"upstream"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose "revdeps" here because drake
talks about "dependencies" a lot in its internals. There is far less "downstream"/"upstream" terminology, and maybe we should make all the terms agree everywhere.
# The real priority queue will be | ||
# https://github.com/dirmeier/datastructures | ||
# once the CRAN version has decrease-key | ||
# (https://github.com/dirmeier/datastructures/issues/4). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add
Remotes:
dirmeier/datastructures
to DESCRIPTION
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it. I haven't had a chance to play with decrease-key in datastructures
, but I will soon.
self$pop(n = 1, what = what) | ||
} | ||
}, | ||
# This is all wrong and inefficient. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's vectorized, so it's not too bad ;-) We'll have to implement efficient name lookups (or a mapping from target names to integers) for full efficiency. I wouldn't worry too much unless it becomes a bottleneck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the encouragement. It was the best I could do for a cheap imitation priority queue given R's suboptimal scalability when it comes to recursion and looping. I will be thinking about this as we move to datastructures
.
R/workplan.R
Outdated
@@ -153,7 +153,7 @@ drake_plan_override <- function(target, field, config){ | |||
if (is.null(in_plan)){ | |||
return(config[[field]]) | |||
} else { | |||
return(in_plan[config$plan$target == target]) | |||
return(in_plan[[which(config$plan$target == target)]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if which()
returns a vector of length != 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that. Fixed in 825f146.
# suitable enough for unit testing, but | ||
# I did artificially stall targets and verified that this line | ||
# is reached in the future::multisession backend as expected. | ||
next # nocov |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to Sys.sleep()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not see a reason, but maybe I am missing something. What would it do there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid full CPU load?
I'm now missing progress output with |
Do you mean metadata returned by |
The green |
Quick comment on the Future API: Adding support for querying the progress of a worker/future, will have to fall under "optional" features since we cannot demand all future backends to support it. Adding support for optional features to the Future API has to be done in a way that will be compatible with whatever possible future backends a random person will invent in the future (pun/sic!) - that is - extending the API has to be done with great care which is why it does not "just" exist (it's easy for some backends). I've started futureverse/future#172 for discussing how to go forward and find minimal common denominators for these type of features. |
Kirill: With Henrik: thanks for clarifying. It sounds like |
Thanks. I'd expect the scheduler (master) to print progress messages. Will file a new issue. |
This PR addresses #227. Here,
make(..., parallelism = "future")
uses a manual scheduler to process the targets (imports are processed using staged parallelism with the default backend). Users may choose betweencaching = "master"
andcaching = "worker"
to select whether caching happens on the master process or each worker individually. (Workers may not have cache access, and non-storr_rds()
caches may not be threadsafe). The queue inqueue.R
is not really a priority, queue, just a cheap imitation and placeholder. When dirmeier/datastructures#4 is fixed, we can move to a much better and more scalable one.This scheduler is a new backend, and none of the other backends are affected. This is a good opportunity to iterate on the new scheduler and make it more efficient as we compare it to the other backends.