diff --git a/src/garage/experiment/snapshotter.py b/src/garage/experiment/snapshotter.py index 549569fbf7..69b87c7be0 100644 --- a/src/garage/experiment/snapshotter.py +++ b/src/garage/experiment/snapshotter.py @@ -3,8 +3,12 @@ import errno import os import pathlib +import sys import cloudpickle +from dowel import logger + +# pylint: disable=no-name-in-module SnapshotConfig = collections.namedtuple( 'SnapshotConfig', ['snapshot_dir', 'snapshot_mode', 'snapshot_gap']) @@ -82,6 +86,7 @@ def snapshot_gap(self): """ return self._snapshot_gap + # pylint: disable=too-many-branches def save_itr_params(self, itr, params): """Save the parameters if at the right iteration. @@ -94,8 +99,13 @@ def save_itr_params(self, itr, params): "gap_overwrite", "gap_and_last", or "none". """ + # pylint: disable=import-outside-toplevel + torch = False + if 'torch' in sys.modules: + import torch + from garage.torch import global_device file_name = None - + # pylint: enable=import-outside-toplevel if self._snapshot_mode == 'all': file_name = os.path.join(self._snapshot_dir, 'itr_%d.pkl' % itr) elif self._snapshot_mode == 'gap_overwrite': @@ -113,8 +123,11 @@ def save_itr_params(self, itr, params): file_name = os.path.join(self._snapshot_dir, 'itr_%d.pkl' % itr) file_name_last = os.path.join(self._snapshot_dir, 'params.pkl') - with open(file_name_last, 'wb') as file: - cloudpickle.dump(params, file) + if torch: + torch.save(params, file_name_last, pickle_module=cloudpickle) + else: + with open(file_name_last, 'wb') as file: + cloudpickle.dump(params, file) elif self._snapshot_mode == 'none': pass else: @@ -122,8 +135,20 @@ def save_itr_params(self, itr, params): self._snapshot_mode)) if file_name: - with open(file_name, 'wb') as file: - cloudpickle.dump(params, file) + if torch: + + class _pickle_module: + dump = cloudpickle.dump + Pickler = cloudpickle.CloudPickler + + params['global_device'] = global_device() + torch.save(params, + file_name, + pickle_module=_pickle_module, + _use_new_zipfile_serialization=False) + else: + with open(file_name, 'wb') as file: + cloudpickle.dump(params, file) def load(self, load_dir, itr='last'): # pylint: disable=no-self-use @@ -145,6 +170,12 @@ def load(self, load_dir, itr='last'): NotAFileError: If the snapshot exists but is not a file. """ + torch = False + # pylint: disable=import-outside-toplevel + if 'torch' in sys.modules: + import torch + from garage.torch import global_device + # pylint: enable=import-outside-toplevel if isinstance(itr, int) or itr.isdigit(): load_from_file = os.path.join(load_dir, 'itr_{}.pkl'.format(itr)) else: @@ -165,7 +196,13 @@ def load(self, load_dir, itr='last'): if not os.path.isfile(load_from_file): raise NotAFileError('File not existing: ', load_from_file) - + if torch: + device = global_device() + params = torch.load(load_from_file, map_location=device) + origin_device = params['global_device'] + del params['global_device'] + logger.log(f'Resuming experiment from {origin_device} on {device}') + return params with open(load_from_file, 'rb') as file: return cloudpickle.load(file) diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py index 07d875604b..8ec18ac96a 100644 --- a/tests/fixtures/__init__.py +++ b/tests/fixtures/__init__.py @@ -1,9 +1,10 @@ """Test fixtures.""" # yapf: disable -from tests.fixtures.fixtures import (snapshot_config, - TfGraphTestCase, - TfTestCase) +from tests.fixtures.fixtures import (reset_gpu_mode, snapshot_config, + TfGraphTestCase, TfTestCase) # yapf: enable -__all__ = ['snapshot_config', 'TfGraphTestCase', 'TfTestCase'] +__all__ = [ + 'reset_gpu_mode', 'snapshot_config', 'TfGraphTestCase', 'TfTestCase' +] diff --git a/tests/fixtures/fixtures.py b/tests/fixtures/fixtures.py index edda5962b0..15bd98ce72 100644 --- a/tests/fixtures/fixtures.py +++ b/tests/fixtures/fixtures.py @@ -9,6 +9,7 @@ from garage.experiment import deterministic from garage.experiment.snapshotter import SnapshotConfig +from garage.torch import set_gpu_mode from tests.fixtures.logger import NullOutput @@ -64,3 +65,8 @@ def teardown_method(self): del self.graph del self.sess gc.collect() + + +def reset_gpu_mode(): + """Reset mode to CPU after test.""" + set_gpu_mode(False) diff --git a/tests/garage/experiment/test_snapshotter.py b/tests/garage/experiment/test_snapshotter.py index 358d5887d1..1a61fca36a 100644 --- a/tests/garage/experiment/test_snapshotter.py +++ b/tests/garage/experiment/test_snapshotter.py @@ -22,6 +22,7 @@ class TestSnapshotter: def setup_method(self): + # pylint: disable=consider-using-with self.temp_dir = tempfile.TemporaryDirectory() def teardown_method(self): @@ -44,6 +45,7 @@ def test_snapshotter(self, mode, files): assert osp.exists(filename) with open(filename, 'rb') as pkl_file: data = pickle.load(pkl_file) + snapshot_data[num]['global_device'] = None assert data == snapshot_data[num] def test_gap_overwrite(self): @@ -60,7 +62,7 @@ def test_gap_overwrite(self): assert osp.exists(filename) with open(filename, 'rb') as pkl_file: data = pickle.load(pkl_file) - assert data == snapshot_data[1] + assert data == {'global_device': None, 'testparam': 4} def test_invalid_snapshot_mode(self): with pytest.raises(ValueError): diff --git a/tests/garage/experiment/test_torch_resume.py b/tests/garage/experiment/test_torch_resume.py new file mode 100644 index 0000000000..c72505d5a4 --- /dev/null +++ b/tests/garage/experiment/test_torch_resume.py @@ -0,0 +1,248 @@ +"""This script creates a test which fails when + saving/resuming a model is unsuccessful.""" + +import tempfile + +import numpy as np +import pytest +import torch +from torch.nn import functional as F + +from garage.envs import GymEnv, normalize +from garage.experiment import deterministic, SnapshotConfig +from garage.replay_buffer import PathBuffer +from garage.sampler import FragmentWorker, LocalSampler +from garage.torch import set_gpu_mode +from garage.torch.algos import SAC +from garage.torch.policies import TanhGaussianMLPPolicy +from garage.torch.q_functions import ContinuousMLPQFunction +from garage.trainer import Trainer + + +@pytest.mark.mujoco +def test_torch_cpu_resume_cpu(): + """Test saving on CPU and resuming on CPU.""" + with tempfile.TemporaryDirectory() as temp_dir: + snapshot_config = SnapshotConfig(snapshot_dir=temp_dir, + snapshot_mode='last', + snapshot_gap=1) + env = normalize( + GymEnv('InvertedDoublePendulum-v2', max_episode_length=100)) + deterministic.set_seed(0) + policy = TanhGaussianMLPPolicy( + env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=torch.nn.ReLU, + output_nonlinearity=None, + min_std=np.exp(-20.), + max_std=np.exp(2.), + ) + + qf1 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + + qf2 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + replay_buffer = PathBuffer(capacity_in_transitions=int(1e6), ) + trainer = Trainer(snapshot_config=snapshot_config) + sampler = LocalSampler(agents=policy, + envs=env, + max_episode_length=env.spec.max_episode_length, + worker_class=FragmentWorker) + sac = SAC(env_spec=env.spec, + policy=policy, + qf1=qf1, + qf2=qf2, + sampler=sampler, + gradient_steps_per_itr=100, + replay_buffer=replay_buffer, + min_buffer_size=1e3, + target_update_tau=5e-3, + discount=0.99, + buffer_batch_size=64, + reward_scale=1., + steps_per_epoch=2) + sac.has_lambda = lambda x: x + 1 + trainer.setup(sac, env) + set_gpu_mode(False) + sac.to() + trainer.setup(algo=sac, env=env) + trainer.train(n_epochs=10, batch_size=100) + trainer = Trainer(snapshot_config) + trainer.restore(temp_dir) + trainer.resume(n_epochs=20) + + +@pytest.mark.gpu +@pytest.mark.mujoco +def test_torch_cpu_resume_gpu(): + """Test saving on CPU and resuming on GPU.""" + with tempfile.TemporaryDirectory() as temp_dir: + snapshot_config = SnapshotConfig(snapshot_dir=temp_dir, + snapshot_mode='last', + snapshot_gap=1) + env = normalize( + GymEnv('InvertedDoublePendulum-v2', max_episode_length=100)) + deterministic.set_seed(0) + policy = TanhGaussianMLPPolicy( + env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=torch.nn.ReLU, + output_nonlinearity=None, + min_std=np.exp(-20.), + max_std=np.exp(2.), + ) + + qf1 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + + qf2 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + replay_buffer = PathBuffer(capacity_in_transitions=int(1e6), ) + trainer = Trainer(snapshot_config=snapshot_config) + sampler = LocalSampler(agents=policy, + envs=env, + max_episode_length=env.spec.max_episode_length, + worker_class=FragmentWorker) + sac = SAC(env_spec=env.spec, + policy=policy, + qf1=qf1, + qf2=qf2, + sampler=sampler, + gradient_steps_per_itr=100, + replay_buffer=replay_buffer, + min_buffer_size=1e3, + target_update_tau=5e-3, + discount=0.99, + buffer_batch_size=64, + reward_scale=1., + steps_per_epoch=2) + sac.has_lambda = lambda x: x + 1 + trainer.setup(sac, env) + set_gpu_mode(False) + sac.to() + trainer.setup(algo=sac, env=env) + trainer.train(n_epochs=10, batch_size=100) + trainer = Trainer(snapshot_config) + set_gpu_mode(True) + trainer.restore(temp_dir) + trainer.resume(n_epochs=20) + + +@pytest.mark.gpu +@pytest.mark.mujoco +def test_torch_gpu_resume_cpu(): + """Test saving on GPU and resuming on CPU.""" + with tempfile.TemporaryDirectory() as temp_dir: + snapshot_config = SnapshotConfig(snapshot_dir=temp_dir, + snapshot_mode='last', + snapshot_gap=1) + env = normalize( + GymEnv('InvertedDoublePendulum-v2', max_episode_length=100)) + deterministic.set_seed(0) + policy = TanhGaussianMLPPolicy( + env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=torch.nn.ReLU, + output_nonlinearity=None, + min_std=np.exp(-20.), + max_std=np.exp(2.), + ) + + qf1 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + + qf2 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + replay_buffer = PathBuffer(capacity_in_transitions=int(1e6), ) + trainer = Trainer(snapshot_config=snapshot_config) + sampler = LocalSampler(agents=policy, + envs=env, + max_episode_length=env.spec.max_episode_length, + worker_class=FragmentWorker) + sac = SAC(env_spec=env.spec, + policy=policy, + qf1=qf1, + qf2=qf2, + sampler=sampler, + gradient_steps_per_itr=100, + replay_buffer=replay_buffer, + min_buffer_size=1e3, + target_update_tau=5e-3, + discount=0.99, + buffer_batch_size=64, + reward_scale=1., + steps_per_epoch=2) + sac.has_lambda = lambda x: x + 1 + trainer.setup(sac, env) + set_gpu_mode(True) + sac.to() + trainer.setup(algo=sac, env=env) + trainer.train(n_epochs=10, batch_size=100) + set_gpu_mode(False) + trainer = Trainer(snapshot_config) + trainer.restore(temp_dir) + trainer.resume(n_epochs=20) + + +@pytest.mark.gpu +@pytest.mark.mujoco +def test_torch_gpu_resume_gpu(): + """Test saving on GPU and resuming on GPU.""" + with tempfile.TemporaryDirectory() as temp_dir: + snapshot_config = SnapshotConfig(snapshot_dir=temp_dir, + snapshot_mode='last', + snapshot_gap=1) + env = normalize( + GymEnv('InvertedDoublePendulum-v2', max_episode_length=100)) + deterministic.set_seed(0) + policy = TanhGaussianMLPPolicy( + env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=torch.nn.ReLU, + output_nonlinearity=None, + min_std=np.exp(-20.), + max_std=np.exp(2.), + ) + + qf1 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + + qf2 = ContinuousMLPQFunction(env_spec=env.spec, + hidden_sizes=[32, 32], + hidden_nonlinearity=F.relu) + replay_buffer = PathBuffer(capacity_in_transitions=int(1e6), ) + trainer = Trainer(snapshot_config=snapshot_config) + sampler = LocalSampler(agents=policy, + envs=env, + max_episode_length=env.spec.max_episode_length, + worker_class=FragmentWorker) + sac = SAC(env_spec=env.spec, + policy=policy, + qf1=qf1, + qf2=qf2, + sampler=sampler, + gradient_steps_per_itr=100, + replay_buffer=replay_buffer, + min_buffer_size=1e3, + target_update_tau=5e-3, + discount=0.99, + buffer_batch_size=64, + reward_scale=1., + steps_per_epoch=2) + sac.has_lambda = lambda x: x + 1 + trainer.setup(sac, env) + set_gpu_mode(True) + sac.to() + trainer.setup(algo=sac, env=env) + trainer.train(n_epochs=10, batch_size=100) + trainer = Trainer(snapshot_config) + trainer.restore(temp_dir) + trainer.resume(n_epochs=20) diff --git a/tests/garage/torch/algos/test_mtsac.py b/tests/garage/torch/algos/test_mtsac.py index 9995551bb1..8a048ced70 100644 --- a/tests/garage/torch/algos/test_mtsac.py +++ b/tests/garage/torch/algos/test_mtsac.py @@ -15,7 +15,7 @@ from garage.torch.q_functions import ContinuousMLPQFunction from garage.trainer import Trainer -from tests.fixtures import snapshot_config +from tests.fixtures import reset_gpu_mode, snapshot_config @pytest.mark.mujoco @@ -178,6 +178,7 @@ def test_mtsac_inverted_double_pendulum(): assert ret > 0 +@pytest.mark.serial def test_to(): """Test the torch function that moves modules to GPU. @@ -236,8 +237,10 @@ def test_to(): for param in mtsac.policy.parameters(): assert param.device == device assert mtsac._log_alpha.device == device + reset_gpu_mode() +@pytest.mark.serial @pytest.mark.mujoco def test_fixed_alpha(): """Test if using fixed_alpha ensures that alpha is non differentiable.""" @@ -298,3 +301,4 @@ def test_fixed_alpha(): assert torch.allclose(torch.Tensor([0.5] * num_tasks), mtsac._log_alpha.to('cpu')) assert not mtsac._use_automatic_entropy_tuning + reset_gpu_mode() diff --git a/tests/garage/torch/algos/test_sac.py b/tests/garage/torch/algos/test_sac.py index 856946aae3..fb87d05f8f 100644 --- a/tests/garage/torch/algos/test_sac.py +++ b/tests/garage/torch/algos/test_sac.py @@ -16,7 +16,7 @@ from garage.torch.q_functions import ContinuousMLPQFunction from garage.trainer import Trainer -from tests.fixtures import snapshot_config +from tests.fixtures import reset_gpu_mode, snapshot_config class _MockDistribution: @@ -177,6 +177,7 @@ def testTemperatureLoss(): assert np.all(np.isclose(loss, expected_loss)) +@pytest.mark.serial @pytest.mark.mujoco def test_sac_inverted_double_pendulum(): """Test Sac performance on inverted pendulum.""" @@ -234,6 +235,7 @@ def test_sac_inverted_double_pendulum(): assert not torch.allclose(torch.Tensor([1.]), sac._log_alpha.to('cpu')) # check that policy is learning beyond predecided threshold assert ret > 80 + reset_gpu_mode() @pytest.mark.mujoco @@ -286,6 +288,7 @@ def test_fixed_alpha(): assert not sac._use_automatic_entropy_tuning +@pytest.mark.serial @pytest.mark.gpu def test_sac_to(): """Test moving Sac between CPU and GPU.""" @@ -339,3 +342,4 @@ def test_sac_to(): set_gpu_mode(False) sac.to() assert torch.allclose(log_alpha, sac._log_alpha) + reset_gpu_mode() diff --git a/tests/garage/torch/algos/test_td3.py b/tests/garage/torch/algos/test_td3.py index 524649a1d6..74aecb1170 100644 --- a/tests/garage/torch/algos/test_td3.py +++ b/tests/garage/torch/algos/test_td3.py @@ -15,12 +15,13 @@ from garage.torch.q_functions import ContinuousMLPQFunction from garage.trainer import Trainer -from tests.fixtures import snapshot_config, TfGraphTestCase +from tests.fixtures import reset_gpu_mode, snapshot_config, TfGraphTestCase class TestTD3(TfGraphTestCase): """Test class for TD3.""" + @pytest.mark.serial @pytest.mark.mujoco def test_td3_inverted_double_pendulum(self): deterministic.set_seed(0) @@ -67,7 +68,9 @@ def test_td3_inverted_double_pendulum(self): td3.to() trainer.setup(td3, env) trainer.train(n_epochs=n_epochs, batch_size=sampler_batch_size) + reset_gpu_mode() + @pytest.mark.serial @pytest.mark.mujoco def test_pickling(self): """Test pickle and unpickle.""" @@ -116,3 +119,4 @@ def test_pickling(self): pickled = pickle.dumps(td3) unpickled = pickle.loads(pickled) assert unpickled + reset_gpu_mode()