Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed training with Kubernetes #17

Open
jlewi opened this issue Nov 9, 2017 · 28 comments
Open

Distributed training with Kubernetes #17

jlewi opened this issue Nov 9, 2017 · 28 comments

Comments

@jlewi
Copy link
Contributor

jlewi commented Nov 9, 2017

Opening this issue to start a discussion about whether it would be worth investing to make it easy to run tensorflow agents K8s.

For some inspiration you can look at TfJob CRD.

Some questions:

  1. Is there a need to be able to distribute the environments across multiple machines?
  2. What is the communication pattern between the simulations and TensorFlow job?
    * Is data fetched from all simulations simultaneously?
    * Does each simulation need to be individually addressable?
@danijar
Copy link
Contributor

danijar commented Nov 9, 2017

This is an interesting topic. I would imagine that a simple distribution pattern would be to run multiple instances of the current code on multiple machine, each simulating their own set of environments. The network parameters would need to be shared using parameter servers.

@jlewi
Copy link
Contributor Author

jlewi commented Nov 9, 2017

@danijar So basically you want to do data parallel replicated training with TensorFlow
each of which is getting data from a set of environments running colocated with TF.

I think that would work out of the box with the TfJob CRD. The CRD will take care of provisioning a set of workers and parameter servers. The workers can run any code (docker image); so they could use tensorflow/agents to launch a bunch of environments as subprocesses.

You could also move the environments into 1 or more sidecars and communicate via the filesystem or sockets. This might be beneficial if

  • You want better monitoring/isolation of the simulation environments compared to TF workers
  • You have different docker images for your TensorFlow process and environments

Would it be beneficial to run the simulations on different machines from the workers?

It sounds like the pattern is many simulations to 1 worker; i.e. you have N workers and each worker uses B simulations to generate a batch of size B.

If tensorflow/agents adds an RPC mode of communication, then I think K8s should be able to handle distributing both TensorFlow workers and the simulations across multiple machines.

Using StatefulSets you could start N * B simulations; each with a stable DNS name. Each worker could then use B of these simulations.

@danijar
Copy link
Contributor

danijar commented Nov 10, 2017

I think in many scenarios it makes sense to simulate and train on the same machine, and just scale the number of those machines. That's mainly because it seems to reduce communication overhead significantly by only sharing parameters, not all the collected data. I'd be super curious to know if that works out of the box as you mentioned it might!

In some new scenarios we cannot simulate the environments on the same machine, let's say because the data is generated by some specialized service or on a real robot. In that case, we could implement a Gym environment that under the hood communicates with the data service via RPCs. But instead of communicating for every frame, we might want to send a policy to the data collector and get back full trajectories, which would require some more changes.

@jlewi
Copy link
Contributor Author

jlewi commented Nov 10, 2017

I think in many scenarios it makes sense to simulate and train on the same machine, and just scale the number of those machines. That's mainly because it seems to reduce communication overhead significantly by only sharing parameters, not all the collected data. I'd be super curious to know if that works out of the box as you mentioned it might!

So would I. Is there a simple example we could try to parallelize across multiple machines using TfJob?

@danijar
Copy link
Contributor

danijar commented Nov 10, 2017

Maybe something like this: https://gist.github.com/danijar/1dde5eb528b61560734c16d4fd0d93a1

We will probably have to modify agents/scripts/train.py though.

@jlewi
Copy link
Contributor Author

jlewi commented Nov 12, 2017

@cwbeitel any interest in trying to turn @danijar's gist into an example running on K8s?

@cwbeitel
Copy link

Sure!

@cwbeitel
Copy link

Still working on it! Hit some issues with my kube deployment when trying to re-deploy with accelerators. I'll push a version here in a minute to share for discussion.

@cwbeitel
Copy link

So here's the work in progress, let me know if you guys have thoughts about this model for how model code is built into images and run. It doesn't have to be so prescriptive but this is a workable model for now.

@cwbeitel
Copy link

OK so here's a cleaner version of that PR kubeflow/training-operator#159. So next up is extending the example with some form of parallelization initially with the environments still running in the same containers as where replicas of the model are being trained. @danijar @jlewi do you guys have thoughts on including the example in tf/k8s vs tf/agents? I don't want to go out of scope on k8s and this issue was originally posted here.

@danijar danijar changed the title agents on kubernetes? Distributed training with Kubernetes Nov 19, 2017
@danijar
Copy link
Contributor

danijar commented Nov 19, 2017

Cool, I looked at the PR and it looks quite nice. The examples directory of the k8s repo is probably a good place for this code. I'm planning on adding some tutorials for Agents and would be happy to include a page about this and a link to the example.

Did you do a full distributed training run and have some graphs to look at?

@cwbeitel
Copy link

Thank you. Also made the example a bit simpler per @jlewi's suggestion, see cwbeitel/k8s@790dfb1.

That sounds good about the tutorial happy to help user test those.

You should be able to tensorboard the logs from one of the runs (let me know if you can't access it, marked "make public")

tensorboard --logdir gs://dev01-181118-181500-k8s/jobs/tensorflow-20171117102413/20171117T182424-pybullet_ant

Rendering is running so I'll upload a gif to the readme when that's done / I have a chance. Have a suggestion of which graphs to include in the readme or did you just mean for browsing?

Haven't made it distributed yet, that's next up!

@danijar
Copy link
Contributor

danijar commented Nov 19, 2017

Thanks. TensorBoard is getting a 401 Unauthorized return code. Yep, checked the code and saw that it's single-instance training for now. Let me know if you run into problems when working on the distributed version, maybe I can help. For the readme, the graph with the mean score is probably most interesting, or a screenshot of all the scores on TensorBoard.

@cwbeitel
Copy link

Ok gs://agents-k8s-share/jobs/... should work going forward including gs://agents-k8s-share/jobs/tensorflow-20171117102413/20171117T182424-pybullet_ant

Figures:

mean_score

pg1

pg2

Thanks happy to have help. Getting up to speed a bit on this class of algorithm and away this week. Maybe at some point we can write up in pseudocode what exactly we want to parallelize. Here's pseudocode for the KL and clipped versions of PPO:

## PPO-1 (KL)
for iteration do
  Run policy for T timesteps over N trajectories
  Estimate advantage function at all timesteps
  Do SGD on above objective for some number of epochs
  If KL too high, increase beta
  If KL too low, decrease beta
end for
## PPO-2 (Clipped objective)
for iteration do
  Run policy for T timesteps or N trajectories
  Estimate advantage function at all timesteps
  Do SGD on L^CLIP(theta) objective for some number of epochs
end for

It seems like we basically want to do distributed SGD in the region of some policy parameterization where individual agents accumulate their own experience trajectories, perform a bit of private SGD, then all share their results with a central policy vector? Or were you thinking more that there would be a single node for policy optimization but distributed accumulation of experience (i.e. many parallel actors but a single critic?)

@danijar
Copy link
Contributor

danijar commented Nov 21, 2017

I think we will have multiple machine that each run a batch of environments and an agent. The agents just need to synchronize their gradients. Since most of the time is spent collecting data, this will hopefully be only a small overhead.

@jlewi
Copy link
Contributor Author

jlewi commented Nov 21, 2017

@danijar This sounds like standard between-graph replication training. Do we need to do anything special?

@cwbeitel
Copy link

Agree this sounds like a standard instance of between-graph replication and the remaining task is to put the policy parameters on the parameter servers. Just looking for these and understanding the structure/model of agents and the PPO algorithm. If I understand correctly in this example our policy is a parameterization of agents.scripts.networks.feed_forward_gaussian?

@danijar
Copy link
Contributor

danijar commented Nov 22, 2017

Yes, the model parameters are created within the agents.scripts.networks.feed_forward_gaussian. Another variable that me might want to share is the current value of the KL penality and of the normalizers for rewards and observations. I would say let's try with only synchronizing model parameters for now.

@cwbeitel
Copy link

Sounds good!

@cwbeitel
Copy link

cwbeitel commented Dec 8, 2017

So just to update having some issues with variable initialization, current version here.

Currently non-chief nodes hang, repeating:

  INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:
  Variables not initialized: ppo_temporary/episodes/Variable,
  ppo_temporary/episodes/Variable_1, ppo_temporary/episodes/Variable_2,
  ppo_temporary/episodes/Variable_3, ppo_temporary/episodes/Variable_4,
  ppo_temporary/episodes/Variable_5, ppo_temporary/last_action,
  ppo_temporary/last_mean, ppo_temporary/last_logstd, ready: None

Presumably because these variables, which are private to an individual worker, are not initialized by the chief. It seems we should be initializing these variables in the local_init_op in response to the, ready_for_local_init_op, not blocking the local_init_op until these local variables are initialized. And there should be more variables on this list - environment simulation variables should be private to workers as well.

I'm guessing there's something simple to be done to either signal workers to initialize their local variables or mark these variables in a way that the ready_for_local_init_op won't block for them (thinking they're global variables?).

@danijar
Copy link
Contributor

danijar commented Dec 9, 2017

Hmm, I thought that the tf.local_variables_initializer() would do exactly that (utility.py line 114). Maybe these variables are mistakenly placed on the parameter server?

@cwbeitel
Copy link

cwbeitel commented Dec 9, 2017

Yeah me too. They don't appear to be on the param servers.

@cwbeitel
Copy link

cwbeitel commented Dec 9, 2017

I'm wondering if I'm mixing things up a bit here. Probably shouldn't be putting anything on the master and using that only for initialization of variables on the workers and ps's.

@jlewi
Copy link
Contributor Author

jlewi commented Dec 13, 2017

It really depends on how the code is setup. Running ops on the master("chief") is quite common. Typically worker 0 is the chief and is also running computations.

@danijar
Copy link
Contributor

danijar commented Jan 29, 2018

@cwbeitel Are you still working on fixing this?

@cwbeitel
Copy link

Took a break from it (this has been a bit frustrating) but still mean to fix what can be.

As I was explaining in my email, there is currently an error that occurs at the 10min mark when Saver() is called, caused by expecting Session instead of MonitoredTrainingSession:

Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/app/trainer/task.py", line 299, in <module>
    tf.app.run()
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/platform/app.py", line 48, in run
    _sys.exit(main(_sys.argv[:1] + flags_passthrough))
  File "/app/trainer/task.py", line 282, in main
    for score in train(agents_config):
  File "trainer/train.py", line 214, in train
    loop._store_checkpoint(sess, saver, global_step)
  File "/app/agents/agents/tools/loop.py", line 233, in _store_checkpoint
    saver.save(sess, filename, global_step)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/saver.py", line 1565, in save
    raise TypeError("'sess' must be a Session; %s" % sess)
TypeError: 'sess' must be a Session; <tensorflow.python.training.monitored_session.MonitoredSession object at 0x7ff9586de710>

This causes jobs to crash then re-start, logging to the same directory as the previous run, causing the following error:

  INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:
  Variables not initialized: ppo_temporary/episodes/Variable,
  ppo_temporary/episodes/Variable_1, ppo_temporary/episodes/Variable_2,
  ppo_temporary/episodes/Variable_3, ppo_temporary/episodes/Variable_4,
  ppo_temporary/episodes/Variable_5, ppo_temporary/last_action,
  ppo_temporary/last_mean, ppo_temporary/last_logstd, ready: None

One approach here would be to disable Agents' checkpoint writing behavior and use that of MTS in favor of various other benefits of the latter including convenience of initializing ops related to distributed training and a convenient mechanism for introducing SessionRunHook's, e.g. for triggering render jobs.

I believe this issue is distinct from the problem of local variables not being initialized in the context of distributed training which we discussed at length. (which was solved by adding non-shared variables to the local collection)

I also believe this problem is distinct from the op not fetchable error that occurs when using SyncReplicasOptimizer since that error occurs immediately and the above only occurs once the checkpoint saver is triggered (after ~10min; or maybe it has a step number periodicity).

The issue with SRO should be solvable. Sounds like punting that to an expert hasn't worked. Going forward I think the approach should be to look through the SRO code to see where it's adding new ops to the graph since it seems like the problem is that it is creating an op inside one of the tf.scan and/or tf.cond contexts. My initial attempt at initializing it outside of those and passing in the initialized SyncReplicasOptimizer via the agents config did not fix the problem but my guess is that something like this would.

Another important observation is that since introducing MTS the mean_score is highly variable. This is with using an unmodified version of the Agents codebase.

Also, when not using MTS, when a run does reach a successful conclusion it then (always) exits with a system error and causes jobs to re-start which then crash-loop from then onwards.

@danijar
Copy link
Contributor

danijar commented Jan 29, 2018

When using MonitoredTrainingSession, you should disable its logging and checkpointing logic. I was using it in the beginning but it was too restrictive so I switched to a standard session. I believe you can get the underlying session from a MonitoredTrainingSession as sess._sess to make the saver happy.

Instead of SyncReplicasOptimizer, you could also try to manually combine the gradients across workers, but I'm unsure how much effort that would be.

@cwbeitel
Copy link

Thanks for the suggestion. I'll give that a try.

Yeah that's been in the back of my mind as well that we could go that route. I'd like to do some more investigating with SRO before doing that myself. But it would be interesting! Certainly SRO provides a clear model of how to do this kind of synchronization.

Also it would be good to see what the training quality actually looks like with asynchronous distributed training before deeming it worth investing in synchronous training. My bad on that - during development I added both the use of MTS and pooling of gradients among multiple workers and assumed at that time the variability in mean_score was due to the syncing of stale gradients. But now it's clear that this variability is the result of some other change that was made in switching over to using MTS since it can be seen when running with the current MTS setup on a single worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants