# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..core.parameterization import Parameterized, Param
from paramz.transformations import Logexp
import sys
[docs]class WarpingFunction(Parameterized):
"""
abstract function for warping
z = f(y)
"""
def __init__(self, name):
super(WarpingFunction, self).__init__(name=name)
self.rate = 0.1
[docs] def f(self, y, psi):
"""function transformation
y is a list of values (GP training data) of shape [N, 1]
"""
raise NotImplementedError
[docs] def fgrad_y(self, y, psi):
"""gradient of f w.r.t to y"""
raise NotImplementedError
[docs] def fgrad_y_psi(self, y, psi):
"""gradient of f w.r.t to y"""
raise NotImplementedError
[docs] def f_inv(self, z, max_iterations=250, y=None):
"""
Calculate the numerical inverse of f. This should be
overwritten for specific warping functions where the
inverse can be found in closed form.
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
y = np.ones_like(z)
it = 0
update = np.inf
while np.abs(update).sum() > 1e-10 and it < max_iterations:
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z) / fgrady
y -= self.rate * update
it += 1
#if it == max_iterations:
# print("WARNING!!! Maximum number of iterations reached in f_inv ")
# print("Sum of roots: %.4f" % np.sum(fy - z))
return y
[docs] def plot(self, xmin, xmax):
y = np.arange(xmin, xmax, 0.01)
f_y = self.f(y)
from matplotlib import pyplot as plt
plt.figure()
plt.plot(y, f_y)
plt.xlabel('y')
plt.ylabel('f(y)')
plt.title('warping function')
plt.show()
[docs]class TanhFunction(WarpingFunction):
"""
This is the function proposed in Snelson et al.:
A sum of tanh functions with linear trends outside
the range. Notice the term 'd', which scales the
linear trend.
"""
def __init__(self, n_terms=3, initial_y=None):
"""
n_terms specifies the number of tanh terms to be used
"""
self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms + 1
self.psi = np.ones((self.n_terms, 3))
super(TanhFunction, self).__init__(name='warp_tanh')
self.psi = Param('psi', self.psi)
self.psi[:, :2].constrain_positive()
self.d = Param('%s' % ('d'), 1.0, Logexp())
self.link_parameter(self.psi)
self.link_parameter(self.d)
self.initial_y = initial_y
[docs] def f(self, y):
"""
Transform y with f using parameter vector psi
psi = [[a,b,c]]
:math:`f = (y * d) + \\sum_{terms} a * tanh(b *(y + c))`
"""
d = self.d
mpsi = self.psi
z = d * y.copy()
for i in range(len(mpsi)):
a, b, c = mpsi[i]
z += a * np.tanh(b * (y + c))
return z
[docs] def fgrad_y(self, y, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
:returns: Nx1 vector of derivatives, unless return_precalc is true,
then it also returns the precomputed stuff
"""
d = self.d
mpsi = self.psi
# vectorized version
S = (mpsi[:,1] * (y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S)
D = 1 - (R ** 2)
GRAD = (d + (mpsi[:,0:1][:,:,None] * mpsi[:,1:2][:,:,None] * D).sum(axis=0)).T
if return_precalc:
return GRAD, S, R, D
return GRAD
[docs] def fgrad_y_psi(self, y, return_covar_chain=False):
"""
gradient of f w.r.t to y and psi
:returns: NxIx4 tensor of partial derivatives
"""
mpsi = self.psi
w, s, r, d = self.fgrad_y(y, return_precalc=True)
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
gradients[:, :, i, 0] = (b * (1.0/np.cosh(s[i])) ** 2).T
gradients[:, :, i, 1] = a * (d[i] - 2.0 * s[i] * r[i] * (1.0/np.cosh(s[i])) ** 2).T
gradients[:, :, i, 2] = (-2.0 * a * (b ** 2) * r[i] * ((1.0 / np.cosh(s[i])) ** 2)).T
gradients[:, :, 0, 3] = 1.0
if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
covar_grad_chain[:, :, i, 0] = (r[i]).T
covar_grad_chain[:, :, i, 1] = (a * (y + c) * ((1.0 / np.cosh(s[i])) ** 2).T)
covar_grad_chain[:, :, i, 2] = a * b * ((1.0 / np.cosh(s[i])) ** 2).T
covar_grad_chain[:, :, 0, 3] = y
return gradients, covar_grad_chain
return gradients
[docs] def update_grads(self, Y_untransformed, Kiy):
grad_y = self.fgrad_y(Y_untransformed)
grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed,
return_covar_chain=True)
djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0)
dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0)
warping_grads = -dquad_dpsi + djac_dpsi
self.psi.gradient[:] = warping_grads[:, :-1]
self.d.gradient[:] = warping_grads[0, -1]
[docs]class LogFunction(WarpingFunction):
"""
Easy wrapper for applying a fixed log warping function to
positive-only values.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
"""
def __init__(self, closed_inverse=True):
self.num_parameters = 0
super(LogFunction, self).__init__(name='log')
if closed_inverse:
self.f_inv = self._f_inv
[docs] def f(self, y):
return np.log(y)
[docs] def fgrad_y(self, y):
return 1. / y
[docs] def update_grads(self, Y_untransformed, Kiy):
pass
[docs] def fgrad_y_psi(self, y, return_covar_chain=False):
if return_covar_chain:
return 0, 0
return 0
def _f_inv(self, z, y=None):
return np.exp(z)
[docs]class IdentityFunction(WarpingFunction):
"""
Identity warping function. This is for testing and sanity check purposes
and should not be used in practice.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
"""
def __init__(self, closed_inverse=True):
self.num_parameters = 0
super(IdentityFunction, self).__init__(name='identity')
if closed_inverse:
self.f_inv = self._f_inv
[docs] def f(self, y):
return y
[docs] def fgrad_y(self, y):
return np.ones(y.shape)
[docs] def update_grads(self, Y_untransformed, Kiy):
pass
[docs] def fgrad_y_psi(self, y, return_covar_chain=False):
if return_covar_chain:
return 0, 0
return 0
def _f_inv(self, z, y=None):
return z