Source code for ddl.validation

"""Module for validating destructor interface and testing properties."""
from __future__ import division, print_function

import logging
import sys
import warnings
from functools import wraps

import numpy as np
from sklearn.base import clone
from sklearn.utils import check_random_state

# noinspection PyProtectedMember
from .base import BoundaryWarning, UniformDensity, _NumDimWarning
from .utils import (check_domain, check_X_in_interval, get_domain_or_default,
                    get_support_or_default, has_method)

try:
    from sklearn.exceptions import SkipTestWarning, DataConversionWarning
    from sklearn.utils.estimator_checks import check_estimator
except ImportError:
    warnings.warn('Could not import sklearn\'s SkipTestWarning, '
                  'DataConversionWarning or check_estimator '
                  '(likely because nose is not installed) but continuing '
                  'so that documentation can be generated without nose.')
try:
    import ot  # Python optimal transport module (pot)
except ImportError:
    warnings.warn('Could not import python optimal transport (pip install pot) (import ot)')

logger = logging.getLogger(__name__)


# TODO What about self-evaluation?
# Sample from learned model and see if we can relearn current model?? -- shows how stable it is...
# Compare samples of "assumed" model with samples from relearned model
# Is this learnable from the number of samples given (vs null distribution)?
class _ShouldOnlyBeInTestWarning(UserWarning):
    """Warning that should only occur in testing."""

    pass


def _ignore_boundary_warnings(func):
    @wraps(func)
    def _wrapper(*args, **kwargs):
        with warnings.catch_warnings():
            warnings.simplefilter('ignore', BoundaryWarning)
            return func(*args, **kwargs)

    return _wrapper


[docs]@_ignore_boundary_warnings def check_density(density, random_state=0): """Check that an estimator implements density methods correctly. First, check that the estimator implements sklearn's estimator interface via :func:`sklearn.utils.estimator_checks.check_estimator`. Then, check for all of the following methods, and possibly check their basic functionality. Some methods will raise errors and others will issue warnings. Required methods (error if fails or does not exist): * :meth:`fit` Optional methods primary (warn if does not exist, error if fails): * :meth:`sample` * :meth:`score_samples` Optional methods secondary (warn if does not exist, warn if fails): * :meth:`score` (mixin) * :meth:`get_support` (required to pass tests if support is not real-valued unbounded) Parameters ---------- density : estimator Density estimator to check. random_state : int, RandomState instance or None, optional (default=0) Note that random_state defaults to 0 so that checks are reproducible by default. If int, `random_state` is the seed used by the random number generator; If :class:`~numpy.random.RandomState` instance, `random_state` is the random number generator; If None, the random number generator is the :class:`~numpy.random.RandomState` instance used by :mod:`numpy.random`. Notes ----- The :meth:`score` method must be equal to the average of :meth:`score_samples` for consistency. Note that some sklearn density estimators such as :class:`sklearn.mixture.GaussianMixture` return the total log probability instead of the average log probability. Thus, these estimators need to be subclassed and their :meth:`score` method overwritten. See Also -------- check_destructor """ # Standard check rng = check_random_state(random_state) density = clone(density) # Remove fitted properties if exist _check_estimator_but_ignore_warnings(density) # Attempt to get support (show warning since optional) support = get_support_or_default(density, warn=True) logger.info('(Before fitting) Density support is %s' % str(np.array(support).tolist())) # Sample based on support n = 10 d = _get_support_n_features(support) X_train = _sample_demo(support, n, d, random_state=rng) X_test = _sample_demo(support, n, d, random_state=rng) # Check that fit, sample, score_samples and score are implemented try: density.fit(X_train) except ValueError: raise ValueError('The demo data raised a value error when calling fit. This should not' ' happen if the data is within the density support via the' ' get_support method or the default real-valued support.') support = get_support_or_default(density, warn=True) logger.info('(After fitting) Density support is %s' % str(np.array(support).tolist())) # Run primary optional tests (error on failure) if has_method(density, 'sample'): # Check multiple samples X_sample = density.sample(n, random_state=rng) if isinstance(X_sample, tuple) and len(X_sample) == 2: raise TypeError('Sample returned tuple of length 2 instead of data matrix.' ' This is likely because the density estimator returned (X,y)' ' instead of just X.') if np.any(X_sample.shape != X_train.shape): raise RuntimeError('Samples from density.sample() do not match training data shape.') # Check single sample X_1 = density.sample(1, random_state=rng) if len(np.array(X_1).shape) != 2: raise TypeError( 'A single sample should still return a 2D matrix with shape (1,n_features).') score_vec = None if has_method(density, 'score_samples'): score_vec = density.score_samples(X_test) if len(score_vec) != n: raise RuntimeError('Output of score_samples is not of length n_samples.') # Run secondary optional tests (warning on failure) _check_score_method(density, score_vec, X_test) logger.info(_success('check_density')) return True
[docs]@_ignore_boundary_warnings def check_destructor(destructor, fitted_density=None, is_canonical=True, properties_to_skip=None, random_state=0): """Check for the required interface and properties of a destructor. First, check the interface of the destructor (see below) and then check the 4 properties of a destructor (see Inouye & Ravikumar 2018 paper). A destructor estimator should have the following interface: Required methods (error if fails or does not exist): * :meth:`fit` * :meth:`transform` * :meth:`inverse_transform` Optional methods primary (warn if does not exist, error if fails): * :meth:`sample` (mixin, uniform->inverse) * :meth:`score_samples` Optional methods secondary (warn if does not exist, warn if fails): * :meth:`fit_from_density` (better for uniformability test than fit) * :meth:`get_domain` (mixin with :attr:`density_`, required to pass tests if domain is not real-valued unbounded) * :meth:`get_support` (required to pass tests if support is not real-valued unbounded) * :meth:`score` (mixin) Optional attributes secondary (warn if does not exist, warn if fails): * :attr:`density_` attribute (provides default for uniformability test if fitted_density not given) In addition to checking the interface, this function will empirically check to see if the destructor seems to have the following 4 properties (see Inouye & Ravikumar 2018 paper). Note that these are just empirical sanity checks based on sampling or the like and thus do not ensure that this estimator always has these properties. 1. Uniformability (required) 2. Invertibility (required) 3. Canonical domain (optional if canonical = False) 4. Identity element (optional if canonical = False) Parameters ---------- destructor : estimator Destructor estimator to check. fitted_density : estimator, optional A fitted density estimator to use in some checks. If None (default), then use simple but interesting distributions based on domain of destructor. Can be used to check a specific density and destructor pair. is_canonical : bool, default=True Whether this destructor should be checked for canonical destructor properties including canonical domain and identity element. properties_to_skip : list, optional Optional list of properties to skip designated by the following strings ['uniformability', 'invertibility', 'canonical-domain', 'identity-element'] random_state : int, RandomState instance or None, optional (default=0) Note that random_state defaults to 0 so that checks are reproducible by default. If int, `random_state` is the seed used by the random number generator; If :class:`~numpy.random.RandomState` instance, `random_state` is the random number generator; If None, the random number generator is the :class:`~numpy.random.RandomState` instance used by :mod:`numpy.random`. See Also -------- check_density Notes ----- *Uniformability check* - Numerically *and* approximately checks one density/destructor pair to determine if the destructor appropriately destroys the density (i.e. transforms the samples to be uniform). Note that this does not mean it will work for all cases but it does provide a basic sanity check on at least one case. This will also run sklearn's estimator checks, i.e. :func:`sklearn.utils.estimator_checks.check_estimator`. *Invertibility check* - Simple numerical check for invertiblity by applying the transformation and then applying the inverse transformation, and vice versa to see if the original data is recovered. *Canonical domain check* - Simply check if the domain is the unit hypercube. Checks if warnings are issued when data is passed that is not on the unit hypercube. *Identity element check* - Numerical approximation for checking if the destructor class includes an identity element. Note this check fits the destructor on uniform samples and then check whether the learned transformation is the identity. Thus, if the test fails, there can be *two* possible causes: 1) The fitting procedure overfitted the uniform samples such that the implied density is far from uniform. 2) The transformation does not appropriately produce an identity transformation. This is a bit stricter than the official property since we train on uniform samples. However, this is probably a better check because we want the destructor to fit an identity transformation if the distribution is truly uniform. `destructor.score_samples(x)` = abs(det(J(x))) should be close to `density.score_samples(x)` and equal if no approximation (such as a piecewise linear function) is made. `destructor.sample(n_samples)` will produce slightly different samples than `destructor.density_.sample(n_samples)` unless no approximation is used. Other fitted parameters other than `density_` should be strictly destructive destructorformation parameters. Some of the property tests rely on calculating the Wasserstein distance between two sets of samples and thus the python optimal transport module (pot) is used. References ---------- D. I. Inouye, P. Ravikumar. Deep Density Destructors. In International Conference on Machine Learning (ICML), 2018. """ def _check_property(p): if p in properties_to_skip: warnings.warn(SkipTestWarning('Skipping checks for property "%s" because ' '`properties_to_skip` included this property.' % p)) return False return True possible_properties = ['uniformability', 'invertibility', 'canonical-domain', 'identity-element'] if properties_to_skip is None: properties_to_skip = [] properties_to_skip = [prop.lower() for prop in properties_to_skip] for prop in properties_to_skip: if prop not in possible_properties: raise ValueError( 'Properties to skip must be one of %s but property to skip was %s.' % (prop, str(possible_properties)) ) # Check destructive transformer interface _check_destructor_interface(destructor, fitted_density, random_state=random_state) # Check destructive transformer properties if _check_property('uniformability'): _check_uniformability(destructor, fitted_density, random_state=random_state) if _check_property('invertibility'): _check_invertibility(destructor, random_state=random_state) if not is_canonical: logger.info('Not testing canonical destructive properties because parameter ' '`is_canonical=False`.') else: try: # Check canonical destructive transformer properties if _check_property('canonical-domain'): _check_canonical_domain(destructor, random_state=random_state) if _check_property('identity-element'): _check_identity_element(destructor, random_state=random_state) except Exception as e: warnings.warn('An exception occured while testing for the canonical properties of the ' 'destructor. Please fix the errors below. (If this is not a ' 'canonical destructor, please set the option `is_canonical=False`.)') raise e logger.info(_success('check_destructor')) return True
@_ignore_boundary_warnings def _check_destructor_interface(trans, fitted_density=None, random_state=None): def _check_density_attr(T, method_name='fit'): if hasattr(T, 'density_'): logger.info(_checking('transformer.density_')) check_density(T.density_) if np.any(get_domain_or_default(T) != get_support_or_default(T.density_)): raise ValueError('Transformer domain does not match density support' ' when using method `%s`.' % method_name) return True else: warnings.warn(SkipTestWarning( 'Fitting destructive transformer with `%s` did not create a "density_" attribute. ' 'Skipping tests that require the "density_" attribute.' % method_name)) return False rng = check_random_state(random_state) trans = clone(trans) # Remove fitted properties if exist _check_estimator_but_ignore_warnings(trans) # Attempt to get domain (show warning since optional) domain = get_domain_or_default(trans, warn=True) logger.info('Transformer domain is %s' % str(np.array(domain).tolist())) # Call get_domain and sample some demo data n = 10 d = _get_domain_n_features(domain) X_train = _sample_demo(domain, n, d, random_state=rng) X_test = _sample_demo(domain, n, d, random_state=rng) X_train_copy = X_train.copy() # Needed for checking mutability X_test_copy = X_test.copy() # Needed for checking mutability # # Check required methods # Fit try: trans.fit(X_train) except ValueError: raise ValueError('The demo data raised a value error when calling fit. This should not' ' happen if the data is within the transformer domain via the' ' get_domain method or the default real-valued domain.') else: np.testing.assert_array_equal(X_train, X_train_copy, 'fit() function should not mutate input data matrix `X`') has_density_attr = _check_density_attr(trans) # Check that transform and inverse_transform work U_sample = trans.transform(X_test) np.testing.assert_array_equal(X_test, X_test_copy, 'transform() function should not mutate input data matrix `X`') U_sample_copy = U_sample.copy() trans.inverse_transform(U_sample) np.testing.assert_array_equal(U_sample, U_sample_copy, 'inverse_transform() function should not mutate input data ' 'matrix `X`') # # Check primary methods if has_method(trans, 'sample'): X_trans_sample = trans.sample(n, random_state=rng) if np.any(X_trans_sample.shape != X_train.shape): raise RuntimeError('Transformer samples do not have the same shape as the training ' 'data.') score_vec = None if has_method(trans, 'score_samples'): score_vec = trans.score_samples(X_test) np.testing.assert_array_equal(X_test, X_test_copy, 'score_samples() function should not mutate input data ' 'matrix `X`') if len(score_vec) != n: raise RuntimeError('Output of score_samples is not of length n') # # Check secondary methods # Check fit_from_density T2 = clone(trans) if has_method(T2, 'fit_from_density'): temp_dens = None if fitted_density is not None: logger.info(_checking('fitted_density')) check_density(fitted_density, random_state=rng) temp_dens = fitted_density elif has_density_attr: temp_dens = trans.density_ if temp_dens is None: warnings.warn('Not testing `fit_for_density` because parameter `fitted_density`' ' = None and `fit` did not provide `density_` property.') else: try: T2.fit_from_density(temp_dens) except ValueError: raise ValueError('The demo data raised a value error when calling ' '`fit_from_density`. This should not happen if the data is ' 'within the transformer domain via the get_domain method or the ' 'default real-valued domain.') _check_density_attr(T2, method_name='fit_from_density') # Check that transformer "log-likelihood" is close to density log-likelihood if has_density_attr: dens_score_vec = trans.density_.score_samples(X_test) # Normalize by both number of dimensions (d) and by max likelihood from density avg_normalized_diff = np.mean( np.abs(np.exp(score_vec / d) - np.exp(dens_score_vec / d)) / np.max(np.exp(dens_score_vec / d)) ) warn_thresh = 0.01 if avg_normalized_diff > warn_thresh: warnings.warn( 'Apparent mismatch between transformer log-likelihood (via `score_samples`)' ' and density log-likelihood;' ' Either there is an error in the log likelihood calculations' ' or the approximation in the transformer is bad. Average normalized' ' exp(score) = likelihood difference is %g (threshold is %g)' % (float(avg_normalized_diff), warn_thresh)) logger.info( 'Average normalized difference between density and transformer `score_samples`: %g' % avg_normalized_diff) _check_score_method(trans, score_vec, X_test) logger.info(_success('_check_destructor_interface')) return True @_ignore_boundary_warnings def _check_uniformability(destructor, fitted_density=None, random_state=0): # Init n = 100 destructor = clone(destructor) rng = check_random_state(random_state) # Create a dummy density by fitting a transformer on demo data and getting the density_ if fitted_density is not None: true_density = fitted_density else: logger.info('Fitting a simple density because `fitted_density` was not provided to ' '`check_uniformabilty`') # Sample demo data based on domain trans_temp = clone(destructor) domain = get_domain_or_default(trans_temp) d = _get_domain_n_features(domain) n_train = n X_temp = _sample_demo(domain, n_train, d, random_state=rng) # Fit transformer fitted_trans = trans_temp.fit(X_temp) # Check and extract "true" density from transformer if hasattr(fitted_trans, 'density_'): true_density = fitted_trans.density_ else: raise RuntimeError('Parameter `fitted_density` is None and fitted destructor' ' did not provide `density_` attribute' ' so unable to continue with uniformability test.') # Extract number of dimensions n_features = np.array(true_density.sample(1, random_state=rng)).shape[1] # Fit transformer ideally via `fit_from_density` if it exists if has_method(destructor, 'fit_from_density'): destructor.fit_from_density(true_density) else: X_train = true_density.sample(n, random_state=rng) destructor.fit(X_train) msg = ('Because the `trans.fit_from_density` method does not exist, fitting the ' 'destructor on a separate training set. Note: This is not ideal for numerically ' 'checking uniformability because there are two sources of variability (training ' 'algorithm in `fit` and sampling rather than just sampling.') warnings.warn(msg) k = 30 fitted_trans_domain = get_domain_or_default(destructor) logger.info('Fitted transformer domain is %s' % str(np.array(fitted_trans_domain).tolist())) def _transformer_emd(_trans, _true_density): # Sample from true densities X_true = _true_density.sample(n, random_state=rng) U_true = rng.rand(n, n_features) # Apply transforms to get (approximate) samples from the other distribution X_trans = _trans.inverse_transform(U_true) U_trans = _trans.transform(X_true) # Check output ranges _assert_X_in_interval(X_trans, fitted_trans_domain) _assert_X_in_interval(U_trans, np.array([0, 1])) # Check that inverse_transform domain is 0, 1 _assert_unit_domain(rng, _trans.inverse_transform, n_features=n_features) # Compute emd between the transformed samples and independent samples from the true # distribution _X_emd = _compute_emd(X_trans, X_true) _U_emd = _compute_emd(U_trans, U_true) return _X_emd, _U_emd temp = np.array([ _transformer_emd(destructor, true_density) for _ in range(k) ]).transpose() X_emd_vec = temp[0] U_emd_vec = temp[1] # Get samples of emd scores to compute a percentile X_emd_true_vec = _emd_sample(true_density, n, k, random_state=rng) U_emd_true_vec = _emd_sample(UniformDensity().fit(np.zeros((1, n_features))), n, k, random_state=rng) # Check whether these look like good samples def _p_val(val, true_vec): return np.sum(true_vec > val) / len(true_vec) def _avg_p_val(vec, true_vec): return np.mean([ _p_val(val, true_vec) for val in vec ]) X_avg_p_val = _avg_p_val(X_emd_vec, X_emd_true_vec) U_avg_p_val = _avg_p_val(U_emd_vec, U_emd_true_vec) logger.info('X_sampled average p value = %g (higher better in this case)' % float(X_avg_p_val)) logger.info('U_sampled average p value = %g (higher better in this case)' % float(U_avg_p_val)) # Check p-values avg_p_threshold = np.maximum(0.01, 1.0 / k) err_msg = ('The `trans.%s` of %s samples does not seem to yield samples from ' 'the %s density at the p value threshold of %g (i.e. the null hypothesis that the ' 'distributions are the same can be rejected with high confidence). This may be ' 'caused by a problem in `trans.%s`, `fitted_density.sample`, or too high ' 'of a p-value threshold.') if X_avg_p_val <= avg_p_threshold: def _plot_data_for_debug(): n_samples = 1000 X_true = true_density.sample(n_samples, random_state=rng) U_true = rng.rand(n_samples, n_features) X_trans = destructor.inverse_transform(U_true) U_trans = destructor.transform(X_true) # noinspection PyPackageRequirements import matplotlib.pyplot as plt axes = plt.subplots(2, 2)[1] for ax, title, X in zip(axes.ravel(), ['X_true', 'U_true', 'X_trans', 'U_trans'], [X_true, U_true, X_trans, U_trans]): ax.scatter(X[:, 0], X[:, 1], s=3) ax.axis('equal') ax.set_title(title) plt.show(block=False) raise _UniformabilityError( err_msg % ('inverse_transform', 'uniform', 'original (assumed)', avg_p_threshold, 'inverse_transform') ) _plot_data_for_debug() if U_avg_p_val <= avg_p_threshold: raise _UniformabilityError( err_msg % ('transform', 'original (assumed)', 'uniform', avg_p_threshold, 'transform') ) logger.info(_success('_check_uniformability')) return True @_ignore_boundary_warnings def _check_invertibility(destructor, random_state=0): # Sample and fit transformer destructor = clone(destructor) rng = check_random_state(random_state) domain = get_domain_or_default(destructor, warn=True) n = 100 d = _get_support_n_features(domain) X_train = _sample_demo(domain, n, d, random_state=rng) destructor.fit(X_train) # Get demo samples and transform or inverse transform in both directions X_orig = _sample_demo(domain, n, d, random_state=rng) U_trans = destructor.transform(X_orig) X_back = destructor.inverse_transform(U_trans) U_orig = rng.rand(n, d) X_trans = destructor.inverse_transform(U_orig) U_back = destructor.transform(X_trans) # Check that the original and back transformed are nearly (numerically) the same def _check_nearly_equal(orig, back, middle): rel_diff = _relative_diff(orig, back) logger.info('Relative diff for invertibility check = %g' % rel_diff) if rel_diff > 1e-14: warnings.warn('Relative diff for invertibility check is larger than 1e-14. This may ' 'be a stability issue.') if rel_diff > 1e-9: logger.debug(orig) logger.debug(middle) logger.debug(back) def _plot_debug_data(): # noinspection PyPackageRequirements import matplotlib.pyplot as plt n_samples = 1000 X_true = destructor.sample(n_samples=n_samples) U_true = rng.rand(n_samples, d) _X_trans = destructor.inverse_transform(U_true) _U_trans = destructor.transform(X_true) axes = plt.subplots(2, 2)[1] for ax, title, X in zip(axes.ravel(), ['X_true', 'U_true', 'X_trans', 'U_trans'], [X_true, U_true, _X_trans, _U_trans]): ax.scatter(X[:, 0], X[:, 1], s=3) ax.axis('equal') ax.set_title(title) plt.show(block=False) axes = plt.subplots(2, 2)[1] for ax, title, X in zip(axes.ravel(), ['orig', 'middle', 'back'], [orig, middle, back]): ax.scatter(X[:, 0], X[:, 1], s=3) ax.axis('equal') ax.set_title(title) plt.show(block=False) raise _InvertibilityError( 'Transforming and then inverse transforming does not seem to ' 'give the original data. Relative difference = %g.' % rel_diff) _plot_debug_data() _check_nearly_equal(X_orig, X_back, U_trans) _check_nearly_equal(U_orig, U_back, X_trans) logger.info(_success('_check_invertibility')) return True @_ignore_boundary_warnings def _check_canonical_domain(destructor, random_state=0): # Setup functions to test rng = check_random_state(random_state) fitted = clone(destructor).fit(np.array([[0], [1], [0], [1], [0], [1]])) _assert_unit_domain(rng, clone(destructor).fit) _assert_unit_domain(rng, fitted.transform) if has_method(fitted, 'score', warn=False): _assert_unit_domain(rng, fitted.score) if has_method(fitted, 'score_samples', warn=False): _assert_unit_domain(rng, fitted.score_samples) logger.info(_success('_check_canonical_domain')) return True @_ignore_boundary_warnings def _check_identity_element(destructor, random_state=0): # Setup rng = check_random_state(random_state) domain = get_domain_or_default(destructor, warn=True) n = 1000 d = _get_support_n_features(domain) # Sample uniform samples in order to fit destructor X_train = rng.rand(n, d) destructor.fit(X_train) # Transform data X = rng.rand(n, d) U = destructor.transform(X) diff = _relative_diff(X, U) if diff > 0.02: # 2% movement of particles on average raise _IdentityElementError('Given that the transformer was trained on uniform samples, ' 'the transformation does not appear to be the identity. There ' 'are *two* possible causes: 1) The fitting procedure ' 'overfitted the uniform samples such that the implied density ' 'is far from uniform. 2) The transformation does not ' 'appropriately produce an identity transformation. Relative ' 'diff = %g' % diff) logger.info('Relative difference after fitting and then transforming uniform samples (shape ' '%s) = %g' % (str(X_train.shape), diff)) logger.info(_success('_check_identity_element')) return True class _DestructorError(RuntimeError): pass class _UniformabilityError(_DestructorError): pass class _InvertibilityError(_DestructorError): pass class _CanonicalDomainError(_DestructorError): pass class _IdentityElementError(_DestructorError): pass def _sample_demo(support, n_samples, n_features, random_state=None): """Sample demo dataset based on support of density.""" # Update to the number of dimensions as needed assert n_samples > 2, 'n_samples should be greater than 2 so that at' \ 'least endpoints can be given supplied' rng = check_random_state(random_state) support = check_domain(support, n_features) X = np.vstack(( rng.randn(n_samples) * 10 if s[0] == -np.inf and s[1] == np.inf else np.hstack((s[0] + (s[1] - s[0]) * rng.beta(1, 0.5, size=n_samples - 2), s)) if s[0] != -np.inf and s[1] != np.inf else np.hstack((rng.exponential(scale=10, size=n_samples - 1) + s[0], [s[0]])) if s[0] != -np.inf and s[1] == np.inf else np.hstack((s[1] - rng.exponential(scale=10, size=n_samples - 1), [s[1]])) if s[0] == -np.inf and s[1] != np.inf else np.nan # Triggers error below since cannot explicitly raise error here for s in support )).transpose() if len(X.shape) == 1: raise ValueError('Given support is not implemented; only real-valued, left-bounded, ' 'right-bounded, and bounded are implemented.') if np.any(np.isnan(X)): raise ValueError('Samples have NaN values.') if np.any(X.shape != np.array([n_samples, n_features])): raise RuntimeError('Demo data is different than specified shape.') return X def _emd_sample(density, n_samples, n_emd_samples, random_state=None): """Compute emd distances between independent samples from density. Sample the earth-mover distance (emd) between independent same-size samples of given density. Useful for statistically estimating whether a sample comes from the given density or not. """ rng = check_random_state(random_state) X_arr = [] i_sample = 0 emd_sample = np.zeros(n_emd_samples) # Generate new samples only if needed for n_emd_samples while i_sample < n_emd_samples: X = density.sample(n_samples, random_state=rng) # Compare to all previous samples for X_other in X_arr: emd_sample[i_sample] = _compute_emd(X, X_other) i_sample += 1 # Break if all needed samples have been computed if i_sample == n_emd_samples: return emd_sample X_arr.append(X) raise RuntimeError('Should have returned inside the nested loop. Must be bug in code.') def _compute_emd(X1, X2): # Setup if np.any(X1.shape != X2.shape): raise ValueError('X1 and X2 should have the same shape.') n, d = X1.shape a, b = np.ones((n,)) / n, np.ones((n,)) / n # uniform distribution on samples # Compute cost matrix M = ot.dist(X1, X2, 'euclidean') M /= M.max() return ot.emd2(a, b, M) def _check_score_method(est, score_vec, X_sample): if has_method(est, 'score'): X_sample_copy = X_sample.copy() score = est.score(X_sample) np.testing.assert_array_equal(X_sample, X_sample_copy, 'score() function should not mutate input data matrix `X`') if score_vec is not None: # 1e-100 to avoid division by 0 mean_diff = np.abs((score - np.mean(score_vec)) / np.maximum(np.abs(score), 1e-100)) sum_diff = np.abs((score - np.sum(score_vec)) / np.maximum(np.abs(score), 1e-100)) if mean_diff > 1e-14: if sum_diff < 1e-14: warnings.warn(DeprecationWarning( 'It seems that the score method returns a sum of per-sample ' 'log-likelihood. The current best practice is to return the average ' 'per-sample log-likelihood. relative_sum_diff = %g, relative_mean_diff = %g' % (sum_diff, mean_diff) )) else: warnings.warn( 'The est.score() method does not return the average log-likelihood. For ' 'standardized estimators, the score method should be the average ' 'per-sample log-likelihood. relative_mean_diff = %g' % mean_diff ) else: warnings.warn('Could not check score method output because score_samples not ' 'implemented') def _check_support(support): """Check support has dimension either (2,) or (n_features,2).""" support = np.array(support) if len(support.shape) == 1 and support.shape[0] == 2: return support elif len(support.shape) == 2 and support.shape[1] == 2: return support else: raise ValueError('The shape of support should either be (2,) or (n_features,2) but shape ') def _get_support_n_features(support, default_n_features=2): """Get the number of features based on support or return the default.""" support = _check_support(support) if len(support.shape) == 1: return default_n_features elif len(support.shape) == 2: return support.shape[0] # Alias since domain and support are handled the same _get_domain_n_features = _get_support_n_features def _relative_diff(orig, back): return np.linalg.norm(np.abs(orig - back), ord='fro') / np.linalg.norm(orig, ord='fro') def _clean_warning_registry(): """Safe way to reset warnings. (Copied from module `sklearn.utils.testing` (v. 0.19.1).) """ warnings.resetwarnings() reg = "__warningregistry__" for mod_name, mod in list(sys.modules.items()): if 'six.moves' in mod_name: continue if hasattr(mod, reg): getattr(mod, reg).clear() def _assert_no_warnings(func, *args, allow_these_warnings=None, **kw): """Assert that no warnings are issued when calling function `func`. (Edited from module `sklearn.utils.testing` (v. 0.19.1).) """ if allow_these_warnings is None: allow_these_warnings = [BoundaryWarning, _ShouldOnlyBeInTestWarning] _clean_warning_registry() with warnings.catch_warnings(record=True) as w_arr: warnings.simplefilter('always') result = func(*args, **kw) # Only keep warning if not in allowed list w_arr = [w for w in w_arr if not np.any([isinstance(w.message, allowed) for allowed in allow_these_warnings])] if len(w_arr) > 0: raise AssertionError("Got warning(s) when calling %s: [%s]" % (func.__name__, ', '.join(str(w) for w in w_arr))) return result def _assert_warns(warning_class, func, *args, **kw): """Test that a certain warning occurs. (Edited from module `sklearn.utils.testing` (v. 0.19.1).) Parameters ---------- warning_class : the warning class The class to test for, e.g. UserWarning. func : callable Calable object to trigger warnings. *args : the positional arguments to `func`. **kw : the keyword arguments to `func` Returns ------- result : the return value of `func` """ # very important to avoid uncontrolled state propagation _clean_warning_registry() with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Trigger a warning. result = func(*args, **kw) # Verify some things if not len(w) > 0: raise AssertionError("No warning raised when calling %s" % func.__name__) found = any(warning.category is warning_class for warning in w) if not found: raise AssertionError("%s did not give warning: %s( is %s)" % (func.__name__, warning_class, w)) return result def _assert_X_in_interval(X, interval): return _assert_no_warnings(check_X_in_interval, X, interval) def _assert_unit_domain(rng, func, n_features=1): # Setup test cases X_good = [ a * np.ones((10, n_features)) for a in (0, 1, 0.5) ] X_good.append(rng.rand(1000, n_features)) X_bad = [ a * np.ones((10, n_features)) for a in (2, 1.1, -0.1, -1, -2, -1e5, 1e5) ] # Attempt to fit data inside the canonical domain (should not raise warning) for X in X_good: _assert_no_warnings(func, X, allow_these_warnings=[BoundaryWarning, _ShouldOnlyBeInTestWarning, _NumDimWarning]) # Attempt to fit data outside the canonical domain (should raise warning) for X in X_bad: _assert_warns(DataConversionWarning, func, X) return True def _success(name): return 'Sucessfully passed `%s`' % name def _checking(name): return 'Checking `%s`' % name def _check_estimator_but_ignore_warnings(est): with warnings.catch_warnings(record=True) as w_arr: check_estimator(est) # Show warnings that are not data conversion warnings for w in w_arr: if isinstance(w.message, SkipTestWarning): warnings.warn(w.message)