# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import sys
import numpy as np
from ...core.parameterization.parameterized import Parameterized
from paramz.caching import Cache_this
from .kernel_slice_operations import KernCallsViaSlicerMeta
from functools import reduce
import six
[docs]@six.add_metaclass(KernCallsViaSlicerMeta)
class Kern(Parameterized):
#===========================================================================
# This adds input slice support. The rather ugly code for slicing can be
# found in kernel_slice_operations
# __meataclass__ is ignored in Python 3 - needs to be put in the function definiton
# __metaclass__ = KernCallsViaSlicerMeta
# Here, we use the Python module six to support Py3 and Py2 simultaneously
#===========================================================================
_support_GPU = False
def __init__(self, input_dim, active_dims, name, useGPU=False, *a, **kw):
"""
The base class for a kernel: a positive definite function
which forms of a covariance function (kernel).
input_dim:
is the number of dimensions to work on. Make sure to give the
tight dimensionality of inputs.
You most likely want this to be the integer telling the number of
input dimensions of the kernel.
active_dims:
is the active_dimensions of inputs X we will work on.
All kernels will get sliced Xes as inputs, if _all_dims_active is not None
Only positive integers are allowed in active_dims!
if active_dims is None, slicing is switched off and all X will be passed through as given.
:param int input_dim: the number of input dimensions to the function
:param array-like|None active_dims: list of indices on which dimensions this kernel works on, or none if no slicing
Do not instantiate.
"""
super(Kern, self).__init__(name=name, *a, **kw)
self.input_dim = int(input_dim)
if active_dims is None:
active_dims = np.arange(input_dim, dtype=np.int_)
self.active_dims = np.atleast_1d(np.asarray(active_dims, np.int_))
self._all_dims_active = np.atleast_1d(self.active_dims).astype(np.int_)
assert self.active_dims.size == self.input_dim, "input_dim={} does not match len(active_dim)={}".format(self.input_dim, self._all_dims_active.size)
self._sliced_X = 0
self.useGPU = self._support_GPU and useGPU
from .psi_comp import PSICOMP_GH
self.psicomp = PSICOMP_GH()
def _save_to_input_dict(self):
input_dict = {}
input_dict["input_dim"] = self.input_dim
if isinstance(self.active_dims, np.ndarray):
input_dict["active_dims"] = self.active_dims.tolist()
else:
input_dict["active_dims"] = self.active_dims
input_dict["name"] = self.name
input_dict["useGPU"] = self.useGPU
return input_dict
[docs] def to_dict(self):
raise NotImplementedError
[docs] @staticmethod
def from_dict(input_dict):
"""
Instantiate an object of a derived class using the information
in input_dict (built by the to_dict method of the derived class).
More specifically, after reading the derived class from input_dict,
it calls the method _build_from_input_dict of the derived class.
Note: This method should not be overrided in the derived class. In case
it is needed, please override _build_from_input_dict instate.
:param dict input_dict: Dictionary with all the information needed to
instantiate the object.
"""
import copy
input_dict = copy.deepcopy(input_dict)
kernel_class = input_dict.pop('class')
input_dict["name"] = str(input_dict["name"])
import GPy
kernel_class = eval(kernel_class)
return kernel_class._build_from_input_dict(kernel_class, input_dict)
@staticmethod
def _build_from_input_dict(kernel_class, input_dict):
return kernel_class(**input_dict)
def __setstate__(self, state):
self._all_dims_active = np.arange(0, max(state['active_dims']) + 1)
super(Kern, self).__setstate__(state)
@property
def _effective_input_dim(self):
return np.size(self._all_dims_active)
@Cache_this(limit=3)
def _slice_X(self, X):
try:
return X[:, self._all_dims_active].astype('float')
except:
return X[:, self._all_dims_active]
def _project_dim(self, dim):
try:
return np.where(self._all_dims_active == dim)[0][0]
except:
return None
[docs] def K(self, X, X2):
"""
Compute the kernel function.
.. math::
K_{ij} = k(X_i, X_j)
:param X: the first set of inputs to the kernel
:param X2: (optional) the second set of arguments to the kernel. If X2
is None, this is passed throgh to the 'part' object, which
handLes this as X2 == X.
"""
raise NotImplementedError
[docs] def Kdiag(self, X):
"""
The diagonal of the kernel matrix K
.. math::
Kdiag_{i} = k(X_i, X_i)
"""
raise NotImplementedError
[docs] def psi0(self, Z, variational_posterior):
"""
.. math::
\psi_0 = \sum_{i=0}^{n}E_{q(X)}[k(X_i, X_i)]
"""
return self.psicomp.psicomputations(self, Z, variational_posterior)[0]
[docs] def psi1(self, Z, variational_posterior):
"""
.. math::
\psi_1^{n,m} = E_{q(X)}[k(X_n, Z_m)]
"""
return self.psicomp.psicomputations(self, Z, variational_posterior)[1]
[docs] def psi2(self, Z, variational_posterior):
"""
.. math::
\psi_2^{m,m'} = \sum_{i=0}^{n}E_{q(X)}[ k(Z_m, X_i) k(X_i, Z_{m'})]
"""
return self.psicomp.psicomputations(self, Z, variational_posterior, return_psi2_n=False)[2]
[docs] def psi2n(self, Z, variational_posterior):
"""
.. math::
\psi_2^{n,m,m'} = E_{q(X)}[ k(Z_m, X_n) k(X_n, Z_{m'})]
Thus, we do not sum out n, compared to psi2
"""
return self.psicomp.psicomputations(self, Z, variational_posterior, return_psi2_n=True)[2]
[docs] def gradients_X(self, dL_dK, X, X2):
"""
.. math::
\\frac{\partial L}{\partial X} = \\frac{\partial L}{\partial K}\\frac{\partial K}{\partial X}
"""
raise NotImplementedError
[docs] def gradients_X_X2(self, dL_dK, X, X2):
return self.gradients_X(dL_dK, X, X2), self.gradients_X(dL_dK.T, X2, X)
[docs] def gradients_XX(self, dL_dK, X, X2, cov=True):
"""
.. math::
\\frac{\partial^2 L}{\partial X\partial X_2} = \\frac{\partial L}{\partial K}\\frac{\partial^2 K}{\partial X\partial X_2}
"""
raise NotImplementedError("This is the second derivative of K wrt X and X2, and not implemented for this kernel")
[docs] def gradients_XX_diag(self, dL_dKdiag, X, cov=True):
"""
The diagonal of the second derivative w.r.t. X and X2
"""
raise NotImplementedError("This is the diagonal of the second derivative of K wrt X and X2, and not implemented for this kernel")
[docs] def gradients_X_diag(self, dL_dKdiag, X):
"""
The diagonal of the derivative w.r.t. X
"""
raise NotImplementedError
[docs] def update_gradients_diag(self, dL_dKdiag, X):
""" update the gradients of all parameters when using only the diagonal elements of the covariance matrix"""
raise NotImplementedError
[docs] def update_gradients_full(self, dL_dK, X, X2):
"""Set the gradients of all parameters when doing full (N) inference."""
raise NotImplementedError
[docs] def reset_gradients(self):
raise NotImplementedError
[docs] def update_gradients_expectations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior):
"""
Set the gradients of all parameters when doing inference with
uncertain inputs, using expectations of the kernel.
The essential maths is
.. math::
\\frac{\partial L}{\partial \\theta_i} & = \\frac{\partial L}{\partial \psi_0}\\frac{\partial \psi_0}{\partial \\theta_i}\\
& \quad + \\frac{\partial L}{\partial \psi_1}\\frac{\partial \psi_1}{\partial \\theta_i}\\
& \quad + \\frac{\partial L}{\partial \psi_2}\\frac{\partial \psi_2}{\partial \\theta_i}
Thus, we push the different derivatives through the gradients of the psi
statistics. Be sure to set the gradients for all kernel
parameters here.
"""
dtheta = self.psicomp.psiDerivativecomputations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior)[0]
self.gradient[:] = dtheta
[docs] def gradients_Z_expectations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior,
psi0=None, psi1=None, psi2=None):
"""
Returns the derivative of the objective wrt Z, using the chain rule
through the expectation variables.
"""
return self.psicomp.psiDerivativecomputations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior)[1]
[docs] def gradients_qX_expectations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior):
"""
Compute the gradients wrt the parameters of the variational
distruibution q(X), chain-ruling via the expectations of the kernel
"""
return self.psicomp.psiDerivativecomputations(self, dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, variational_posterior)[2:]
def plot(self, x=None, fignum=None, ax=None, title=None, plot_limits=None, resolution=None, **mpl_kwargs):
"""
plot this kernel.
:param x: the value to use for the other kernel argument (kernels are a function of two variables!)
:param fignum: figure number of the plot
:param ax: matplotlib axis to plot on
:param title: the matplotlib title
:param plot_limits: the range over which to plot the kernel
:resolution: the resolution of the lines used in plotting
:mpl_kwargs avalid keyword arguments to pass through to matplotlib (e.g. lw=7)
"""
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
from ...plotting.matplot_dep import kernel_plots
kernel_plots.plot(self, x, fignum, ax, title, plot_limits, resolution, **mpl_kwargs)
def __add__(self, other):
""" Overloading of the '+' operator. for more control, see self.add """
return self.add(other)
def __iadd__(self, other):
return self.add(other)
[docs] def add(self, other, name='sum'):
"""
Add another kernel to this one.
:param other: the other kernel to be added
:type other: GPy.kern
"""
assert isinstance(other, Kern), "only kernels can be added to kernels..."
from .add import Add
return Add([self, other], name=name)
def __mul__(self, other):
""" Here we overload the '*' operator. See self.prod for more information"""
return self.prod(other)
def __imul__(self, other):
""" Here we overload the '*' operator. See self.prod for more information"""
return self.prod(other)
def __pow__(self, other):
"""
Shortcut for tensor `prod`.
"""
assert np.all(self._all_dims_active == range(self.input_dim)), "Can only use kernels, which have their input_dims defined from 0"
assert np.all(other._all_dims_active == range(other.input_dim)), "Can only use kernels, which have their input_dims defined from 0"
other._all_dims_active += self.input_dim
return self.prod(other)
[docs] def prod(self, other, name='mul'):
"""
Multiply two kernels (either on the same space, or on the tensor
product of the input space).
:param other: the other kernel to be added
:type other: GPy.kern
"""
assert isinstance(other, Kern), "only kernels can be multiplied to kernels..."
from .prod import Prod
# kernels = []
# if isinstance(self, Prod): kernels.extend(self.parameters)
# else: kernels.append(self)
# if isinstance(other, Prod): kernels.extend(other.parameters)
# else: kernels.append(other)
return Prod([self, other], name)
def _check_input_dim(self, X):
assert X.shape[1] == self.input_dim, "{} did not specify active_dims and X has wrong shape: X_dim={}, whereas input_dim={}".format(self.name, X.shape[1], self.input_dim)
def _check_active_dims(self, X):
assert X.shape[1] >= len(self._all_dims_active), "At least {} dimensional X needed, X.shape={!s}".format(len(self._all_dims_active), X.shape)
[docs]class CombinationKernel(Kern):
"""
Abstract super class for combination kernels.
A combination kernel combines (a list of) kernels and works on those.
Examples are the HierarchicalKernel or Add and Prod kernels.
"""
def __init__(self, kernels, name, extra_dims=[], link_parameters=True):
"""
Abstract super class for combination kernels.
A combination kernel combines (a list of) kernels and works on those.
Examples are the HierarchicalKernel or Add and Prod kernels.
:param list kernels: List of kernels to combine (can be only one element)
:param str name: name of the combination kernel
:param array-like extra_dims: if needed extra dimensions for the combination kernel to work on
"""
assert all([isinstance(k, Kern) for k in kernels])
extra_dims = np.asarray(extra_dims, dtype=int)
active_dims = reduce(np.union1d, (np.r_[x.active_dims] for x in kernels), extra_dims)
input_dim = active_dims.size
# initialize the kernel with the full input_dim
super(CombinationKernel, self).__init__(input_dim, active_dims, name)
effective_input_dim = reduce(max, (k._all_dims_active.max() for k in kernels)) + 1
self._all_dims_active = np.array(np.concatenate((np.arange(effective_input_dim), extra_dims if extra_dims is not None else [])), dtype=int)
self.extra_dims = extra_dims
if link_parameters:
self.link_parameters(*kernels)
def _save_to_input_dict(self):
input_dict = super(CombinationKernel, self)._save_to_input_dict()
input_dict["parts"] = {}
for ii in range(len(self.parts)):
input_dict["parts"][ii] = self.parts[ii].to_dict()
return input_dict
@staticmethod
def _build_from_input_dict(kernel_class, input_dict):
parts = input_dict.pop('parts', None)
subkerns = []
for pp in parts:
subkerns.append(Kern.from_dict(parts[pp]))
return kernel_class(subkerns)
@property
def parts(self):
return self.parameters
def _set_all_dims_ative(self):
self._all_dims_active = np.atleast_1d(self.active_dims).astype(int)
def _check_active_dims(self, X):
return
def _check_input_dim(self, X):
# As combination kernels cannot always know, what their inner kernels have as input dims, the check will be done inside them, respectively
return