"""Module for independent densities and destructors."""
from __future__ import division, print_function
import itertools
import logging
import warnings
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_array, check_is_fitted, check_random_state
from .base import BaseDensityDestructor, ScoreMixin
from .univariate import STANDARD_NORMAL_DENSITY, ScipyUnivariateDensity
# noinspection PyProtectedMember
from .utils import (_UNIT_SPACE, check_X_in_interval, get_domain_or_default, get_support_or_default,
make_interior_probability)
logger = logging.getLogger(__name__)
[docs]class IndependentDestructor(BaseDensityDestructor):
"""Coordinate-wise destructor based on underlying independent density.
This destructor assumes that the underlying density is independent (i.e.
:class:`~ddl.independent.IndependentDensity`) and thus the
transformation merely applys a univariate CDF to each feature
independently of other features. The user can specify the univariate
densities for each feature using the random variables defined in
:mod:`scipy.stats`. The fit method merely fits an independent density.
For transform and inverse transform, this destrcutor mereley applies the
corresponding CDFs and inverse CDFs to transform each feature
independently.
Parameters
----------
independent_density : IndependentDensity
The independent density estimator for this destructor.
Attributes
----------
density_ : IndependentDensity
Fitted underlying independent density.
See Also
--------
IndependentDensity
"""
[docs] def __init__(self, independent_density=None):
self.independent_density = independent_density
def _get_density_estimator(self):
"""Get the *unfitted* density associated with this destructor.
NOTE: The returned estimator is NOT fitted but is a clone or new
instantiation of the underlying density estimator. This is just
a helper function that needs to be overridden by subclasses of
:class:`~ddl.base.BaseDensityDestructor`.
Returns
-------
density : estimator
The *unfitted* density estimator associated wih this
destructor.
"""
if self.independent_density is None:
return IndependentDensity()
else:
return clone(self.independent_density)
[docs] @classmethod
def create_fitted(cls, fitted_density, **kwargs):
"""Create fitted density destructor.
Parameters
----------
fitted_density : Density
Fitted density.
**kwargs
Other parameters to pass to Destructor constructor.
Returns
-------
fitted_transformer : Transformer
Fitted transformer.
"""
destructor = cls(**kwargs)
destructor.density_ = fitted_density
return destructor
def _check_dim(self, X):
if X.shape[1] != len(self.density_.univariate_densities_):
raise ValueError('Dimension of input does not match dimension of the original '
'training data.')
[docs]class IndependentDensity(BaseEstimator, ScoreMixin):
"""Independent density estimator.
This density assumes that the underlying density is independent. The
user can specify the univariate densities for each feature.
Parameters
----------
univariate_estimators : estimator or array-like of shape (n_features,)
Univariate estimator(s) for this independent density. Default
assumes univariate Gaussian densities for all features. Should be
one of the following:
1. None (default, assumes independent Gaussian density).
2. Univariate density estimator (assumes all features have
the same density class, but the fitted parameters can be
different, e.g. the means of features 1 and 2 could be
different even though they are both Gaussian estimators.).
3. Array-like of univariate density estimators for each feature.
Attributes
----------
univariate_densities_ : array, shape (n_features, )
*Fitted* univariate estimators for each feature.
n_features_ : int
Number of features.
See Also
--------
TreeDestructor
ddl.univariate
ddl.univariate.ScipyUnivariateDensity
ddl.univariate.HistogramUnivariateDensity
"""
[docs] def __init__(self, univariate_estimators=None):
self.univariate_estimators = univariate_estimators
[docs] def fit(self, X, y=None, **fit_params):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
fit_params : dict, optional
Optional extra fit parameters.
Returns
-------
self : estimator
Returns the instance itself.
"""
def _check_univariate(estimators, n_features):
if estimators is None:
return [IndependentDensity._get_default_univariate() for _ in range(n_features)]
elif len(np.array(estimators).shape) == 0:
return [estimators for _ in range(n_features)]
elif len(estimators) == n_features:
return estimators
else:
try:
temp = itertools.cycle(estimators)
except TypeError:
raise ValueError('`univariate_estimators` should be either None, a single '
'estimator, or an array-like of estimators.')
else:
return list(itertools.islice(temp, n_features))
X = check_array(X)
est_arr = _check_univariate(self.univariate_estimators, X.shape[1])
# Fit univariate densities for each column
self.univariate_densities_ = np.array([
clone(est).fit(np.reshape(x_col, (-1, 1)))
for est, x_col in zip(est_arr, X.transpose())
])
self.n_features_ = len(self.univariate_densities_)
return self
[docs] @classmethod
def create_fitted(cls, fitted_univariate_densities, n_features=None, **kwargs):
"""Create fitted density.
Parameters
----------
fitted_univariate_densities : array-like of Density or Density, shape (n_features,)
Fitted univariate densities. If a single fitted density then `n_features`
parameter must be provided to appropriately expand to an array.
n_features : int, optional
Number of features. If not supplied, will be inferred from
`fitted_univariate_densities`.
**kwargs
Other parameters to pass to object constructor.
Returns
-------
fitted_density : Density
Fitted density.
"""
if len(np.array(fitted_univariate_densities).shape) == 0:
if n_features is None:
raise ValueError('Must supply either array-like of fitted_univariate_densities '
'or a single fitted and n_features')
fitted_univariate_densities = [fitted_univariate_densities
for _ in range(n_features)]
elif n_features is not None:
assert len(fitted_univariate_densities) == n_features
density = cls(**kwargs)
density.univariate_densities_ = np.array(fitted_univariate_densities)
density.n_features_ = len(fitted_univariate_densities)
return density
[docs] def sample(self, n_samples=1, random_state=None):
"""Generate random samples from this density/destructor.
Parameters
----------
n_samples : int, default=1
Number of samples to generate. Defaults to 1.
random_state : int, RandomState instance or None, optional (default=None)
If int, `random_state` is the seed used by the random number
generator; If :class:`~numpy.random.RandomState` instance,
`random_state` is the random number generator; If None, the random
number generator is the :class:`~numpy.random.RandomState` instance
used by :mod:`numpy.random`.
Returns
-------
X : array, shape (n_samples, n_features)
Randomly generated sample.
"""
self._check_is_fitted()
rng = check_random_state(random_state)
X = np.array([
np.ravel(u_dens.sample(n_samples=n_samples, random_state=rng))
for u_dens in self.univariate_densities_
]).transpose()
return X
[docs] def score_samples(self, X, y=None):
"""Compute log-likelihood (or log(det(Jacobian))) for each sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples is the number of samples and n_features
is the number of features.
y : None, default=None
Not used but kept for compatibility.
Returns
-------
log_likelihood : array, shape (n_samples,)
Log likelihood of each data point in X.
"""
self._check_is_fitted()
X = check_array(X)
# Extract log-likelihood for all dimensions
independent_scores = np.array([
u_dens.score_samples(np.reshape(x_col, (-1, 1))).ravel()
for u_dens, x_col in zip(self.univariate_densities_, X.transpose())
]).transpose()
# Sum of log-likelihood is product of likelihoods because independent variables
return independent_scores.sum(axis=1)
[docs] def conditional_densities(self, X, cond_idx, not_cond_idx):
"""[Placeholder].
Parameters
----------
X :
cond_idx :
not_cond_idx :
Returns
-------
obj : object
"""
# Since independent, the conditional is equal to the marginal
return self.marginal_density(not_cond_idx)
[docs] def marginal_density(self, marginal_idx):
"""[Placeholder].
Parameters
----------
marginal_idx :
Returns
-------
obj : object
"""
marginal_density = clone(self)
marginal_density.univariate_densities_ = self.univariate_densities_[marginal_idx]
marginal_density.n_features_ = len(marginal_idx)
# noinspection PyProtectedMember
marginal_density._check_is_fitted()
return marginal_density
[docs] def marginal_cdf(self, x, target_idx):
"""[Placeholder].
Parameters
----------
x :
target_idx :
Returns
-------
obj : object
"""
return self.univariate_densities_[target_idx].cdf(np.array(x).reshape(-1, 1)).reshape(
np.array(x).shape)
[docs] def marginal_inverse_cdf(self, x, target_idx):
"""[Placeholder].
Parameters
----------
x :
target_idx :
Returns
-------
obj : object
"""
return self.univariate_densities_[target_idx].inverse_cdf(
np.array(x).reshape(-1, 1)).reshape(np.array(x).shape)
[docs] def get_support(self):
"""Get the support of this density (i.e. the positive density region).
Returns
-------
support : array-like, shape (2,) or shape (n_features, 2)
If shape is (2, ), then ``support[0]`` is the minimum and
``support[1]`` is the maximum for all features. If shape is
(`n_features`, 2), then each feature's support (which could
be different for each feature) is given similar to the first
case.
"""
def _unwrap_support(est):
# Univariate density estimators should return [[a,b]] because there is only one
# dimension, thus this unwraps this even if the default is returned of [a,b]
return np.array(get_support_or_default(est)).ravel()
# Check if fitted first
try:
self._check_is_fitted()
except NotFittedError:
# Use defaults from parameters
estimators = self.univariate_estimators
if estimators is None:
return _unwrap_support(IndependentDensity._get_default_univariate())
elif len(np.array(estimators).shape) == 0:
return _unwrap_support(estimators)
else:
return np.array([_unwrap_support(est) for est in estimators])
else:
# Use fitted support
return np.array([_unwrap_support(dens) for dens in self.univariate_densities_])
def _check_is_fitted(self):
check_is_fitted(self, ['univariate_densities_', 'n_features_'])
@staticmethod
def _get_default_univariate():
return ScipyUnivariateDensity()
[docs]class IndependentInverseCdf(BaseEstimator, ScoreMixin, TransformerMixin):
"""Independent inverse CDF transformer applied coordinate-wise.
A transformer (or *relative* destructor) that performs the inverse CDF
transform independently for the fitted univariate densities
corresponding to each feature. The default is the inverse CDF of the
standard normal; this default is useful to make linear projection
destructors canonical by prepending this as a preprocessing step so that
the domain of the destructor is the unit hypercube (i.e. canonical
domain).
See :func:`fit` function documentation for more information.
Attributes
----------
fitted_densities_ : array, shape (n_features,)
Fitted univariate densities for each feature. Note that these must
be passed in as parameters to the :func:`fit` function. All needed
transformation and scoring information is built into the univariate
densities. For example, the :func:`transform` function merely uses
the :func:`inverse_cdf` function.
See Also
--------
ddl.univariate
IndependentDestructor
"""
[docs] def fit(self, X, y=None, fitted_densities=None, **fit_params):
"""Fit estimator to X.
X is only used to get the number of features.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
fitted_densities : array-like of estimators
Default assumes that `fitted_densities` are standard Gaussian.
`fitted_densities` should be fitted versions of the following
similar to the `univariate_estimators` parameter of
`IndependentDensity`:
#. None (defaults to fitted `ScipyUnivariateDensity()`),
#. univariate density estimator,
#. array-like of univariate density estimators.
Returns
-------
self : estimator
Returns the instance itself.
"""
warnings.warn(DeprecationWarning('Class factory method `create_fitted` '
'should be used instead.'))
X = check_array(X)
# Mainly just get default and make array of densities if needed
dens_arr = self._get_densities_or_default(fitted_densities, X.shape[1])
self.fitted_densities_ = dens_arr
return self
[docs] @classmethod
def create_fitted(cls, n_features, fitted_densities=None, **kwargs):
"""Create fitted transformer.
Parameters
----------
fitted_density : Density
Fitted density.
**kwargs
Other parameters to pass to constructor.
Returns
-------
fitted_transformer : Transformer
Fitted transformer.
"""
destructor = cls(**kwargs)
dens_arr = cls._get_densities_or_default(fitted_densities, n_features)
destructor.fitted_densities_ = dens_arr
return destructor
[docs] def score_samples(self, X, y=None):
"""Compute log-likelihood (or log(det(Jacobian))) for each sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples is the number of samples and n_features
is the number of features.
y : None, default=None
Not used but kept for compatibility.
Returns
-------
log_likelihood : array, shape (n_samples,)
Log likelihood of each data point in X.
"""
self._check_is_fitted()
X = check_array(X)
self._check_dim(X)
X = check_X_in_interval(X, get_domain_or_default(self))
X = make_interior_probability(X)
self.transpose = np.array([
# Derivative of inversecdf = 1/pdf(inversecdf(X)) -> -logpdf(inversecdf(X)), which is
# the log(J^{-1}), because Jacobian is diagonal
-d.score_samples(d.inverse_cdf(x_col.reshape(-1, 1))).ravel()
for d, x_col in zip(self.fitted_densities_, X.transpose())
]).transpose()
independent_scores = self.transpose
# Sum of log-likelihood is product of likelihoods because independent variables
return independent_scores.sum(axis=1)
[docs] def get_domain(self):
"""Get the domain of this destructor.
Returns
-------
domain : array-like, shape (2,) or shape (n_features, 2)
If shape is (2, ), then ``domain[0]`` is the minimum and
``domain[1]`` is the maximum for all features. If shape is
(`n_features`, 2), then each feature's domain (which could
be different for each feature) is given similar to the first
case.
"""
return _UNIT_SPACE
def _get_density_support(self):
# Get the density support which is the same as the range of this transformer (or the
# domain of the inverse transformation
def _check_univariate_support(support):
shape = np.array(support).shape
if len(shape) != 2 or shape[0] != 1 or shape[1] != 2:
raise RuntimeError('Should be univariate support with shape (1,2), i.e. the '
'number of dimensions is fixed at 1 but the following shape '
'was given: %s.' % str(shape))
return support.ravel()
self._check_is_fitted()
return np.array([
_check_univariate_support(get_support_or_default(d))
for d in self.fitted_densities_
])
@staticmethod
def _get_densities_or_default(fitted_densities, n_features):
if fitted_densities is None:
return np.array([STANDARD_NORMAL_DENSITY for _ in range(n_features)])
elif len(np.array(fitted_densities).shape) == 0:
return np.array([fitted_densities for _ in range(n_features)])
else:
return np.array(fitted_densities)
def _check_dim(self, X):
if X.shape[1] != len(self.fitted_densities_):
raise ValueError('Dimension of input does not match dimension of the original '
'training data.')
def _check_is_fitted(self):
check_is_fitted(self, ['fitted_densities_'])