"""Module for univariate densities (see also :mod:`ddl.independent`)."""
from __future__ import division, print_function
import logging
import warnings
import numpy as np
import scipy.stats
from sklearn.base import BaseEstimator
from sklearn.exceptions import DataConversionWarning, NotFittedError
from sklearn.utils.validation import check_array, check_is_fitted, check_random_state, column_or_1d
from .base import BoundaryWarning, ScoreMixin
# noinspection PyProtectedMember
from .utils import (_DEFAULT_SUPPORT, check_X_in_interval, make_finite, make_interior,
make_interior_probability, make_positive)
logger = logging.getLogger(__name__)
SCIPY_RV_NON_NEGATIVE = ['expon', 'chi']
SCIPY_RV_STRICLTY_POSITIVE = ['gamma', 'invgamma', 'chi2', 'lognorm']
SCIPY_RV_UNIT_SUPPORT = ['rv_histgoram', 'uniform', 'beta']
def _check_univariate_X(X, support, inverse=False):
X = check_array(X, ensure_2d=True) # ensure_2d=True is default but just making explicit
# Check that X is a column vector but first ravel because check_estimator passes
# a matrix to fit
if X.shape[1] > 1:
warnings.warn(DataConversionWarning(
'Input should be column vector with shape (n, 1) but found matrix. Converting to '
'column vector via `np.mean(X, axis=1).reshape((-1, 1))`. '
'Ideally, this would raise an error but in order to pass the checks in '
'`sklearn.utils.check_estimator`, we convert the data rather than raise an error. '
))
X = np.mean(X, axis=1).reshape((-1, 1))
# Check that values are within support or range(inverse)
if inverse:
X = check_X_in_interval(X, np.array([0, 1]))
else:
X = check_X_in_interval(X, support)
return np.array(X)
[docs]class ScipyUnivariateDensity(BaseEstimator, ScoreMixin):
"""Density estimator via random variables defined in :mod:`scipy.stats`.
A univariate density estimator that can fit any distribution defined in
:mod:`scipy.stats`. This includes common distributions such as Gaussian,
laplace, beta, gamma and log-normal distributions but also many other
distributions as well.
Note that this density estimator is strictly univariate and therefore
expects the input data to be a single array with shape (n_samples, 1).
Parameters
----------
scipy_rv : object or None, default=None
Default random variable is a Gaussian (i.e.
:class:`scipy.stats.norm`) if `scipy_rv=None`. Other examples include
:class:`scipy.stats.gamma` or :class:`scipy.stats.beta`.
scipy_fit_kwargs : dict or None, optional
Keyword arguments as a dictionary for the fit function of the scipy
random variable (e.g. ``dict(floc=0, fscale=1)`` to fix the location
and scale parameters to 0 and 1 respectively). Defaults are
different depending on `scipy_rv` parameter. For example for the
`scipy.stats.beta` we set `floc=0` and `fscale=1`, i.e. fix the
location and scale of the beta distribution.
Attributes
----------
rv_ : object
Frozen :mod:`scipy.stats` random variable object. Fitted parameters
of distribution can be accessed via `args` property.
See Also
--------
scipy.stats
"""
[docs] def __init__(self, scipy_rv=None, scipy_fit_kwargs=None):
self.scipy_rv = scipy_rv
self.scipy_fit_kwargs = scipy_fit_kwargs
[docs] def fit(self, X, y=None, **fit_params):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, 1)
Training data, where `n_samples` is the number of samples. Note
that the shape must have a second dimension of 1 since this is a
univariate density estimator.
y : None, default=None
Not used in the fitting process but kept for compatibility.
fit_params : dict, optional
Optional extra fit parameters.
Returns
-------
self : estimator
Returns the instance itself.
"""
def _check_scipy_kwargs(kwargs, _scipy_rv):
if kwargs is None:
if self._is_special(_scipy_rv, SCIPY_RV_UNIT_SUPPORT):
return dict(floc=0, fscale=1)
elif self._is_special(_scipy_rv,
SCIPY_RV_NON_NEGATIVE + SCIPY_RV_STRICLTY_POSITIVE):
return dict(floc=0)
else:
return {}
elif isinstance(kwargs, dict):
return kwargs
else:
raise ValueError('`scipy_fit_kwargs` should be either None or a `dict` object.')
# Input validation
scipy_rv = self._get_scipy_rv_or_default()
scipy_fit_kwargs = _check_scipy_kwargs(self.scipy_fit_kwargs, scipy_rv)
X = self._check_X(X)
# MLE fit based on scipy implementation
if scipy_rv.numargs == 0 and 'floc' in scipy_fit_kwargs and 'fscale' in scipy_fit_kwargs:
params = (scipy_fit_kwargs['floc'], scipy_fit_kwargs['fscale'])
else:
try:
params = scipy_rv.fit(X.ravel(), **scipy_fit_kwargs)
except RuntimeError as e:
warnings.warn('Unable to fit to data using scipy_rv so attempting to use default '
'parameters for the distribution. Original error:\n%s' % str(e))
params = self._get_default_params(scipy_rv)
except ValueError:
# warnings.warn(
# 'Trying to use fixed parameters instead. Original error:\n%s' % str(e))
# try to extract fixed parameters in a certain order
params = []
for k in ['fa', 'f0', 'fb', 'f1', 'floc', 'fscale']:
try:
params.append(scipy_fit_kwargs.pop(k))
except KeyError:
pass
# Avoid degenerate case when scale = 0
if len(params) >= 2 and params[-1] == 0:
params = list(params)
if isinstance(X.dtype, np.floating):
params[-1] = np.finfo(X.dtype).eps
else:
params[-1] = 1 # Integer types
params = tuple(params)
# Create "frozen" version of random variable so that parameters do not need to be
# specified
self.rv_ = scipy_rv(*params)
# Check for a fit error in the domain of the parameters
try:
self.rv_.rvs(1)
except ValueError:
warnings.warn('Parameters discovered by fit are not in the domain of the '
'parameters so attempting to use default parameters for the '
'distribution.')
self.rv_ = scipy_rv(*self._get_default_params(scipy_rv))
return self
[docs] @classmethod
def create_fitted(cls, scipy_rv_params=None, **kwargs):
"""Create fitted density.
Parameters
----------
scipy_rv : object or None, default=None
Default random variable is a Gaussian (i.e.
:class:`scipy.stats.norm`) if `scipy_rv=None`. Other examples include
:class:`scipy.stats.gamma` or :class:`scipy.stats.beta`.
scipy_rv_params : dict, optional
Parameters to pass to scipy_rv when creating frozen random variable.
Default parameters have been set for various distributions.
**kwargs
Other parameters to pass to object constructor.
Returns
-------
fitted_density : Density
Fitted density.
"""
density = cls(**kwargs)
# Get default if scipy_rv=None
scipy_rv = density._get_scipy_rv_or_default()
# Fit scipy random variable
if scipy_rv_params is None:
try:
params = cls._get_default_params(scipy_rv)
except NotImplementedError:
params = []
rv = scipy_rv(*params)
else:
rv = scipy_rv(**scipy_rv_params)
density.rv_ = rv
return density
@classmethod
def _get_default_params(cls, scipy_rv):
if cls._is_special(scipy_rv, ['beta']):
return [1, 1]
elif cls._is_special(scipy_rv, ['uniform', 'norm', 'expon', 'lognorm']):
return [] # Empty since no parameters needed
else:
raise NotImplementedError('The distribution given by the `scipy_rv = %s` does not '
'have any associated default parameters.'
% str(scipy_rv))
[docs] def sample(self, n_samples=1, random_state=None):
"""Generate random samples from this density/destructor.
Parameters
----------
n_samples : int, default=1
Number of samples to generate. Defaults to 1.
random_state : int, RandomState instance or None, optional (default=None)
If int, `random_state` is the seed used by the random number
generator; If :class:`~numpy.random.RandomState` instance,
`random_state` is the random number generator; If None, the random
number generator is the :class:`~numpy.random.RandomState` instance
used by :mod:`numpy.random`.
Returns
-------
X : array, shape (n_samples, n_features)
Randomly generated sample.
"""
self._check_is_fitted()
rng = check_random_state(random_state)
return np.array(self.rv_.rvs(size=n_samples, random_state=rng)).reshape((n_samples, 1))
[docs] def score_samples(self, X, y=None):
"""Compute log-likelihood (or log(det(Jacobian))) for each sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples is the number of samples and n_features
is the number of features.
y : None, default=None
Not used but kept for compatibility.
Returns
-------
log_likelihood : array, shape (n_samples,)
Log likelihood of each data point in X.
"""
self._check_is_fitted()
X = self._check_X(X)
return self.rv_.logpdf(X.ravel()).reshape((-1, 1))
[docs] def cdf(self, X, y=None):
"""[Placeholder].
Parameters
----------
X :
y :
Returns
-------
obj : object
"""
self._check_is_fitted()
X = self._check_X(X)
return self.rv_.cdf(X.ravel()).reshape((-1, 1))
[docs] def inverse_cdf(self, X, y=None):
"""[Placeholder].
Parameters
----------
X :
y :
Returns
-------
obj : object
"""
self._check_is_fitted()
X = self._check_X(X, inverse=True)
return self.rv_.ppf(X.ravel()).reshape((-1, 1))
[docs] def get_support(self):
"""Get the support of this density (i.e. the positive density region).
Returns
-------
support : array-like, shape (2,) or shape (n_features, 2)
If shape is (2, ), then ``support[0]`` is the minimum and
``support[1]`` is the maximum for all features. If shape is
(`n_features`, 2), then each feature's support (which could
be different for each feature) is given similar to the first
case.
"""
# Assumes density is univariate
try:
self._check_is_fitted()
except NotFittedError:
# Get upper and lower bounds of support from scipy random variable properties
if self.scipy_rv is None:
default_rv = ScipyUnivariateDensity._get_default_scipy_rv()
return np.array([[default_rv.a, default_rv.b]])
else:
return np.array([[self.scipy_rv.a, self.scipy_rv.b]])
else:
# Scale and shift if fitted
try:
loc = self.rv_.args[-2]
except IndexError:
try:
loc = self.rv_.args[-1]
except IndexError:
loc = 0
scale = 1
else:
scale = self.rv_.args[-1]
if scale == 0: # Handle special degenerate case to avoid nans in domain
scale += np.finfo(float).eps
return loc + scale * np.array([[self.rv_.a, self.rv_.b]])
def _check_X(self, X, inverse=False):
# Check that X is univariate or warn otherwise
X = _check_univariate_X(X, self.get_support(), inverse=inverse)
scipy_rv = self._get_scipy_rv_or_default()
# Move away from support/domain boundaries if necessary
if inverse and (np.any(X <= 0) or np.any(X >= 1)):
warnings.warn(BoundaryWarning(
'Some probability values (input to inverse functions) are either 0 or 1. Bounding '
'values away from 0 or 1 to avoid infinities in output. For example, the inverse '
'cdf of a Gaussian at 0 will yield `-np.inf`.'))
X = make_interior_probability(X)
if self._is_special(scipy_rv, SCIPY_RV_UNIT_SUPPORT) and (np.any(X <= 0) or np.any(X >= 1)):
warnings.warn(BoundaryWarning(
'Input to random variable function has at least one value either 0 or 1 '
'but all input should be in (0,1) exclusive. Bounding values away from 0 or 1 by '
'eps=%g'))
X = make_interior_probability(X)
if self._is_special(scipy_rv, SCIPY_RV_STRICLTY_POSITIVE) and np.any(X <= 0):
warnings.warn(BoundaryWarning(
'Input to random variable function has at least one value less than or equal to '
'zero but all input should be strictly positive. Making all input greater than or '
'equal to some small positive constant.'))
X = make_positive(X)
if np.any(np.isinf(X)):
warnings.warn(BoundaryWarning(
'Input to random variable function has at least one value that is `np.inf` or '
'`-np.inf`. Making all input finite via a very large constant.'))
X = make_finite(X)
return X
def _get_scipy_rv_or_default(self):
if self.scipy_rv is None:
return ScipyUnivariateDensity._get_default_scipy_rv()
else:
return self.scipy_rv
@staticmethod
def _get_default_scipy_rv():
return scipy.stats.norm
@staticmethod
def _is_special(scipy_rv, scipy_str_set):
# Modify string set for special case of rv_histogram
scipy_str_set = [
'.' + dstr + '_gen' if dstr != 'rv_histogram' else '.' + dstr
for dstr in scipy_str_set
]
return np.any([
dstr in str(scipy_rv)
for dstr in scipy_str_set
])
def _check_is_fitted(self):
check_is_fitted(self, ['rv_'])
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning)
STANDARD_NORMAL_DENSITY = ScipyUnivariateDensity(
scipy_rv=scipy.stats.norm,
scipy_fit_kwargs=dict(floc=0, fscale=1)
).fit(np.array([[0]]))
[docs]class HistogramUnivariateDensity(ScipyUnivariateDensity):
"""Histogram univariate density estimator.
Parameters
----------
bins : int or sequence of scalars or str, optional
Same ase the parameter of :func:`numpy.histogram`. Copied from numpy
documentation:
If `bins` is an int, it defines the number of equal-width
bins in the given range (10, by default). If `bins` is a
sequence, it defines the bin edges, including the rightmost
edge, allowing for non-uniform bin widths.
.. versionadded:: 1.11.0
If `bins` is a string from the list below, `histogram` will use
the method chosen to calculate the optimal bin width and
consequently the number of bins (see `Notes` for more detail on
the estimators) from the data that falls within the requested
range. While the bin width will be optimal for the actual data
in the range, the number of bins will be computed to fill the
entire range, including the empty portions. For visualisation,
using the 'auto' option is suggested. Weighted data is not
supported for automated bin size selection.
'auto'
Maximum of the 'sturges' and 'fd' estimators. Provides good
all around performance.
'fd' (Freedman Diaconis Estimator)
Robust (resilient to outliers) estimator that takes into
account data variability and data size.
'doane'
An improved version of Sturges' estimator that works better
with non-normal datasets.
'scott'
Less robust estimator that that takes into account data
variability and data size.
'rice'
Estimator does not take variability into account, only data
size. Commonly overestimates number of bins required.
'sturges'
R's default method, only accounts for data size. Only
optimal for gaussian data and underestimates number of bins
for large non-gaussian datasets.
'sqrt'
Square root (of data size) estimator, used by Excel and
other programs for its speed and simplicity.
bounds : float or array-like of shape (2,)
Specification for the finite bounds of the histogram. Bounds can be
percentage extension or a specified interval [a,b].
alpha : float
Regularization parameter corresponding to the number of
pseudo-counts to add to each bin of the histogram. This can be seen
as putting a Dirichlet prior on the empirical bin counts with
Dirichlet parameter alpha.
Attributes
----------
bin_edges_ : array of shape (n_bins + 1,)
Edges of bins.
pdf_bin_ : array of shape (n_bins,)
pdf values of bins. Note that histograms have a constant pdf
value within each bin.
cdf_bin_ : array of shape (n_bins + 1,)
cdf values at bin edges. Used with linear interpolation to
compute pdf, cdf and inverse cdf.
"""
[docs] def __init__(self, bins=None, bounds=0.1, alpha=1e-6):
self.bins = bins
self.bounds = bounds
self.alpha = alpha
[docs] def fit(self, X, y=None, histogram_params=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, 1)
Training data, where `n_samples` is the number of samples. Note
that the shape must have a second dimension of 1 since this is a
univariate density estimator.
y : None, default=None
Not used in the fitting process but kept for compatibility.
histogram_params : list or tuple of size 2
Tuple or list of values of bins and bin edges. For example,
from :func:`numpy.histogram`.
Returns
-------
self : estimator
Returns the instance itself.
"""
if X is not None and histogram_params is not None:
raise ValueError('Either X or histogram_params can be provided (i.e. not None) '
'but not both.')
if histogram_params is not None:
hist, bin_edges = histogram_params
warnings.warn(DeprecationWarning('Class factory method `create_fitted` should '
'be used instead of passing histogram_params '
'to fit.'))
else:
X = self._check_X(X)
# Get percent extension but do not modify bounds
bounds = self._check_bounds(X)
bins = self.bins if self.bins is not None else 'auto'
# Fit numpy histogram
hist, bin_edges = np.histogram(X, bins=bins, range=bounds)
hist = np.array(hist, dtype=float) # Make float so we can add non-integer alpha
hist += self.alpha # Smooth histogram by alpha so no areas have 0 probability
# Normalize bins by bin_edges
rv = self._hist_params_to_rv(hist, bin_edges)
self.rv_ = rv
return self
@staticmethod
def _hist_params_to_rv(hist, bin_edges):
hist = np.array(hist)
bin_edges = np.array(bin_edges)
hist = hist / (bin_edges[1:] - bin_edges[:-1])
rv = scipy.stats.rv_histogram((hist, bin_edges))
return rv
[docs] @classmethod
def create_fitted(cls, hist, bin_edges, **kwargs):
"""Create fitted density.
Parameters
----------
hist : counts
fitted_univariate_densities : array-like of Density, shape (n_features,)
Fitted univariate densities.
**kwargs
Other parameters to pass to object constructor.
Returns
-------
fitted_density : Density
Fitted density.
"""
density = cls(**kwargs)
rv = cls._hist_params_to_rv(hist, bin_edges)
density.rv_ = rv
return density
[docs] def get_support(self):
"""Get the support of this density (i.e. the positive density region).
Returns
-------
support : array-like, shape (2,) or shape (n_features, 2)
If shape is (2, ), then ``support[0]`` is the minimum and
``support[1]`` is the maximum for all features. If shape is
(`n_features`, 2), then each feature's support (which could
be different for each feature) is given similar to the first
case.
"""
# Make [[a,b]] so that it is explicitly a univariate density
return np.array([self._check_bounds()])
def _check_bounds(self, X=None, extend=True):
# If bounds is extension
if np.isscalar(self.bounds):
if X is None:
# If no X than just return -inf, inf
return _DEFAULT_SUPPORT
else:
# If X is not None than extract bounds and extend as necessary
perc_extension = self.bounds
_domain = np.array([np.min(X), np.max(X)])
center = np.mean(_domain)
_domain = (1 + perc_extension) * (_domain - center) + center
return _domain
# If bounds is just an array then directly return it
else:
_domain = column_or_1d(self.bounds).copy()
if _domain.shape[0] != 2:
raise ValueError('Domain should either be a two element array-like or a'
' scalar indicating percentage extension of domain')
return _domain
def _check_X(self, X, inverse=False):
X = super(HistogramUnivariateDensity, self)._check_X(X, inverse)
bounds = self._check_bounds()
if np.any(X <= bounds[0]) or np.any(X >= bounds[1]):
warnings.warn(BoundaryWarning(
'Input to random variable function has at least one value outside of bounds '
'but all input should be in (bounds[0], bounds[1]) exclusive. Bounding '
'values away from bounds[0] or bounds[1]'))
X = make_interior(X, bounds)
return X
def _get_scipy_rv_or_default(self):
return scipy.stats.rv_histogram