Source code for objectrl.models.ddpg

# -----------------------------------------------------------------------------------
# ObjectRL: An Object-Oriented Reinforcement Learning Codebase
# Copyright (C) 2025 ADIN Lab

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------------

import math
import typing

import torch

from objectrl.models.basic.ac import ActorCritic
from objectrl.models.basic.actor import Actor
from objectrl.models.basic.critic import CriticEnsemble
from objectrl.utils.utils import totorch

if typing.TYPE_CHECKING:
    from objectrl.config.config import MainConfig


[docs] class OrnsteinUhlenbeckNoise: """ Implements Ornstein-Uhlenbeck process to generate temporally correlated noise. Commonly used in DDPG to add exploration noise to continuous actions. Args: dim_act (int): Shape of the action space. mu (float): Long-running mean. theta (float): Speed of mean reversion. sigma (float): Volatility (standard deviation). dt (float): Time step size. x0 (float, optional): Initial value of the process. """
[docs] def __init__( self, dim_act: int, mu: float = 0, theta: float = 0.15, sigma: float = 0.2, dt: float = 1e-2, x0: float | None = None, ) -> None: self.dim_act = dim_act self.mu = mu self.theta = theta self.sigma = sigma self.state = mu self.dt = dt self.x0 = x0 self.reset()
[docs] def reset(self) -> None: """Resets internal state to initial value or mean (mu).""" self.state = self.x0 if self.x0 is not None else self.mu
[docs] def evolve_state(self) -> torch.Tensor: """Applies the OU process to evolve the noise state.""" x = self.state dx = self.theta * (self.mu - x) * self.dt + self.sigma * math.sqrt( self.dt ) * torch.randn(self.dim_act) self.state = x + dx return self.state
[docs] def sample(self) -> torch.Tensor: """ Returns a sample from the current noise state. Args: None Returns: torch.Tensor: A sample tensor generated from the noise process. """ return self.evolve_state()
def __call__(self) -> torch.Tensor: """ Enables the object to be called directly like a function. Args: None Returns: torch.Tensor: A sample tensor generated from the noise process. """ return self.sample()
[docs] class DDPGActor(Actor): """ DDPG actor module that produces actions and applies OU noise during training. Attributes: interaction_iter (int): Counter for interaction steps. sampling_rate (int): Frequency of head sampling for posterior sampling. idx_active_critic (int): Index of the currently active critic head. is_episode_end (bool): Flag indicating if the current episode has ended. """
[docs] def __init__(self, config: "MainConfig", dim_state: int, dim_act: int) -> None: super().__init__(config, dim_state, dim_act) noise = config.model.noise self.noise = OrnsteinUhlenbeckNoise( dim_act, mu=noise.mu, theta=noise.theta, sigma=noise.sigma, dt=noise.dt, x0=noise.x0, ) self.action_limit_low = totorch( config.env.env.action_space.low, device=self.device # type: ignore ) self.action_limit_high = totorch( config.env.env.action_space.high, device=self.device # type: ignore )
[docs] def act(self, state: torch.Tensor, is_training: bool = True) -> dict: """ Produces an action given a state. Adds noise during training. Args: state (torch.Tensor): The input state. is_training (bool): Whether to add exploration noise. Returns: dict: Dictionary containing 'action' and 'action_wo_noise'. """ action_dict = super().act(state) action = action_dict["action"] action_dict["action_wo_noise"] = action.clone() if is_training: noise = self.noise().to(action.device) action += noise action = torch.clip(action, self.action_limit_low, self.action_limit_high) action_dict["action"] = action return action_dict
[docs] def loss(self, state: torch.Tensor, critics: CriticEnsemble) -> torch.Tensor: """ Computes the loss for the actor by maximizing the critic's Q-value. Args: state (torch.Tensor): Batch of input states. critics (CriticEnsemble): Critic network(s). Returns: torch.Tensor: Actor loss (negative Q-value). """ act_dict = self.act(state, is_training=False) action = act_dict["action"] q_values = critics.Q(state, action) q = critics.reduce(q_values, reduce_type=self.config.model.critic.reduce) return (-q).mean()
[docs] class DDPGCritic(CriticEnsemble): """ DDPG critic module implementing Q-value estimation and Bellman target computation. Attributes: loss (PACBayesLoss): Loss function for training. gamma (float): Discount factor for future rewards. """
[docs] def __init__(self, config: "MainConfig", dim_state: int, dim_act: int) -> None: super().__init__(config, dim_state, dim_act)
[docs] @torch.no_grad() def get_bellman_target( self, reward: torch.Tensor, next_state: torch.Tensor, done: torch.Tensor, actor: DDPGActor, ) -> torch.Tensor: """ Computes the Bellman target for critic training. Args: reward (torch.Tensor): Reward signal. next_state (torch.Tensor): Next state. done (torch.Tensor): Done flag (1 if terminal, else 0). actor (DDPGActor): Actor network (used for target action). Returns: torch.Tensor: Bellman target (y). """ next_action_dict = actor.act_target(next_state) next_action = next_action_dict["action"] target_values = self.Q_t(next_state, next_action) target_value = self.reduce( target_values, reduce_type=self.config.model.critic.target_reduce ) y = reward.unsqueeze(-1) + self._gamma * target_value * (1 - done.unsqueeze(-1)) return y
[docs] class DeepDeterministicPolicyGradient(ActorCritic): """ Full DDPG agent, combining actor and critic with target networks and experience replay. Lillicrap et al. (2015): Continuous Control with Deep Reinforcement Learning """ _agent_name = "DDPG"
[docs] def __init__( self, config: "MainConfig", critic_type: type = DDPGCritic, actor_type: type = DDPGActor, ) -> None: """ Initializes DDPG agent. Args: config (MainConfig): Configuration dataclass instance. critic_type (type): Critic class type. actor_type (type): Actor class type. Returns: None """ super().__init__(config, critic_type, actor_type)
[docs] def store_transition(self, transition: dict) -> None: """ Stores a transition in the replay buffer. Resets noise if episode is done. Args: transition (dict): A dictionary containing keys like 'state', 'action', 'reward', 'done'. """ done = transition["terminated"] or transition["truncated"] if done: self.actor.noise.reset() return super().store_transition(transition)