# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..core.sparse_gp_mpi import SparseGP_MPI
from .. import kern
from ..core.parameterization import Param
from ..likelihoods import Gaussian
from GPy.core.parameterization.variational import SpikeAndSlabPrior, SpikeAndSlabPosterior,VariationalPrior
from ..inference.latent_function_inference.var_dtc_parallel import update_gradients, VarDTC_minibatch
from ..kern.src.psi_comp.ssrbf_psi_gpucomp import PSICOMP_SSRBF_GPU
[docs]class IBPPosterior(SpikeAndSlabPosterior):
'''
The SpikeAndSlab distribution for variational approximations.
'''
def __init__(self, means, variances, binary_prob, tau=None, sharedX=False, name='latent space'):
"""
binary_prob : the probability of the distribution on the slab part.
"""
from paramz.transformations import Logexp
super(IBPPosterior, self).__init__(means, variances, binary_prob, group_spike=True, name=name)
self.sharedX = sharedX
if sharedX:
self.mean.fix(warning=False)
self.variance.fix(warning=False)
self.tau = Param("tau_", np.ones((self.gamma_group.shape[0],2)), Logexp())
self.link_parameter(self.tau)
[docs] def set_gradients(self, grad):
self.mean.gradient, self.variance.gradient, self.gamma.gradient, self.tau.gradient = grad
def __getitem__(self, s):
if isinstance(s, (int, slice, tuple, list, np.ndarray)):
import copy
n = self.__new__(self.__class__, self.name)
dc = self.__dict__.copy()
dc['mean'] = self.mean[s]
dc['variance'] = self.variance[s]
dc['binary_prob'] = self.binary_prob[s]
dc['tau'] = self.tau
dc['parameters'] = copy.copy(self.parameters)
n.__dict__.update(dc)
n.parameters[dc['mean']._parent_index_] = dc['mean']
n.parameters[dc['variance']._parent_index_] = dc['variance']
n.parameters[dc['binary_prob']._parent_index_] = dc['binary_prob']
n.parameters[dc['tau']._parent_index_] = dc['tau']
n._gradient_array_ = None
oversize = self.size - self.mean.size - self.variance.size - self.gamma.size - self.tau.size
n.size = n.mean.size + n.variance.size + n.gamma.size+ n.tau.size + oversize
n.ndim = n.mean.ndim
n.shape = n.mean.shape
n.num_data = n.mean.shape[0]
n.input_dim = n.mean.shape[1] if n.ndim != 1 else 1
return n
else:
return super(IBPPosterior, self).__getitem__(s)
[docs]class IBPPrior(VariationalPrior):
def __init__(self, input_dim, alpha =2., name='IBPPrior', **kw):
super(IBPPrior, self).__init__(name=name, **kw)
from paramz.transformations import Logexp, __fixed__
self.input_dim = input_dim
self.variance = 1.
self.alpha = Param('alpha', alpha, __fixed__)
self.link_parameter(self.alpha)
[docs] def KL_divergence(self, variational_posterior):
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma_group.values, variational_posterior.tau.values
var_mean = np.square(mu)/self.variance
var_S = (S/self.variance - np.log(S))
part1 = (gamma* (np.log(self.variance)-1. +var_mean + var_S)).sum()/2.
ad = self.alpha/self.input_dim
from scipy.special import betaln,digamma
part2 = (gamma*np.log(gamma)).sum() + ((1.-gamma)*np.log(1.-gamma)).sum() + betaln(ad,1.)*self.input_dim \
-betaln(tau[:,0], tau[:,1]).sum() + ((tau[:,0]-gamma-ad)*digamma(tau[:,0])).sum() + \
((tau[:,1]+gamma-2.)*digamma(tau[:,1])).sum() + ((2.+ad-tau[:,0]-tau[:,1])*digamma(tau.sum(axis=1))).sum()
return part1+part2
[docs] def update_gradients_KL(self, variational_posterior):
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma_group.values, variational_posterior.tau.values
variational_posterior.mean.gradient -= gamma*mu/self.variance
variational_posterior.variance.gradient -= (1./self.variance - 1./S) * gamma /2.
from scipy.special import digamma,polygamma
dgamma = (np.log(gamma/(1.-gamma))+ digamma(tau[:,1])-digamma(tau[:,0]))/variational_posterior.num_data
variational_posterior.binary_prob.gradient -= dgamma+((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2.
ad = self.alpha/self.input_dim
common = (ad+2-tau[:,0]-tau[:,1])*polygamma(1,tau.sum(axis=1))
variational_posterior.tau.gradient[:,0] = -((tau[:,0]-gamma-ad)*polygamma(1,tau[:,0])+common)
variational_posterior.tau.gradient[:,1] = -((tau[:,1]+gamma-2)*polygamma(1,tau[:,1])+common)
[docs]class SLVMPosterior(SpikeAndSlabPosterior):
'''
The SpikeAndSlab distribution for variational approximations.
'''
def __init__(self, means, variances, binary_prob, tau=None, name='latent space'):
"""
binary_prob : the probability of the distribution on the slab part.
"""
from paramz.transformations import Logexp
super(SLVMPosterior, self).__init__(means, variances, binary_prob, group_spike=False, name=name)
self.tau = Param("tau_", np.ones((self.gamma.shape[1],2)), Logexp())
self.link_parameter(self.tau)
[docs] def set_gradients(self, grad):
self.mean.gradient, self.variance.gradient, self.gamma.gradient, self.tau.gradient = grad
def __getitem__(self, s):
if isinstance(s, (int, slice, tuple, list, np.ndarray)):
import copy
n = self.__new__(self.__class__, self.name)
dc = self.__dict__.copy()
dc['mean'] = self.mean[s]
dc['variance'] = self.variance[s]
dc['binary_prob'] = self.binary_prob[s]
dc['tau'] = self.tau
dc['parameters'] = copy.copy(self.parameters)
n.__dict__.update(dc)
n.parameters[dc['mean']._parent_index_] = dc['mean']
n.parameters[dc['variance']._parent_index_] = dc['variance']
n.parameters[dc['binary_prob']._parent_index_] = dc['binary_prob']
n.parameters[dc['tau']._parent_index_] = dc['tau']
n._gradient_array_ = None
oversize = self.size - self.mean.size - self.variance.size - self.gamma.size - self.tau.size
n.size = n.mean.size + n.variance.size + n.gamma.size+ n.tau.size + oversize
n.ndim = n.mean.ndim
n.shape = n.mean.shape
n.num_data = n.mean.shape[0]
n.input_dim = n.mean.shape[1] if n.ndim != 1 else 1
return n
else:
return super(IBPPosterior, self).__getitem__(s)
[docs]class SLVMPrior(VariationalPrior):
def __init__(self, input_dim, alpha =1., beta=1., Z=None, name='SLVMPrior', **kw):
super(SLVMPrior, self).__init__(name=name, **kw)
self.input_dim = input_dim
self.variance = 1.
self.alpha = alpha
self.beta = beta
self.Z = Z
if Z is not None:
assert np.all(np.unique(Z)==np.array([0,1]))
[docs] def KL_divergence(self, variational_posterior):
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma.values, variational_posterior.tau.values
var_mean = np.square(mu)/self.variance
var_S = (S/self.variance - np.log(S))
part1 = (gamma* (np.log(self.variance)-1. +var_mean + var_S)).sum()/2.
from scipy.special import betaln,digamma
part2 = (gamma*np.log(gamma)).sum() + ((1.-gamma)*np.log(1.-gamma)).sum() + betaln(self.alpha,self.beta)*self.input_dim \
-betaln(tau[:,0], tau[:,1]).sum() + ((tau[:,0]-(gamma*self.Z).sum(0)-self.alpha)*digamma(tau[:,0])).sum() + \
((tau[:,1]-((1-gamma)*self.Z).sum(0)-self.beta)*digamma(tau[:,1])).sum() + ((self.Z.sum(0)+self.alpha+self.beta-tau[:,0]-tau[:,1])*digamma(tau.sum(axis=1))).sum()
return part1+part2
[docs] def update_gradients_KL(self, variational_posterior):
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma.values, variational_posterior.tau.values
variational_posterior.mean.gradient -= gamma*mu/self.variance
variational_posterior.variance.gradient -= (1./self.variance - 1./S) * gamma /2.
from scipy.special import digamma,polygamma
dgamma = np.log(gamma/(1.-gamma))+ (digamma(tau[:,1])-digamma(tau[:,0]))*self.Z
variational_posterior.binary_prob.gradient -= dgamma+((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2.
common = (self.Z.sum(0)+self.alpha+self.beta-tau[:,0]-tau[:,1])*polygamma(1,tau.sum(axis=1))
variational_posterior.tau.gradient[:,0] = -((tau[:,0]-(gamma*self.Z).sum(0)-self.alpha)*polygamma(1,tau[:,0])+common)
variational_posterior.tau.gradient[:,1] = -((tau[:,1]-((1-gamma)*self.Z).sum(0)-self.beta)*polygamma(1,tau[:,1])+common)
[docs]class SSGPLVM(SparseGP_MPI):
"""
Spike-and-Slab Gaussian Process Latent Variable Model
:param Y: observed data (np.ndarray) or GPy.likelihood
:type Y: np.ndarray| GPy.likelihood instance
:param input_dim: latent dimensionality
:type input_dim: int
:param init: initialisation method for the latent space
:type init: 'PCA'|'random'
"""
def __init__(self, Y, input_dim, X=None, X_variance=None, Gamma=None, init='PCA', num_inducing=10,
Z=None, kernel=None, inference_method=None, likelihood=None, name='Spike_and_Slab GPLVM', group_spike=False, IBP=False,SLVM=False, alpha=2., beta=2., connM=None, tau=None, mpi_comm=None, pi=None, learnPi=False,normalizer=False, sharedX=False, variational_prior=None,**kwargs):
self.group_spike = group_spike
self.init = init
self.sharedX = sharedX
if X is None:
from ..util.initialization import initialize_latent
X, fracs = initialize_latent(init, input_dim, Y)
else:
fracs = np.ones(input_dim)
if X_variance is None: # The variance of the variational approximation (S)
X_variance = np.random.uniform(0,.1,X.shape)
if Gamma is None:
gamma = np.empty_like(X) # The posterior probabilities of the binary variable in the variational approximation
gamma[:] = 0.5 + 0.1 * np.random.randn(X.shape[0], input_dim)
gamma[gamma>1.-1e-9] = 1.-1e-9
gamma[gamma<1e-9] = 1e-9
else:
gamma = Gamma.copy()
if Z is None:
Z = np.random.permutation(X.copy())[:num_inducing]
assert Z.shape[1] == X.shape[1]
if likelihood is None:
likelihood = Gaussian()
if kernel is None:
kernel = kern.RBF(input_dim, lengthscale=fracs, ARD=True) # + kern.white(input_dim)
if kernel.useGPU:
kernel.psicomp = PSICOMP_SSRBF_GPU()
if inference_method is None:
inference_method = VarDTC_minibatch(mpi_comm=mpi_comm)
if pi is None:
pi = np.empty((input_dim))
pi[:] = 0.5
if IBP:
self.variational_prior = IBPPrior(input_dim=input_dim, alpha=alpha) if variational_prior is None else variational_prior
X = IBPPosterior(X, X_variance, gamma, tau=tau,sharedX=sharedX)
elif SLVM:
self.variational_prior = SLVMPrior(input_dim=input_dim, alpha=alpha, beta=beta, Z=connM) if variational_prior is None else variational_prior
X = SLVMPosterior(X, X_variance, gamma, tau=tau)
else:
self.variational_prior = SpikeAndSlabPrior(pi=pi,learnPi=learnPi, group_spike=group_spike) if variational_prior is None else variational_prior
X = SpikeAndSlabPosterior(X, X_variance, gamma, group_spike=group_spike,sharedX=sharedX)
super(SSGPLVM,self).__init__(X, Y, Z, kernel, likelihood, variational_prior=self.variational_prior, inference_method=inference_method, name=name, mpi_comm=mpi_comm, normalizer=normalizer, **kwargs)
self.link_parameter(self.X, index=0)
[docs] def set_X_gradients(self, X, X_grad):
"""Set the gradients of the posterior distribution of X in its specific form."""
X.mean.gradient, X.variance.gradient, X.binary_prob.gradient = X_grad
[docs] def get_X_gradients(self, X):
"""Get the gradients of the posterior distribution of X in its specific form."""
return X.mean.gradient, X.variance.gradient, X.binary_prob.gradient
def _propogate_X_val(self):
pass
[docs] def parameters_changed(self):
self.X.propogate_val()
if self.sharedX: self._highest_parent_._propogate_X_val()
super(SSGPLVM,self).parameters_changed()
if isinstance(self.inference_method, VarDTC_minibatch):
self.X.collate_gradient()
return
self._log_marginal_likelihood -= self.variational_prior.KL_divergence(self.X)
self.X.mean.gradient, self.X.variance.gradient, self.X.binary_prob.gradient = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.grad_dict['dL_dpsi0'], dL_dpsi1=self.grad_dict['dL_dpsi1'], dL_dpsi2=self.grad_dict['dL_dpsi2'])
# update for the KL divergence
self.variational_prior.update_gradients_KL(self.X)
self.X.collate_gradient()
[docs] def sample_W(self, nSamples, raw_samples=False):
"""
Sample the loading matrix if the kernel is linear.
"""
assert isinstance(self.kern, kern.Linear)
from ..util.linalg import pdinv
N, D = self.Y.shape
Q = self.X.shape[1]
noise_var = self.likelihood.variance.values
# Draw samples for X
Xs = np.random.randn(*((nSamples,)+self.X.shape))*np.sqrt(self.X.variance.values)+self.X.mean.values
b = np.random.rand(*((nSamples,)+self.X.shape))
Xs[b>self.X.gamma.values] = 0
invcov = (Xs[:,:,:,None]*Xs[:,:,None,:]).sum(1)/noise_var+np.eye(Q)
cov = np.array([pdinv(invcov[s_idx])[0] for s_idx in range(invcov.shape[0])])
Ws = np.empty((nSamples, Q, D))
tmp = (np.transpose(Xs, (0,2,1)).reshape(nSamples*Q,N).dot(self.Y)).reshape(nSamples,Q,D)
mean = (cov[:,:,:,None]*tmp[:,None,:,:]).sum(2)/noise_var
zeros = np.zeros((Q,))
for s_idx in range(Xs.shape[0]):
Ws[s_idx] = (np.random.multivariate_normal(mean=zeros,cov=cov[s_idx],size=(D,))).T+mean[s_idx]
if raw_samples:
return Ws
else:
return Ws.mean(0), Ws.std(0)