diff --git a/pymc/distributions/timeseries.py b/pymc/distributions/timeseries.py index 6691ad2e93..c1925d2e78 100644 --- a/pymc/distributions/timeseries.py +++ b/pymc/distributions/timeseries.py @@ -12,13 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple, Union + import aesara.tensor as at import numpy as np from aesara import scan -from scipy import stats +from aesara.tensor.random.op import RandomVariable -from pymc.distributions import distribution, multivariate +from pymc.aesaraf import change_rv_size, floatX, intX +from pymc.distributions import distribution, logprob, multivariate from pymc.distributions.continuous import Flat, Normal, get_tau_sigma from pymc.distributions.shape_utils import to_tuple @@ -32,6 +35,191 @@ "MvStudentTRandomWalk", ] +from pymc.util import check_dist_not_registered + + +class GaussianRandomWalkRV(RandomVariable): + """ + GaussianRandomWalk Random Variable + """ + + name = "GaussianRandomWalk" + ndim_supp = 1 + ndims_params = [0, 0, 0, 0] + dtype = "floatX" + _print_name = ("GaussianRandomWalk", "\\operatorname{GaussianRandomWalk}") + + # TODO: Assert steps is a scalar! + + def _shape_from_params(self, dist_params, **kwargs): + steps = dist_params[3] + + # TODO: Ask ricardo why this is correct. Isn't shape different if size is passed? + return (steps + 1,) + + @classmethod + def rng_fn( + cls, + rng: np.random.RandomState, + mu: Union[np.ndarray, float], + sigma: Union[np.ndarray, float], + init: float, + steps: int, + size: Tuple[int], + ) -> np.ndarray: + """Gaussian Random Walk generator. + + The init value is drawn from the Normal distribution with the same sigma as the + innovations. + + Notes + ----- + Currently does not support custom init distribution + + Parameters + ---------- + rng: np.random.RandomState + Numpy random number generator + mu: array_like + Random walk mean + sigma: np.ndarray + Standard deviation of innovation (sigma > 0) + init: float + Initialization value for GaussianRandomWalk + steps: int + Length of random walk, must be greater than 1. Returned array will be of size+1 to + account as first value is initial value + size: int + The number of Random Walk time series generated + + Returns + ------- + ndarray + """ + + # TODO: Maybe we can remove this contraint? + if steps is None or steps < 1: + raise ValueError("Steps must be None or greater than 0") + + # If size is None then the returned series should be (1+steps,) + if size is None: + init_size = 1 + steps_size = steps + + # TODO: Ask Ricardo how size is known to be a tuple? Its int above + # If size is None then the returned series should be (size, 1+steps) + else: + init_size = (*size, 1) + steps_size = (*size, steps) + + init = np.reshape(init, init_size) + steps = rng.normal(loc=mu, scale=sigma, size=steps_size) + + grw = np.concatenate([init, steps], axis=-1) + + return np.cumsum(grw, axis=-1) + + +gaussianrandomwalk = GaussianRandomWalkRV() + + +class GaussianRandomWalk(distribution.Continuous): + r"""Random Walk with Normal innovations + + + Notes + ----- + init is currently drawn from a Normal distribution with the same sigma as the innovations + + Parameters + ---------- + mu: tensor_like of float + innovation drift, defaults to 0.0 + sigma: tensor_like of float, optional + sigma > 0, innovation standard deviation, defaults to 0.0 + init: Scalar PyMC distribution + Scalar distribution of the initial value, created with the `.dist()` API. Defaults to + Normal with same `mu` and `sigma` as the GaussianRandomWalk + steps: int + Number of steps in Gaussian Random Walks + size: int + Number of independent Gaussian Random Walks + """ + + rv_op = gaussianrandomwalk + + def __new__(cls, name, mu=0.0, sigma=1.0, init=None, steps=None, **kwargs): + check_dist_not_registered(init) + return super().__new__(cls, name, mu, sigma, init, steps, **kwargs) + + @classmethod + def dist( + cls, mu=0.0, sigma=1.0, init=None, steps=None, size=None, shape=None, **kwargs + ) -> RandomVariable: + + mu = at.as_tensor_variable(floatX(mu)) + sigma = at.as_tensor_variable(floatX(sigma)) + + if steps is None: + # We can infer steps from the shape, if it was given + if shape is not None: + steps = to_tuple(shape)[-1] - 1 + else: + # TODO: Raise ValueError? + steps = 1 + + steps = at.as_tensor_variable(intX(steps)) + + if init is None: + init = Normal.dist(mu, sigma, size=size) + else: + if not ( + isinstance(init, at.TensorVariable) + and init.owner is not None + and isinstance(init.owner.op, RandomVariable) + and init.owner.op.ndim_supp == 0 + ): + raise TypeError("init must be a scalar distribution variable") + if size is not None or shape is not None: + init = change_rv_size(init, to_tuple(size or shape)) + else: + # If not explicit, size is determined by the shape of mu and sigma + mu_ = at.broadcast_arrays(mu, sigma)[0] + init = change_rv_size(init, mu_.shape) + + return super().dist([mu, sigma, init, steps], size=size, shape=shape, **kwargs) + + def logp( + value: at.Variable, + mu: at.Variable, + sigma: at.Variable, + init: at.Variable, + steps: at.Variable, + ) -> at.TensorVariable: + """Calculate log-probability of Gaussian Random Walk distribution at specified value. + + Parameters + ---------- + value: at.Variable, + mu: at.Variable, + sigma: at.Variable, + init: at.Variable, Not used + steps: at.Variable, Not used + + Returns + ------- + TensorVariable + """ + + # Calculate initialization logp + init_logp = logprob.logp(init, value[..., 0]) + + # Make time series stationary around the mean value + stationary_series = at.diff(value, axis=-1) + series_logp = logprob.logp(Normal.dist(mu, sigma), stationary_series) + + return init_logp + series_logp.sum(axis=-1) + class AR1(distribution.Continuous): """ @@ -176,127 +364,6 @@ def logp(self, value): return at.sum(innov_like) + at.sum(init_like) -class GaussianRandomWalk(distribution.Continuous): - r"""Random Walk with Normal innovations - - Note that this is mainly a user-friendly wrapper to enable an easier specification - of GRW. You are not restricted to use only Normal innovations but can use any - distribution: just use `aesara.tensor.cumsum()` to create the random walk behavior. - - Parameters - ---------- - mu : tensor_like of float, default 0 - innovation drift, defaults to 0.0 - For vector valued `mu`, first dimension must match shape of the random walk, and - the first element will be discarded (since there is no innovation in the first timestep) - sigma : tensor_like of float, optional - `sigma` > 0, innovation standard deviation (only required if `tau` is not specified) - For vector valued `sigma`, first dimension must match shape of the random walk, and - the first element will be discarded (since there is no innovation in the first timestep) - tau : tensor_like of float, optional - `tau` > 0, innovation precision (only required if `sigma` is not specified) - For vector valued `tau`, first dimension must match shape of the random walk, and - the first element will be discarded (since there is no innovation in the first timestep) - init : pymc.Distribution, optional - distribution for initial value (Defaults to Flat()) - """ - - def __init__(self, tau=None, init=None, sigma=None, mu=0.0, sd=None, *args, **kwargs): - kwargs.setdefault("shape", 1) - super().__init__(*args, **kwargs) - if sum(self.shape) == 0: - raise TypeError("GaussianRandomWalk must be supplied a non-zero shape argument!") - if sd is not None: - sigma = sd - tau, sigma = get_tau_sigma(tau=tau, sigma=sigma) - self.tau = at.as_tensor_variable(tau) - sigma = at.as_tensor_variable(sigma) - self.sigma = self.sd = sigma - self.mu = at.as_tensor_variable(mu) - self.init = init or Flat.dist() - self.mean = at.as_tensor_variable(0.0) - - def _mu_and_sigma(self, mu, sigma): - """Helper to get mu and sigma if they are high dimensional.""" - if sigma.ndim > 0: - sigma = sigma[1:] - if mu.ndim > 0: - mu = mu[1:] - return mu, sigma - - def logp(self, x): - """ - Calculate log-probability of Gaussian Random Walk distribution at specified value. - - Parameters - ---------- - x : numeric - Value for which log-probability is calculated. - - Returns - ------- - TensorVariable - """ - if x.ndim > 0: - x_im1 = x[:-1] - x_i = x[1:] - mu, sigma = self._mu_and_sigma(self.mu, self.sigma) - innov_like = Normal.dist(mu=x_im1 + mu, sigma=sigma).logp(x_i) - return self.init.logp(x[0]) + at.sum(innov_like) - return self.init.logp(x) - - def random(self, point=None, size=None): - """Draw random values from GaussianRandomWalk. - - Parameters - ---------- - point : dict or Point, optional - Dict of variable values on which random values are to be - conditioned (uses default point if not specified). - size : int, optional - Desired size of random sample (returns one sample if not - specified). - - Returns - ------- - array - """ - # sigma, mu = distribution.draw_values([self.sigma, self.mu], point=point, size=size) - # return distribution.generate_samples( - # self._random, - # sigma=sigma, - # mu=mu, - # size=size, - # dist_shape=self.shape, - # not_broadcast_kwargs={"sample_shape": to_tuple(size)}, - # ) - pass - - def _random(self, sigma, mu, size, sample_shape): - """Implement a Gaussian random walk as a cumulative sum of normals. - axis = len(size) - 1 denotes the axis along which cumulative sum would be calculated. - This might need to be corrected in future when issue #4010 is fixed. - """ - if size[len(sample_shape)] == sample_shape: - axis = len(sample_shape) - else: - axis = len(size) - 1 - rv = stats.norm(mu, sigma) - data = rv.rvs(size).cumsum(axis=axis) - data = np.array(data) - - # the following lines center the random walk to start at the origin. - if len(data.shape) > 1: - for i in range(data.shape[0]): - data[i] = data[i] - data[i][0] - else: - data = data - data[0] - return data - - def _distr_parameters_for_repr(self): - return ["mu", "sigma"] - - class GARCH11(distribution.Continuous): r""" GARCH(1,1) with Normal innovations. The model is specified by diff --git a/pymc/tests/test_distributions.py b/pymc/tests/test_distributions.py index 67a9137620..fc427726b9 100644 --- a/pymc/tests/test_distributions.py +++ b/pymc/tests/test_distributions.py @@ -2616,6 +2616,16 @@ def test_interpolated_transform(self, transform): assert not np.isfinite(m.compile_logp()({"x": -1.0})) assert not np.isfinite(m.compile_logp()({"x": 11.0})) + @pytest.mark.skip("Unsure how to use this test fixture") + def test_grw(self): + self.check_logp( + pm.GaussianRandomWalk, + R, + {"mu": R, "sigma": Rplus, "steps": Nat}, + lambda value, mu, sigma: sp.norm.logpdf(value, mu, sigma).cumsum().sum(), + decimal=select_by_precision(float64=6, float32=1), + ) + class TestBound: """Tests for pm.Bound distribution""" diff --git a/pymc/tests/test_distributions_random.py b/pymc/tests/test_distributions_random.py index c400d8e57b..514aff2ca5 100644 --- a/pymc/tests/test_distributions_random.py +++ b/pymc/tests/test_distributions_random.py @@ -157,109 +157,6 @@ def pymc_random_discrete( assert p > alpha, str(pt) -class BaseTestCases: - class BaseTestCase(SeededTest): - shape = 5 - # the following are the default values of the distribution that take effect - # when the parametrized shape/size in the test case is None. - # For every distribution that defaults to non-scalar shapes they must be - # specified by the inheriting Test class. example: TestGaussianRandomWalk - default_shape = () - default_size = () - - def setup_method(self, *args, **kwargs): - super().setup_method(*args, **kwargs) - self.model = pm.Model() - - def get_random_variable(self, shape, with_vector_params=False, name=None): - """Creates a RandomVariable of the parametrized distribution.""" - if with_vector_params: - params = { - key: value * np.ones(self.shape, dtype=np.dtype(type(value))) - for key, value in self.params.items() - } - else: - params = self.params - if name is None: - name = self.distribution.__name__ - with self.model: - try: - if shape is None: - # in the test case parametrization "None" means "no specified (default)" - return self.distribution(name, transform=None, **params) - else: - ndim_supp = self.distribution.rv_op.ndim_supp - if ndim_supp == 0: - size = shape - else: - size = shape[:-ndim_supp] - return self.distribution(name, size=size, transform=None, **params) - except TypeError: - if np.sum(np.atleast_1d(shape)) == 0: - pytest.skip("Timeseries must have positive shape") - raise - - @staticmethod - def sample_random_variable(random_variable, size): - """Draws samples from a RandomVariable.""" - if size: - random_variable = change_rv_size(random_variable, size, expand=True) - return random_variable.eval() - - @pytest.mark.parametrize("size", [None, (), 1, (1,), 5, (4, 5)], ids=str) - @pytest.mark.parametrize("shape", [None, ()], ids=str) - def test_scalar_distribution_shape(self, shape, size): - """Draws samples of different [size] from a scalar [shape] RV.""" - rv = self.get_random_variable(shape) - exp_shape = self.default_shape if shape is None else tuple(np.atleast_1d(shape)) - exp_size = self.default_size if size is None else tuple(np.atleast_1d(size)) - expected = exp_size + exp_shape - actual = np.shape(self.sample_random_variable(rv, size)) - assert ( - expected == actual - ), f"Sample size {size} from {shape}-shaped RV had shape {actual}. Expected: {expected}" - # check that negative size raises an error - with pytest.raises(ValueError): - self.sample_random_variable(rv, size=-2) - with pytest.raises(ValueError): - self.sample_random_variable(rv, size=(3, -2)) - - @pytest.mark.parametrize("size", [None, ()], ids=str) - @pytest.mark.parametrize( - "shape", [None, (), (1,), (1, 1), (1, 2), (10, 11, 1), (9, 10, 2)], ids=str - ) - def test_scalar_sample_shape(self, shape, size): - """Draws samples of scalar [size] from a [shape] RV.""" - rv = self.get_random_variable(shape) - exp_shape = self.default_shape if shape is None else tuple(np.atleast_1d(shape)) - exp_size = self.default_size if size is None else tuple(np.atleast_1d(size)) - expected = exp_size + exp_shape - actual = np.shape(self.sample_random_variable(rv, size)) - assert ( - expected == actual - ), f"Sample size {size} from {shape}-shaped RV had shape {actual}. Expected: {expected}" - - @pytest.mark.parametrize("size", [None, 3, (4, 5)], ids=str) - @pytest.mark.parametrize("shape", [None, 1, (10, 11, 1)], ids=str) - def test_vector_params(self, shape, size): - shape = self.shape - rv = self.get_random_variable(shape, with_vector_params=True) - exp_shape = self.default_shape if shape is None else tuple(np.atleast_1d(shape)) - exp_size = self.default_size if size is None else tuple(np.atleast_1d(size)) - expected = exp_size + exp_shape - actual = np.shape(self.sample_random_variable(rv, size)) - assert ( - expected == actual - ), f"Sample size {size} from {shape}-shaped RV had shape {actual}. Expected: {expected}" - - -@pytest.mark.xfail(reason="This distribution has not been refactored for v4") -class TestGaussianRandomWalk(BaseTestCases.BaseTestCase): - distribution = pm.GaussianRandomWalk - params = {"mu": 1.0, "sigma": 1.0} - default_shape = (1,) - - class BaseTestDistributionRandom(SeededTest): """ This class provides a base for tests that new RandomVariables are correctly @@ -418,19 +315,29 @@ def seeded_numpy_distribution_builder(dist_name: str) -> Callable: ) -class TestFlat(BaseTestDistributionRandom): - pymc_dist = pm.Flat - pymc_dist_params = {} - expected_rv_op_params = {} +class TestGaussianRandomWalk(BaseTestDistributionRandom): + pymc_dist = pm.GaussianRandomWalk + pymc_dist_params = {"mu": 1.0, "steps": 2, "sigma": 3, "init": 1} + expected_rv_op_params = {"mu": 1.0, "sigma": 3, "init": 1, "steps": 2} + # reference_dist_params = {"b": 1.0, "kappa": 1.0, "mu": 0.0} + checks_to_run = [ "check_pymc_params_match_rv_op", "check_rv_inferred_size", - "check_not_implemented", ] def check_rv_inferred_size(self): - sizes_to_check = [None, (), 1, (1,), 5, (4, 5), (2, 4, 2)] - sizes_expected = [(), (), (1,), (1,), (5,), (4, 5), (2, 4, 2)] + steps = self.pymc_dist_params["steps"] + sizes_to_check = [ + None, + (), + 1, + (1,), + ] + sizes_expected = [(steps + 1,), (steps + 1,), (1, steps + 1), (1, steps + 1)] + # sizes_to_check = [None, (), 1, (1,), 5, (4, 5), (2, 4, 2)] + # sizes_expected = [(), (), (1,), (1,), (5,), (4, 5), (2, 4, 2)] + for size, expected in zip(sizes_to_check, sizes_expected): pymc_rv = self.pymc_dist.dist(**self.pymc_dist_params, size=size) expected_symbolic = tuple(pymc_rv.shape.eval()) diff --git a/pymc/tests/test_distributions_timeseries.py b/pymc/tests/test_distributions_timeseries.py index 3736b0134b..c059fdb10a 100644 --- a/pymc/tests/test_distributions_timeseries.py +++ b/pymc/tests/test_distributions_timeseries.py @@ -12,20 +12,125 @@ # See the License for the specific language governing permissions and # limitations under the License. +import aesara.tensor as at import numpy as np import pytest +from scipy import stats + +import pymc as pm + from pymc.aesaraf import floatX from pymc.distributions.continuous import Flat, Normal -from pymc.distributions.timeseries import AR, AR1, GARCH11, EulerMaruyama +from pymc.distributions.timeseries import ( + AR, + AR1, + GARCH11, + EulerMaruyama, + GaussianRandomWalk, + gaussianrandomwalk, +) from pymc.model import Model from pymc.sampling import sample, sample_posterior_predictive from pymc.tests.helpers import select_by_precision -# pytestmark = pytest.mark.usefixtures("seeded_test") -pytestmark = pytest.mark.xfail(reason="Timeseries not refactored") + +class TestGaussianRandomWalk: + @pytest.mark.parametrize( + "kwargs,expected", + [ + ({"steps": 5}, (6,)), + ({"size": 1}, (5,)), + ({"size": 2}, (5,)), + # implied dims are not working + pytest.param({"mu": [0, 0]}, (2, 5), marks=pytest.mark.xfail), + ], + ) + def test_grw_rv_op_shape(self, kwargs, expected): + """Basic test for GRW RV op""" + default_kwargs = dict(init=1, mu=3, sd=0.0000001, steps=4, size=None) + + combined_kwargs = {**default_kwargs, **kwargs} + grw = gaussianrandomwalk( + combined_kwargs["mu"], + combined_kwargs["sd"], + combined_kwargs["init"], + combined_kwargs["steps"], + ).eval() + + assert grw.shape == expected + + def test_grw_logp(self): + # `at.diff` is currently broken with constants + test_vals = [0, 1, 2] + vals = at.vector("vals") + mu = 1 + sigma = 1 + init = pm.Normal.dist(0, sigma) + + with pm.Model(): + grw = GaussianRandomWalk("grw", mu, sigma, init=init, steps=2) + + logp = pm.logp(grw, vals) + logp_eval = logp.eval({vals: test_vals}) + + logp_reference = ( + stats.norm(0, sigma).logpdf(test_vals[0]) + + stats.norm(mu, sigma).logpdf(np.diff(test_vals)).sum() + ) + + np.testing.assert_almost_equal(logp_eval, logp_reference) + + def test_grw_inference(self): + mu, sigma, steps = 2, 1, 10000 + obs = np.concatenate([[0], np.random.normal(mu, sigma, size=steps)]).cumsum() + + with pm.Model(): + _mu = pm.Uniform("mu", -10, 10) + _sigma = pm.Uniform("sigma", 0, 10) + # Workaround for bug in `at.diff` when data is constant + obs_data = pm.MutableData("obs_data", obs) + grw = GaussianRandomWalk("grw", _mu, _sigma, steps=steps, observed=obs_data) + + trace = pm.sample() + + recovered_mu = trace.posterior["mu"].mean() + recovered_sigma = trace.posterior["sigma"].mean() + np.testing.assert_allclose([mu, sigma], [recovered_mu, recovered_sigma], atol=0.2) + + @pytest.mark.parametrize( + "steps,size,expected", + ( + (1, None, (2,)), + (2, 1, (1, 3)), + (2, 5, (5, 3)), + (10, 5, (5, 11)), + ), + ) + def test_grw_shape(self, steps, size, expected): + grw_dist = pm.GaussianRandomWalk.dist(mu=0, sigma=1, steps=steps, size=size) + expected_symbolic = tuple(grw_dist.shape.eval()) + assert expected_symbolic == expected + + @pytest.mark.parametrize("size", (None, (1, 2), (10, 2), (3, 100, 2))) + def test_init_automatically_resized(self, size): + x = GaussianRandomWalk.dist(mu=[0, 1], init=pm.Normal.dist(), size=size) + init = x.owner.inputs[-2] + assert init.eval().shape == size if size is not None else (2,) + + x = GaussianRandomWalk.dist(mu=[0, 1], init=pm.Normal.dist(size=5), shape=size) + init = x.owner.inputs[-2] + assert init.eval().shape == size if size is not None else (2,) + + @pytest.mark.parametrize("shape", (None, (6,), (3, 6))) + def test_inferred_steps_from_shape(self, shape): + with pm.Model() as m: + x = GaussianRandomWalk("x", shape=shape) + steps = x.owner.inputs[-1] + assert steps.eval() == 5 if shape is not None else 1 +@pytest.mark.xfail(reason="Timeseries not refactored") def test_AR(): # AR1 data = np.array([0.3, 1, 2, 3, 4]) @@ -63,6 +168,7 @@ def test_AR(): np.testing.assert_allclose(ar_like, reg_like) +@pytest.mark.xfail(reason="Timeseries not refactored") def test_AR_nd(): # AR2 multidimensional p, T, n = 3, 100, 5 @@ -82,6 +188,7 @@ def test_AR_nd(): ) +@pytest.mark.xfail(reason="Timeseries not refactored") def test_GARCH11(): # test data ~ N(0, 1) data = np.array( @@ -142,6 +249,7 @@ def _gen_sde_path(sde, pars, dt, n, x0): return np.array(xs) +@pytest.mark.xfail(reason="Timeseries not refactored") def test_linear(): lam = -0.78 sig2 = 5e-3