Source code for pydeep.ae.model

''' This module provides a general implementation of a 3 layer tied weights Auto-encoder (x-h-y).
    The code is focused on readability and clearness, while keeping the efficiency and flexibility high.
    Several activation functions are available for visible and hidden units which can be mixed arbitrarily.
    The code can easily be adapted to AEs without tied weights. For deep AEs the FFN code can be adapted.

    :Implemented:
        -  AE  - Auto-encoder (centered)
        - DAE  - Denoising Auto-encoder (centered)
        - SAE  - Sparse Auto-encoder (centered)
        - CAE  - Contractive Auto-encoder (centered)
        - SLAE - Slow Auto-encoder (centered)

    :Info:
        http://ufldl.stanford.edu/wiki/index.php/Sparse_Coding:_Autoencoder_Interpretation

    :Version:
        1.0

    :Date:
        08.02.2016

    :Author:
        Jan Melchior

    :Contact:
        JanMelchior@gmx.de

    :License:

        Copyright (C) 2016 Jan Melchior

        This program is free software: you can redistribute it and/or modify
        it under the terms of the GNU General Public License as published by
        the Free Software Foundation, either version 3 of the License, or
        (at your option) any later version.

        This program is distributed in the hope that it will be useful,
        but WITHOUT ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        GNU General Public License for more details.

        You should have received a copy of the GNU General Public License
        along with this program.  If not, see <http://www.gnu.org/licenses/>.

'''

import numpy as numx

from pydeep.base.activationfunction import Sigmoid, SoftMax
from pydeep.base.basicstructure import BipartiteGraph
from pydeep.base.costfunction import CrossEntropyError


[docs]class AutoEncoder(BipartiteGraph): ''' Class for a 3 Layer Auto-encoder (x-h-y) with tied weights. '''
[docs] def __init__(self, number_visibles, number_hiddens, data=None, visible_activation_function=Sigmoid, hidden_activation_function=Sigmoid, cost_function=CrossEntropyError, initial_weights='AUTO', initial_visible_bias='AUTO', initial_hidden_bias='AUTO', initial_visible_offsets='AUTO', initial_hidden_offsets='AUTO', dtype=numx.float64): ''' This function initializes all necessary parameters and data structures. It is recommended to pass the training data to initialize the network automatically. :Parameters: number_visibles: Number of the visible variables. -type: int number_hiddens Number of hidden variables. -type: int data: The training data for parameter initialization if 'AUTO' is chosen. -type: None or numpy array [num samples, input dim] or List of numpy arrays [num samples, input dim] visible_activation_function: A non linear transformation function for the visible units (default: Sigmoid) -type: Subclass of ActivationFunction() hidden_activation_function: A non linear transformation function for the hidden units (default: Sigmoid) -type: Subclass of ActivationFunction cost_function A cost function (default: CrossEntropyError()) -type: subclass of FNNCostFunction() initial_weights: Initial weights.'AUTO' is random -type: 'AUTO', scalar or numpy array [input dim, output_dim] initial_visible_bias: Initial visible bias. 'AUTO' is random 'INVERSE_SIGMOID' is the inverse Sigmoid of the visilbe mean -type: 'AUTO','INVERSE_SIGMOID', scalar or numpy array [1, input dim] initial_hidden_bias: Initial hidden bias. 'AUTO' is random 'INVERSE_SIGMOID' is the inverse Sigmoid of the hidden mean -type: 'AUTO','INVERSE_SIGMOID', scalar or numpy array [1, output_dim] initial_visible_offsets: Initial visible mean values. AUTO=data mean or 0.5 if not data is given. -type: 'AUTO', scalar or numpy array [1, input dim] initial_hidden_offsets: Initial hidden mean values. AUTO = 0.5 -type: 'AUTO', scalar or numpy array [1, output_dim] dtype: Used data type i.e. numpy.float64 -type: numpy.float32 or numpy.float64 or numpy.longdouble ''' if (cost_function == CrossEntropyError) and not (visible_activation_function == Sigmoid): raise Exception("The Cross entropy cost should only be used with Sigmoid units or units of " "interval (0,1)", UserWarning) # Set the initial_visible_bias to zero if the activation function is not a SIGMOID if ((initial_visible_bias is 'AUTO' or initial_visible_bias is 'INVERSE_SIGMOID') and not (visible_activation_function == Sigmoid)): initial_visible_bias = 0.0 # Set the AUTO initial_hidden_bias to zero if the activation function is not a SIGMOID if ((initial_hidden_bias is 'AUTO' or initial_hidden_bias is 'INVERSE_SIGMOID') and not (hidden_activation_function == Sigmoid)): initial_hidden_bias = 0.0 if visible_activation_function == SoftMax or hidden_activation_function == SoftMax: raise Exception("Softmax not supported but you can use FNN instead!") # Call constructor of superclass super(AutoEncoder, self).__init__(number_visibles=number_visibles, number_hiddens=number_hiddens, data=data, visible_activation_function=visible_activation_function, hidden_activation_function=hidden_activation_function, initial_weights=initial_weights, initial_visible_bias=initial_visible_bias, initial_hidden_bias=initial_hidden_bias, initial_visible_offsets=initial_visible_offsets, initial_hidden_offsets=initial_hidden_offsets, dtype=dtype) # Store the cost function self.cost_function = cost_function
[docs] def _get_contractive_penalty(self, a_h, factor): ''' Calculates contractive penalty cost for a data point x. :Parameters: a_h: Pre-synaptic activation of h: a_h = (Wx+c). -type: numpy array [num samples, hidden dim] factor: Influence factor (lambda) for the penalty. -type: float :Returns: Contractive penalty costs for x. -type: numpy array [num samples] ''' w2_sum = numx.sum(self.w ** 2.0, axis=0).reshape(1, self.output_dim) df2 = self.hidden_activation_function.df(a_h) ** 2.0 return factor * numx.sum(df2 * w2_sum, axis=1)
[docs] def _get_sparse_penalty(self, h, factor, desired_sparseness): ''' Calculates sparseness penalty cost for a data point x. .. Warning:: Different penalties are used depending on the hidden activation function. :Parameters: h: hidden activation. -type: numpy array [num samples, hidden dim] factor: Influence factor (beta) for the penalty. -type: float desired_sparseness: Desired average hidden activation. -type: float :Returns: Sparseness penalty costs for x. -type: numpy array [num samples] ''' mean_h = numx.atleast_2d(numx.mean(h, axis=0)) if self.hidden_activation_function == Sigmoid: min_value = 1e-10 max_value = 1.0 - min_value mean_h = numx.atleast_2d(numx.clip(mean_h, min_value, max_value)) sparseness = desired_sparseness * numx.log(desired_sparseness / mean_h) + (1.0 - desired_sparseness) * numx.log( (1.0 - desired_sparseness) / (1.0 - mean_h)) else: sparseness = (desired_sparseness - mean_h) ** 2.0 return factor * numx.sum(sparseness, axis=1)
[docs] def _get_slowness_penalty(self, h, h_next, factor): ''' Calculates slowness penalty cost for a data point x. .. Warning:: Different penalties are used depending on the hidden activation function. :Parameters: h: hidden activation. -type: numpy array [num samples, hidden dim] h_next: hidden activation of the next data point in a sequence. -type: numpy array [num samples, hidden dim] factor: Influence factor (beta) for the penalty. -type: float :Returns: Sparseness penalty costs for x. -type: numpy array [num samples] ''' return factor * numx.sum((h - h_next) ** 2.0, axis=1)
[docs] def energy(self, x, contractive_penalty=0.0, sparse_penalty=0.0, desired_sparseness=0.01, x_next=None, slowness_penalty=0.0): ''' Calculates the energy/cost for a data point x. :Parameters: x: Data points. -type: numpy array [num samples, input dim] contractive_penalty: If a value > 0.0 is given the cost is also calculated on the contractive penalty. -type: float sparse_penalty: If a value > 0.0 is given the cost is also calculated on the sparseness penalty. -type: float desired_sparseness: Desired average hidden activation. -type: float x_next: Next data points. -type: None or numpy array [num samples, input dim] slowness_penalty: If a value > 0.0 is given the cost is also calculated on the slowness penalty. -type: float :Returns: Costs for x. -type: numpy array [num samples] ''' a_h, h = self._encode(x) _, y = self._decode(h) cost = self.cost_function.f(y, x) if contractive_penalty > 0.0: cost += self._get_contractive_penalty(a_h, contractive_penalty) if sparse_penalty > 0.0: cost += self._get_sparse_penalty(h, sparse_penalty, desired_sparseness) if slowness_penalty > 0.0 and x_next is not None: h_next = self.encode(x_next) cost += self._get_slowness_penalty(h, h_next, slowness_penalty) return cost
[docs] def _encode(self, x): ''' The function propagates the activation of the input layer through the network to the hidden/output layer. :Parameters: x: Input of the network. -type: numpy array [num samples, input dim] :Returns: Pre and Post synaptic output. -type: List of arrays [num samples, hidden dim] ''' # Compute pre-synaptic activation pre_act_h = self._hidden_pre_activation(x) # Compute post-synaptic activation h = self._hidden_post_activation(pre_act_h) return pre_act_h, h
[docs] def encode(self, x): ''' The function propagates the activation of the input layer through the network to the hidden/output layer. :Parameters: x: Input of the network. -type: numpy array [num samples, input dim] :Returns: Output of the network. -type: array [num samples, hidden dim] ''' return self._encode(numx.atleast_2d(x))[1]
[docs] def _decode(self, h): ''' The function propagates the activation of the hidden layer reverse through the network to the input layer. :Parameters: h: Output of the network -type: numpy array [num samples, hidden dim] :Returns: Input of the network. -type: array [num samples, input dim] ''' # Compute pre-synaptic activation pre_act_y = self._visible_pre_activation(h) # Compute post-synaptic activation y = self._visible_post_activation(pre_act_y) return pre_act_y, y
[docs] def decode(self, h): ''' The function propagates the activation of the hidden layer reverse through the network to the input layer. :Parameters: h: Output of the network -type: numpy array [num samples, hidden dim] :Returns: Pre and Post synaptic input. -type: List of arrays [num samples, input dim] ''' return self._decode(numx.atleast_2d(h))[1]
[docs] def reconstruction_error(self, x, absolut=False): ''' Calculates the reconstruction error for given training data. :Parameters: x: Datapoints -type: numpy array [num samples, input dim] absolut: If true the absolute error is caluclated. -type: bool :Returns: Reconstruction error. -type: List of arrays [num samples, 1] ''' diff = x - self.decode(self.encode(x)) if absolut is True: diff = numx.abs(diff) else: diff = diff ** 2 return numx.sum(diff, axis=1)
[docs] def _get_gradients(self, x, a_h, h, a_y, y, reg_contractive, reg_sparseness, desired_sparseness, reg_slowness, x_next, a_h_next, h_next): ''' Computes the gradients of weights, visible and the hidden bias. Depending on whether contractive penalty and or sparse penalty is used the gradient changes. :Parameters: x: Training data. -type: numpy array [num samples, input dim] a_h: Pre-synaptic activation of h: a_h = (Wx+c). -type: numpy array [num samples, output dim] h Post-synaptic activation of h: h = f(a_h). -type: numpy array [num samples, output dim] a_y: Pre-synaptic activation of y: a_y = (Wh+b). -type: numpy array [num samples, input dim] y Post-synaptic activation of y: y = f(a_y). -type: numpy array [num samples, input dim] reg_contractive: Contractive influence factor (lambda). -type: float reg_sparseness: Sparseness influence factor (lambda). -type: float desired_sparseness: Desired average hidden activation. -type: float reg_slowness: Slowness influence factor. -type: float x_next: Next Training data in Sequence. -type: numpy array [num samples, input dim] a_h_next: Next pre-synaptic activation of h: a_h = (Wx+c). -type: numpy array [num samples, output dim] h_next Next post-synaptic activation of h: h = f(a_h). -type: numpy array [num samples, input dim] ''' # Calculate derivatives for the activation functions df_a_y = self.visible_activation_function.df(a_y) df_a_h = self.hidden_activation_function.df(a_h) # Calculate the visible bias gradient grad_b = self.cost_function.df(y, x) * df_a_y # Calculate the hidden bias gradient grad_c = numx.dot(grad_b, self.w) # Add the sparse penalty gradient part if reg_sparseness > 0.0: grad_c += reg_sparseness * self.__get_sparse_penalty_gradient_part(h, desired_sparseness) grad_c *= df_a_h # Calculate weight gradient grad_W = numx.dot((x - self.ov).T, grad_c) + numx.dot(grad_b.T, (h - self.oh)) # Average / Normalize over batches grad_b = numx.mean(grad_b, axis=0).reshape(1, self.input_dim) grad_c = numx.mean(grad_c, axis=0).reshape(1, self.output_dim) grad_W /= x.shape[0] # Add contractive penalty gradient if reg_contractive > 0.0: pW, pc = self._get_contractive_penalty_gradient(x, a_h, df_a_h) grad_c += reg_contractive * pc grad_W += reg_contractive * pW if reg_slowness > 0.0 and x_next is not None: df_a_h_next = self.hidden_activation_function.df(a_h_next) pW, pc = self._get_slowness_penalty_gradient(x, x_next, h, h_next, df_a_h, df_a_h_next) grad_c += reg_slowness * pc grad_W += reg_slowness * pW return [grad_W, grad_b, grad_c]
def __get_sparse_penalty_gradient_part(self, h, desired_sparseness): ''' This function computes the desired part of the gradient for the sparse penalty term. Only used for efficiency. :Parameters: h: hidden activations -type: numpy array [num samples, input dim] desired_sparseness: Desired average hidden activation. -type: float :Returs: The computed gradient part is returned -type: numpy array [1, hidden dim] ''' mean_h = numx.atleast_2d(numx.mean(h, axis=0)) if self.hidden_activation_function == Sigmoid or isinstance(self.hidden_activation_function, Sigmoid): min_value = 1e-10 max_value = 1.0 - min_value mean_h = numx.clip(mean_h, min_value, max_value) grad = -desired_sparseness / mean_h + (1.0 - desired_sparseness) / (1.0 - mean_h) else: grad = -2.0 * (desired_sparseness - mean_h) return grad
[docs] def _get_sparse_penalty_gradient(self, h, df_a_h, desired_sparseness): ''' This function computes the gradient for the sparse penalty term. :Parameters: h: hidden activations -type: numpy array [num samples, input dim] df_a_h: Derivative of untransformed hidden activations -type: numpy array [num samples, input dim] desired_sparseness: Desired average hidden activation. -type: float :Returs: The computed gradient part is returned -type: numpy array [1, hidden dim] ''' return self.__get_sparse_penalty_gradient_part(h, desired_sparseness) * df_a_h
[docs] def _get_contractive_penalty_gradient(self, x, a_h, df_a_h): ''' This function computes the gradient for the contractive penalty term. :Parameters: x: Training data. -type: numpy array [num samples, input dim] a_h: Untransformed hidden activations -type: numpy array [num samples, input dim] df_a_h: Derivative of untransformed hidden activations -type: numpy array [num samples, input dim] :Returs: The computed gradient is returned -type: numpy array [input dim, hidden dim] ''' ddf_a_h = self.hidden_activation_function.ddf(a_h) w2_sum = numx.sum(self.w ** 2, axis=0).reshape(1, self.output_dim) grad_c = 2.0 * df_a_h * ddf_a_h * w2_sum grad_w = numx.dot((x - self.ov).T, 2.0 * df_a_h * ddf_a_h) * w2_sum / x.shape[0] + 2.0 * self.w * ( numx.mean(df_a_h ** 2.0, axis=0)) grad_c = numx.mean(grad_c, axis=0).reshape(1, self.output_dim) return [grad_w, grad_c]
[docs] def _get_slowness_penalty_gradient(self, x, x_next, h, h_next, df_a_h, df_a_h_next): ''' This function computes the gradient for the slowness penalty term. :Parameters: x: Training data. -type: numpy array [num samples, input dim] x_next: Next training data points in Sequence. -type: numpy array [num samples, input dim] h: Corresponding hidden activations. -type: numpy array [num samples, output dim] h_next: Corresponding next hidden activations. -type: numpy array [num samples, output dim] df_a_h: Derivative of untransformed hidden activations. -type: numpy array [num samples, input dim] df_a_h_next: Derivative of untransformed next hidden activations. -type: numpy array [num samples, input dim] :Returs: The computed gradient is returned -type: numpy array [input dim, hidden dim] ''' diff = 2.0 * (h - h_next) grad_w = (numx.dot((x - self.ov).T, diff * df_a_h) - numx.dot((x_next - self.ov).T, diff * df_a_h_next)) / \ x.shape[0] grad_c = numx.mean(diff * (df_a_h - df_a_h_next), axis=0) return [grad_w, grad_c]
[docs] def finit_differences(self, data, delta, reg_sparseness, desired_sparseness, reg_contractive, reg_slowness, data_next): ''' Finite differences test for AEs. The finite differences test involves all functions of the model except init and reconstruction_error data: The training data -type: numpy array [num samples, input dim] delta: The learning rate. -type: numpy array[num parameters] reg_sparseness: The parameter (epsilon) for the sparseness regularization. -type: float desired_sparseness: Desired average hidden activation. -type: float reg_contractive: The parameter (epsilon) for the contractive regularization. -type: float reg_slowness: The parameter (epsilon) for the slowness regularization. -type: float data_next: The next training data in the sequence. -type: numpy array [num samples, input dim] ''' data = numx.atleast_2d(data) diff_w = numx.zeros((data.shape[0], self.w.shape[0], self.w.shape[1])) diff_b = numx.zeros((data.shape[0], self.bv.shape[0], self.bv.shape[1])) diff_c = numx.zeros((data.shape[0], self.bh.shape[0], self.bh.shape[1])) for d in range(data.shape[0]): batch = data[d].reshape(1, data.shape[1]) a_h, h = self._encode(batch) a_y, y = self._decode(h) if data_next is not None: batch_next = data_next[d].reshape(1, data.shape[1]) a_h_next, h_next = self._encode(batch_next) else: batch_next = None a_h_next, h_next = None, None for i in range(self.input_dim): for j in range(self.output_dim): grad_w_ij = self._get_gradients(batch, a_h, h, a_y, y, reg_contractive, reg_sparseness, desired_sparseness, reg_slowness, batch_next, a_h_next, h_next)[0][i, j] self.w[i, j] += delta E_pos = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.w[i, j] -= 2 * delta E_neg = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.w[i, j] += delta diff_w[d, i, j] = grad_w_ij - (E_pos - E_neg) / (2.0 * delta) for i in range(self.input_dim): grad_b_i = \ self._get_gradients(batch, a_h, h, a_y, y, reg_contractive, reg_sparseness, desired_sparseness, reg_slowness, batch_next, a_h_next, h_next)[1][0, i] self.bv[0, i] += delta E_pos = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.bv[0, i] -= 2 * delta E_neg = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.bv[0, i] += delta diff_b[d, 0, i] = grad_b_i - (E_pos - E_neg) / (2.0 * delta) for j in range(self.output_dim): grad_c_j = \ self._get_gradients(batch, a_h, h, a_y, y, reg_contractive, reg_sparseness, desired_sparseness, reg_slowness, batch_next, a_h_next, h_next)[2][0, j] self.bh[0, j] += delta E_pos = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.bh[0, j] -= 2 * delta E_neg = self.energy(batch, reg_contractive, reg_sparseness, desired_sparseness, batch_next, reg_slowness) self.bh[0, j] += delta diff_c[d, 0, j] = grad_c_j - (E_pos - E_neg) / (2.0 * delta) return [diff_w, diff_b, diff_c]