Source code for epsie.proposals.angular

# Copyright (C) 2020  Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import numpy
from scipy import stats

from .normal import (Normal, AdaptiveSupport, SSAdaptiveSupport,
                     ATAdaptiveSupport)
from .bounded_normal import Boundaries


[docs]class Angular(Normal): r"""A propsal distribution for parameters that are cyclic over :math:`[0, 2\pi)`. To generate a jump proposal from a given point, a truncated normal distribution centered on the point and with bounds at :math:`\pm \pi` around the point. Cyclic bounds are then applied, resulting in a proposed point that is in :math:`[0, 2\pi)`. For example, if a the given point is at :math:`1.75\pi` and the jump is :math:`+0.5\pi`, then proposed point will be at :math:`0.25\pi`. Since the same proposal distribution is used regardless of the location, this is a symmetric proposal. This proposal can handle more than one parameter; however, parameters must all be independent of each other. Parameters ---------- parameters : (list of) str The names of the parameters to create proposals for. cov : (array of) float, optional The covariance to use for the underlying truncated normal distribution, in squared radians. May provide either a single float, a 1D array with length ``ndim``, or an ``ndim x ndim`` array, where ``ndim`` = the number of parameters given. If 2D array is given, the off-diagonal terms must be zero. Default is 1 sq. radian for all parameters. jump_interval : int, optional The jump interval of the proposal, the proposal only gets called every jump interval-th time. After ``jump_interval_duration`` number of proposal steps elapses the proposal will again be called on every chain iteration. By default ``jump_interval`` = 1. jump_interval_duration : int, optional The number of proposals steps during which values of ``jump_interval`` other than 1 are used. After this elapses the proposal is called on each iteration. """ name = 'angular' symmetric = True def __init__(self, parameters, cov=None, jump_interval=1, jump_interval_duration=None): super().__init__( parameters, cov=cov, jump_interval=jump_interval, jump_interval_duration=jump_interval_duration) # _halfwidth is half the width of the interval, in factors of pi self._halfwidth = 1. self._factor = numpy.pi # for speed up self._invfactor = 1./numpy.pi self._logfactor = numpy.log(self._factor) def _apply_cyclic(self, value): r"""Applies cyclic boundaries to the given value. This causes the value to lie within 0 and ``2*_halfwidth``. With the default ``_halfwidth`` = 1, this means the value will be within 0 and 2. Parameters ---------- value : float The value to apply the boundaries to. offset : float, optional Offset to apply. If none provided, will use the ``_offset`` attribute. Returns ------- float : The value remapped to be within the boundaries. """ return value % (2*self._halfwidth) def _jump(self, fromx): # we'll do something similar as the bounded normal here: draw points # using a bounded normal centered on 0 with bounds at -pi and pi. # we'll just use rejection sampling for this to_x = {} for ii, p in enumerate(self.parameters): # remove the common factor from the std std = self._std[ii] * self._invfactor newpt = self.random_generator.normal(scale=std) while abs(newpt) > self._halfwidth: newpt = self.random_generator.normal(scale=std) # add to fromx, then put back within bounds newpt += fromx[p] * self._invfactor newpt = self._apply_cyclic(newpt) # now convert back to radians to_x[p] = newpt * self._factor return to_x def _logpdf(self, xi, givenx): # convert to arrays xi = numpy.array([xi[p]*self._invfactor for p in self.parameters]) givenx = numpy.array([givenx[p]*self._invfactor for p in self.parameters]) # make sure givenx is in bounds and figure out how far it is from the # middle of the interval givenx = self._apply_cyclic(givenx) dx = self._halfwidth - givenx # apply cyclic boundaries to xi xi = self._apply_cyclic(xi) # shift xi and apply cyclic boundaries again xi = self._apply_cyclic(xi + dx) # now evaluate using a truncated normal centered on the middle of # the interval # remove the common factor from the std std = self._std * self._invfactor b = self._halfwidth/std a = -b return stats.truncnorm.logpdf(xi, a, b, loc=self._halfwidth, scale=std).sum() - self._logfactor
[docs]class AdaptiveAngular(AdaptiveSupport, Angular): r"""An angular proposoal with adaptive variance, using the algorithm from Veitch et al. See :py:class:`AdaptiveSupport` for details on the adaptation algorithm. Parameters ---------- parameters : (list of) str The names of the parameters. boundaries : dict Dictionary mapping parameters to boundaries. Boundaries must be a tuple or iterable of length two. The boundaries will be used for the prior widths in the adaptation algorithm. adaptation_duration: int The number of proposal steps over which to apply the adaptation. No more adaptation will be done once a proposal exceeds this value. jump_interval : int, optional The jump interval of the proposal, the proposal only gets called every jump interval-th time. After ``adaptation_duration`` number of proposal steps elapses the proposal will again be called on every chain iteration. By default ``jump_interval`` = 1. start_step : int, optional The proposal step to start doing the adaptation (:math:`k_0+1` in the equation below). Must be greater than zero. Default is 1. \**kwargs : All other keyword arguments are passed to :py:func:`AdaptiveSupport.setup_adaptation`. See that function for details. """ name = 'adaptive_angular' symmetric = True def __init__(self, parameters, adaptation_duration, jump_interval=1, start_step=1, **kwargs): # set the parameters, initialize the covariance matrix super().__init__( parameters, jump_interval=jump_interval, jump_interval_duration=adaptation_duration) # all parameters have the same (cyclic) boundaries boundaries = {p: Boundaries((0, 2*self._halfwidth*self._factor)) for p in self.parameters} # set up the adaptation parameters self.setup_adaptation(boundaries, adaptation_duration, start_step=start_step, **kwargs)
[docs]class SSAdaptiveAngular(SSAdaptiveSupport, Angular): r"""An angular proposoal with adaptive variance, using the algorithm from Sivia and Skilling. See :py:class:`SSAdaptiveSupport` for details on the adaptation algorithm. Parameters ---------- parameters : (list of) str The names of the parameters. boundaries : dict Dictionary mapping parameters to boundaries. Boundaries must be a tuple or iterable of length two. The boundaries will be used for the prior widths in the adaptation algorithm. jump_interval : int, optional The jump interval of the proposal, the proposal only gets called every jump interval-th time. After ``jump_interval_duration`` number of proposal steps elapses the proposal will again be called on every chain iteration. By default ``jump_interval`` = 1. jump_interval_duration : int, optional The number of proposals steps during which values of ``jump_interval`` other than 1 are used. After this elapses the proposal is called on each iteration. \**kwargs : All other keyword arguments are passed to :py:func:`SSAdaptiveSupport.setup_adaptation`. See that function for details. """ name = 'ss_adaptive_angular' symmetric = True def __init__(self, parameters, cov=None, jump_interval=1, jump_interval_duration=None, **kwargs): # set the parameters, initialize the covariance matrix super().__init__( parameters, cov=cov, jump_interval=jump_interval, jump_interval_duration=jump_interval_duration) # set up the adaptation parameters if 'max_cov' not in kwargs: # set the max std to be (1.49*2*pi) kwargs['max_cov'] = (1.49 * 2 * numpy.pi)**2 self.setup_adaptation(**kwargs)
[docs]class ATAdaptiveAngular(ATAdaptiveSupport, Angular): r"""An angular proposal with adaptive variance, using the algorithm from Andrieu & Thoms. See :py:class:`ATAdaptiveSupport` for details on the adaptation algorithm. Parameters ---------- parameters : (list of) str The names of the parameters. adaptation_duration: int The number of proposal steps over which to apply the adaptation. No more adaptation will be done once a proposal exceeds this value. start_step: int, optional The proposal step index when adaptation phase begins. target_rate: float, optional Target acceptance ratio. By default 0.234 and 0.48 for componentwise scaling. jump_interval : int, optional The jump interval of the proposal, the proposal only gets called every jump interval-th time. After ``adaptation_duration`` number of proposal steps elapses the proposal will again be called on every chain iteration. By default ``jump_interval`` = 1. """ name = 'at_adaptive_angular' symmetric = True def __init__(self, parameters, adaptation_duration, componentwise=False, start_step=1, target_rate=None, jump_interval=1): # set the parameters, initialize the covariance matrix super().__init__( parameters, jump_interval=jump_interval, jump_interval_duration=adaptation_duration) # set up the adaptation parameters self.setup_adaptation(adaptation_duration=adaptation_duration, diagonal=True, componentwise=componentwise, start_step=start_step, target_rate=target_rate)