# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from .kern import CombinationKernel
from paramz.caching import Cache_this
import itertools
from functools import reduce
[docs]def numpy_invalid_op_as_exception(func):
"""
A decorator that allows catching numpy invalid operations
as exceptions (the default behaviour is raising warnings).
"""
def func_wrapper(*args, **kwargs):
np.seterr(invalid='raise')
result = func(*args, **kwargs)
np.seterr(invalid='warn')
return result
return func_wrapper
[docs]class Prod(CombinationKernel):
"""
Computes the product of 2 kernels
:param k1, k2: the kernels to multiply
:type k1, k2: Kern
:rtype: kernel object
"""
def __init__(self, kernels, name='mul'):
_newkerns = []
for kern in kernels:
if isinstance(kern, Prod):
for part in kern.parts:
#kern.unlink_parameter(part)
_newkerns.append(part.copy())
else:
_newkerns.append(kern.copy())
super(Prod, self).__init__(_newkerns, name)
[docs] def to_dict(self):
"""
Convert the object into a json serializable dictionary.
Note: It uses the private method _save_to_input_dict of the parent.
:return dict: json serializable dictionary containing the needed information to instantiate the object
"""
input_dict = super(Prod, self)._save_to_input_dict()
input_dict["class"] = str("GPy.kern.Prod")
return input_dict
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def K(self, X, X2=None, which_parts=None):
if which_parts is None:
which_parts = self.parts
elif not isinstance(which_parts, (list, tuple)):
# if only one part is given
which_parts = [which_parts]
return reduce(np.multiply, (p.K(X, X2) for p in which_parts))
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def Kdiag(self, X, which_parts=None):
if which_parts is None:
which_parts = self.parts
return reduce(np.multiply, (p.Kdiag(X) for p in which_parts))
[docs] def reset_gradients(self):
for part in self.parts:
part.reset_gradients()
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK_dX(self, X, X2, dimX, which_parts=None):
"""
Compute the derivative of K with respect to:
dimension dimX of set X.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros((X.shape[0], X2.shape[0]))
for combination in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
else:
prod = np.ones(prod_sum.shape)
to_update = list(set(which_parts) - set(combination))[0]
prod_sum += prod*to_update.dK_dX(X, X2, dimX)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK_dXdiag(self, X, dimX, which_parts=None):
"""
Compute the derivative of K with respect to:
dimension dimX of set X.
Returns only diagonal elements.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros(X.shape[0])
for combination in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination])
else:
prod = np.ones(prod_sum.shape)
to_update = list(set(which_parts) - set(combination))[0]
prod_sum += prod*to_update.dK_dXdiag(X, dimX)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK_dX2(self, X, X2, dimX2, which_parts=None):
"""
Compute the derivative of K with respect to:
dimension dimX2 of set X2.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros((X.shape[0], X2.shape[0]))
for combination in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
else:
prod = np.ones(prod_sum.shape)
to_update = list(set(which_parts) - set(combination))[0]
prod_sum += prod*to_update.dK_dX2(X, X2, dimX2)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK2_dXdX2(self, X, X2, dimX, dimX2, which_parts=None):
"""
Compute the second derivative of K with respect to:
dimension dimX of set X, and
dimension dimX2 of set X2.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros((X.shape[0], X2.shape[0]))
for combination1 in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination1) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination1])
else:
prod = np.ones(prod_sum.shape)
to_update1 = list(set(which_parts) - set(combination1))[0]
prod_sum += prod*to_update1.dK2_dXdX2(X, X2, dimX, dimX2)
if len(which_parts) > 1:
for combination2 in itertools.combinations(combination1, len(combination1) - 1):
if len(combination2) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination2])
else:
prod = np.ones(prod_sum.shape)
to_update2 = list(set(combination1) - set(combination2))[0]
prod_sum += prod*to_update1.dK_dX(X, X2, dimX)*to_update2.dK_dX2(X, X2, dimX2)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK2_dXdX2diag(self, X, dimX, dimX2, which_parts=None):
"""
Compute the second derivative of K with respect to:
dimension dimX of set X, and
dimension dimX2 of set X2.
Returns only diagonal elements.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros(X.shape[0])
for combination1 in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination1) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination1])
else:
prod = np.ones(prod_sum.shape)
to_update1 = list(set(which_parts) - set(combination1))[0]
prod_sum += prod*to_update1.dK2_dXdX2diag(X, dimX, dimX2)
if len(which_parts) > 1:
for combination2 in itertools.combinations(combination1, len(combination1) - 1):
if len(combination2) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination2])
else:
prod = np.ones(prod_sum.shape)
to_update2 = list(set(combination1) - set(combination2))[0]
prod_sum += prod*to_update1.dK_dXdiag(X, dimX)*to_update2.dK_dX2diag(X, dimX)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK2_dXdX(self, X, X2, dimX_0, dimX_1, which_parts=None):
"""
Compute the second derivative of K with respect to:
dimension dimX_0 of set X, and
dimension dimX_1 of set X.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros((X.shape[0], X2.shape[0]))
for combination1 in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination1) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination1])
else:
prod = np.ones(prod_sum.shape)
to_update1 = list(set(which_parts) - set(combination1))[0]
prod_sum += prod*to_update1.dK2_dXdX(X, X2, dimX_0, dimX_1)
if len(which_parts) > 1:
for combination2 in itertools.combinations(combination1, len(combination1) - 1):
if len(combination2) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination2])
else:
prod = np.ones(prod_sum.shape)
to_update2 = list(set(combination1) - set(combination2))[0]
prod_sum += prod*to_update1.dK_dX(X, X2, dimX_0)*to_update2.dK_dX(X, X2, dimX_1)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK3_dXdXdX2(self, X, X2, dimX_0, dimX_1, dimX2, which_parts=None):
"""
Compute the third derivative of K with respect to:
dimension dimX_0 of set X,
dimension dimX_1 of set X, and
dimension dimX2 of set X2.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros((X.shape[0], X2.shape[0]))
for combination1 in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination1) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination1])
else:
prod = np.ones(prod_sum.shape)
to_update1 = list(set(which_parts) - set(combination1))[0]
prod_sum += prod*to_update1.dK3_dXdXdX2(X, X2, dimX_0, dimX_1, dimX2)
if len(which_parts) > 1:
for combination2 in itertools.combinations(combination1, len(combination1) - 1):
if len(combination2) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination2])
else:
prod = np.ones(prod_sum.shape)
to_update2 = list(set(combination1) - set(combination2))[0]
prod_sum += prod*to_update1.dK2_dXdX2(X, X2, dimX_0, dimX2)*to_update2.dK_dX(X, X2, dimX_1)
prod_sum += prod*to_update1.dK2_dXdX(X, X2, dimX_0, dimX_1)*to_update2.dK_dX2(X, X2, dimX2)
prod_sum += prod*to_update1.dK_dX(X, X2, dimX_0)*to_update2.dK2_dXdX2(X, X2, dimX_1, dimX2)
if len(which_parts) > 2:
for combination3 in itertools.combinations(combination2, len(combination2) - 1):
if len(combination3) > 0:
prod = reduce(np.multiply, [p.K(X, X2) for p in combination3])
else:
prod = np.ones(prod_sum.shape)
to_update3 = list(set(combination2) - set(combination3))[0]
prod_sum += prod*to_update1.dK_dX(X, X2, dimX_0)*to_update2.dK_dX2(X, X2, dimX2)*to_update3.dK_dX(X, X2, dimX_1)
return prod_sum
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def dK3_dXdXdX2diag(self, X, dimX_0, dimX_1, dimX2, which_parts=None):
"""
Compute the third derivative of K with respect to:
dimension dimX_0 of set X,
dimension dimX_1 of set X, and
dimension dimX2 of set X2.
Returns only diagonal elements of the covariance matrix.
"""
if which_parts is None:
which_parts = self.parts
prod_sum = np.zeros(X.shape[0])
for combination1 in itertools.combinations(which_parts, len(which_parts) - 1):
if len(combination1) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination1])
else:
prod = np.ones(prod_sum.shape)
to_update1 = list(set(which_parts) - set(combination1))[0]
prod_sum += prod*to_update1.dK3_dXdXdX2diag(X, dimX_0, dimX_1, dimX2)
if len(which_parts) > 1:
for combination2 in itertools.combinations(combination1, len(combination1) - 1):
if len(combination2) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination2])
else:
prod = np.ones(prod_sum.shape)
to_update2 = list(set(combination1) - set(combination2))[0]
prod_sum += prod*to_update1.dK2_dXdX2diag(X, dimX_0, dimX2)*to_update2.dK_dXdiag(X, dimX_1)
prod_sum += prod*to_update1.dK2_dXdXdiag(X, dimX_0, dimX_1)*to_update2.dK_dX2diag(X, dimX2)
prod_sum += prod*to_update1.dK_dXdiag(X, dimX_0)*to_update2.dK2_dXdX2diag(X, dimX_1, dimX2)
if len(which_parts) > 2:
for combination3 in itertools.combinations(combination2, len(combination2) - 1):
if len(combination3) > 0:
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination3])
else:
prod = np.ones(prod_sum.shape)
to_update3 = list(set(combination2) - set(combination3))[0]
prod_sum += prod*to_update1.dK_dXdiag(X, dimX_0)*to_update2.dK_dX2diag(X, dimX2)*to_update3.dK_dXdiag(X, dimX_1)
return prod_sum
[docs] def update_gradients_direct(self, *args):
for i, (g,p) in enumerate(zip(args, self.parts)):
p.update_gradients_direct(*g)
[docs] def dgradients_dX(self, X, X2, dimX, parts=None):
"""
Compute the hyperparameter gradients of:
the derivative of K with respect to dimension dimX of set X
("dK_dX").
"""
if parts is None:
parts = self.parts
gradients = []
for part in parts:
neq_parts = [p for p in parts if p is not part]
if len(neq_parts) > 0:
K = self.K(X, X2, which_parts=neq_parts)
K_dx = self.dK_dX(X, X2, dimX, which_parts=neq_parts)
else:
K = np.ones((X.shape[0], X2.shape[0]))
K_dx = np.zeros((X.shape[0], X2.shape[0]))
g = part.dgradients(X, X2)
g_dx = part.dgradients_dX(X, X2, dimX)
gradients += [[(g_i*K_dx + g_dx_i*K) for (g_i, g_dx_i) in zip(g, g_dx)]]
return gradients
[docs] def dgradients_dX2(self, X, X2, dimX2, parts=None):
"""
Compute the hyperparameter gradients of:
the derivative of K with respect to dimension dimX2 of set X2
("dK_dX2").
"""
if parts is None:
parts = self.parts
gradients = []
for part in parts:
neq_parts = [p for p in parts if p is not part]
if len(neq_parts) > 0:
K = self.K(X, X2, which_parts=neq_parts)
K_dx2 = self.dK_dX2(X, X2, dimX2, which_parts=neq_parts)
else:
K = np.ones((X.shape[0], X2.shape[0]))
K_dx2 = np.zeros((X.shape[0], X2.shape[0]))
g = part.dgradients(X, X2)
g_dx2 = part.dgradients_dX2(X, X2, dimX2)
gradients += [[(g_i*K_dx2 + g_dx2_i*K) for (g_i, g_dx2_i) in zip(g, g_dx2)]]
return gradients
[docs] def dgradients2_dXdX2(self, X, X2, dimX, dimX2, parts=None):
"""
Compute the hyperparameter gradients of:
the second derivative of K with respect to:
dimension dimX of set X, and
dimension dimX2 of set X2
("dK2_dXdX2").
"""
if parts is None:
parts = self.parts
gradients = []
for part in parts:
neq_parts = [p for p in parts if p is not part]
K = self.K(X, X2, which_parts=neq_parts)
K_dx = self.dK_dX(X, X2, dimX, which_parts=neq_parts)
K_dx2 = self.dK_dX2(X, X2, dimX2, which_parts=neq_parts)
K_dxdx2 = self.dK2_dXdX2(X, X2, dimX, dimX2, which_parts=neq_parts)
g = part.dgradients(X, X2)
g_dx = part.dgradients_dX(X, X2, dimX)
g_dx2 = part.dgradients_dX2(X, X2, dimX2)
g_dxdx2 = part.dgradients2_dXdX2(X, X2, dimX, dimX2)
gradients += [[(g_i*K_dxdx2 + g_dx_i*K_dx2 + g_dx2_i*K_dx + g_dxdx2_i*K) for (g_i, g_dx_i, g_dx2_i, g_dxdx2_i) in zip(g, g_dx, g_dx2, g_dxdx2)]]
return gradients
[docs] def update_gradients_full(self, dL_dK, X, X2=None):
if len(self.parts)==2:
self.parts[0].update_gradients_full(dL_dK*self.parts[1].K(X,X2), X, X2)
self.parts[1].update_gradients_full(dL_dK*self.parts[0].K(X,X2), X, X2)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
to_update.update_gradients_full(dL_dK * prod, X, X2)
[docs] def update_gradients_diag(self, dL_dKdiag, X):
if len(self.parts)==2:
self.parts[0].update_gradients_diag(dL_dKdiag*self.parts[1].Kdiag(X), X)
self.parts[1].update_gradients_diag(dL_dKdiag*self.parts[0].Kdiag(X), X)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
to_update.update_gradients_diag(dL_dKdiag * prod, X)
[docs] def gradients_X(self, dL_dK, X, X2=None):
target = np.zeros(X.shape)
if len(self.parts)==2:
target += self.parts[0].gradients_X(dL_dK*self.parts[1].K(X, X2), X, X2)
target += self.parts[1].gradients_X(dL_dK*self.parts[0].K(X, X2), X, X2)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
target += to_update.gradients_X(dL_dK * prod, X, X2)
return target
[docs] def gradients_X_diag(self, dL_dKdiag, X):
target = np.zeros(X.shape)
if len(self.parts)==2:
target += self.parts[0].gradients_X_diag(dL_dKdiag*self.parts[1].Kdiag(X), X)
target += self.parts[1].gradients_X_diag(dL_dKdiag*self.parts[0].Kdiag(X), X)
else:
k = self.Kdiag(X)*dL_dKdiag
for p in self.parts:
target += p.gradients_X_diag(k/p.Kdiag(X),X)
return target
[docs] def sde_update_gradient_full(self, gradients):
"""
Update gradient in the order in which parameters are represented in the
kernel
"""
part_start_param_index = 0
for p in self.parts:
if not p.is_fixed:
part_param_num = len(p.param_array) # number of parameters in the part
p.sde_update_gradient_full(gradients[part_start_param_index:(part_start_param_index+part_param_num)])
part_start_param_index += part_param_num
[docs] def sde(self):
"""
"""
F = np.array((0,), ndmin=2)
L = np.array((1,), ndmin=2)
Qc = np.array((1,), ndmin=2)
H = np.array((1,), ndmin=2)
Pinf = np.array((1,), ndmin=2)
P0 = np.array((1,), ndmin=2)
dF = None
dQc = None
dPinf = None
dP0 = None
# Assign models
for p in self.parts:
(Ft,Lt,Qct,Ht,P_inft, P0t, dFt,dQct,dP_inft,dP0t) = p.sde()
# check derivative dimensions ->
number_of_parameters = len(p.param_array)
assert dFt.shape[2] == number_of_parameters, "Dynamic matrix derivative shape is wrong"
assert dQct.shape[2] == number_of_parameters, "Diffusion matrix derivative shape is wrong"
assert dP_inft.shape[2] == number_of_parameters, "Infinite covariance matrix derivative shape is wrong"
# check derivative dimensions <-
# exception for periodic kernel
if (p.name == 'std_periodic'):
Qct = P_inft
dQct = dP_inft
dF = dkron(F,dF,Ft,dFt,'sum')
dQc = dkron(Qc,dQc,Qct,dQct,'prod')
dPinf = dkron(Pinf,dPinf,P_inft,dP_inft,'prod')
dP0 = dkron(P0,dP0,P0t,dP0t,'prod')
F = np.kron(F,np.eye(Ft.shape[0])) + np.kron(np.eye(F.shape[0]),Ft)
L = np.kron(L,Lt)
Qc = np.kron(Qc,Qct)
Pinf = np.kron(Pinf,P_inft)
P0 = np.kron(P0,P_inft)
H = np.kron(H,Ht)
return (F,L,Qc,H,Pinf,P0,dF,dQc,dPinf,dP0)
[docs]def dkron(A,dA,B,dB, operation='prod'):
"""
Function computes the derivative of Kronecker product A*B
(or Kronecker sum A+B).
Input:
-----------------------
A: 2D matrix
Some matrix
dA: 3D (or 2D matrix)
Derivarives of A
B: 2D matrix
Some matrix
dB: 3D (or 2D matrix)
Derivarives of B
operation: str 'prod' or 'sum'
Which operation is considered. If the operation is 'sum' it is assumed
that A and are square matrices.s
Output:
dC: 3D matrix
Derivative of Kronecker product A*B (or Kronecker sum A+B)
"""
if dA is None:
dA_param_num = 0
dA = np.zeros((A.shape[0], A.shape[1],1))
else:
dA_param_num = dA.shape[2]
if dB is None:
dB_param_num = 0
dB = np.zeros((B.shape[0], B.shape[1],1))
else:
dB_param_num = dB.shape[2]
# Space allocation for derivative matrix
dC = np.zeros((A.shape[0]*B.shape[0], A.shape[1]*B.shape[1], dA_param_num + dB_param_num))
for k in range(dA_param_num):
if operation == 'prod':
dC[:,:,k] = np.kron(dA[:,:,k],B);
else:
dC[:,:,k] = np.kron(dA[:,:,k],np.eye( B.shape[0] ))
for k in range(dB_param_num):
if operation == 'prod':
dC[:,:,dA_param_num+k] = np.kron(A,dB[:,:,k])
else:
dC[:,:,dA_param_num+k] = np.kron(np.eye( A.shape[0] ),dB[:,:,k])
return dC