""" This module provides different types of training algorithms for RBMs
running on CPU. The structure is kept modular to simplify the
understanding of the code and the mathematics. In addition the modularity
helps to create other kind of training algorithms by inheritance.
:Implemented:
- CD (Contrastive Divergence)
- PCD (Persistent Contrastive Divergence)
- PT (Parallel Tempering)
- IPT (Independent Parallel Tempering)
- GD (Exact Gradient descent (only for small binary models))
:Info:
For the derivations .. seealso::
https://www.ini.rub.de/PEOPLE/wiskott/Reprints/Melchior-2012-MasterThesis-RBMs.pdf
:Version:
1.1.0
:Date:
04.04.2017
:Author:
Jan Melchior
:Contact:
JanMelchior@gmx.de
:License:
Copyright (C) 2017 Jan Melchior
This file is part of the Python library PyDeep.
PyDeep is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import pydeep.rbm.sampler as sampler
import pydeep.rbm.model as models
import pydeep.rbm.estimator as estimator
import pydeep.base.numpyextension as numxext
import numpy as numx
[docs]class CD(object):
""" Implementation of the training algorithm Contrastive Divergence (CD).
:INFO:
A fast learning algorithm for deep belief nets, Geoffrey E. Hinton
and Simon Osindero Yee-Whye Teh Department of Computer Science
University of Toronto Yee-Whye Teh 10 Kings College Road National
University of Singapore.
"""
[docs] @classmethod
def _calculate_centered_gradient(cls,
gradients,
visible_offsets,
hidden_offsets):
""" Calculates the centered gradient from the normal CD gradient for the parameters W, bv, bh and the \
corresponding offset values.
:param gradients: Original gradients.
:type gradients: List of 2D numpy arrays
:param visible_offsets: Visible offsets to be used.
:type visible_offsets: numpy array[1,input dim]
:param hidden_offsets: Hidden offsets to be used.
:type hidden_offsets: numpy array[1,output dim]
:return: Enhanced gradients for all parameters.
:rtype: numpy arrays (num parameters x [parameter.shape])
"""
gradients[0] -= numx.dot(gradients[1].T, hidden_offsets) + numx.dot(visible_offsets.T, gradients[2])
gradients[1] -= numx.dot(hidden_offsets, gradients[0].T)
gradients[2] -= numx.dot(visible_offsets, gradients[0])
return gradients
[docs] def __init__(self, model, data=None):
""" The constructor initializes the CD trainer with a given model and data.
:param model: The model to sample from.
:type model: Valid model class.
:param data: Data for initialization, only has effect if the centered gradient is used.
:type data: numpy array [num. samples x input dim]
"""
# Store model variable
self.model = model
# Create the Gibbs-sampler
self.sampler = sampler.GibbsSampler(model)
# Count the number of parameters
parameters = self.model.get_parameters()
self.num_parameters = len(parameters)
self.hidden_offsets = 0.5 * numx.ones((1, self.model.output_dim))
if data is not None:
if self.model.input_dim != data.shape[1]:
raise ValueError("Data dimension and model input dimension have to be equal!")
self.visible_offsets = data.mean(axis=0).reshape(1, data.shape[1])
else:
self.visible_offsets = 0.5 * numx.ones((1, self.model.input_dim))
# Storage variables for the gradients
self.parameter_updates = []
for i in range(self.num_parameters):
self.parameter_updates.append(numx.zeros((
parameters[i].shape[0],
parameters[i].shape[1]),
dtype=model.dtype))
[docs] def _adapt_gradient(self,
pos_gradients,
neg_gradients,
batch_size,
epsilon,
momentum,
reg_l1norm,
reg_l2norm,
reg_sparseness,
desired_sparseness,
mean_hidden_activity,
visible_offsets,
hidden_offsets,
use_centered_gradient,
restrict_gradient,
restriction_norm):
""" This function updates the parameter gradients.
:param pos_gradients: Positive Gradients.
:type pos_gradients: numpy array[parameter index, parameter shape]
:param neg_gradients: Negative Gradients.
:type neg_gradients: numpy array[parameter index, parameter shape]
:param batch_size: The batch_size of the data.
:type batch_size: float
:param epsilon: The learning rate.
:type epsilon: numpy array[num parameters]
:param momentum: The momentum term.
:type momentum: numpy array[num parameters]
:param reg_l1norm: The parameter for the L1 regularization
:type reg_l1norm: float
:param reg_l2norm: The parameter for the L2 regularization also know as weight decay.
:type reg_l2norm: float
:param reg_sparseness: The parameter for the desired_sparseness regularization.
:type reg_sparseness: None or float
:param desired_sparseness: Desired average hidden activation or None for no regularization.
:type desired_sparseness: None or float
:param mean_hidden_activity: Average hidden activation <P(h_i=1|x)>_h_i
:type mean_hidden_activity: numpy array [num samples]
:param visible_offsets: If not zero the gradient is centered around this value.
:type visible_offsets: float
:param hidden_offsets: If not zero the gradient is centered around this value.
:type hidden_offsets: float
:param use_centered_gradient: Uses the centered gradient instead of centering.
:type use_centered_gradient: bool
:param restrict_gradient: If a scalar is given the norm of the weight gradient (along the input dim) is \
restricted to stay below this value.
:type restrict_gradient: None, float
:param restriction_norm: Restricts the column norm, row norm or Matrix norm.
:type restriction_norm: string, 'Cols','Rows', 'Mat'
"""
# calculate normal gradient
gradients = []
for i in range(self.num_parameters):
gradients.append((pos_gradients[i] - neg_gradients[i]) / batch_size)
# adapt to centered gradient
if use_centered_gradient:
gradients = self._calculate_centered_gradient(gradients, visible_offsets, hidden_offsets)
# adapt parameters
for i in range(self.num_parameters):
self.parameter_updates[i] *= momentum[i]
self.parameter_updates[i] += epsilon[i] * gradients[i]
# Add sparse penalty
if reg_sparseness != 0:
if desired_sparseness is not None:
self.parameter_updates[2] += (epsilon[2] * reg_sparseness * (desired_sparseness - mean_hidden_activity))
# st = numx.clip(mean_hidden_activity,0.001,0.999)
# st = -desired_sparseness/st+(1.0-desired_sparseness)/(1.0-st)
# self.parameter_updates[2] -= epsilon[2] * reg_sparseness * st
# add weight decay
if reg_l1norm != 0:
self.parameter_updates[0] -= (epsilon[0] * reg_l1norm * numx.sign(self.model.w))
if reg_l2norm != 0:
self.parameter_updates[0] -= (epsilon[0] * reg_l2norm * self.model.w)
# Restricts the gradient
if numx.isscalar(restrict_gradient):
if restrict_gradient > 0:
if restriction_norm is 'Cols':
self.parameter_updates[0] = numxext.restrict_norms(self.parameter_updates[0], restrict_gradient, 0)
if restriction_norm is 'Rows':
self.parameter_updates[0] = numxext.restrict_norms(self.parameter_updates[0], restrict_gradient, 1)
if restriction_norm is 'Mat':
self.parameter_updates[0] = numxext.restrict_norms(self.parameter_updates[0], restrict_gradient,
None)
[docs] def _train(self,
data,
epsilon,
k,
momentum,
reg_l1norm,
reg_l2norm,
reg_sparseness,
desired_sparseness,
update_visible_offsets,
update_hidden_offsets,
offset_typ,
use_centered_gradient,
restrict_gradient,
restriction_norm,
use_hidden_states):
""" The training for one batch is performed using Contrastive Divergence (CD) for k sampling steps.
:param data: The data used for training.
:type data: numpy array [batch_size, input dimension]
:param epsilon: The learning rate.
:type epsilon: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param k: NUmber of sampling steps.
:type k: int
:param momentum: The momentum term.
:type momentum: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param reg_l1norm: The parameter for the L1 regularization
:type reg_l1norm: float
:param reg_l2norm: The parameter for the L2 regularization also know as weight decay.
:type reg_l2norm: float
:param reg_sparseness: The parameter for the desired_sparseness regularization.
:type reg_sparseness: None or float
:param desired_sparseness: Desired average hidden activation or None for no regularization.
:type desired_sparseness: None or float
:param update_visible_offsets: The update step size for the models visible offsets.
:type update_visible_offsets: float
:param update_hidden_offsets: The update step size for the models hidden offsets.
:type update_hidden_offsets: float
:param offset_typ: | Different offsets can be used to center the gradient.
| :Example: 'DM' uses the positive phase visible mean and the negative phase hidden mean. \
'A0' uses the average of positive and negative phase mean for visible, zero for the \
hiddens. Possible values are out of {A,D,M,0}x{A,D,M,0}
:type offset_typ: string
:param use_centered_gradient: Uses the centered gradient instead of centering.
:type use_centered_gradient: bool
:param restrict_gradient: If a scalar is given the norm of the weight gradient (along the input dim) is \
restricted to stay below this value.
:type restrict_gradient: None, float
:param restriction_norm: Restricts the column norm, row norm or Matrix norm.
:type restriction_norm: string, 'Cols','Rows', 'Mat'
:param use_hidden_states: If True, the hidden states are used for the gradient calculations, the hiddens \
probabilities otherwise.
:type use_hidden_states: bool
"""
# Sample the first time
hid_probs_pos = self.model.probability_h_given_v(data)
hid_states_pos = self.model.sample_h(hid_probs_pos)
if update_visible_offsets != 0.0:
xmean_pos = numx.mean(data, axis=0).reshape(1, self.model.input_dim)
hmean_pos = 0.0
if update_hidden_offsets != 0.0 or reg_sparseness != 0.0:
if use_hidden_states:
hmean_pos = numx.mean(hid_states_pos, axis=0).reshape(1, self.model.output_dim)
else:
hmean_pos = numx.mean(hid_probs_pos, axis=0).reshape(1, self.model.output_dim)
# Perform k steps of Gibbs sampling
if isinstance(self.sampler, sampler.GibbsSampler):
vis_states_neg = self.sampler.sample_from_h(hid_states_pos, k=k)
else:
vis_states_neg = self.sampler.sample(data.shape[0], k=k)
hid_probs_neg = self.model.probability_h_given_v(vis_states_neg)
if use_hidden_states:
hid_states_neg = self.model.sample_h(hid_probs_neg)
if update_visible_offsets != 0.0:
xmean_neg = numx.mean(vis_states_neg, axis=0
).reshape(1, self.model.input_dim)
hmean_neg = 0.0
if update_hidden_offsets != 0.0:
if use_hidden_states:
hmean_neg = numx.mean(hid_states_neg, axis=0).reshape(1, self.model.output_dim)
else:
hmean_neg = numx.mean(hid_probs_neg, axis=0).reshape(1, self.model.output_dim)
new_visible_offsets = 0.0
if update_visible_offsets != 0.0:
if offset_typ[0] is 'A':
new_visible_offsets = (xmean_pos + xmean_neg) * 0.5
if offset_typ[0] is 'D':
new_visible_offsets = xmean_pos
if offset_typ[0] is 'M':
new_visible_offsets = xmean_neg
if offset_typ[0] is '0':
new_visible_offsets = 0.0 * xmean_pos
new_hidden_offsets = 0.0
if update_hidden_offsets != 0.0:
if offset_typ[1] is 'A':
new_hidden_offsets = (hmean_pos + hmean_neg) * 0.5
if offset_typ[1] is 'D':
new_hidden_offsets = hmean_pos
if offset_typ[1] is 'M':
new_hidden_offsets = hmean_neg
if offset_typ[1] is '0':
new_hidden_offsets = 0.0 * hmean_pos
if use_centered_gradient is False:
# update the centers
self.model.update_offsets(new_visible_offsets,
new_hidden_offsets,
update_visible_offsets,
update_hidden_offsets)
self.visible_offsets = 0.0
self.hidden_offsets = 0.0
else:
self.hidden_offsets = ((1.0 - update_hidden_offsets) * self.hidden_offsets + update_hidden_offsets
* new_hidden_offsets)
self.visible_offsets = ((1.0 - update_visible_offsets) * self.visible_offsets + update_visible_offsets
* new_visible_offsets)
# Calculate positive phase gradient using states or probabilities
if use_hidden_states:
pos_gradients = self.model.calculate_gradients(data, hid_states_pos)
neg_gradients = self.model.calculate_gradients(vis_states_neg, hid_states_neg)
else:
pos_gradients = self.model.calculate_gradients(data, hid_probs_pos)
neg_gradients = self.model.calculate_gradients(vis_states_neg, hid_probs_neg)
# Adapt the gradients by weight decay momentum and learning rate
self._adapt_gradient(pos_gradients=pos_gradients,
neg_gradients=neg_gradients,
batch_size=data.shape[0],
epsilon=epsilon,
momentum=momentum,
reg_l1norm=reg_l1norm,
reg_l2norm=reg_l2norm,
reg_sparseness=reg_sparseness,
desired_sparseness=desired_sparseness,
mean_hidden_activity=hmean_pos,
visible_offsets=self.visible_offsets,
hidden_offsets=self.hidden_offsets,
use_centered_gradient=use_centered_gradient,
restrict_gradient=restrict_gradient,
restriction_norm=restriction_norm)
# update the parameters with the calculated gradient
self.model.update_parameters(self.parameter_updates)
[docs] def train(self,
data,
num_epochs=1,
epsilon=0.01,
k=1,
momentum=0.0,
reg_l1norm=0.0,
reg_l2norm=0.0,
reg_sparseness=0.0,
desired_sparseness=None,
update_visible_offsets=0.01,
update_hidden_offsets=0.01,
offset_typ='DD',
use_centered_gradient=False,
restrict_gradient=False,
restriction_norm='Mat',
use_hidden_states=False):
""" Train the models with all batches using Contrastive Divergence (CD) for k sampling steps.
:param data: The data used for training.
:type data: numpy array [batch_size, input dimension]
:param num_epochs: NUmber of epochs (loop through the data).
:type num_epochs: int
:param epsilon: The learning rate.
:type epsilon: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param k: NUmber of sampling steps.
:type k: int
:param momentum: The momentum term.
:type momentum: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param reg_l1norm: The parameter for the L1 regularization
:type reg_l1norm: float
:param reg_l2norm: The parameter for the L2 regularization also know as weight decay.
:type reg_l2norm: float
:param reg_sparseness: The parameter for the desired_sparseness regularization.
:type reg_sparseness: None or float
:param desired_sparseness: Desired average hidden activation or None for no regularization.
:type desired_sparseness: None or float
:param update_visible_offsets: The update step size for the models visible offsets.
:type update_visible_offsets: float
:param update_hidden_offsets: The update step size for the models hidden offsets.
:type update_hidden_offsets: float
:param offset_typ: | Different offsets can be used to center the gradient.
| Example:'DM' uses the positive phase visible mean and the negative phase hidden mean. \
'A0' uses the average of positive and negative phase mean for visible, zero for the \
hiddens. Possible values are out of {A,D,M,0}x{A,D,M,0}
:type offset_typ: string
:param use_centered_gradient: Uses the centered gradient instead of centering.
:type use_centered_gradient: bool
:param restrict_gradient: If a scalar is given the norm of the weight gradient (along the input dim) is \
restricted to stay below this value.
:type restrict_gradient: None, float
:param restriction_norm: Restricts the column norm, row norm or Matrix norm.
:type restriction_norm: string, 'Cols','Rows', 'Mat'
:param use_hidden_states: If True, the hidden states are used for the gradient calculations, the hiddens \
probabilities otherwise.
:type use_hidden_states: bool
"""
# Set learning rates
if numx.isscalar(epsilon):
epsilon = numx.zeros(self.num_parameters) + epsilon
# Set momenti
if numx.isscalar(momentum):
momentum = numx.zeros(self.num_parameters) + momentum
if isinstance(data, list):
for _ in range(num_epochs):
# gradient update for all batches
for batch in data:
self._train(data=batch,
epsilon=epsilon,
k=k,
momentum=momentum,
reg_l1norm=reg_l1norm,
reg_l2norm=reg_l2norm,
reg_sparseness=reg_sparseness,
desired_sparseness=desired_sparseness,
update_visible_offsets=update_visible_offsets,
update_hidden_offsets=update_hidden_offsets,
offset_typ=offset_typ,
use_centered_gradient=use_centered_gradient,
restrict_gradient=restrict_gradient,
restriction_norm=restriction_norm,
use_hidden_states=use_hidden_states)
else:
for _ in range(num_epochs):
self._train(data=data,
epsilon=epsilon,
k=k,
momentum=momentum,
reg_l1norm=reg_l1norm,
reg_l2norm=reg_l2norm,
reg_sparseness=reg_sparseness,
desired_sparseness=desired_sparseness,
update_visible_offsets=update_visible_offsets,
update_hidden_offsets=update_hidden_offsets,
offset_typ=offset_typ,
use_centered_gradient=use_centered_gradient,
restrict_gradient=restrict_gradient,
restriction_norm=restriction_norm,
use_hidden_states=use_hidden_states)
[docs]class PCD(CD):
""" Implementation of the training algorithm Persistent Contrastive Divergence (PCD).
:Reference: | Training Restricted Boltzmann Machines using Approximations to the
| Likelihood Gradient, Tijmen Tieleman, Department of Computer
| Science, University of Toronto, Toronto, Ontario M5S 3G4, Canada
"""
[docs] def __init__(self,
model,
num_chains,
data=None):
""" The constructor initializes the PCD trainer with a given model and data.
:param model: The model to sample from.
:type model: Valid model class.
:param num_chains: The number of chains that should be used.
.. Note:: You should use the data's batch size!
:type num_chains: int
:param data: Data for initialization, only has effect if the centered gradient is used.
:type data: numpy array [num. samples x input dim]
"""
# Call super constructor of CD
super(PCD, self).__init__(model, data)
self.sampler = sampler.PersistentGibbsSampler(model, num_chains)
[docs]class PT(CD):
""" Implementation of the training algorithm Parallel Tempering Contrastive Divergence (PT).
:Reference: | Parallel Tempering for Training of Restricted Boltzmann Machines,
| Guillaume Desjardins, Aaron Courville, Yoshua Bengio, Pascal
| Vincent, Olivier Delalleau, Dept. IRO, Universite de Montreal P.O.
| Box 6128, Succ. Centre-Ville, Montreal, H3C 3J7, Qc, Canada.
"""
[docs] def __init__(self,
model,
betas=3,
data=None):
""" The constructor initializes the IPT trainer with a given model anddata.
:param model: The model to sample from.
:type model: Valid model class.
:param betas: List of inverse temperatures to sample from. If a scalar is given, the temperatures will be set \
linearly from 0.0 to 1.0 in 'betas' steps.
:type betas: int, numpy array [num betas]
:param data: Data for initialization, only has effect if the centered gradient is used.
:type data: numpy array [num. samples x input dim]
"""
# Call super constructor of CD
super(PT, self).__init__(model, data)
if numx.isscalar(betas):
self.sampler = sampler.ParallelTemperingSampler(model, betas, None)
else:
self.sampler = sampler.ParallelTemperingSampler(model, betas.shape[0], betas)
[docs]class IPT(CD):
""" Implementation of the training algorithm Independent Parallel Tempering Contrastive Divergence (IPT). \
As normal PT but the chain's switches are done only from one batch to the next instead of from one sample to \
the next.
:Reference: | Parallel Tempering for Training of Restricted Boltzmann Machines,
| Guillaume Desjardins, Aaron Courville, Yoshua Bengio, Pascal
| Vincent, Olivier Delalleau, Dept. IRO, Universite de Montreal P.O.
| Box 6128, Succ. Centre-Ville, Montreal, H3C 3J7, Qc, Canada.
"""
[docs] def __init__(self,
model,
num_samples,
betas=3,
data=None):
""" The constructor initializes the IPT trainer with a given model and
data.
:param model: The model to sample from.
:type model: Valid model class.
:param num_samples: The number of Samples to produce. .. Note:: you should use the batchsize.
:type num_samples: int
:param betas: List of inverse temperatures to sample from. If a scalar is given, the temperatures will be set \
linearly from 0.0 to 1.0 in 'betas' steps.
:type betas: int, numpy array [num betas]
:param data: Data for initialization, only has effect if the centered gradient is used.
:type data: numpy array [num. samples x input dim]
"""
# Call super constructor of CD
super(IPT, self).__init__(model, data)
if numx.isscalar(betas):
self.sampler = sampler.IndependentParallelTemperingSampler(model, num_samples, betas, None)
else:
self.sampler = sampler.IndependentParallelTemperingSampler(model, num_samples, betas.shape[0], betas)
[docs]class GD(CD):
""" Implementation of the training algorithm Gradient descent. Since it involves the calculation of the partition \
function for each update, it is only possible for small BBRBMs.
"""
[docs] def __init__(self,
model,
data=None):
""" The constructor initializes the Gradient trainer with a given model.
:param model: The model to sample from.
:type model: Valid model class.
:param data: Data for initialization, only has effect if the centered gradient is used.
:type data: numpy array [num. samples x input dim]
"""
if not isinstance(model, models.BinaryBinaryRBM):
raise ValueError("True gradient only possible for Binary Binary RBMs!")
# Call super constructor of CD
super(GD, self).__init__(model, data)
[docs] def _train(self,
data,
epsilon,
k,
momentum,
reg_l1norm,
reg_l2norm,
reg_sparseness,
desired_sparseness,
update_visible_offsets,
update_hidden_offsets,
offset_typ,
use_centered_gradient,
restrict_gradient,
restriction_norm,
use_hidden_states):
""" The training for one batch is performed using True Gradient (GD) for k Gibbs-sampling steps.
:param data: The data used for training.
:type data: numpy array [batch_size, input dimension]
:param epsilon: The learning rate.
:type epsilon: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param k: NUmber of sampling steps.
:type k: int
:param momentum: The momentum term.
:type momentum: scalar or numpy array[num parameters] or numpy array[num parameters, parameter shape]
:param reg_l1norm: The parameter for the L1 regularization
:type reg_l1norm: float
:param reg_l2norm: The parameter for the L2 regularization also know as weight decay.
:type reg_l2norm: float
:param reg_sparseness: The parameter for the desired_sparseness regularization.
:type reg_sparseness: None or float
:param desired_sparseness: Desired average hidden activation or None for no regularization.
:type desired_sparseness: None or float
:param update_visible_offsets: The update step size for the models visible offsets.
:type update_visible_offsets: float
:param update_hidden_offsets: The update step size for the models hidden offsets.
:type update_hidden_offsets: float
:param offset_typ: | Different offsets can be used to center the gradient.<br />
| Example: 'DM' uses the positive phase visible mean and the negative phase hidden mean.
| 'A0' uses the average of positive and negative phase mean for visible, zero for the
| hiddens. Possible values are out of {A,D,M,0}x{A,D,M,0}
:type offset_typ: string
:param use_centered_gradient: Uses the centered gradient instead of centering.
:type use_centered_gradient: bool
:param restrict_gradient: If a scalar is given the norm of the weight gradient (along the input dim) is \
restricted to stay below this value.
:type restrict_gradient: None, float
:param restriction_norm: Restricts the column norm, row norm or Matrix norm.
:type restriction_norm: string, 'Cols','Rows', 'Mat'
:param use_hidden_states: If True, the hidden states are used for the gradient calculations, the hiddens \
probabilities otherwise.
:type use_hidden_states: bool
"""
# Sample the first time
hid_probs_pos = self.model.probability_h_given_v(data)
if update_visible_offsets != 0.0:
xmean_pos = numx.mean(data, axis=0).reshape(1, self.model.input_dim)
hmean_pos = 0.0
if update_hidden_offsets != 0.0 or reg_sparseness != 0.0:
if use_hidden_states:
hid_states_pos = self.model.sample_h(hid_probs_pos)
hmean_pos = numx.mean(hid_states_pos, axis=0).reshape(1, self.model.output_dim)
else:
hmean_pos = numx.mean(hid_probs_pos, axis=0).reshape(1, self.model.output_dim)
# Calculate the partition function
if self.model.input_dim < self.model.output_dim:
batch_size = numx.min([self.model.input_dim, 12])
ln_z = estimator.partition_function_factorize_v(self.model, beta=1.0, batchsize_exponent=batch_size,
status=False)
else:
batch_size = numx.min([self.model.output_dim, 12])
ln_z = estimator.partition_function_factorize_h(self.model, beta=1.0, batchsize_exponent=batch_size,
status=False)
# empty negative phase parts
neg_gradients = [numx.zeros(self.model.w.shape),
numx.zeros(self.model.bv.shape),
numx.zeros(self.model.bh.shape)]
# Calculate gradient stepwise in batches
bit_length = self.model.input_dim
batchsize = numx.power(2, batch_size)
num_combinations = numx.power(2, bit_length)
num_batches = num_combinations // batchsize
for batch in range(0, num_batches):
# Generate current batch
bit_combinations = numxext.generate_binary_code(bit_length, batch_size, batch)
# P(x)
prob_x = numx.exp(
self.model.log_probability_v(ln_z, bit_combinations))
# P(h|x)
prob_h_x = self.model.probability_h_given_v(bit_combinations)
# Calculate gradient
neg_gradients[1] += numx.sum(numx.tile(prob_x, (1, self.model.input_dim)) * (bit_combinations -
self.model.ov), axis=0)
prob_x = (numx.tile(prob_x, (1, self.model.output_dim)) * (prob_h_x - self.model.oh))
neg_gradients[0] += numx.dot((bit_combinations - self.model.ov).T, prob_x)
neg_gradients[2] += numx.sum(prob_x, axis=0)
if update_visible_offsets != 0.0 and (offset_typ[0] is 'A' or offset_typ[0] is 'M'):
bit_combinations = numxext.generate_binary_code(self.model.input_dim, None, 0)
prob_x = numx.exp(self.model.log_probability_v(ln_z, bit_combinations))
xmean_neg = numx.sum(prob_x * bit_combinations, axis=0).reshape(1, self.model.input_dim)
if update_hidden_offsets != 0.0 and (offset_typ[1] is 'A' or offset_typ[1] is 'M'):
bit_combinations = numxext.generate_binary_code(self.model.output_dim, None, 0)
prob_h = numx.exp(self.model.log_probability_h(ln_z, bit_combinations))
hmean_neg = numx.sum(prob_h * bit_combinations, axis=0).reshape(1, self.model.output_dim)
new_visible_offsets = 0.0
if update_visible_offsets != 0.0:
if offset_typ[0] is 'A':
new_visible_offsets = (xmean_pos + xmean_neg) * 0.5
if offset_typ[0] is 'D':
new_visible_offsets = xmean_pos
if offset_typ[0] is 'M':
new_visible_offsets = xmean_neg
if offset_typ[0] is '0':
new_visible_offsets = 0.0 * xmean_pos
new_hidden_offsets = 0.0
if update_hidden_offsets != 0.0:
if offset_typ[1] is 'A':
new_hidden_offsets = (hmean_pos + hmean_neg) * 0.5
if offset_typ[1] is 'D':
new_hidden_offsets = hmean_pos
if offset_typ[1] is 'M':
new_hidden_offsets = hmean_neg
if offset_typ[1] is '0':
new_hidden_offsets = 0.0 * hmean_pos
if use_centered_gradient is False:
# update the centers
self.model.update_offsets(new_visible_offsets, new_hidden_offsets, update_visible_offsets,
update_hidden_offsets)
self.visible_offsets = 0.0
self.hidden_offsets = 0.0
else:
self.hidden_offsets = ((1.0 - update_hidden_offsets) * self.hidden_offsets + update_hidden_offsets
* new_hidden_offsets)
self.visible_offsets = ((1.0 - update_visible_offsets) * self.visible_offsets + update_visible_offsets
* new_visible_offsets)
# Calculate positive phase gradient using states or probabilities
if use_hidden_states:
pos_gradients = self.model.calculate_gradients(data, hid_states_pos)
else:
pos_gradients = self.model.calculate_gradients(data, hid_probs_pos)
# Times batch size since adpat gradient devides by batchsize
neg_gradients[0] *= data.shape[0]
neg_gradients[1] *= data.shape[0]
neg_gradients[2] *= data.shape[0]
# Adapt the gradients by weight decay momentum and learning rate
self._adapt_gradient(pos_gradients=pos_gradients,
neg_gradients=neg_gradients,
batch_size=data.shape[0],
epsilon=epsilon,
momentum=momentum,
reg_l1norm=reg_l1norm,
reg_l2norm=reg_l2norm,
reg_sparseness=reg_sparseness,
desired_sparseness=desired_sparseness,
mean_hidden_activity=hmean_pos,
visible_offsets=self.visible_offsets,
hidden_offsets=self.hidden_offsets,
use_centered_gradient=use_centered_gradient,
restrict_gradient=restrict_gradient,
restriction_norm=restriction_norm)
# update the parameters with the calculated gradient
self.model.update_parameters(self.parameter_updates)