Source code for GPy.kern.src.multidimensional_integral_limits

# Written by Mike Smith

from __future__ import division
import numpy as np
from .kern import Kern
from ...core.parameterization import Param
from paramz.transformations import Logexp
import math

[docs]class Multidimensional_Integral_Limits(Kern): #todo do I need to inherit from Stationary """ Integral kernel, can include limits on each integral value. This kernel allows an n-dimensional histogram or binned data to be modelled. The outputs are the counts in each bin. The inputs are the start and end points of each bin: Pairs of inputs act as the limits on each bin. So inputs 4 and 5 provide the start and end values of each bin in the 3rd dimension. The kernel's predictions are the latent function which might have generated those binned results. """ def __init__(self, input_dim, variances=None, lengthscale=None, ARD=False, active_dims=None, name='integral'): super(Multidimensional_Integral_Limits, self).__init__(input_dim, active_dims, name) if lengthscale is None: lengthscale = np.ones(1) else: lengthscale = np.asarray(lengthscale) self.lengthscale = Param('lengthscale', lengthscale, Logexp()) #Logexp - transforms to allow positive only values... self.variances = Param('variances', variances, Logexp()) #and here. self.link_parameters(self.variances, self.lengthscale) #this just takes a list of parameters we need to optimise.
[docs] def h(self, z): return 0.5 * z * np.sqrt(math.pi) * math.erf(z) + np.exp(-(z**2))
[docs] def dk_dl(self, t, tprime, s, sprime, l): #derivative of the kernel wrt lengthscale return l * ( self.h((t-sprime)/l) - self.h((t - tprime)/l) + self.h((tprime-s)/l) - self.h((s-sprime)/l))
[docs] def update_gradients_full(self, dL_dK, X, X2=None): if X2 is None: #we're finding dK_xx/dTheta dK_dl_term = np.zeros([X.shape[0],X.shape[0],self.lengthscale.shape[0]]) k_term = np.zeros([X.shape[0],X.shape[0],self.lengthscale.shape[0]]) dK_dl = np.zeros([X.shape[0],X.shape[0],self.lengthscale.shape[0]]) dK_dv = np.zeros([X.shape[0],X.shape[0]]) for il,l in enumerate(self.lengthscale): idx = il*2 for i,x in enumerate(X): for j,x2 in enumerate(X): dK_dl_term[i,j,il] = self.dk_dl(x[idx],x2[idx],x[idx+1],x2[idx+1],l) k_term[i,j,il] = self.k_xx(x[idx],x2[idx],x[idx+1],x2[idx+1],l) for il,l in enumerate(self.lengthscale): dK_dl = self.variances[0] * dK_dl_term[:,:,il] for jl, l in enumerate(self.lengthscale): if jl!=il: dK_dl *= k_term[:,:,jl] self.lengthscale.gradient[il] = np.sum(dK_dl * dL_dK) dK_dv = self.calc_K_xx_wo_variance(X) #the gradient wrt the variance is k_xx. self.variances.gradient = np.sum(dK_dv * dL_dK) else: #we're finding dK_xf/Dtheta raise NotImplementedError("Currently this function only handles finding the gradient of a single vector of inputs (X) not a pair of vectors (X and X2)")
#useful little function to help calculate the covariances.
[docs] def g(self,z): return 1.0 * z * np.sqrt(math.pi) * math.erf(z) + np.exp(-(z**2))
[docs] def k_xx(self,t,tprime,s,sprime,l): """Covariance between observed values. s and t are one domain of the integral (i.e. the integral between s and t) sprime and tprime are another domain of the integral (i.e. the integral between sprime and tprime) We're interested in how correlated these two integrals are. Note: We've not multiplied by the variance, this is done in K.""" return 0.5 * (l**2) * ( self.g((t-sprime)/l) + self.g((tprime-s)/l) - self.g((t - tprime)/l) - self.g((s-sprime)/l))
[docs] def k_ff(self,t,tprime,l): """Doesn't need s or sprime as we're looking at the 'derivatives', so no domains over which to integrate are required""" return np.exp(-((t-tprime)**2)/(l**2)) #rbf
[docs] def k_xf(self,t,tprime,s,l): """Covariance between the gradient (latent value) and the actual (observed) value. Note that sprime isn't actually used in this expression, presumably because the 'primes' are the gradient (latent) values which don't involve an integration, and thus there is no domain over which they're integrated, just a single value that we want.""" return 0.5 * np.sqrt(math.pi) * l * (math.erf((t-tprime)/l) + math.erf((tprime-s)/l))
[docs] def calc_K_xx_wo_variance(self,X): """Calculates K_xx without the variance term""" K_xx = np.ones([X.shape[0],X.shape[0]]) #ones now as a product occurs over each dimension for i,x in enumerate(X): for j,x2 in enumerate(X): for il,l in enumerate(self.lengthscale): idx = il*2 #each pair of input dimensions describe the limits on one actual dimension in the data K_xx[i,j] *= self.k_xx(x[idx],x2[idx],x[idx+1],x2[idx+1],l) return K_xx
[docs] def K(self, X, X2=None): if X2 is None: #X vs X K_xx = self.calc_K_xx_wo_variance(X) return K_xx * self.variances[0] else: #X vs X2 K_xf = np.ones([X.shape[0],X2.shape[0]]) for i,x in enumerate(X): for j,x2 in enumerate(X2): for il,l in enumerate(self.lengthscale): idx = il*2 K_xf[i,j] *= self.k_xf(x[idx],x2[idx],x[idx+1],l) return K_xf * self.variances[0]
[docs] def Kdiag(self, X): """I've used the fact that we call this method for K_ff when finding the covariance as a hack so I know if I should return K_ff or K_xx. In this case we're returning K_ff!! $K_{ff}^{post} = K_{ff} - K_{fx} K_{xx}^{-1} K_{xf}$""" K_ff = np.ones(X.shape[0]) for i,x in enumerate(X): for il,l in enumerate(self.lengthscale): idx = il*2 K_ff[i] *= self.k_ff(x[idx],x[idx],l) return K_ff * self.variances[0]