"""Module for local destructors such as adjacent-pixel-pair destructors."""
from __future__ import division, print_function
import logging
import warnings
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.externals.joblib import Parallel, delayed
from sklearn.utils.validation import check_array, check_is_fitted, check_random_state
from .base import DestructorMixin
from .independent import IndependentDensity, IndependentDestructor
from .univariate import HistogramUnivariateDensity
# noinspection PyProtectedMember
from .utils import _UNIT_SPACE
logger = logging.getLogger(__name__)
[docs]class FeatureGroupsDestructor(BaseEstimator, DestructorMixin):
"""Destructor that transforms groups of features independently.
Parameters
----------
groups_estimator : estimator, default=RandomFeaturePairs
Estimator that determines grouping.
group_canonical_destructor : estimator
Destructor that will be fitted and applied to each group of features
independently.
n_jobs : int
Number of jobs to use when fitting or transforming. Leverages joblib.
Attributes
----------
groups_ : array-like, shape (n_groups, n_feature_per_group)
Feature indices for each group. Note that there should be no
duplicate indices so that each group can be transformed independently.
group_destructors_ : array of estimators, shape (n_groups,)
Array of destructors for each feature group.
n_features_ : int
Number of features of the training data.
See Also
--------
ImageFeaturePairs
RandomFeaturePairs
"""
[docs] def __init__(self, groups_estimator=None, group_canonical_destructor=None, n_jobs=1):
self.groups_estimator = groups_estimator
self.group_canonical_destructor = group_canonical_destructor
self.n_jobs = n_jobs
[docs] def fit(self, X, y=None, **fit_params):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
fit_params : dict, optional
Optional extra fit parameters.
Returns
-------
self : estimator
Returns the instance itself.
"""
self.fit_transform(X, y, **fit_params)
return self
[docs] def get_domain(self):
"""Get the domain of this destructor.
Returns
-------
domain : array-like, shape (2,) or shape (n_features, 2)
If shape is (2, ), then ``domain[0]`` is the minimum and
``domain[1]`` is the maximum for all features. If shape is
(`n_features`, 2), then each feature's domain (which could
be different for each feature) is given similar to the first
case.
"""
# We assume canonical destructors
return _UNIT_SPACE
[docs] def score_samples(self, X, y=None):
"""Compute log-likelihood (or log(det(Jacobian))) for each sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
New data, where n_samples is the number of samples and n_features
is the number of features.
y : None, default=None
Not used but kept for compatibility.
Returns
-------
log_likelihood : array, shape (n_samples,)
Log likelihood of each data point in X.
"""
self._check_is_fitted()
X = check_array(X, copy=True)
X = np.asfortranarray(X)
group_score_samples = Parallel(n_jobs=self.n_jobs)(
delayed(_score_samples)(X[:, group], d)
for group, d in zip(self.groups_, self.group_destructors_)
)
return np.sum(group_score_samples, axis=0)
def _check_is_fitted(self):
check_is_fitted(self, ['groups_', 'group_destructors_'])
def _fit_transform(Z_group, d):
Z_group = d.fit_transform(Z_group)
return Z_group, d
def _transform(Z_group, d):
return d.transform(Z_group)
def _inverse_transform(Z_group, d):
return d.inverse_transform(Z_group)
def _score_samples(X_group, d):
return d.score_samples(X_group)
[docs]class RandomFeaturePairs(BaseEstimator):
"""Random feature pairs estimator for use with FeatureGroupsDestructor.
Randomly groups features into pairs.
Parameters
----------
random_state : int, RandomState instance or None, optional (default=None)
If int, `random_state` is the seed used by the random number
generator; If :class:`~numpy.random.RandomState` instance,
`random_state` is the random number generator; If None, the random
number generator is the :class:`~numpy.random.RandomState` instance
used by :mod:`numpy.random`.
Attributes
----------
groups_ : array-like, shape (n_groups, 2)
Feature indices for each group. Note that there should be no
duplicate indices so that each group can be transformed independently.
See Also
--------
FeatureGroupsDestructor
"""
[docs] def __init__(self, random_state=None):
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
Returns
-------
self : estimator
Returns the instance itself.
"""
X = check_array(X)
rng = check_random_state(self.random_state)
n_features = X.shape[1]
perm = rng.permutation(n_features)
if np.mod(n_features, 2) == 1:
# Remove one pair
perm = perm[:-1]
logger.info('Odd number of dimensions so one dimension is not paired')
self.groups_ = perm.reshape(-1, 2)
return self
[docs]class ImageFeaturePairs(BaseEstimator):
"""Generate pairs of pixels based on image layout.
For use with :class:`~ddl.local.FeatureGroupsDestructor`.
Parameters
----------
image_shape : array-like, shape (n_image_dimensions,)
The shape such that ``X[0,:].reshape(image_shape)`` is converted to
an image. Note that image_shape could have any length depending on
the number of image channels, e.g. color images with rgb channels.
relative_position : array-like, shape (n_image_dimensions,)
A relative position to pair with a selected feature. For example,
if `relative_position` = (1, 0), then the pixels will be paired
horizontally whereas if `relative_position` = (0, 1), then the pixels
will be paired vertically.
init_offset: array-like, shape (n_image_dimensions,)
The amount to offset in all directions on the image. For example,
one might first do a init_offset of (0, 0) and then a init_offset of
(1, 0) to couple the all horizontal pixels.
wrap : bool
Whether to wrap the pixels to the other side so that all features
are paired. For example, if `relative_position = (1,0)` and
`init_offset = (1,0)`, the last pixel on the row will match with the
first pixel on the row.
Attributes
----------
groups_ : array-like, shape (n_groups, 2)
Feature indices for each group. Note that there should be no
duplicate indices so that each group can be transformed independently.
See Also
--------
FeatureGroupsDestructor
"""
[docs] def __init__(self, image_shape=None, relative_position=None, init_offset=None, step=None,
wrap=True):
self.image_shape = image_shape
self.relative_position = relative_position
self.init_offset = init_offset
self.step = step
self.wrap = wrap
[docs] def fit(self, X, y=None):
"""Fit estimator to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : None, default=None
Not used in the fitting process but kept for compatibility.
Returns
-------
self : estimator
Returns the instance itself.
"""
def _check_image_shape(shape, x):
if shape is None:
return x.shape
else:
try:
image_x = x.reshape(shape)
except ValueError:
raise ValueError('Coulc not reshape X[0,:] into image_shape')
return image_x.shape
def _check_relative_position(pos, shape):
if pos is None:
pos = np.zeros(shape)
pos[0] = 1
else:
return np.array(pos)
def _check_init_offset(offset, shape):
if offset is None:
offset = np.zeros(shape)
offset = np.array(offset)
# if np.sum(offset) > 1:
# raise ValueError('np.sum(init_offset) should be less than 1.')
return offset
def _check_step(_step, shape):
_step = _check_relative_position(_step, shape)
if np.all(_step == 0):
raise ValueError('step should be a non-zero array-like')
return _step
# Validate inputs and parameters
X = check_array(X)
if X.shape[0] < 1:
raise ValueError('X must have one row so that the image_shape can be checked.')
image_shape = _check_image_shape(self.image_shape, X[0, :])
relative_position = _check_relative_position(self.relative_position, image_shape)
init_offset = _check_init_offset(self.init_offset, image_shape)
step = _check_step(self.step, image_shape)
if (len(image_shape) != len(relative_position)
or len(image_shape) != len(init_offset)
or len(image_shape) != len(step)):
raise ValueError('length of image_shape, relative_position and init_offset should all '
'be the same')
# Setup unpaired features
n_features = np.prod(image_shape)
if n_features < 2:
raise ValueError('n_features < 2 but this means there are no pairs')
unpaired_features = set(range(n_features))
def _lin_idx(I):
return np.ravel_multi_index(I, image_shape)
def _wrap(I):
if self.wrap:
return np.mod(I, image_shape)
return I
def _check_I(I):
# Check that it is within the bounds of the image
# Wrapping should have already been performed
if np.any(I / image_shape >= 1):
return False
# Check if this feature_idx has already been paired
linear_idx = _lin_idx(I)
if linear_idx not in unpaired_features:
return False
# Otherwise return true
return True
# Only wrap pair if allowed
pairs = []
cur_I = init_offset
pair_I = _wrap(cur_I + relative_position)
while len(unpaired_features) > 0:
unpaired_idx = -1 # Whether searching through unpaired_features
unpaired_arr = None
found_pair = True
while not _check_I(cur_I) or not _check_I(pair_I):
if unpaired_idx == -1:
# If invalid pair, then step
cur_I += step
pair_I = _wrap(cur_I + relative_position)
if np.all(cur_I / image_shape < 1):
continue
else:
unpaired_idx = 0
unpaired_arr = np.sort(list(unpaired_features))
# Get I from linear idx
if unpaired_idx >= len(unpaired_arr):
# We have reached the end of the unpaired_idx so break
# No valid indices found
warnings.warn('Did not pair all features.')
found_pair = False
break
cur_I = np.array(np.unravel_index(unpaired_arr[unpaired_idx], image_shape))
pair_I = _wrap(cur_I + relative_position)
unpaired_idx += 1
# Break if did not find a pair
if not found_pair:
break
# Add pair and remove
new_pair = (_lin_idx(cur_I), _lin_idx(pair_I))
pairs.append(new_pair)
unpaired_features.remove(new_pair[0])
unpaired_features.remove(new_pair[1])
self.groups_ = pairs
self.unpaired_features_ = np.sort(list(unpaired_features))
return self