# Source code for GPy.testing.state_space_main_tests

# -*- coding: utf-8 -*-
# Copyright (c) 2015, Alex Grigorevskiy
"""
Test module for state_space_main.py
"""

import unittest
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

import GPy.models.state_space_setup as ss_setup
import GPy.models.state_space_main as ssm

[docs]def generate_x_points(points_num=100, x_interval = (0, 20), random=True):
"""
Function generates (sorted) points on the x axis.

Input:
---------------------------
points_num: int
How many points to generate
x_interval: tuple (a,b)
On which interval to generate points
random: bool
Regular points or random

Output:
---------------------------
x_points: np.array
Generated points
"""

x_interval = np.asarray( x_interval )

if random:
x_points = np.random.rand(points_num) * ( x_interval[1] - x_interval[0] ) + x_interval[0]
x_points = np.sort( x_points )
else:
x_points = np.linspace(x_interval[0], x_interval[1], num=points_num )

return x_points

[docs]def generate_sine_data(x_points=None, sin_period=2.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Function generates sinusoidal data.

Input:
--------------------------------

x_points: np.array
Previously generated X points
sin_period: float
Sine period
sin_ampl: float
Sine amplitude
noise_var: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data

(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)

points_num: int

x_interval: tuple (a,b)

random: bool
"""

sin_function = lambda xx: sin_ampl * np.sin( 2*np.pi/sin_period * xx )

if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)

y_points = sin_function( x_points ) + np.random.randn( len(x_points) ) * np.sqrt(noise_var)

if plot:
pass

return x_points, y_points

plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Function generates linear data.

Input:
--------------------------------

x_points: np.array
Previously generated X points
tangent: float
Factor with which independent variable is multiplied in linear equation.
noise_var: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data

(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)

points_num: int

x_interval: tuple (a,b)

random: bool
"""

linear_function = lambda xx:  tangent*xx + add_term

if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)

y_points = linear_function( x_points ) + np.random.randn( len(x_points) ) * np.sqrt(noise_var)

if plot:
pass

return x_points, y_points

[docs]def generate_brownian_data(x_points=None, kernel_var = 2.0, noise_var = 2.0,
plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Generate brownian data - data from Brownian motion.
First point is always 0, and \Beta(0) = 0  - standard conditions for Brownian motion.

Input:
--------------------------------

x_points: np.array
Previously generated X points
variance: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data

(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)

points_num: int

x_interval: tuple (a,b)

random: bool

"""
if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)
if x_points[0] != 0:
x_points[0] = 0

y_points = np.zeros( (points_num,) )
for i in range(1, points_num):
noise = np.random.randn() * np.sqrt(kernel_var * (x_points[i] - x_points[i-1]))
y_points[i] = y_points[i-1] + noise

y_points += np.random.randn( len(x_points) ) * np.sqrt(noise_var)

return x_points, y_points

sin_period=2.0, sin_ampl=10.0, plot = False,
points_num=100, x_interval = (0, 20), random=True):
"""
Generate the sum of linear trend and the sine function.

For parameters see the 'generate_linear' and 'generate_sine'.

Comment: Gaussian noise variance is added only once (for linear function).
"""

x_points, y_linear_points = generate_linear_data(x_points, tangent, add_term, noise_var,
False, points_num, x_interval, random)

x_points, y_sine_points = generate_sine_data(x_points, sin_period, sin_ampl, 0.0,
False, points_num, x_interval, random)

y_points = y_linear_points + y_sine_points

if plot:
pass

return x_points, y_points

[docs]def generate_random_y_data(samples, dim, ts_no):
"""
Generate data:

Input:
------------------

samples - how many samples
dim - dimensionality of the data
ts_no - number of time series

Output:
--------------------------
Y: np.array((samples, dim, ts_no))
"""

Y = np.empty((samples, dim, ts_no));

for i in range(0,samples):
for j in range(0,ts_no):
sample = np.random.randn(dim)
Y[i,:,j] = sample

if (Y.shape[2] == 1): # ts_no = 1
Y.shape=(Y.shape[0], Y.shape[1])
return Y

[docs]class StateSpaceKernelsTests(np.testing.TestCase):
[docs]    def setUp(self):
pass

[docs]    def run_descr_model(self, measurements, A,Q,H,R, true_states=None,
mean_compare_decimal=8,
m_init=None, P_init=None, dA=None,dQ=None,
dH=None,dR=None, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

#import pdb; pdb.set_trace()

state_dim = 1 if not isinstance(A,np.ndarray) else A.shape[0]
ts_no = 1 if (len(measurements.shape) < 3) else measurements.shape[2]
grad_params_no = None if dA is None else dA.shape[2]

ss_setup.use_cython = use_cython
global ssm
if (ssm.cython_code_available) and (ssm.use_cython != use_cython):

(f_mean, f_var, loglikelhood, g_loglikelhood, \
dynamic_callables_smoother) = ssm.DescreteStateSpace.kalman_filter(A, Q, H, R, measurements, index=None,
m_init=m_init, P_init=P_init, p_kalman_filter_type = kalman_filter_type,
calc_log_likelihood=calc_log_likelihood,

f_mean_squeezed = np.squeeze(f_mean[1:,:]) # exclude initial value
f_var_squeezed = np.squeeze(f_var[1:,:]) # exclude initial value

if true_states is not None:
#print np.max(np.abs(f_mean_squeezed-true_states))
np.testing.assert_almost_equal(np.max(np.abs(f_mean_squeezed- \
true_states)), 0, decimal=mean_compare_decimal)

np.testing.assert_equal(f_mean.shape, (measurements.shape[0]+1,state_dim,ts_no) )
np.testing.assert_equal(f_var.shape, (measurements.shape[0]+1,state_dim,state_dim) )

(M_smooth, P_smooth) = ssm.DescreteStateSpace.rts_smoother(state_dim, dynamic_callables_smoother, f_mean,
f_var)

return f_mean, f_var

[docs]    def run_continuous_model(self, F, L, Qc, p_H, p_R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=None, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

#import pdb; pdb.set_trace()

state_dim = 1 if not isinstance(F,np.ndarray) else F.shape[0]
ts_no = 1 if (len(Y_data.shape) < 3) else Y_data.shape[2]

ss_setup.use_cython = use_cython
global ssm
if (ssm.cython_code_available) and (ssm.use_cython != use_cython):

(f_mean, f_var, loglikelhood, g_loglikelhood, \
dynamic_callables_smoother) = ssm.ContDescrStateSpace.cont_discr_kalman_filter(F, L, Qc, p_H, p_R,
P_inf, X_data, Y_data, index = None,
m_init=None, P_init=None,
p_kalman_filter_type='regular',
calc_log_likelihood=False,

f_mean_squeezed = np.squeeze(f_mean[1:,:]) # exclude initial value
f_var_squeezed = np.squeeze(f_var[1:,:]) # exclude initial value

np.testing.assert_equal(f_mean.shape, (Y_data.shape[0]+1,state_dim,ts_no))
np.testing.assert_equal(f_var.shape, (Y_data.shape[0]+1,state_dim,state_dim))

(M_smooth, P_smooth) = ssm.ContDescrStateSpace.cont_discr_rts_smoother(state_dim, f_mean, \
f_var,dynamic_callables_smoother)

return f_mean, f_var

[docs]    def test_discrete_ss_first(self,plot=False):
"""
Tests discrete State-Space model - first test.
"""
np.random.seed(235) # seed the random number generator

A = 1.0 # For cython code to run properly need float input
H = 1.0
Q = 1.0
R = 1.0

steps_num = 100

# generate data ->
true_states = np.zeros((steps_num,))
init_state = 0
measurements = np.zeros((steps_num,))

for s in range(0, steps_num):
if s== 0:
true_states[0] = init_state + np.sqrt(Q)*np.random.randn()
else:
true_states[s] = true_states[s-1] + np.sqrt(R)*np.random.randn()
measurements[s] = true_states[s] + np.sqrt(R)*np.random.randn()
# generate data <-

# descrete kalman filter ->
m_init = 0; P_init = 1
d_num = 1000
state_discr = np.linspace(-10,10,d_num)

state_trans_matrix = np.empty((d_num,d_num))
for i in range(d_num):
state_trans_matrix[:,i] = norm.pdf(state_discr, loc=A*state_discr[i], scale=np.sqrt(Q))

m_prev = norm.pdf(state_discr, loc = m_init, scale = np.sqrt(P_init)); #m_prev / np.sum(m_prev)
m = np.zeros((d_num, steps_num))
i_mean = np.zeros((steps_num,))

for s in range(0, steps_num):
# Prediction step:
if (s==0):
m[:,s] =  np.dot(state_trans_matrix, m_prev)
else:
m[:,s] =  np.dot(state_trans_matrix, m[:,s-1])
# Update step:
#meas_ind = np.argmin(np.abs(state_discr - measurements[s])
y_vec = np.zeros( (d_num,))
for i in range(d_num):
y_vec[i] = norm.pdf(measurements[s], loc=H*state_discr[i], scale=np.sqrt(R))
norm_const = np.dot( y_vec, m[:,s] )
m[:,s] =  y_vec * m[:,s] / norm_const

i_mean[s] = state_discr[ np.argmax(m[:,s]) ]
# descrete kalman filter <-

(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot( true_states, 'g.-',label='true states')
#plt.plot( measurements, 'b.-', label='measurements')
plt.plot( f_mean, 'r.-',label='Kalman filter estimates')
plt.plot( i_mean, 'k.-', label='Discretization')

plt.plot( f_mean + 2*np.sqrt(f_var), 'r.--')
plt.plot( f_mean - 2*np.sqrt(f_var), 'r.--')
plt.legend()
plt.show()
# plotting <-
return None

[docs]    def test_discrete_ss_1D(self,plot=False):
"""
This function tests Kalman filter and smoothing when the state
dimensionality is one dimensional.
"""

np.random.seed(234) # seed the random number generator

# 1D ss model
state_dim = 1;
param_num = 2 # sigma_Q, sigma_R - parameters
measurement_dim = 1 # dimensionality od measurement

A = 1.0
Q = 2.0
dA= np.zeros((state_dim,state_dim,param_num))
dQ = np.zeros((state_dim,state_dim,param_num)); dQ[0,0,0] = 1.0

# measurement related parameters (subject to change) ->
H = np.ones((measurement_dim,state_dim ))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <-

# 1D measurement, 1 ts_no ->
data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:]+H*f_var[1:]*H), 'b--')
plt.plot( np.squeeze(f_mean[1:]-H*f_var[1:]*H), 'b--')
#            plt.plot( np.squeeze(M_sm[1:]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:]+H*P_sm[1:]*H), 'r--')
#            plt.plot( np.squeeze(M_sm[1:]-H*P_sm[1:]*H), 'r--')
plt.legend()
plt.title("1D state-space, 1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurement, 1 ts_no <-

# 1D measurement, 3 ts_no ->
data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,

#import pdb; pdb.set_trace()
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,:,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.squeeze(H*f_var[1:]*H), 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.squeeze(H*f_var[1:]*H), 'b--')
#            plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,0,1])+H*np.squeeze(P_sm[1:])*H, 'r--')
#            plt.plot( np.squeeze(M_sm[1:,0,1])-H*np.squeeze(P_sm[1:])*H, 'r--')
plt.legend()
plt.title("1D state-space, 1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurement, 3 ts_no <-
measurement_dim = 2 # dimensionality of measurement

H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <

data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

#        (f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
#                          mean_compare_decimal=16,
#                          m_init=None, P_init=None, dA=dA,dQ=dQ,
#                          dH=dH,dR=dR, use_cython=True,
#                          kalman_filter_type='svd',
#                          calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,0,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D state-space, 2D measurements, 3 ts_no. 1-st measurement, 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurement, 3 ts_no <-

[docs]    def test_discrete_ss_2D(self,plot=False):
"""
This function tests Kalman filter and smoothing when the state
dimensionality is two dimensional.
"""

np.random.seed(234) # seed the random number generator

# 1D ss model
state_dim = 2;
param_num = 3 # sigma_Q, sigma_R, one parameters in A - parameters
measurement_dim = 1 # dimensionality od measurement

A = np.eye(state_dim); A[0,0] = 0.5
Q = np.ones((state_dim,state_dim));
dA = np.zeros((state_dim,state_dim,param_num)); dA[1,1,2] = 1
dQ = np.zeros((state_dim,state_dim,param_num)); dQ[:,:,1] = np.eye(measurement_dim)

# measurement related parameters (subject to change) ->
H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <-

# 1D measurement, 1 ts_no ->
data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,0]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,0])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,0])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurement, 1 ts_no <-

# 1D measurement, 3 ts_no ->
data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,:,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurement, 3 ts_no <-

# 2D measurement, 3 ts_no ->
# measurement related parameters (subject to change) ->
measurement_dim = 2 # dimensionality od measurement

H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <

data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,

#        (f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
#                          mean_compare_decimal=16,
#                          m_init=None, P_init=None, dA=dA,dQ=dQ,
#                          dH=dH,dR=dR, use_cython=True,
#                          kalman_filter_type='svd',
#                          calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,0,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 2D measurements, 3 ts_no. 1-st measurement, 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurement, 3 ts_no <-

[docs]    def test_continuous_ss(self,plot=False):
"""
This function tests the continuous state-space model.
"""

# 1D measurements, 1 ts_no ->
measurement_dim = 1 # dimensionality of measurement

X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))

try:
import GPy
except ImportError as e:
return None

periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()

state_dim = dFt.shape[0];
param_num = dFt.shape[2]

# dH matrix is None

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=True,
kalman_filter_type='rbc',
calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot( X_data, np.squeeze(Y_data[:,0]), 'g.-', label='measurements')
plt.plot( X_data, np.squeeze(f_mean[1:,15]), 'b.-',label='Kalman filter estimates')
plt.plot( X_data, np.squeeze(f_mean[1:,15])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( X_data, np.squeeze(f_mean[1:,15])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#        plt.plot( np.squeeze(M_sm[1:,15]), 'r.-',label='Smoother Estimates')
#        plt.plot( np.squeeze(M_sm[1:,15])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#        plt.plot( np.squeeze(M_sm[1:,15])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurements, 1 ts_no <-

# 1D measurements, 3 ts_no ->
measurement_dim = 1 # dimensionality od measurement

X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))

periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()

state_dim = dFt.shape[0];
param_num = dFt.shape[2]

# dH matrix is None

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=True,
kalman_filter_type='rbc',
calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot(X_data, np.squeeze(Y_data[:,0,1]), 'g.-', label='measurements')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1]), 'b.-',label='Kalman filter estimates')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,15,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,15,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,15,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurements, 3 ts_no <-

# 2D measurements, 3 ts_no ->
measurement_dim = 2 # dimensionality od measurement

X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))

periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()
H = np.vstack((H,H)) # make 2D measurements
R = 1.5 * np.eye(measurement_dim)

state_dim = dFt.shape[0];
param_num = dFt.shape[2]

# dH matrix is None

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,

(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,

#        (f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
#                          m_init=None, P_init=P0, use_cython=True,
#                          kalman_filter_type='rbc',
#                          calc_log_likelihood=True,

if plot:
# plotting ->
plt.figure()
plt.plot(X_data, np.squeeze(Y_data[:,0,1]), 'g.-', label='measurements')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1]), 'b.-',label='Kalman filter estimates')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
#            plt.plot( np.squeeze(M_sm[1:,15,1]), 'r.-',label='Smoother Estimates')
#            plt.plot( np.squeeze(M_sm[1:,15,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
#            plt.plot( np.squeeze(M_sm[1:,15,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurements, 3 ts_no <-

#    """
#    Test EM gradient calculation. This method works (the formulas are such)
#    that it works only for time invariant matrices A, Q, H, R. For the continuous
#    model it means that time intervals are the same.
#    """
#
#    np.random.seed(234) # seed the random number generator
#
#    # 1D measurements, 1 ts_no ->
#    measurement_dim = 1 # dimensionality of measurement
#
#    x_data = generate_x_points(points_num=10, x_interval = (0, 20), random=False)
#    data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))
#
#    import GPy
#    #periodic_kernel = GPy.kern.sde_Matern32(1,active_dims=[0,])
#    periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
#    (F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0t) = periodic_kernel.sde()
#
#    state_dim = dFt.shape[0];
#    param_num = dFt.shape[2]
#
#    # dH matrix is None
#
#
#    #(F,L,Qc,H,P_inf,dF,dQc,dP_inf) = ssm.balance_ss_model(F,L,Qc,H,P_inf,dF,dQc,dP_inf)
#    # Use the Kalman filter to evaluate the likelihood
#
#    #import pdb; pdb.set_trace()
#    (M_kf, P_kf, log_likelihood,
#                                  L, Qc, H, 1.5, P_inf, x_data, data, m_init=None,
#                                  P_init=P0, calc_log_likelihood=True,
#
#    if plot:
#        # plotting ->
#        plt.figure()
#        plt.plot( np.squeeze(data[:,0]), 'g.-', label='measurements')
#        plt.plot( np.squeeze(M_kf[1:,15]), 'b.-',label='Kalman filter estimates')
#        plt.plot( np.squeeze(M_kf[1:,15])+np.einsum('ij,ajk,kl', H, P_kf[1:], H.T)[:,0,0], 'b--')
#        plt.plot( np.squeeze(M_kf[1:,15])-np.einsum('ij,ajk,kl', H, P_kf[1:], H.T)[:,0,0], 'b--')
#        plt.title("1D measurements, 1 ts_no")
#        plt.show()
#        # plotting <-
#    # 1D measurements, 1 ts_no <-

if __name__ == '__main__':
print("Running state-space inference tests...")
unittest.main()

#tt = StateSpaceKernelsTests('test_discrete_ss_first')
#res = tt.test_discrete_ss_first(plot=True)
#res = tt.test_discrete_ss_1D(plot=True)
#res = tt.test_discrete_ss_2D(plot=False)
#res = tt.test_continuos_ss(plot=True)