# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from .kern import CombinationKernel
from paramz.caching import Cache_this
import itertools
from functools import reduce
[docs]def numpy_invalid_op_as_exception(func):
"""
A decorator that allows catching numpy invalid operations
as exceptions (the default behaviour is raising warnings).
"""
def func_wrapper(*args, **kwargs):
np.seterr(invalid='raise')
result = func(*args, **kwargs)
np.seterr(invalid='warn')
return result
return func_wrapper
[docs]class Prod(CombinationKernel):
"""
Computes the product of 2 kernels
:param k1, k2: the kernels to multiply
:type k1, k2: Kern
:rtype: kernel object
"""
def __init__(self, kernels, name='mul'):
_newkerns = []
for kern in kernels:
if isinstance(kern, Prod):
for part in kern.parts:
#kern.unlink_parameter(part)
_newkerns.append(part.copy())
else:
_newkerns.append(kern.copy())
super(Prod, self).__init__(_newkerns, name)
[docs] def to_dict(self):
"""
Convert the object into a json serializable dictionary.
Note: It uses the private method _save_to_input_dict of the parent.
:return dict: json serializable dictionary containing the needed information to instantiate the object
"""
input_dict = super(Prod, self)._save_to_input_dict()
input_dict["class"] = str("GPy.kern.Prod")
return input_dict
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def K(self, X, X2=None, which_parts=None):
if which_parts is None:
which_parts = self.parts
elif not isinstance(which_parts, (list, tuple)):
# if only one part is given
which_parts = [which_parts]
return reduce(np.multiply, (p.K(X, X2) for p in which_parts))
[docs] @Cache_this(limit=3, force_kwargs=['which_parts'])
def Kdiag(self, X, which_parts=None):
if which_parts is None:
which_parts = self.parts
return reduce(np.multiply, (p.Kdiag(X) for p in which_parts))
[docs] def update_gradients_full(self, dL_dK, X, X2=None):
if len(self.parts)==2:
self.parts[0].update_gradients_full(dL_dK*self.parts[1].K(X,X2), X, X2)
self.parts[1].update_gradients_full(dL_dK*self.parts[0].K(X,X2), X, X2)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
to_update.update_gradients_full(dL_dK * prod, X, X2)
[docs] def update_gradients_diag(self, dL_dKdiag, X):
if len(self.parts)==2:
self.parts[0].update_gradients_diag(dL_dKdiag*self.parts[1].Kdiag(X), X)
self.parts[1].update_gradients_diag(dL_dKdiag*self.parts[0].Kdiag(X), X)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.Kdiag(X) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
to_update.update_gradients_diag(dL_dKdiag * prod, X)
[docs] def gradients_X(self, dL_dK, X, X2=None):
target = np.zeros(X.shape)
if len(self.parts)==2:
target += self.parts[0].gradients_X(dL_dK*self.parts[1].K(X, X2), X, X2)
target += self.parts[1].gradients_X(dL_dK*self.parts[0].K(X, X2), X, X2)
else:
for combination in itertools.combinations(self.parts, len(self.parts) - 1):
prod = reduce(np.multiply, [p.K(X, X2) for p in combination])
to_update = list(set(self.parts) - set(combination))[0]
target += to_update.gradients_X(dL_dK * prod, X, X2)
return target
[docs] def gradients_X_diag(self, dL_dKdiag, X):
target = np.zeros(X.shape)
if len(self.parts)==2:
target += self.parts[0].gradients_X_diag(dL_dKdiag*self.parts[1].Kdiag(X), X)
target += self.parts[1].gradients_X_diag(dL_dKdiag*self.parts[0].Kdiag(X), X)
else:
k = self.Kdiag(X)*dL_dKdiag
for p in self.parts:
target += p.gradients_X_diag(k/p.Kdiag(X),X)
return target
[docs] def sde_update_gradient_full(self, gradients):
"""
Update gradient in the order in which parameters are represented in the
kernel
"""
part_start_param_index = 0
for p in self.parts:
if not p.is_fixed:
part_param_num = len(p.param_array) # number of parameters in the part
p.sde_update_gradient_full(gradients[part_start_param_index:(part_start_param_index+part_param_num)])
part_start_param_index += part_param_num
[docs] def sde(self):
"""
"""
F = np.array((0,), ndmin=2)
L = np.array((1,), ndmin=2)
Qc = np.array((1,), ndmin=2)
H = np.array((1,), ndmin=2)
Pinf = np.array((1,), ndmin=2)
P0 = np.array((1,), ndmin=2)
dF = None
dQc = None
dPinf = None
dP0 = None
# Assign models
for p in self.parts:
(Ft,Lt,Qct,Ht,P_inft, P0t, dFt,dQct,dP_inft,dP0t) = p.sde()
# check derivative dimensions ->
number_of_parameters = len(p.param_array)
assert dFt.shape[2] == number_of_parameters, "Dynamic matrix derivative shape is wrong"
assert dQct.shape[2] == number_of_parameters, "Diffusion matrix derivative shape is wrong"
assert dP_inft.shape[2] == number_of_parameters, "Infinite covariance matrix derivative shape is wrong"
# check derivative dimensions <-
# exception for periodic kernel
if (p.name == 'std_periodic'):
Qct = P_inft
dQct = dP_inft
dF = dkron(F,dF,Ft,dFt,'sum')
dQc = dkron(Qc,dQc,Qct,dQct,'prod')
dPinf = dkron(Pinf,dPinf,P_inft,dP_inft,'prod')
dP0 = dkron(P0,dP0,P0t,dP0t,'prod')
F = np.kron(F,np.eye(Ft.shape[0])) + np.kron(np.eye(F.shape[0]),Ft)
L = np.kron(L,Lt)
Qc = np.kron(Qc,Qct)
Pinf = np.kron(Pinf,P_inft)
P0 = np.kron(P0,P_inft)
H = np.kron(H,Ht)
return (F,L,Qc,H,Pinf,P0,dF,dQc,dPinf,dP0)
[docs]def dkron(A,dA,B,dB, operation='prod'):
"""
Function computes the derivative of Kronecker product A*B
(or Kronecker sum A+B).
Input:
-----------------------
A: 2D matrix
Some matrix
dA: 3D (or 2D matrix)
Derivarives of A
B: 2D matrix
Some matrix
dB: 3D (or 2D matrix)
Derivarives of B
operation: str 'prod' or 'sum'
Which operation is considered. If the operation is 'sum' it is assumed
that A and are square matrices.s
Output:
dC: 3D matrix
Derivative of Kronecker product A*B (or Kronecker sum A+B)
"""
if dA is None:
dA_param_num = 0
dA = np.zeros((A.shape[0], A.shape[1],1))
else:
dA_param_num = dA.shape[2]
if dB is None:
dB_param_num = 0
dB = np.zeros((B.shape[0], B.shape[1],1))
else:
dB_param_num = dB.shape[2]
# Space allocation for derivative matrix
dC = np.zeros((A.shape[0]*B.shape[0], A.shape[1]*B.shape[1], dA_param_num + dB_param_num))
for k in range(dA_param_num):
if operation == 'prod':
dC[:,:,k] = np.kron(dA[:,:,k],B);
else:
dC[:,:,k] = np.kron(dA[:,:,k],np.eye( B.shape[0] ))
for k in range(dB_param_num):
if operation == 'prod':
dC[:,:,dA_param_num+k] = np.kron(A,dB[:,:,k])
else:
dC[:,:,dA_param_num+k] = np.kron(np.eye( A.shape[0] ),dB[:,:,k])
return dC