-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Manual scheduling #227
Comments
Also: let's use |
If the keys are integers (like in our case), "insert" and "decrease key" can be implemented in O(1). |
My main focus right now is this issue. After some sketching (which I will push to a branch as soon as I finish traveling) I think the infrastructure should emerge as an independent “future” backend separate from “future_lapply”. The new scheduler will turn out great in the end, but until we test it over several months, I do not think it should replace any of the existing backends. I think this will be okay because there will not be very much volume of code to add, and I will make sure it is merge-safe. |
Key data structures I am planning:
|
I was wrong about this comment. I forgot that we cannot process the imports in embarrassingly parallel fashion. We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them. For this, we should stick to something low-overhead. The current |
Can you give an example where the order is important in |
library(drake)
f1 <- function(x){
f2(x)
}
f2 <- function(x){
f3(x)
}
f3 <- function(x){
f4(x)
}
f4 <- function(x){
x
}
plan <- drake_plan(a = f1(1))
config <- drake_config(plan)
vis_drake_graph(config) make(plan)
## target a
outdated(config)
## character(0)
f4 <- function(x){
x + 1
}
outdated(config)
## [1] "a" Some terms:
|
For completeness: |
It just occurred to me: just as a first step, what if we used GNU Make to schedule futures? We just need to change the
This way, we could debug and test the The advantage of Make as a scheduler for futures is that parallel workers could simultaneously write to the cache even when remote jobs have no access. |
To clarify that last point: requiring the master process to do all the caching is a potential bottleneck. For large computations with small data, this isn't so bad. But a lot of big data work involves small computations on large data, so I think it is worth the time to make this alternative available. |
We could also check if or how GNU Make builds a priority queue. |
FYI: to do this correctly, I think there is more I need to learn about |
Splitting up this issue further would help, I like that idea. Let's focus on the |
Sounds great. Let's discuss the |
I think I have a promising start on this issue here (
The rest of the backends are unaffected. We'll see how all the backends compete against each other. Eventually, it may come time to deprecate and remove backends. One note on monitoring |
Looks more promising: https://github.com/dirmeier/datastructures/blob/master/vignettes/datastructures.Rmd |
I am noticing that |
Update: I have been working hard on the
I implemented the
I will submit a PR early next week. At that point, I hope we can review the code together. |
Note for reference: we might be able to get non-staged |
Initial work on a manual scheduler (#227)
Looking back at the summary of our discussion from the RStudio conference:
As I understand it, the purpose of watching the
Unfortunately does not work for multicore futures at the moment: futureverse/future#199. But it does work for multisession parallelism. Just implemented it in #275.
As I see it, that part hinges on @dirmeier's response to dirmeier/datastructures#4 (comment). If we can use his decrease-key, I will upgrade |
Yes, watching the storr seems necessary only for FWIW, I'm still using staged parallelism in my current experiments because of the overhead associated with futures. I like your idea of a pure callr worker, because even future.callr has some overhead that mainly comes from finding the environment in which the globals live (according to a very coarse profiling). Ideally, we'd pass commands to persistent workers that would run them and communicate back, but that feels like a long way to go. |
Yeah, if we keep persistent workers, it seems like we either need to have some kind of message passing or make all the workers fight over the next available target. I don't think If we had a pure I do not know how to interact with a |
Message passing via file I/O doesn't necessarily mean concurrency if each worker is limited to a namespace (directory) in the file system. But it seems safer to abstract away the message passing part. I can take care of implementing the callr backend. Would that backend bundle several targets into one callr process? |
I would love it if you wrote a At the conference we talked about allowing multiple targets per worker, but I think we decided to table that for now. I do not think |
Once we have a
|
We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development. |
Agreed. I was thinking callr might be lower-overhead than future. |
I think it's a good time to close this issue: we already have a good start on the manual scheduler, and further work is starting to split up into a bunch of smaller issues. |
Do you want to add these issues now, or maybe as comments to #278? |
I think continuing to plan from this thread here is fine. I think the priority queue still needs to wait for dirmeier/datastructures#4, and transitioning imports from |
Agreed: library(callr)
library(parallel)
library(future.callr)
#> Loading required package: future
#>
#> Attaching package: 'future'
#> The following object is masked from 'package:callr':
#>
#> run
library(microbenchmark)
w <- microbenchmark(mclapply(1:2, function(x) {
print("done")
}), times = 1000)
x <- microbenchmark(mclapply(1:2, function(x) {
print("done")
}, mc.cores = 2), times = 1000)
y <- microbenchmark(r(function() {
print("done")
}), times = 1000)
plan(future.callr::callr)
z <- microbenchmark(future(print("done")), times = 1000)
out <- rbind(w, x, y, z)
out
#> Unit: milliseconds
#> expr min
#> mclapply(1:2, function(x) { print("done") }) 2.512824
#> mclapply(1:2, function(x) { print("done") }, mc.cores = 2) 2.619827
#> r(function() { print("done") }) 101.140287
#> future(print("done")) 7.744327
#> lq mean median uq max neval
#> 2.979621 3.490104 3.094162 3.680197 11.02008 1000
#> 3.049609 3.403416 3.120213 3.509649 9.79317 1000
#> 107.533589 108.844388 108.231943 109.175948 183.09687 1000
#> 208.584417 209.172502 208.961368 209.789184 257.03640 1000 |
parLapply()
andmclapply()
resolved(f)
resolved(f)
isTRUE
,value(f)
throws an errorNomenclature
The text was updated successfully, but these errors were encountered: