diff --git a/.github/unittest/linux_examples/scripts/run_test.sh b/.github/unittest/linux_examples/scripts/run_test.sh index de002389a41..112152ed7fc 100755 --- a/.github/unittest/linux_examples/scripts/run_test.sh +++ b/.github/unittest/linux_examples/scripts/run_test.sh @@ -146,7 +146,7 @@ python .github/unittest/helpers/coverage_run_parallel.py examples/dreamer/dreame python .github/unittest/helpers/coverage_run_parallel.py examples/td3/td3.py \ collector.total_frames=48 \ collector.init_random_frames=10 \ - optimization.batch_size=10 \ + optim.batch_size=10 \ collector.frames_per_batch=16 \ collector.num_workers=4 \ collector.env_per_collector=2 \ @@ -247,7 +247,7 @@ python .github/unittest/helpers/coverage_run_parallel.py examples/iql/iql_online python .github/unittest/helpers/coverage_run_parallel.py examples/td3/td3.py \ collector.total_frames=48 \ collector.init_random_frames=10 \ - optimization.batch_size=10 \ + optim.batch_size=10 \ collector.frames_per_batch=16 \ collector.num_workers=2 \ collector.env_per_collector=1 \ diff --git a/examples/td3/config.yaml b/examples/td3/config.yaml index 35a2d9f8b2f..4ef557ed50c 100644 --- a/examples/td3/config.yaml +++ b/examples/td3/config.yaml @@ -1,47 +1,50 @@ -# Environment +# task and env env: name: HalfCheetah-v3 task: "" - exp_name: "HalfCheetah-TD3" - library: gym - frame_skip: 1 + exp_name: ${env.name}_TD3 + library: gymnasium seed: 42 + max_episode_steps: 1000 -# Collection +# collector collector: total_frames: 1000000 - init_random_frames: 10000 + init_random_frames: 25_000 init_env_steps: 1000 frames_per_batch: 1000 - max_frames_per_traj: 1000 - async_collection: 1 + reset_at_each_iter: False collector_device: cpu env_per_collector: 1 num_workers: 1 -# Replay Buffer +# replay buffer replay_buffer: prb: 0 # use prioritized experience replay size: 1000000 + scratch_dir: ${env.exp_name}_${env.seed} -# Optimization -optimization: +# optim +optim: utd_ratio: 1.0 gamma: 0.99 loss_function: l2 - lr: 3e-4 - weight_decay: 2e-4 + lr: 3.0e-4 + weight_decay: 0.0 + adam_eps: 1e-4 batch_size: 256 target_update_polyak: 0.995 policy_update_delay: 2 + policy_noise: 0.2 + noise_clip: 0.5 -# Network +# network network: hidden_sizes: [256, 256] activation: relu device: "cuda:0" -# Logging +# logging logger: backend: wandb mode: online diff --git a/examples/td3/td3.py b/examples/td3/td3.py index f4d8707f404..7c9904f5300 100644 --- a/examples/td3/td3.py +++ b/examples/td3/td3.py @@ -11,8 +11,9 @@ The helper functions are coded in the utils.py associated with this script. """ -import hydra +import time +import hydra import numpy as np import torch import torch.cuda @@ -22,6 +23,7 @@ from torchrl.record.loggers import generate_exp_name, get_logger from utils import ( + log_metrics, make_collector, make_environment, make_loss_module, @@ -35,6 +37,7 @@ def main(cfg: "DictConfig"): # noqa: F821 device = torch.device(cfg.network.device) + # Create logger exp_name = generate_exp_name("TD3", cfg.env.exp_name) logger = None if cfg.logger.backend: @@ -45,140 +48,155 @@ def main(cfg: "DictConfig"): # noqa: F821 wandb_kwargs={"mode": cfg.logger.mode, "config": cfg}, ) + # Set seeds torch.manual_seed(cfg.env.seed) np.random.seed(cfg.env.seed) - # Create Environments + # Create environments train_env, eval_env = make_environment(cfg) - # Create Agent + # Create agent model, exploration_policy = make_td3_agent(cfg, train_env, eval_env, device) # Create TD3 loss loss_module, target_net_updater = make_loss_module(cfg, model) - # Make Off-Policy Collector + # Create off-policy collector collector = make_collector(cfg, train_env, exploration_policy) - # Make Replay Buffer + # Create replay buffer replay_buffer = make_replay_buffer( - batch_size=cfg.optimization.batch_size, + batch_size=cfg.optim.batch_size, prb=cfg.replay_buffer.prb, buffer_size=cfg.replay_buffer.size, + buffer_scratch_dir="/tmp/" + cfg.replay_buffer.scratch_dir, device=device, ) - # Make Optimizers + # Create optimizers optimizer_actor, optimizer_critic = make_optimizer(cfg, loss_module) - rewards = [] - rewards_eval = [] - # Main loop + start_time = time.time() collected_frames = 0 pbar = tqdm.tqdm(total=cfg.collector.total_frames) - r0 = None - q_loss = None init_random_frames = cfg.collector.init_random_frames num_updates = int( cfg.collector.env_per_collector * cfg.collector.frames_per_batch - * cfg.optimization.utd_ratio + * cfg.optim.utd_ratio ) - delayed_updates = cfg.optimization.policy_update_delay + delayed_updates = cfg.optim.policy_update_delay prb = cfg.replay_buffer.prb - env_per_collector = cfg.collector.env_per_collector - eval_rollout_steps = cfg.collector.max_frames_per_traj // cfg.env.frame_skip + eval_rollout_steps = cfg.env.max_episode_steps eval_iter = cfg.logger.eval_iter - frames_per_batch, frame_skip = cfg.collector.frames_per_batch, cfg.env.frame_skip + frames_per_batch = cfg.collector.frames_per_batch + update_counter = 0 - for i, tensordict in enumerate(collector): + sampling_start = time.time() + for tensordict in collector: + sampling_time = time.time() - sampling_start exploration_policy.step(tensordict.numel()) - # update weights of the inference policy + + # Update weights of the inference policy collector.update_policy_weights_() - if r0 is None: - r0 = tensordict["next", "reward"].sum(-1).mean().item() pbar.update(tensordict.numel()) tensordict = tensordict.reshape(-1) current_frames = tensordict.numel() + # Add to replay buffer replay_buffer.extend(tensordict.cpu()) collected_frames += current_frames - # optimization steps + # Optimization steps + training_start = time.time() if collected_frames >= init_random_frames: ( actor_losses, q_losses, ) = ([], []) - for j in range(num_updates): - # sample from replay buffer - sampled_tensordict = replay_buffer.sample().clone() + for _ in range(num_updates): + + # Update actor every delayed_updates + update_counter += 1 + update_actor = update_counter % delayed_updates == 0 - loss_td = loss_module(sampled_tensordict) + # Sample from replay buffer + sampled_tensordict = replay_buffer.sample().clone() - actor_loss = loss_td["loss_actor"] - q_loss = loss_td["loss_qvalue"] + # Compute loss + q_loss, *_ = loss_module.value_loss(sampled_tensordict) + # Update critic optimizer_critic.zero_grad() - update_actor = j % delayed_updates == 0 - q_loss.backward(retain_graph=update_actor) + q_loss.backward() optimizer_critic.step() q_losses.append(q_loss.item()) + # Update actor if update_actor: + actor_loss, *_ = loss_module.actor_loss(sampled_tensordict) optimizer_actor.zero_grad() actor_loss.backward() optimizer_actor.step() + actor_losses.append(actor_loss.item()) - # update qnet_target params + # Update target params target_net_updater.step() - # update priority + # Update priority if prb: replay_buffer.update_priority(sampled_tensordict) - rewards.append( - (i, tensordict["next", "reward"].sum().item() / env_per_collector) + training_time = time.time() - training_start + episode_end = ( + tensordict["next", "done"] + if tensordict["next", "done"].any() + else tensordict["next", "truncated"] ) - train_log = { - "train_reward": rewards[-1][1], - "collected_frames": collected_frames, - } - if q_loss is not None: - train_log.update( - { - "actor_loss": np.mean(actor_losses), - "q_loss": np.mean(q_losses), - } + episode_rewards = tensordict["next", "episode_reward"][episode_end] + + # Logging + metrics_to_log = {} + if len(episode_rewards) > 0: + episode_length = tensordict["next", "step_count"][episode_end] + metrics_to_log["train/reward"] = episode_rewards.mean().item() + metrics_to_log["train/episode_length"] = episode_length.sum().item() / len( + episode_length ) - if logger is not None: - for key, value in train_log.items(): - logger.log_scalar(key, value, step=collected_frames) - if abs(collected_frames % eval_iter) < frames_per_batch * frame_skip: + + if collected_frames >= init_random_frames: + metrics_to_log["train/q_loss"] = np.mean(q_losses) + if update_actor: + metrics_to_log["train/a_loss"] = np.mean(actor_losses) + metrics_to_log["train/sampling_time"] = sampling_time + metrics_to_log["train/training_time"] = training_time + + # Evaluation + if abs(collected_frames % eval_iter) < frames_per_batch: with set_exploration_type(ExplorationType.MODE), torch.no_grad(): + eval_start = time.time() eval_rollout = eval_env.rollout( eval_rollout_steps, exploration_policy, auto_cast_to_device=True, break_when_any_done=True, ) + eval_time = time.time() - eval_start eval_reward = eval_rollout["next", "reward"].sum(-2).mean().item() - rewards_eval.append((i, eval_reward)) - eval_str = f"eval cumulative reward: {rewards_eval[-1][1]: 4.4f} (init: {rewards_eval[0][1]: 4.4f})" - if logger is not None: - logger.log_scalar( - "evaluation_reward", rewards_eval[-1][1], step=collected_frames - ) - if len(rewards_eval): - pbar.set_description( - f"reward: {rewards[-1][1]: 4.4f} (r0 = {r0: 4.4f})," + eval_str - ) + metrics_to_log["eval/reward"] = eval_reward + metrics_to_log["eval/time"] = eval_time + if logger is not None: + log_metrics(logger, metrics_to_log, collected_frames) + sampling_start = time.time() collector.shutdown() + end_time = time.time() + execution_time = end_time - start_time + print(f"Training took {execution_time:.2f} seconds to finish") if __name__ == "__main__": diff --git a/examples/td3/utils.py b/examples/td3/utils.py index 9a8c5809f75..090529782fd 100644 --- a/examples/td3/utils.py +++ b/examples/td3/utils.py @@ -1,3 +1,10 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +import tempfile +from contextlib import nullcontext + import torch from torch import nn, optim @@ -10,10 +17,11 @@ EnvCreator, InitTracker, ParallelEnv, + RewardSum, + StepCounter, TransformedEnv, ) -from torchrl.envs.libs.gym import GymEnv -from torchrl.envs.transforms import RewardScaling +from torchrl.envs.libs.gym import GymEnv, set_gym_backend from torchrl.envs.utils import ExplorationType, set_exploration_type from torchrl.modules import ( AdditiveGaussianWrapper, @@ -33,17 +41,27 @@ # ----------------- -def env_maker(task, frame_skip=1, device="cpu", from_pixels=False): - return GymEnv(task, device=device, frame_skip=frame_skip, from_pixels=from_pixels) +def env_maker( + task, + device="cpu", + from_pixels=False, +): + with set_gym_backend("gym"): + return GymEnv( + task, + device=device, + from_pixels=from_pixels, + ) -def apply_env_transforms(env, reward_scaling=1.0): +def apply_env_transforms(env, max_episode_steps): transformed_env = TransformedEnv( env, Compose( + StepCounter(max_steps=max_episode_steps), InitTracker(), - RewardScaling(loc=0.0, scale=reward_scaling), DoubleToFloat(), + RewardSum(), ), ) return transformed_env @@ -53,16 +71,18 @@ def make_environment(cfg): """Make environments for training and evaluation.""" parallel_env = ParallelEnv( cfg.collector.env_per_collector, - EnvCreator(lambda: env_maker(task=cfg.env.name)), + EnvCreator(lambda task=cfg.env.name: env_maker(task=task)), ) parallel_env.set_seed(cfg.env.seed) - train_env = apply_env_transforms(parallel_env) + train_env = apply_env_transforms( + parallel_env, max_episode_steps=cfg.env.max_episode_steps + ) eval_env = TransformedEnv( ParallelEnv( cfg.collector.env_per_collector, - EnvCreator(lambda: env_maker(task=cfg.env.name)), + EnvCreator(lambda task=cfg.env.name: env_maker(task=task)), ), train_env.transform.clone(), ) @@ -79,9 +99,10 @@ def make_collector(cfg, train_env, actor_model_explore): collector = SyncDataCollector( train_env, actor_model_explore, + init_random_frames=cfg.collector.init_random_frames, frames_per_batch=cfg.collector.frames_per_batch, - max_frames_per_traj=cfg.collector.max_frames_per_traj, total_frames=cfg.collector.total_frames, + reset_at_each_iter=cfg.collector.reset_at_each_iter, device=cfg.collector.collector_device, ) collector.set_seed(cfg.env.seed) @@ -92,35 +113,40 @@ def make_replay_buffer( batch_size, prb=False, buffer_size=1000000, - buffer_scratch_dir="/tmp/", + buffer_scratch_dir=None, device="cpu", prefetch=3, ): - if prb: - replay_buffer = TensorDictPrioritizedReplayBuffer( - alpha=0.7, - beta=0.5, - pin_memory=False, - prefetch=prefetch, - storage=LazyMemmapStorage( - buffer_size, - scratch_dir=buffer_scratch_dir, - device=device, - ), - batch_size=batch_size, - ) - else: - replay_buffer = TensorDictReplayBuffer( - pin_memory=False, - prefetch=prefetch, - storage=LazyMemmapStorage( - buffer_size, - scratch_dir=buffer_scratch_dir, - device=device, - ), - batch_size=batch_size, - ) - return replay_buffer + with ( + tempfile.TemporaryDirectory() + if buffer_scratch_dir is None + else nullcontext(buffer_scratch_dir) + ) as scratch_dir: + if prb: + replay_buffer = TensorDictPrioritizedReplayBuffer( + alpha=0.7, + beta=0.5, + pin_memory=False, + prefetch=prefetch, + storage=LazyMemmapStorage( + buffer_size, + scratch_dir=scratch_dir, + device=device, + ), + batch_size=batch_size, + ) + else: + replay_buffer = TensorDictReplayBuffer( + pin_memory=False, + prefetch=prefetch, + storage=LazyMemmapStorage( + buffer_size, + scratch_dir=scratch_dir, + device=device, + ), + batch_size=batch_size, + ) + return replay_buffer # ==================================================================== @@ -128,17 +154,6 @@ def make_replay_buffer( # ----- -def get_activation(cfg): - if cfg.network.activation == "relu": - return nn.ReLU - elif cfg.network.activation == "tanh": - return nn.Tanh - elif cfg.network.activation == "leaky_relu": - return nn.LeakyReLU - else: - raise NotImplementedError - - def make_td3_agent(cfg, train_env, eval_env, device): """Make TD3 agent.""" # Define Actor Network @@ -222,17 +237,18 @@ def make_loss_module(cfg, model): actor_network=model[0], qvalue_network=model[1], num_qvalue_nets=2, - loss_function=cfg.optimization.loss_function, + loss_function=cfg.optim.loss_function, delay_actor=True, delay_qvalue=True, + gamma=cfg.optim.gamma, action_spec=model[0][1].spec, + policy_noise=cfg.optim.policy_noise, + noise_clip=cfg.optim.noise_clip, ) - loss_module.make_value_estimator(gamma=cfg.optimization.gamma) + loss_module.make_value_estimator(gamma=cfg.optim.gamma) # Define Target Network Updater - target_net_updater = SoftUpdate( - loss_module, eps=cfg.optimization.target_update_polyak - ) + target_net_updater = SoftUpdate(loss_module, eps=cfg.optim.target_update_polyak) return loss_module, target_net_updater @@ -241,11 +257,36 @@ def make_optimizer(cfg, loss_module): actor_params = list(loss_module.actor_network_params.flatten_keys().values()) optimizer_actor = optim.Adam( - actor_params, lr=cfg.optimization.lr, weight_decay=cfg.optimization.weight_decay + actor_params, + lr=cfg.optim.lr, + weight_decay=cfg.optim.weight_decay, + eps=cfg.optim.adam_eps, ) optimizer_critic = optim.Adam( critic_params, - lr=cfg.optimization.lr, - weight_decay=cfg.optimization.weight_decay, + lr=cfg.optim.lr, + weight_decay=cfg.optim.weight_decay, + eps=cfg.optim.adam_eps, ) return optimizer_actor, optimizer_critic + + +# ==================================================================== +# General utils +# --------- + + +def log_metrics(logger, metrics, step): + for metric_name, metric_value in metrics.items(): + logger.log_scalar(metric_name, metric_value, step) + + +def get_activation(cfg): + if cfg.network.activation == "relu": + return nn.ReLU + elif cfg.network.activation == "tanh": + return nn.Tanh + elif cfg.network.activation == "leaky_relu": + return nn.LeakyReLU + else: + raise NotImplementedError diff --git a/test/test_cost.py b/test/test_cost.py index c3d4e0b8086..6c38e6a8b65 100644 --- a/test/test_cost.py +++ b/test/test_cost.py @@ -2417,8 +2417,12 @@ def test_td3_notensordict( loss_val_td = loss(td) torch.manual_seed(0) loss_val = loss(**kwargs) - for i, key in enumerate(loss_val_td.keys()): + for i in loss_val: + assert i in loss_val_td.values(), f"{i} not in {loss_val_td.values()}" + + for i, key in enumerate(loss.out_keys): torch.testing.assert_close(loss_val_td.get(key), loss_val[i]) + # test select loss.select_out_keys("loss_actor", "loss_qvalue") torch.manual_seed(0) diff --git a/torchrl/collectors/collectors.py b/torchrl/collectors/collectors.py index 4ef979d11d6..0d5443b22b4 100644 --- a/torchrl/collectors/collectors.py +++ b/torchrl/collectors/collectors.py @@ -2,6 +2,8 @@ # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. +from __future__ import annotations + import _pickle import abc import inspect @@ -391,12 +393,12 @@ class SyncDataCollector(DataCollectorBase): If the environment wraps multiple environments together, the number of steps is tracked for each environment independently. Negative values are allowed, in which case this argument is ignored. - Defaults to ``-1`` (i.e. no maximum number of steps). + Defaults to ``None`` (i.e. no maximum number of steps). init_random_frames (int, optional): Number of frames for which the policy is ignored before it is called. This feature is mainly intended to be used in offline/model-based settings, where a batch of random trajectories can be used to initialize training. - Defaults to ``-1`` (i.e. no random frames). + Defaults to ``None`` (i.e. no random frames). reset_at_each_iter (bool, optional): Whether environments should be reset at the beginning of a batch collection. Defaults to ``False``. @@ -498,12 +500,12 @@ def __init__( total_frames: int, device: DEVICE_TYPING = None, storing_device: DEVICE_TYPING = None, - create_env_kwargs: Optional[dict] = None, - max_frames_per_traj: int = -1, - init_random_frames: int = -1, + create_env_kwargs: dict | None = None, + max_frames_per_traj: int | None = None, + init_random_frames: int | None = None, reset_at_each_iter: bool = False, - postproc: Optional[Callable[[TensorDictBase], TensorDictBase]] = None, - split_trajs: Optional[bool] = None, + postproc: Callable[[TensorDictBase], TensorDictBase] | None = None, + split_trajs: bool | None = None, exploration_type: ExplorationType = DEFAULT_EXPLORATION_TYPE, exploration_mode=None, return_same_td: bool = False, @@ -567,7 +569,7 @@ def __init__( self.env: EnvBase = self.env.to(self.device) self.max_frames_per_traj = max_frames_per_traj - if self.max_frames_per_traj > 0: + if self.max_frames_per_traj is not None and self.max_frames_per_traj > 0: # let's check that there is no StepCounter yet for key in self.env.output_spec.keys(True, True): if isinstance(key, str): @@ -823,7 +825,10 @@ def rollout(self) -> TensorDictBase: tensordicts = [] with set_exploration_type(self.exploration_type): for t in range(self.frames_per_batch): - if self._frames < self.init_random_frames: + if ( + self.init_random_frames is not None + and self._frames < self.init_random_frames + ): self.env.rand_step(self._tensordict) else: self.policy(self._tensordict) @@ -1016,12 +1021,12 @@ class _MultiDataCollector(DataCollectorBase): If the environment wraps multiple environments together, the number of steps is tracked for each environment independently. Negative values are allowed, in which case this argument is ignored. - Defaults to ``-1`` (i.e. no maximum number of steps). + Defaults to ``None`` (i.e. no maximum number of steps). init_random_frames (int, optional): Number of frames for which the policy is ignored before it is called. This feature is mainly intended to be used in offline/model-based settings, where a batch of random trajectories can be used to initialize training. - Defaults to ``-1`` (i.e. no random frames). + Defaults to ``None`` (i.e. no random frames). reset_at_each_iter (bool, optional): Whether environments should be reset at the beginning of a batch collection. Defaults to ``False``. @@ -1077,8 +1082,8 @@ def __init__( device: DEVICE_TYPING = None, storing_device: Optional[Union[DEVICE_TYPING, Sequence[DEVICE_TYPING]]] = None, create_env_kwargs: Optional[Sequence[dict]] = None, - max_frames_per_traj: int = -1, - init_random_frames: int = -1, + max_frames_per_traj: int | None = None, + init_random_frames: int | None = None, reset_at_each_iter: bool = False, postproc: Optional[Callable[[TensorDictBase], TensorDictBase]] = None, split_trajs: Optional[bool] = None, @@ -1633,7 +1638,10 @@ def iterator(self) -> Iterator[TensorDictBase]: self.update_policy_weights_() for idx in range(self.num_workers): - if self._frames < self.init_random_frames: + if ( + self.init_random_frames is not None + and self._frames < self.init_random_frames + ): msg = "continue_random" else: msg = "continue" @@ -1869,7 +1877,7 @@ def iterator(self) -> Iterator[TensorDictBase]: self.update_policy_weights_() for i in range(self.num_workers): - if self.init_random_frames > 0: + if self.init_random_frames is not None and self.init_random_frames > 0: self.pipes[i].send((None, "continue_random")) else: self.pipes[i].send((None, "continue")) @@ -1891,7 +1899,10 @@ def iterator(self) -> Iterator[TensorDictBase]: # the function blocks here until the next item is asked, hence we send the message to the # worker to keep on working in the meantime before the yield statement - if self._frames < self.init_random_frames: + if ( + self.init_random_frames is not None + and self._frames < self.init_random_frames + ): msg = "continue_random" else: msg = "continue" @@ -1918,7 +1929,10 @@ def reset(self, reset_idx: Optional[Sequence[bool]] = None) -> None: raise Exception("self.queue_out is full") if self.running: for idx in range(self.num_workers): - if self._frames < self.init_random_frames: + if ( + self.init_random_frames is not None + and self._frames < self.init_random_frames + ): self.pipes[idx].send((idx, "continue_random")) else: self.pipes[idx].send((idx, "continue")) @@ -1952,14 +1966,14 @@ class aSyncDataCollector(MultiaSyncDataCollector): environment wraps multiple environments together, the number of steps is tracked for each environment independently. Negative values are allowed, in which case this argument is ignored. - Default is -1 (i.e. no maximum number of steps) + Defaults to ``None`` (i.e. no maximum number of steps) frames_per_batch (int): Time-length of a batch. reset_at_each_iter and frames_per_batch == n_steps are equivalent configurations. - default: 200 + Defaults to ``200`` init_random_frames (int): Number of frames for which the policy is ignored before it is called. This feature is mainly intended to be used in offline/model-based settings, where a batch of random trajectories can be used to initialize training. - default=-1 (i.e. no random frames) + Defaults to ``None`` (i.e. no random frames) reset_at_each_iter (bool): Whether or not environments should be reset for each batch. default=False. postproc (callable, optional): A PostProcessor is an object that will read a batch of data and process it in a @@ -1994,9 +2008,9 @@ def __init__( ] = None, total_frames: Optional[int] = -1, create_env_kwargs: Optional[dict] = None, - max_frames_per_traj: int = -1, + max_frames_per_traj: int | None = None, frames_per_batch: int = 200, - init_random_frames: int = -1, + init_random_frames: int | None = None, reset_at_each_iter: bool = False, postproc: Optional[Callable[[TensorDictBase], TensorDictBase]] = None, split_trajs: Optional[bool] = None, diff --git a/torchrl/objectives/td3.py b/torchrl/objectives/td3.py index 72ffa64a4f2..9912c143ae6 100644 --- a/torchrl/objectives/td3.py +++ b/torchrl/objectives/td3.py @@ -357,129 +357,128 @@ def _cached_stack_actor_params(self): [self.actor_network_params, self.target_actor_network_params], 0 ) - @dispatch - def forward(self, tensordict: TensorDictBase) -> TensorDictBase: - obs_keys = self.actor_network.in_keys - tensordict_save = tensordict - tensordict = tensordict.clone(False) - act = tensordict.get(self.tensor_keys.action) - action_shape = act.shape - action_device = act.device - # computing early for reprod - noise = torch.normal( - mean=torch.zeros(action_shape), - std=torch.full(action_shape, self.policy_noise), - ).to(action_device) - noise = noise.clamp(-self.noise_clip, self.noise_clip) - - tensordict_actor_grad = tensordict.select( - *obs_keys - ) # to avoid overwriting keys - next_td_actor = step_mdp(tensordict).select( - *self.actor_network.in_keys - ) # next_observation -> - tensordict_actor = torch.stack([tensordict_actor_grad, next_td_actor], 0) - # DO NOT call contiguous bc we'll update the tds later - actor_output_td = self._vmap_actor_network00( - tensordict_actor, - self._cached_stack_actor_params, - ) - # add noise to target policy - actor_output_td1 = actor_output_td[1] - next_action = (actor_output_td1.get(self.tensor_keys.action) + noise).clamp( - self.min_action, self.max_action + def actor_loss(self, tensordict): + tensordict_actor_grad = tensordict.select(*self.actor_network.in_keys) + tensordict_actor_grad = self.actor_network( + tensordict_actor_grad, self.actor_network_params ) - actor_output_td1.set(self.tensor_keys.action, next_action) - tensordict_actor.set( - self.tensor_keys.action, - actor_output_td.get(self.tensor_keys.action), - ) - - # repeat tensordict_actor to match the qvalue size - _actor_loss_td = ( - tensordict_actor[0] - .select(*self.qvalue_network.in_keys) - .expand(self.num_qvalue_nets, *tensordict_actor[0].batch_size) + actor_loss_td = tensordict_actor_grad.select( + *self.qvalue_network.in_keys + ).expand( + self.num_qvalue_nets, *tensordict_actor_grad.batch_size ) # for actor loss - _qval_td = tensordict.select(*self.qvalue_network.in_keys).expand( - self.num_qvalue_nets, - *tensordict.select(*self.qvalue_network.in_keys).batch_size, - ) # for qvalue loss - _next_val_td = ( - tensordict_actor[1] - .select(*self.qvalue_network.in_keys) - .expand(self.num_qvalue_nets, *tensordict_actor[1].batch_size) - ) # for next value estimation - tensordict_qval = torch.cat( - [ - _actor_loss_td, - _next_val_td, - _qval_td, - ], - 0, - ) - - # cat params - qvalue_params = torch.cat( - [ + state_action_value_actor = ( + self._vmap_qvalue_network00( + actor_loss_td, self._cached_detach_qvalue_network_params, - self.target_qvalue_network_params, - self.qvalue_network_params, - ], - 0, - ) - tensordict_qval = self._vmap_qvalue_network00( - tensordict_qval, - qvalue_params, + ) + .get(self.tensor_keys.state_action_value) + .squeeze(-1) ) + loss_actor = -(state_action_value_actor[0]).mean() + metadata = { + "state_action_value_actor": state_action_value_actor.mean().detach(), + } + return loss_actor, metadata + + def value_loss(self, tensordict): + tensordict = tensordict.clone(False) + + act = tensordict.get(self.tensor_keys.action) - state_action_value = tensordict_qval.get( - self.tensor_keys.state_action_value - ).squeeze(-1) - ( - state_action_value_actor, - next_state_action_value_qvalue, - state_action_value_qvalue, - ) = state_action_value.split( - [self.num_qvalue_nets, self.num_qvalue_nets, self.num_qvalue_nets], - dim=0, + # computing early for reprod + noise = (torch.randn_like(act) * self.policy_noise).clamp( + -self.noise_clip, self.noise_clip ) - loss_actor = -(state_action_value_actor.min(0)[0]).mean() + with torch.no_grad(): + next_td_actor = step_mdp(tensordict).select( + *self.actor_network.in_keys + ) # next_observation -> + next_td_actor = self.actor_network( + next_td_actor, self.target_actor_network_params + ) + next_action = (next_td_actor.get(self.tensor_keys.action) + noise).clamp( + self.min_action, self.max_action + ) + next_td_actor.set( + self.tensor_keys.action, + next_action, + ) + next_val_td = next_td_actor.select(*self.qvalue_network.in_keys).expand( + self.num_qvalue_nets, *next_td_actor.batch_size + ) # for next value estimation + next_target_q1q2 = ( + self._vmap_qvalue_network00( + next_val_td, + self.target_qvalue_network_params, + ) + .get(self.tensor_keys.state_action_value) + .squeeze(-1) + ) + # min over the next target qvalues + next_target_qvalue = next_target_q1q2.min(0)[0] - next_state_value = next_state_action_value_qvalue.min(0)[0] + # set next target qvalues tensordict.set( ("next", self.tensor_keys.state_action_value), - next_state_value.unsqueeze(-1), + next_target_qvalue.unsqueeze(-1), + ) + + qval_td = tensordict.select(*self.qvalue_network.in_keys).expand( + self.num_qvalue_nets, + *tensordict.batch_size, ) + # preditcted current qvalues + current_qvalue = ( + self._vmap_qvalue_network00( + qval_td, + self.qvalue_network_params, + ) + .get(self.tensor_keys.state_action_value) + .squeeze(-1) + ) + + # compute target values for the qvalue loss (reward + gamma * next_target_qvalue * (1 - done)) target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1) - pred_val = state_action_value_qvalue - td_error = (pred_val - target_value).pow(2) + + td_error = (current_qvalue - target_value).pow(2) loss_qval = ( distance_loss( - pred_val, - target_value.expand_as(pred_val), + current_qvalue, + target_value.expand_as(current_qvalue), loss_function=self.loss_function, ) .mean(-1) .sum() - * 0.5 ) + metadata = { + "td_error": td_error, + "next_state_value": next_target_qvalue.mean().detach(), + "pred_value": current_qvalue.mean().detach(), + "target_value": target_value.mean().detach(), + } - tensordict_save.set(self.tensor_keys.priority, td_error.detach().max(0)[0]) + return loss_qval, metadata + @dispatch + def forward(self, tensordict: TensorDictBase) -> TensorDictBase: + tensordict_save = tensordict + loss_actor, metadata_actor = self.actor_loss(tensordict) + loss_qval, metadata_value = self.value_loss(tensordict_save) + tensordict_save.set( + self.tensor_keys.priority, metadata_value.pop("td_error").detach().max(0)[0] + ) if not loss_qval.shape == loss_actor.shape: raise RuntimeError( f"QVal and actor loss have different shape: {loss_qval.shape} and {loss_actor.shape}" ) td_out = TensorDict( source={ - "loss_actor": loss_actor.mean(), - "loss_qvalue": loss_qval.mean(), - "pred_value": pred_val.mean().detach(), - "state_action_value_actor": state_action_value_actor.mean().detach(), - "next_state_value": next_state_value.mean().detach(), - "target_value": target_value.mean().detach(), + "loss_actor": loss_actor, + "loss_qvalue": loss_qval, + **metadata_actor, + **metadata_value, }, batch_size=[], )