Source code for GPy.inference.latent_function_inference.inferenceX

# Copyright (c) 2014, Zhenwen Dai
# Licensed under the BSD 3-clause license (see LICENSE.txt)

import numpy as np
from ...core import Model
from GPy.core.parameterization import variational
from ...util.linalg import tdot

[docs]def infer_newX(model, Y_new, optimize=True, init='L2'): """ Infer the distribution of X for the new observed data *Y_new*. :param model: the GPy model used in inference :type model: GPy.core.Model :param Y_new: the new observed data for inference :type Y_new: numpy.ndarray :param optimize: whether to optimize the location of new X (True by default) :type optimize: boolean :return: a tuple containing the estimated posterior distribution of X and the model that optimize X :rtype: (GPy.core.parameterization.variational.VariationalPosterior, GPy.core.Model) """ infr_m = InferenceX(model, Y_new, init=init) if optimize: infr_m.optimize() return infr_m.X, infr_m
[docs]class InferenceX(Model): """ The model class for inference of new X with given new Y. (replacing the "do_test_latent" in Bayesian GPLVM) It is a tiny inference model created from the original GP model. The kernel, likelihood (only Gaussian is supported at the moment) and posterior distribution are taken from the original model. For Regression models and GPLVM, a point estimate of the latent variable X will be inferred. For Bayesian GPLVM, the variational posterior of X will be inferred. X is inferred through a gradient optimization of the inference model. :param model: the GPy model used in inference :type model: GPy.core.Model :param Y: the new observed data for inference :type Y: numpy.ndarray :param init: the distance metric of Y for initializing X with the nearest neighbour. :type init: 'L2', 'NCC' and 'rand' """ def __init__(self, model, Y, name='inferenceX', init='L2'): if np.isnan(Y).any() or getattr(model, 'missing_data', False): assert Y.shape[0]==1, "The current implementation of inference X only support one data point at a time with missing data!" self.missing_data = True self.valid_dim = np.logical_not(np.isnan(Y[0])) self.ninan = getattr(model, 'ninan', None) else: self.missing_data = False super(InferenceX, self).__init__(name) self.likelihood = model.likelihood.copy() self.kern = model.kern.copy() # if model.kern.useGPU: # from ...models import SSGPLVM # if isinstance(model, SSGPLVM): # self.kern.GPU_SSRBF(True) # else: # self.kern.GPU(True) from copy import deepcopy self.posterior = deepcopy(model.posterior) if isinstance(model.X, variational.VariationalPosterior): self.uncertain_input = True from ...models.ss_gplvm import IBPPrior from ...models.ss_mrd import IBPPrior_SSMRD if isinstance(model.variational_prior, IBPPrior) or isinstance(model.variational_prior, IBPPrior_SSMRD): self.variational_prior = variational.SpikeAndSlabPrior(pi=0.5, learnPi=False, group_spike=False) else: self.variational_prior = model.variational_prior.copy() else: self.uncertain_input = False if hasattr(model, 'Z'): self.sparse_gp = True self.Z = model.Z.copy() else: self.sparse_gp = False self.uncertain_input = False self.Z = model.X.copy() self.Y = Y self.X = self._init_X(model, Y, init=init) self.compute_dL() self.link_parameter(self.X) def _init_X(self, model, Y_new, init='L2'): # Initialize the new X by finding the nearest point in Y space. Y = model.Y if self.missing_data: Y = Y[:,self.valid_dim] Y_new = Y_new[:,self.valid_dim] dist = -2.*Y_new.dot(Y.T) + np.square(Y_new).sum(axis=1)[:,None]+ np.square(Y).sum(axis=1)[None,:] else: if init=='L2': dist = -2.*Y_new.dot(Y.T) + np.square(Y_new).sum(axis=1)[:,None]+ np.square(Y).sum(axis=1)[None,:] elif init=='NCC': dist = Y_new.dot(Y.T) elif init=='rand': dist = np.random.rand(Y_new.shape[0],Y.shape[0]) idx = dist.argmin(axis=1) from ...models import SSGPLVM if isinstance(model, SSGPLVM): X = variational.SpikeAndSlabPosterior((model.X.mean[idx].values), (model.X.variance[idx].values), (model.X.gamma[idx].values)) if model.group_spike: X.gamma.fix() else: if self.uncertain_input and self.sparse_gp: X = variational.NormalPosterior((model.X.mean[idx].values), (model.X.variance[idx].values)) else: from ...core import Param X = Param('latent mean',(model.X[idx].values).copy()) return X
[docs] def compute_dL(self): # Common computation beta = 1./np.fmax(self.likelihood.variance, 1e-6) output_dim = self.Y.shape[-1] wv = self.posterior.woodbury_vector if self.missing_data: wv = wv[:,self.valid_dim] output_dim = self.valid_dim.sum() if self.ninan is not None: self.dL_dpsi2 = beta/2.*(self.posterior.woodbury_inv[:,:,self.valid_dim] - tdot(wv)[:, :, None]).sum(-1) else: self.dL_dpsi2 = beta/2.*(output_dim*self.posterior.woodbury_inv - tdot(wv)) self.dL_dpsi1 = beta*np.dot(self.Y[:,self.valid_dim], wv.T) self.dL_dpsi0 = - beta/2.* np.ones(self.Y.shape[0]) else: self.dL_dpsi2 = beta*(output_dim*self.posterior.woodbury_inv - tdot(wv))/2. #np.einsum('md,od->mo',wv, wv) self.dL_dpsi1 = beta*np.dot(self.Y, wv.T) self.dL_dpsi0 = -beta/2.*output_dim* np.ones(self.Y.shape[0])
[docs] def parameters_changed(self): if self.uncertain_input: psi0 = self.kern.psi0(self.Z, self.X) psi1 = self.kern.psi1(self.Z, self.X) psi2 = self.kern.psi2(self.Z, self.X) else: psi0 = self.kern.Kdiag(self.X) psi1 = self.kern.K(self.X, self.Z) psi2 = np.dot(psi1.T,psi1) self._log_marginal_likelihood = (self.dL_dpsi2*psi2).sum()+(self.dL_dpsi1*psi1).sum()+(self.dL_dpsi0*psi0).sum() if self.uncertain_input: X_grad = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.dL_dpsi0, dL_dpsi1=self.dL_dpsi1, dL_dpsi2=self.dL_dpsi2) self.X.set_gradients(X_grad) else: dL_dpsi1 = self.dL_dpsi1 + 2.*np.dot(psi1,self.dL_dpsi2) X_grad = self.kern.gradients_X_diag(self.dL_dpsi0, self.X) X_grad += self.kern.gradients_X(dL_dpsi1, self.X, self.Z) self.X.gradient = X_grad if self.uncertain_input: if isinstance(self.variational_prior, variational.SpikeAndSlabPrior): # Update Log-likelihood KL_div = self.variational_prior.KL_divergence(self.X) # update for the KL divergence self.variational_prior.update_gradients_KL(self.X) else: # Update Log-likelihood KL_div = self.variational_prior.KL_divergence(self.X) # update for the KL divergence self.variational_prior.update_gradients_KL(self.X) self._log_marginal_likelihood += -KL_div
[docs] def log_likelihood(self): return self._log_marginal_likelihood