Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual scheduling #227

Closed
krlmlr opened this issue Feb 4, 2018 · 32 comments
Closed

Manual scheduling #227

krlmlr opened this issue Feb 4, 2018 · 32 comments

Comments

@krlmlr
Copy link
Collaborator

krlmlr commented Feb 4, 2018

  • use future for all parallelism
    • get rid of parLapply() and mclapply()
    • parallelism argument is kept, forwards to the corresponding future implementation with a warning
  • for now, only one job per future, we provide persistence later by running multiple targets in the same future
  • communicating changes to the storr:
    1. job writes directly to the storr
    2. job returns result through the future, master stores result into the storr
  • monitoring progress:
    • watch the storr and
    • check resolved(f)
  • failing workers
    • resolved(f) is TRUE, value(f) throws an error
  • scheduling with a priority queue ("decrease key" operation? datawookie/liqueueR#2)
    • keys in the pq: number of unresolved upstream targets, need "decrease key" operation
    • as soon as a job finishes, "decrease key" is called for all of its downstream targets
    • a job can be run if key in pq is zero
    • job priorities help resolve ties if pq contains multiple jobs with key 0
      • one option is to use the position in the data frame as a proxy for priority if we don't have explicit priorities

Nomenclature

  • target: the object to compute
  • command/job: the R code that computes a target
  • future: the vehicle that computes one or more commands/jobs on a worker
  • worker: the compute unit that computes futures
@wlandau
Copy link
Member

wlandau commented Feb 4, 2018

Also: let's use lightly_parallelize() as the engine behind make_imports(). For imports, the order of processing does not matter.

@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 4, 2018

If the keys are integers (like in our case), "insert" and "decrease key" can be implemented in O(1).

@wlandau
Copy link
Member

wlandau commented Feb 4, 2018

My main focus right now is this issue. After some sketching (which I will push to a branch as soon as I finish traveling) I think the infrastructure should emerge as an independent “future” backend separate from “future_lapply”. The new scheduler will turn out great in the end, but until we test it over several months, I do not think it should replace any of the existing backends. I think this will be okay because there will not be very much volume of code to add, and I will make sure it is merge-safe.

@wlandau-lilly
Copy link
Collaborator

wlandau-lilly commented Feb 4, 2018

Key data structures I am planning:

  • R6_worker: a persistent R6 object that can iterate over multiple transitory futures and targets, etc.
  • R6_queue: a trivial queue for targets that we will later turn into a priority queue.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

I was wrong about this comment. I forgot that we cannot process the imports in embarrassingly parallel fashion. We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them. For this, we should stick to something low-overhead. The current mclapply-powered staged parallelism has worked well for my own projects, and the staging should not cause a bottleneck here.

wlandau-lilly added a commit that referenced this issue Feb 5, 2018
@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 5, 2018

We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them.

Can you give an example where the order is important in make_imports()?

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

library(drake)

f1 <- function(x){
  f2(x)
}

f2 <- function(x){
  f3(x)
}

f3 <- function(x){
  f4(x)
}

f4 <- function(x){
  x
}

plan <- drake_plan(a = f1(1))
config <- drake_config(plan)
vis_drake_graph(config)

fig pdf

make(plan)

## target a

outdated(config)

## character(0)

f4 <- function(x){
  x + 1
}

outdated(config)

## [1] "a"

Some terms:

  • kernel: the reproducibly-tracked, fingerprinted representation of a target or import.
  • dependency hash: an overarching hash of all the hashes of the kernels of an item's dependencies.

Drake accounts for nesting because the kernel of a function includes its own dependency hash.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

For completeness: f1() cannot be processed until f2() through f4() are processed so drake can tell when to update a.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

It just occurred to me: just as a first step, what if we used GNU Make to schedule futures? We just need to change the parallelism = "Makefile" backend:

  • Deploy and monitor a single future from the mk() function. Makefile rules are usually of the form Rscript -e drake::mk(...), and there is some customizability.
  • add future::plan("next") to the drake::config() object so the separate R sessions see it. We could eventually extend this to solve Use different computing resources for different targets. #169, but it does not have to happen right away.

This way, we could debug and test the R6_worker class for monitoring individual futures without having to worry about the rest of the manual scheduler. We could also benchmark drake's upcoming manual scheduler against Make and try to outperform it.

The advantage of Make as a scheduler for futures is that parallel workers could simultaneously write to the cache even when remote jobs have no access.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

To clarify that last point: requiring the master process to do all the caching is a potential bottleneck. For large computations with small data, this isn't so bad. But a lot of big data work involves small computations on large data, so I think it is worth the time to make this alternative available.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

We could also check if or how GNU Make builds a priority queue.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

FYI: to do this correctly, I think there is more I need to learn about future. It will take additional time.

@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 5, 2018

Splitting up this issue further would help, I like that idea. Let's focus on the Makefile backend first.

@wlandau
Copy link
Member

wlandau commented Feb 5, 2018

Sounds great. Let's discuss the future-powered Makefile backend in #237.

@wlandau
Copy link
Member

wlandau commented Feb 10, 2018

I think I have a promising start on this issue here (i227-attempt2 branch). It still needs a ton of debugging and testing, but it's a start. There are two experimental future-based backends in that branch, each of which deploys one future per target.

  • "future_commands" - Do the caching on the master process. The future should not need to access the master process's file system except to read/write user-defined input/output files.
  • "future_total" - Tell each future to cache its own target.

The rest of the backends are unaffected. We'll see how all the backends compete against each other. Eventually, it may come time to deprecate and remove backends.

One note on monitoring futures in case they crash: I personally think this should be the responsibility of the future package. I have added a comment to futureverse/future#172.

@wlandau
Copy link
Member

wlandau commented Feb 13, 2018

@wlandau
Copy link
Member

wlandau commented Feb 13, 2018

I am noticing that future introduces some additional overhead for each target. Maybe with additional callr or processx workers based on the scheduler, we could get rid of the *lapply backends completely. The Makefile backend is still its own special case, unfortunately.

@wlandau
Copy link
Member

wlandau commented Feb 14, 2018

Update: I have been working hard on the i227-attempt2 branch, and I am almost ready to involve more people in alpha testing. I am starting to test it on some of my own old work projects, and things appear to be going rather well so far. Main changes:

I implemented the "future"-based scheduler as a separate parallel backend because:

  1. I don't want to risk damaging the other backends.
  2. I want to compare it head-to-head with the staged parallelism backends we already have.
  3. I want to be able merge it into master for easier alpha testing.

I will submit a PR early next week. At that point, I hope we can review the code together.

@wlandau
Copy link
Member

wlandau commented Feb 14, 2018

Note for reference: we might be able to get non-staged mclapply-like functionality by combining mcfork() with the manual scheduler. This might remove enough overhead to make it practical to process imports this way.

This was referenced Feb 15, 2018
wlandau pushed a commit that referenced this issue Feb 18, 2018
This commit documents the experimental "future"
backend and "caching" argument to `make()`.
These features need time to play out in real projects,
but they are ready for code review and hopefully alpha testing.
wlandau pushed a commit that referenced this issue Feb 20, 2018
Initial work on a manual scheduler (#227)
@wlandau
Copy link
Member

wlandau commented Feb 22, 2018

Looking back at the summary of our discussion from the RStudio conference:

monitoring progress:

  • watch the storr and
  • check resolved(f)

As I understand it, the purpose of watching the storr is to see if a worker crashed even if resolved(f) is TRUE. Is that right? If so, using the cache would only work for caching = "worker", so I hesitate to implement it right now. For caching = "master", we should probably not assume that workers have cache access.

failing workers

  • resolved(f) is TRUE, value(f) throws an error

Unfortunately does not work for multicore futures at the moment: futureverse/future#199. But it does work for multisession parallelism. Just implemented it in #275.

scheduling with a priority queue ...

As I see it, that part hinges on @dirmeier's response to dirmeier/datastructures#4 (comment). If we can use his decrease-key, I will upgrade drake's current priority queue. If not, we might just leave the existing queue be for now. After either eventuality, I will close this issue, and we can work on specific scheduling-related problems in more specific issues.

@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 22, 2018

Yes, watching the storr seems necessary only for caching = "worker". Checking resolved(f) may be expensive on some architectures.

FWIW, I'm still using staged parallelism in my current experiments because of the overhead associated with futures. I like your idea of a pure callr worker, because even future.callr has some overhead that mainly comes from finding the environment in which the globals live (according to a very coarse profiling). Ideally, we'd pass commands to persistent workers that would run them and communicate back, but that feels like a long way to go.

@wlandau
Copy link
Member

wlandau commented Feb 23, 2018

Yeah, if we keep persistent workers, it seems like we either need to have some kind of message passing or make all the workers fight over the next available target. I don't think drake is ready for either yet.

If we had a pure callr worker to process the imports (an optionally targets) we could totally get rid of the mclapply and parLapply backends, and we would no longer need to process targets and imports in two separate stages. (I still think we should keep the Makefile backend even though it complicates the code base.) It seems very doable, but I need to get better at callr.

I do not know how to interact with a callr job like a future, but once I learn and we have that part figured out, I think we should define analogous S3 callr_worker.resolved() and callr_worker.value() methods. That way, we could just add on a callr worker and not need many modifications to R/future.R.

@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 23, 2018

Message passing via file I/O doesn't necessarily mean concurrency if each worker is limited to a namespace (directory) in the file system. But it seems safer to abstract away the message passing part.

I can take care of implementing the callr backend. Would that backend bundle several targets into one callr process?

@wlandau
Copy link
Member

wlandau commented Feb 23, 2018

I would love it if you wrote a callr worker, particularly one that behaves like the existing future worker. Maybe in new_worker(), we could optionally launch callr where a future is launched here?

At the conference we talked about allowing multiple targets per worker, but I think we decided to table that for now. I do not think drake is ready for that yet.

@wlandau
Copy link
Member

wlandau commented Feb 23, 2018

Once we have a callr worker, here is what I am thinking for the next steps:

  1. Allow an optional worker column in the workflow plan data frame, where users can choose between "future" and "callr". There is no sense in deploying an HPC job for a knitr report just because targets leading up to it need SLURM.
  2. Allow targets (via future or callr) and imports (via callr) to be scheduled all together instead of in separate stages.
  3. Remove all parallel backends except "future", "callr", and "Makefile". The "mclapply" and "parLapply" backends will redirect to "callr", and the "future_lapply" backend will redirect to "future".

@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 23, 2018

We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.

@krlmlr krlmlr mentioned this issue Feb 23, 2018
@wlandau
Copy link
Member

wlandau commented Feb 23, 2018

Agreed. I was thinking callr might be lower-overhead than future.

@wlandau
Copy link
Member

wlandau commented Feb 24, 2018

I think it's a good time to close this issue: we already have a good start on the manual scheduler, and further work is starting to split up into a bunch of smaller issues.

@wlandau wlandau closed this as completed Feb 24, 2018
@krlmlr
Copy link
Collaborator Author

krlmlr commented Feb 24, 2018

Do you want to add these issues now, or maybe as comments to #278?

@wlandau
Copy link
Member

wlandau commented Feb 24, 2018

I think continuing to plan from this thread here is fine. I think the priority queue still needs to wait for dirmeier/datastructures#4, and transitioning imports from mclapply/parLapply to callr depends on how fast the new backend is in #278. There are also other issues like grouping multiple targets into a given worker, but I hesitate to open issues that drake is not ready for.

@wlandau
Copy link
Member

wlandau commented Feb 27, 2018

We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.

Agreed:

library(callr)
library(parallel)
library(future.callr)
#> Loading required package: future
#> 
#> Attaching package: 'future'
#> The following object is masked from 'package:callr':
#> 
#>     run
library(microbenchmark)

w <- microbenchmark(mclapply(1:2, function(x) {
  print("done")
}), times = 1000)

x <- microbenchmark(mclapply(1:2, function(x) {
  print("done")
}, mc.cores = 2), times = 1000)

y <- microbenchmark(r(function() {
  print("done")
}), times = 1000)

plan(future.callr::callr)

z <- microbenchmark(future(print("done")), times = 1000)

out <- rbind(w, x, y, z)
out
#> Unit: milliseconds
#>                                                            expr        min
#>                mclapply(1:2, function(x) {     print("done") })   2.512824
#>  mclapply(1:2, function(x) {     print("done") }, mc.cores = 2)   2.619827
#>                             r(function() {     print("done") }) 101.140287
#>                                           future(print("done"))   7.744327
#>          lq       mean     median         uq       max neval
#>    2.979621   3.490104   3.094162   3.680197  11.02008  1000
#>    3.049609   3.403416   3.120213   3.509649   9.79317  1000
#>  107.533589 108.844388 108.231943 109.175948 183.09687  1000
#>  208.584417 209.172502 208.961368 209.789184 257.03640  1000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants