Source code for coax.utils._action_noise

import numpy as onp


__all__ = (
    'OrnsteinUhlenbeckNoise',
)


[docs]class OrnsteinUhlenbeckNoise: r""" Add `Ornstein-Uhlenbeck <https://en.wikipedia.org/wiki/Ornstein-Uhlenbeck_process>`_ noise to continuous actions. .. math:: A_t\ \mapsto\ \widetilde{A}_t = A_t + X_t As a side effect, the Ornstein-Uhlenbeck noise :math:`X_t` is updated with every function call: .. math:: X_t\ =\ X_{t-1} - \theta\,\left(X_{t-1} - \mu\right) + \sigma\,\varepsilon where :math:`\varepsilon` is white noise, i.e. :math:`\varepsilon\sim\mathcal{N}(0,\mathbb{I})`. The authors of the `DDPG paper <https://arxiv.org/abs/1509.02971>`_ chose to use Ornstein-Uhlenbeck noise "*[...] in order to explore well in physical environments that have momentum.*" Parameters ---------- mu : float or ndarray, optional The mean :math:`\mu` towards which the Ornstein-Uhlenbeck process should revert; must be broadcastable with the input actions. sigma : positive float or ndarray, optional The spread of the noise :math:`\sigma>0` of the Ornstein-Uhlenbeck process; must be broadcastable with the input actions. theta : positive float or ndarray, optional The (element-wise) dissipation rate :math:`\theta>0` of the Ornstein-Uhlenbeck process; must be broadcastable with the input actions. min_value : float or ndarray, optional The lower bound used for clipping the output action; must be broadcastable with the input actions. max_value : float or ndarray, optional The upper bound used for clipping the output action; must be broadcastable with the input actions. random_seed : int, optional Sets the random state to get reproducible results. """ def __init__( self, mu=0., sigma=1., theta=0.15, min_value=None, max_value=None, random_seed=None): self.mu = mu self.theta = theta self.sigma = sigma self.min_value = -1e15 if min_value is None else min_value self.max_value = 1e15 if max_value is None else max_value self.random_seed = random_seed self.rnd = onp.random.RandomState(self.random_seed) self.reset()
[docs] def reset(self): r""" Reset the Ornstein-Uhlenbeck process. """ self._noise = None
[docs] def __call__(self, a): r""" Add some Ornstein-Uhlenbeck to a continuous action. Parameters ---------- a : action A single action :math:`A_t`. Returns ------- a_noisy : action An action with noise added :math:`\widetilde{A}_t = A_t + X_t`. """ a = onp.asarray(a) if self._noise is None: self._noise = onp.ones_like(a) * self.mu white_noise = onp.asarray(self.rnd.randn(*a.shape), dtype=a.dtype) self._noise += self.theta * (self.mu - self._noise) + self.sigma * white_noise self._noise = onp.clip(self._noise, self.min_value, self.max_value) return a + self._noise