"""Module for mixture densities and destructors."""
from __future__ import division, print_function
import logging
import warnings
import numpy as np
from scipy.optimize import brentq as scipy_brentq
from scipy.special import logsumexp as scipy_logsumexp
from sklearn.base import BaseEstimator, clone
from sklearn.cluster import KMeans
from sklearn.exceptions import NotFittedError
from sklearn.mixture import BayesianGaussianMixture, GaussianMixture
from sklearn.utils.validation import check_array, check_is_fitted, check_random_state, column_or_1d
from .base import ScoreMixin
from .gaussian import GaussianDensity
from .independent import IndependentDensity
logger = logging.getLogger(__name__)
class _MixtureMixin(ScoreMixin):
def _get_component_densities(self):
"""Return the density components."""
return self.component_densities_
def _check_is_fitted(self):
check_is_fitted(self, ['weights_', 'component_densities_'])
def _process_conditionals(self, conditionals, n_samples):
if not hasattr(conditionals, '__len__'):
return np.array([conditionals for _ in range(n_samples)])
else:
return conditionals
def _set_fit_params(self, mixture, w, c_arr):
mixture.weights_ = w
mixture.component_densities_ = c_arr
return mixture
def score_samples(self, X, y=None):
"""Compute log-likelihood (or log(det(Jacobian))) for each sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples is the number of samples and n_features
is the number of features.
y : None, default=None
Not used but kept for compatibility.
Returns
-------
log_likelihood : array, shape (n_samples,)
Log likelihood of each data point in X.
"""
self._check_is_fitted()
components = self._get_component_densities()
component_log_likelihood = np.array([
comp.score_samples(X)
for comp in components
]) # (n_components, n_samples)
weighted_log_likelihood = np.log(self.weights_).reshape(-1, 1) + component_log_likelihood
return scipy_logsumexp(weighted_log_likelihood, axis=0)
def sample(self, n_samples=1, random_state=None):
"""Generate random samples from this density/destructor.
Parameters
----------
n_samples : int, default=1
Number of samples to generate. Defaults to 1.
random_state : int, RandomState instance or None, optional (default=None)
If int, `random_state` is the seed used by the random number
generator; If :class:`~numpy.random.RandomState` instance,
`random_state` is the random number generator; If None, the random
number generator is the :class:`~numpy.random.RandomState` instance
used by :mod:`numpy.random`.
Returns
-------
X : array, shape (n_samples, n_features)
Randomly generated sample.
"""
self._check_is_fitted()
rng = check_random_state(random_state)
n_samples_per_comp = rng.multinomial(n_samples, self.weights_)
X = np.vstack([
comp.sample(comp_n_samples, random_state=rng)
for comp, comp_n_samples in zip(self._get_component_densities(), n_samples_per_comp)
])
# Not used but if clusters are needed, see here
# y = np.concatenate([j * np.ones(comp_n_samples, dtype=int)
# for j, comp_n_samples in enumerate(n_samples_per_comp)])
return X
def conditional_densities(self, X, cond_idx, not_cond_idx):
"""Compute conditional densities.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Data to condition on based on `cond_idx`.
cond_idx : array-like of int
Indices to condition on.
not_cond_idx :
Indices not to condition on.
Returns
-------
conditional_densities : array-like of estimators
Either a single density if all the same or a list of Gaussian
densities with conditional variances and means.
"""
if len(cond_idx) + len(not_cond_idx) != X.shape[1]:
raise ValueError('`cond_idx_arr` and `not_cond_idx_arr` should be complements that '
'have the a total of X.shape[1] number of values.')
# Handle trivial case
if len(cond_idx) == 0:
return self
self._check_is_fitted()
# Retrieve components
components = self._get_component_densities() # (n_components,)
n_samples, n_features = X.shape
# Get new weights
X_cond = X[:, cond_idx]
marginal_log_likelihood = np.array([
component.marginal_density(cond_idx).score_samples(X_cond)
for component in components
]).transpose() # (n_samples, n_components)
# Numerically safe way to get new weights if marginal_log_likelihood is very low e.g.
# -1000, which would mean np.exp(-1000) = 0
log_cond_weights = np.log(self.weights_) + marginal_log_likelihood
log_cond_weights -= scipy_logsumexp(log_cond_weights, axis=1).reshape(-1, 1)
cond_weights = np.exp(log_cond_weights)
cond_weights = np.maximum(1e-100, cond_weights) # Just to avoid complete zeros
# Condition each Gaussian
cond_components = np.array([
self._process_conditionals(component.conditional_densities(X, cond_idx, not_cond_idx),
n_samples)
for component in components
]).transpose() # (n_samples, n_components)
# Create final conditional densities
conditional_densities = np.array([
self._set_fit_params(clone(self), cond_w, cond_c_arr)
for cond_w, cond_c_arr in zip(cond_weights, cond_components)
])
return conditional_densities
def marginal_cdf(self, x, target_idx):
"""Return the marginal cdf of `x` at feature `target_idx`."""
cdf_components = np.array([
np.reshape(comp.marginal_cdf(x, target_idx), (-1,))
for j, comp in enumerate(self._get_component_densities())
]).transpose() # (n_samples, n_components)
if not hasattr(x, '__len__'):
return np.dot(cdf_components.ravel(), self.weights_)
else:
return np.dot(cdf_components, self.weights_) # (n_samples,)
def marginal_inverse_cdf(self, x, target_idx):
"""Return the marginal inverse cdf of `x` at feature `target_idx`."""
# Get bounds on left and right from min and max of components
# Note that these are global bounds for given x so they only have to be computed once
components = self._get_component_densities()
bound_left = np.min([comp.marginal_inverse_cdf(np.min(x), target_idx)
for comp in components])
bound_right = np.max([comp.marginal_inverse_cdf(np.max(x), target_idx)
for comp in components])
# Handle trivial case where bounds are equal
if bound_left == bound_right:
return bound_left * np.ones(np.array(x).shape)
# Scale bounds by 10% to ensure they will produce positive and negative values
# because there may be numerical error
bound_center = (bound_right + bound_left) / 2
bound_left, bound_right = (1.1 * (np.array([bound_left, bound_right]) - bound_center)
+ bound_center)
weights = np.array(self.weights_, copy=False)
# Setup function to get inverse cdf for a scalar value
def _get_inverse_cdf(_x_scalar):
def _bound_marginal_cdf(a):
# Rewrote for efficiency
# u - self.marginal_cdf(_x_scalar, target_idx)
marginal_cdf = np.dot(
weights,
[comp.marginal_cdf(a, target_idx) for comp in components]
)
return marginal_cdf - _x_scalar
# If requesting an inverse_cdf for 1 - eps, just return inverse_cdf
if (1 - _x_scalar) < 2 * np.finfo(float).eps:
return _bound_marginal_cdf(bound_right)
return scipy_brentq(_bound_marginal_cdf, bound_left, bound_right)
if not hasattr(x, '__len__'):
return _get_inverse_cdf(x)
else:
return np.array([_get_inverse_cdf(x_scalar) for x_scalar in x])
class _MixtureDensity(BaseEstimator, _MixtureMixin):
"""Mixture of independent types."""
def __init__(self, cluster_estimator=None, component_density_estimator=None):
self.cluster_estimator = cluster_estimator
self.component_density_estimator = component_density_estimator
def fit(self, X, y=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
Returns
-------
self : estimator
Returns the instance itself.
"""
X = check_array(X)
cluster_estimator = self._get_cluster_or_default()
density_estimator = self._get_density_estimator_or_default()
# Fit y if not given using cluster_estimator
if y is None:
try:
y = cluster_estimator.fit_predict(X)
except AttributeError:
y = cluster_estimator.fit(X).predict(X)
else:
warnings.warn('y was given so not clustering. If you want to cluster the data, '
'then set y=None when fitting.')
y = column_or_1d(y)
self.weights_ = np.array([np.sum(y == label) for label in np.unique(y)]) / len(y)
# Fit component densities using cluster labels
self.component_densities_ = np.array([
clone(density_estimator).fit(X[y == label, :])
for label in np.unique(y)
])
self.n_features_ = X.shape[1]
return self
def _get_density_estimator_or_default(self):
if self.component_density_estimator is None:
return IndependentDensity()
else:
return self.component_density_estimator
def _get_cluster_or_default(self):
if self.cluster_estimator is None:
return KMeans(n_clusters=2, random_state=0)
else:
return self.cluster_estimator
class _GaussianMixtureMixin(object):
"""Overrides several methods in GaussianMixture.
Needed to comply with density specifications and adds a few methods.
"""
def fit(self, X, y=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
Returns
-------
self : estimator
Returns the instance itself.
"""
super(_GaussianMixtureMixin, self).fit(X, y)
X = self._check_X(X)
self.n_features_ = X.shape[1]
return self
def sample(self, n_samples=1, random_state=None):
"""Sample from GaussianMixture and return only X instead of (X, y)."""
# Set random state of this object before calling sample
old_random_state = self.random_state
self.random_state = random_state
X, y = super(_GaussianMixtureMixin, self).sample(n_samples)
self.random_state = old_random_state
return X
def sample_joint(self, n_samples=1, random_state=None):
old_random_state = self.random_state
self.random_state = random_state
X, y = super(_GaussianMixtureMixin, self).sample(n_samples)
self.random_state = old_random_state
return X, y
def _get_component_densities(self):
n_components, _ = self.means_.shape
# noinspection PyProtectedMember
return np.array([
GaussianDensity(covariance_type=self.covariance_type)._fit_direct(
mean=self.means_[j],
covariance=self._get_covariance(j),
precision=self._get_precision(j),
precision_cholesky=self._get_precision_cholesky(j),
copy=False)
for j in range(n_components)
]) # (n_components,)
# noinspection PyProtectedMember
def _process_conditionals(self, conditionals, n_samples):
if not hasattr(conditionals, '__len__'):
conditionals._fit_auxiliary()
return np.array([conditionals for _ in range(n_samples)])
else:
return np.array([cond._fit_auxiliary() for cond in conditionals])
def _set_fit_params(self, mixture, w, c_arr):
def _check_tied(attr):
if mixture.covariance_type == 'tied':
return getattr(c_arr[0], attr)
else:
return np.array([getattr(gaussian, attr) for gaussian in c_arr])
mixture.weights_ = w
mixture.means_ = np.array([gaussian.mean_ for gaussian in c_arr], copy=False)
mixture.covariances_ = _check_tied('covariance_')
mixture.precisions_ = _check_tied('precision_')
mixture.precisions_cholesky_ = _check_tied('precision_cholesky_')
mixture.converged_ = False
mixture.n_iter_ = 0
mixture.lower_bound_ = np.NaN
return mixture
def _get_covariance(self, component_idx):
return _get_component_array(self.covariances_, component_idx, self.covariance_type)
def _get_precision(self, component_idx):
return _get_component_array(self.precisions_, component_idx, self.covariance_type)
def _get_precision_cholesky(self, component_idx):
return _get_component_array(self.precisions_cholesky_, component_idx, self.covariance_type)
def _check_X(self, X):
"""Taken mostly from `sklearn.mixture.base._check_X`."""
X = check_array(X, dtype=[np.float64, np.float32])
# Try to get shape from fitted means
try:
self._check_is_fitted()
except NotFittedError:
n_components = None
n_features = None
else:
n_components, n_features = self.means_.shape
# Check that X conform
if n_components is not None and X.shape[0] < n_components:
raise ValueError('Expected n_samples >= n_components '
'but got n_components = %d, n_samples = %d'
% (n_components, X.shape[0]))
if n_features is not None and X.shape[1] != n_features:
raise ValueError("Expected the input data X have %d features, "
"but got %d features"
% (n_features, X.shape[1]))
return X
def _create_fitted_mixture(cls, means, covariances, weights=None,
covariance_type='spherical', **kwargs):
means = check_array(means)
n_components, n_features = means.shape
if weights is None:
weights = np.ones(n_components) / n_components
weights = np.array(weights)
assert len(weights) == n_components
covariances = np.array(covariances)
if covariance_type == 'full':
assert np.all(covariances.shape == (n_components, n_features, n_features))
precisions = np.array([np.linalg.inv(c) for c in covariances])
precisions_cholesky = np.array([np.linalg.cholesky(p) for p in precisions])
elif covariance_type == 'tied':
assert np.all(covariances.shape == (n_features, n_features))
precisions = np.linalg.inv(covariances)
precisions_cholesky = np.linalg.cholesky(precisions)
elif covariance_type == 'diag':
assert np.all(covariances.shape == (n_components, n_features))
precisions = 1/covariances
precisions_cholesky = np.sqrt(precisions)
elif covariance_type == 'spherical':
assert np.all(covariances.shape == (n_components,))
precisions = 1/covariances
precisions_cholesky = np.sqrt(precisions)
else:
raise RuntimeError('covariance type "%s" not recognized' % covariance_type)
if 'n_components' in kwargs:
assert kwargs['n_components'] == n_components
else:
kwargs['n_components'] = n_components
mixture = cls(covariance_type=covariance_type, **kwargs)
mixture.weights_ = weights
mixture.means_ = means
mixture.covariances_ = covariances
mixture.precisions_ = precisions
mixture.precisions_cholesky_ = precisions_cholesky
mixture.converged_ = False
mixture.n_iter_ = 0
mixture.lower_bound_ = np.NaN
return mixture
[docs]class GaussianMixtureDensity(_GaussianMixtureMixin, GaussianMixture, _MixtureMixin):
"""Gaussian mixture density that can be used with AutoregressiveDestructor.
This subclasses off of :class:`sklearn.mixture.GaussianMixture`. It
overrides several methods such as :func:`sample` and :func:`score` to
ensure that the interface conforms to the other density estimators. In
addition, the necessary conditional and marginal distribution methods
needed for :class:`ddl.autoregressive.AutoregressiveDestructor` were
added. See :class:`sklearn.mixture.GaussianMixture` for parameters and
attributes.
Note that _GaussianMixtureMixin must override some things in
GaussianMixture but _MixtureMixin should not override GaussianMixture.
Thus, the order of multiple inheritance should remain
_GaussianMixtureMixin, GaussianMixture, _MixtureMixin.
See Also
--------
sklearn.mixture.GaussianMixture
ddl.autoregressive.AutoregressiveDestructor
BayesianGaussianMixtureDensity
"""
[docs] @classmethod
def create_fitted(*args, **kwargs):
return _create_fitted_mixture(*args, **kwargs)
class _BayesianGaussianMixtureDensity(_GaussianMixtureMixin, BayesianGaussianMixture,
_MixtureMixin):
"""Bayesian Gaussian mixture, useful for AutoregressiveDestructor.
This subclasses off of :class:`sklearn.mixture.BayesianGaussianMixture`.
It overrides several methods such as :func:`sample` and :func:`score` to
ensure that the interface conforms to the other density estimators. In
addition, the necessary conditional and marginal distribution methods
needed for :class:`ddl.autoregressive.AutoregressiveDestructor` were
added. See :class:`sklearn.mixture.BayesianGaussianMixture` for
parameters and attributes.
Note that _GaussianMixtureMixin must override some things in
GaussianMixture but _MixtureMixin should not override GaussianMixture.
Thus, the order of multiple inheritance should remain
_GaussianMixtureMixin, GaussianMixture, _MixtureMixin.
See Also
--------
sklearn.mixture.BayesianGaussianMixture
ddl.autoregressive.AutoregressiveDestructor
GaussianMixtureDensity
"""
@classmethod
def create_fitted(*args, **kwargs):
return _create_fitted_mixture(*args, **kwargs)
[docs]class FirstFixedGaussianMixtureDensity(GaussianMixtureDensity):
"""Mixture density where one component is fixed as the standard normal.
This is useful for creating a regularized Gaussian mixture destructor.
In particular, if this is paired with an inverse Gaussian cdf (i.e.
:class:`~ddl.independent.IndependentInverseCdf`), and the weight of the
fixed standard normal approaches 1, then the composite destructor
approaches an identity. Thus, the `fixed_weight` parameter can be used
to control the amount of regularization.
Note this is implemented by overriding the :func:`_m_step` private
method of :class:`sklearn.mixture.GaussianMixture` so it may not be
compatible with future releases of sklearn.
More specifically first n_components-1 Gaussian components are fit using
the standard Gaussian mixture estimator. Then, we manually add a fixed
standard normal component with the desired fixed weight. Then, we refit
but override the M step so that the fixed weight component does not
change.
Parameters
----------
fixed_weight : float, default=0.5
The fixed weight between 0 and 1 that is given to the first Gaussian
component. As this weight approaches 1, there is full regularization
and no learning from the data. If the weight approaches 0,
then there is no regularization and the fitting is determined
entirely from the data.
n_components : int, default=1
The number of mixture components to fit.
covariance_type : {'full', 'tied', 'diag', 'spherical'}, default='full'
String describing the type of covariance parameters to use.
Must be one of::
'full' (each component has its own general covariance matrix),
'tied' (all components share the same general covariance matrix),
'diag' (each component has its own diagonal covariance matrix),
'spherical' (each component has its own single variance).
"""
[docs] def __init__(self, fixed_weight=0.5, n_components=1, covariance_type='full'):
self.fixed_weight = fixed_weight
super(FirstFixedGaussianMixtureDensity, self).__init__(
n_components=n_components, covariance_type=covariance_type,
)
def _m_step(self, X, log_resp):
super(FirstFixedGaussianMixtureDensity, self)._m_step(X, log_resp)
_, n_features = X.shape
# Fix first component during second stage of fitting
if self._second_stage:
self.means_[0, :] = 0
if self.covariance_type == 'full':
self._set_first_standard_covariance(
np.eye(n_features, n_features).reshape(n_features, n_features, 1)
)
elif self.covariance_type == 'tied':
raise RuntimeError('FirstFixed cannot support covariance_type="tied"')
elif self.covariance_type == 'diag':
self._set_first_standard_covariance(np.ones(n_features).reshape(1, -1))
elif self.covariance_type == 'spherical':
self._set_first_standard_covariance(np.array([1]))
else:
raise RuntimeError('Invalid covariance_type "%s"' % self.covariance_type)
# Reset mixture weights
self.weights_[0] = self.fixed_weight
self.weights_[1:] *= (1 - self.fixed_weight) / np.sum(self.weights_[1:])
# logger.debug('Second stage n_components=%d, weights[0]=%g, means_[0]=%s'
# % (self.weights_.shape[0], self.weights_[0], str(self.means_[0])))
else:
# logger.debug('First stage n_components=%d, weights[0]=%g, means_[0]=%s'
# % (self.weights_.shape[0], self.weights_[0], str(self.means_[0])))
pass
[docs] def fit(self, X, y=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
Returns
-------
self : estimator
Returns the instance itself.
"""
# Fit with one less component
self.n_components -= 1
self._second_stage = False
super(FirstFixedGaussianMixtureDensity, self).fit(X, y)
# Update means, etc. with single Gaussian component
self.n_components += 1
n_samples, n_features = X.shape
self.means_ = np.append(self.means_, np.zeros(n_features).reshape(1, -1), axis=0)
if self.covariance_type == 'full':
self._append_standard_covariance(
np.eye(n_features, n_features).reshape(n_features, n_features, 1)
)
elif self.covariance_type == 'tied':
raise RuntimeError('FirstFixed cannot support covariance_type="tied"')
elif self.covariance_type == 'diag':
self._append_standard_covariance(np.ones(n_features).reshape(1, -1))
elif self.covariance_type == 'spherical':
self._append_standard_covariance(np.array([1]))
else:
raise RuntimeError('Invalid covariance_type "%s"' % self.covariance_type)
# Rescale weights
if self.fixed_weight < 0 or self.fixed_weight > 1:
raise ValueError('weight should be between 0 and 1')
self.weights_ *= (1 - self.fixed_weight)
self.weights_ = np.append(self.weights_, np.array([self.fixed_weight]))
# Reset parameters for second fitting
old_warm_start = self.warm_start
self.warm_start = True
self.converged_ = False
self.lower_bound_ = -np.infty
# Refit with second stage set (i.e. fixed standard normal)
self._second_stage = True
super(FirstFixedGaussianMixtureDensity, self).fit(X, y)
self.warm_start = old_warm_start
return self
def _append_standard_covariance(self, cov):
"""Append standard covariance matrix.
Only works for standard covariance (i.e. identity covariance).
"""
self.covariances_ = np.append(self.covariances_, cov, axis=0)
self.precisions_ = np.append(self.precisions_, cov, axis=0)
self.precisions_cholesky_ = np.append(self.precisions_cholesky_, cov, axis=0)
def _set_first_standard_covariance(self, cov):
"""Set first to standard covariance.
Only works for standard covariance (i.e. identity covariance).
"""
self.covariances_[0] = cov
self.precisions_[0] = cov
self.precisions_cholesky_[0] = cov
def _get_component_array(array, component_idx, covariance_type):
if covariance_type == 'full':
return array[component_idx, :, :]
elif covariance_type == 'tied':
return array
elif covariance_type == 'diag':
return array[component_idx, :]
elif covariance_type == 'spherical':
return array[component_idx]
else:
raise ValueError('Covariance type is invalid.')