Source code for pydeep.preprocessing

""" This module contains several classes for data preprocessing.

    :Implemented:
        - Standarizer
        - Principal Component Analysis (PCA)
        - Zero Phase Component Analysis (ZCA)
        - Independent Component Analysis (ICA)
        - Binarize data
        - Rescale data
        - Remove row means
        - Remove column means

    :Version:
        1.1.0

    :Date:
        04.04.2017

    :Author:
        Jan Melchior

    :Contact:
        JanMelchior@gmx.de

    :License:

        Copyright (C) 2017 Jan Melchior

        This file is part of the Python library PyDeep.

        PyDeep is free software: you can redistribute it and/or modify
        it under the terms of the GNU General Public License as published by
        the Free Software Foundation, either version 3 of the License, or
        (at your option) any later version.

        This program is distributed in the hope that it will be useful,
        but WITHOUT ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        GNU General Public License for more details.

        You should have received a copy of the GNU General Public License
        along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
import numpy as numx
import pydeep.base.numpyextension as npext


def binarize_data(data):
    """ Converts data to binary values. \
        For data out of [a,b] a data point p will become zero if p < 0.5*(b-a) one otherwise.

    :param data: Data to be binarized.
    :type data: numpy array [num data point, data dimension]

    :return: Binarized data.
    :rtype: numpy array [num data point, data dimension]
    """
    return numx.array(numx.where(data < 0.5, 0, 1))


def rescale_data(data, new_min=0.0, new_max=1.0):
    """ Normalize the values of a matrix. e.g. [min,max] ->  [new_min,new_max]

    :param data: Data to be normalized.
    :type data: numpy array [num data point, data dimension]

    :param new_min: New min value.
    :type new_min: float

    :param new_max: Rescaled data
    :type new_max: float

    :return:
    :rtype: numpy array [num data point, data dimension]
    """
    datac = numx.array(data, numx.float64)
    minimum = numx.min(numx.min(datac, axis=1), axis=0)
    datac -= minimum
    maximum = numx.max(numx.max(datac, axis=1), axis=0)
    datac *= (new_max - new_min) / maximum
    datac += new_min
    return datac


def remove_rows_means(data, return_means=False):
    """ Remove the individual mean of each row.

    :param data: Data to be normalized
    :type data: numpy array [num data point, data dimension]

    :param return_means: If True returns also the means
    :type return_means: bool

    :return: Data without row means, row means (optional).
    :rtype: numpy array [num data point, data dimension], Means of the data (optional).
    """
    means = numx.mean(data, axis=1).reshape(data.shape[0], 1)
    output = data - means
    if return_means is True:
        return output, means
    else:
        return output


def remove_cols_means(data, return_means=False):
    """ Remove the individual mean of each column.

    :param data: Data to be normalized
    :type data: numpy array [num data point, data dimension]

    :param return_means: If True returns also the means
    :type return_means: bool

    :return: Data without column means, column means (optional).
    :rtype: numpy array [num data point, data dimension], Means of the data (optional).
    """
    means = numx.mean(data, axis=0).reshape(1, data.shape[1])
    output = data - means
    if return_means is True:
        return output, means
    else:
        return output


[docs]class STANDARIZER(object): """ Shifts the data having zero mean and scales it having unit variances along the axis. """
[docs] def __init__(self, input_dim): """ Constructor. :param input_dim: Data dimensionality. :type input_dim: int """ self.input_dim = input_dim self.output_dim = input_dim self.mean = None self.covariance_matrix = None self.standard_deviation = None self.trained = False
[docs] def train(self, data): """ Training the model (full batch). :param data: Data for training. :type data: numpy array [num data point, data dimension] """ if self.input_dim != data.shape[1]: raise ValueError("Wrong data dimensionality.") # Center data and compute covariance matrix self.mean = numx.mean(data, axis=0).reshape(1, data.shape[1]) self.covariance_matrix = numx.cov(data - self.mean, rowvar=0) self.standard_deviation = numx.std(data, axis=0).reshape(1, data.shape[1]) self.trained = True
[docs] def project(self, data): """ Projects the data to normalized space. :param data: Data to project. :type data: numpy array [num data point, data dimension] :return: Projected data. :rtype: numpy array [num data point, data dimension] """ if self.input_dim != data.shape[1]: raise ValueError("Wrong data dimensionality.") if not self.trained: raise ValueError("Train model first!") return (data - self.mean) / self.standard_deviation
[docs] def unproject(self, data): """ Projects the data back to the input space. :param data: Data to unproject. :type data: numpy array [num data point, data dimension] :return: Projected data. :rtype: numpy array [num data point, data dimension] """ if self.input_dim != data.shape[1]: raise ValueError("Wrong data dimensionality.") if not self.trained: raise ValueError("Train model first!") return data * self.standard_deviation + self.mean
[docs]class PCA(STANDARIZER): """ Principle component analysis (PCA) using Singular Value Decomposition (SVD) """
[docs] def __init__(self, input_dim, whiten=False): """ Constructor. :param input_dim: Data dimensionality. :type input_dim: int :param whiten: If true the projected data will be de-correlated in all directions. :type whiten: bool """ super(PCA, self).__init__(input_dim) self.whiten = whiten self.eigen_values = None self.projection_matrix = None self.unprojection_matrix = None
[docs] def train(self, data): """ Training the model (full batch). :param data: data for training. :type data: numpy array [num data point, data dimension] """ super(PCA, self).train(data) # Compute Eigenvalue and Eigenvectors of Covariance matrix self.projection_matrix, self.eigen_values, _ = numx.linalg.svd( self.covariance_matrix) # Sort Eigenvalues and Eigenvectors by Eigenvalues in decreasing order index = numx.argsort(self.eigen_values)[::-1] self.eigen_values = self.eigen_values[index].reshape(1, index.shape[0]) self.projection_matrix = self.projection_matrix[:, index] self.unprojection_matrix = self.projection_matrix.T # If true the projected data will be decorrelated in all directions if self.whiten is True: self.unprojection_matrix = (self.projection_matrix * numx.sqrt(self.eigen_values)).T self.projection_matrix = self.projection_matrix / numx.sqrt(self.eigen_values) self.trained = True
[docs] def project(self, data, num_components=None): """ Projects the data to Eigenspace. :Info: projection_matrix has its projected vectors as its columns. i.e. if we project x by W into y where W is \ the projection_matrix, then y = W.T * x :param data: Data to project. :type data: numpy array [num data point, data dimension] :param num_components: :type num_components: int or None :return: Projected data. :rtype: numpy array [num data point, data dimension] """ if not self.trained: raise ValueError("Train model first!") n = self.output_dim if num_components is not None: n = num_components return numx.dot(data - self.mean, self.projection_matrix[:, 0:n])
[docs] def unproject(self, data, num_components=None): """ Projects the data from Eigenspace to normal space. :param data: Data to be unprojected. :type data: numpy array [num data point, data dimension] :param num_components: Number of components to project. :type num_components: int :return: Unprojected data. :rtype: numpy array [num data point, num_components] """ if not self.trained: raise ValueError("Train model first!") n = self.input_dim if num_components is not None: n = num_components return numx.dot(data, self.unprojection_matrix[0:data.shape[1], 0:n]) + self.mean[:, 0:n]
[docs]class ZCA(PCA): """ Principle component analysis (PCA) using Singular Value Decomposition (SVD). """
[docs] def __init__(self, input_dim): """ Constructor. :param input_dim: Data dimensionality. :type input_dim: int """ super(ZCA, self).__init__(input_dim, False)
[docs] def train(self, data): """ Training the model (full batch). :param data: data for training. :type data: numpy array [num data point, data dimension] """ super(ZCA, self).train(data) self.projection_matrix = numx.dot(self.projection_matrix / numx.sqrt(self.eigen_values), self.projection_matrix.T) self.unprojection_matrix = numx.dot(self.unprojection_matrix.T * numx.sqrt(self.eigen_values), self.unprojection_matrix)
[docs]class ICA(PCA): """ Independent Component Analysis using FastICA. """
[docs] def __init__(self, input_dim): """ Constructor. :param input_dim: Data dimensionality. :type input_dim: int """ super(ICA, self).__init__(input_dim, False) self.input_dim = input_dim self.output_dim = input_dim self.trained = False
[docs] def train(self, data, iterations=1000, convergence=0.0, status=False): """ Training the model (full batch). :param data: data for training. :type data: numpy array [num data point, data dimension] :param iterations: Number of iterations :type iterations: int :param convergence: If the angle (in degrees) between filters of two updates is less than the given value, \ training is terminated. :type convergence: double :param status: If true the progress is printed to the console. :type status: bool """ if self.input_dim != data.shape[1]: raise ValueError("Wrong data dimensionality.") # Random init self.projection_matrix = numx.random.randn(data.shape[1], data.shape[1]) projection_matrix_old = numx.copy(self.projection_matrix) for epoch in range(0, iterations): # One iteration. # TODO: PendingDeprecationWarning: the matrix subclass is not the recommended # way to represent matrices or deal with linear algebra (see # https://docs.scipy.org/doc/numpy/user/numpy-for-matlab-users.html). Please # adjust your code to use regular ndarray. hyptan = 1.0 - 2.0 / (numx.exp(2.0 * numx.dot(data, self.projection_matrix)) + 1.0) self.projection_matrix = (numx.dot(data.T, hyptan) / data.shape[0] - numx.array(numx.dot(numx.ones( (data.shape[1], 1)), numx.matrix(numx.mean(1.0 - hyptan ** 2.0, axis=0)))) * self.projection_matrix) tmp = numx.linalg.inv(numx.dot(self.projection_matrix.T, self.projection_matrix)) ew, ev = numx.linalg.eig(tmp) self.projection_matrix = numx.dot(self.projection_matrix, numx.real(numx.dot(numx.dot(ev, numx.diag(ew) ** 0.5), ev.T))) angle = numx.mean( numx.diagonal(npext.angle_between_vectors(projection_matrix_old.T, self.projection_matrix.T, True))) if angle < convergence or 180.0 - angle < convergence: break projection_matrix_old = numx.copy(self.projection_matrix) if status is True: import pydeep.misc.measuring as mea mea.print_progress(epoch, iterations, True) # Set results self.mean = numx.zeros((1, data.shape[1])) self.unprojection_matrix = self.projection_matrix.T self.trained = True
[docs] def log_likelihood(self, data): """ Calculates the Log-Likelihood (LL) for the given data. :param data: data to calculate the Log-Likelihood for. :type data: numpy array [num data point, data dimension] :return: log-likelihood. :rtype: numpy array [num data point] """ if not self.trained: raise ValueError("Train model first!") return numx.sum(numx.log(0.5 / (numx.cosh(numx.dot(self.unprojection_matrix, data.T)) ** 2.0)), axis=0) + numx.log(numx.abs(numx.linalg.det(self.projection_matrix)))