# Copyright (c) 2012-2014, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
"""
Gaussian Processes regression examples
"""
MPL_AVAILABLE = True
try:
import matplotlib.pyplot as plt
except ImportError:
MPL_AVAILABLE = False
import numpy as np
import GPy
[docs]def olympic_marathon_men(optimize=True, plot=True):
"""Run a standard Gaussian process regression on the Olympic marathon data."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.olympic_marathon_men()
# create simple GP Model
m = GPy.models.GPRegression(data["X"], data["Y"])
# set the lengthscale to be something sensible (defaults to 1)
m.kern.lengthscale = 10.0
if optimize:
m.optimize("bfgs", max_iters=200)
if MPL_AVAILABLE and plot:
m.plot(plot_limits=(1850, 2050))
return m
[docs]def coregionalization_toy(optimize=True, plot=True):
"""
A simple demonstration of coregionalization on two sinusoidal functions.
"""
# build a design matrix with a column of integers indicating the output
X1 = np.random.rand(50, 1) * 8
X2 = np.random.rand(30, 1) * 5
# build a suitable set of observed variables
Y1 = np.sin(X1) + np.random.randn(*X1.shape) * 0.05
Y2 = np.sin(X2) + np.random.randn(*X2.shape) * 0.05 + 2.0
m = GPy.models.GPCoregionalizedRegression(X_list=[X1, X2], Y_list=[Y1, Y2])
if optimize:
m.optimize("bfgs", max_iters=100)
if MPL_AVAILABLE and plot:
slices = GPy.util.multioutput.get_slices([X1, X2])
m.plot(
fixed_inputs=[(1, 0)],
which_data_rows=slices[0],
Y_metadata={"output_index": 0},
)
m.plot(
fixed_inputs=[(1, 1)],
which_data_rows=slices[1],
Y_metadata={"output_index": 1},
ax=plt.gca(),
)
return m
[docs]def coregionalization_sparse(optimize=True, plot=True):
"""A simple demonstration of coregionalization on two sinusoidal
functions using sparse approximations. """
# build a design matrix with a column of integers indicating the output
X1 = np.random.rand(50, 1) * 8
X2 = np.random.rand(30, 1) * 5
# build a suitable set of observed variables
Y1 = np.sin(X1) + np.random.randn(*X1.shape) * 0.05
Y2 = np.sin(X2) + np.random.randn(*X2.shape) * 0.05 + 2.0
m = GPy.models.SparseGPCoregionalizedRegression(X_list=[X1, X2], Y_list=[Y1, Y2])
if optimize:
m.optimize("bfgs", max_iters=100)
if MPL_AVAILABLE and plot:
slices = GPy.util.multioutput.get_slices([X1, X2])
m.plot(
fixed_inputs=[(1, 0)],
which_data_rows=slices[0],
Y_metadata={"output_index": 0},
)
m.plot(
fixed_inputs=[(1, 1)],
which_data_rows=slices[1],
Y_metadata={"output_index": 1},
ax=plt.gca(),
)
plt.ylim(-3,)
return m
[docs]def epomeo_gpx(max_iters=200, optimize=True, plot=True):
"""
Perform Gaussian process regression on the latitude and longitude data
from the Mount Epomeo runs. Requires gpxpy to be installed on your system
to load in the data.
"""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.epomeo_gpx()
num_data_list = []
for Xpart in data["X"]:
num_data_list.append(Xpart.shape[0])
num_data_array = np.array(num_data_list)
num_data = num_data_array.sum()
Y = np.zeros((num_data, 2))
t = np.zeros((num_data, 2))
start = 0
for Xpart, index in zip(data["X"], range(len(data["X"]))):
end = start + Xpart.shape[0]
t[start:end, :] = np.hstack(
(Xpart[:, 0:1], index * np.ones((Xpart.shape[0], 1)))
)
Y[start:end, :] = Xpart[:, 1:3]
num_inducing = 200
Z = np.hstack(
(
np.linspace(t[:, 0].min(), t[:, 0].max(), num_inducing)[:, None],
np.random.randint(0, 4, num_inducing)[:, None],
)
)
k1 = GPy.kern.RBF(1)
k2 = GPy.kern.Coregionalize(output_dim=5, rank=5)
k = k1 ** k2
m = GPy.models.SparseGPRegression(t, Y, kernel=k, Z=Z, normalize_Y=True)
m.constrain_fixed(".*variance", 1.0)
m.inducing_inputs.constrain_fixed()
m.Gaussian_noise.variance.constrain_bounded(1e-3, 1e-1)
m.optimize(max_iters=max_iters, messages=True)
return m
[docs]def multiple_optima(
gene_number=937,
resolution=80,
model_restarts=10,
seed=10000,
max_iters=300,
optimize=True,
plot=True,
):
"""
Show an example of a multimodal error surface for Gaussian process
regression. Gene 939 has bimodal behaviour where the noisy mode is
higher.
"""
# Contour over a range of length scales and signal/noise ratios.
length_scales = np.linspace(0.1, 60.0, resolution)
log_SNRs = np.linspace(-3.0, 4.0, resolution)
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.della_gatta_TRP63_gene_expression(
data_set="della_gatta", gene_number=gene_number
)
# data['Y'] = data['Y'][0::2, :]
# data['X'] = data['X'][0::2, :]
data["Y"] = data["Y"] - np.mean(data["Y"])
lls = GPy.examples.regression._contour_data(
data, length_scales, log_SNRs, GPy.kern.RBF
)
if MPL_AVAILABLE and plot:
plt.contour(length_scales, log_SNRs, np.exp(lls), 20, cmap=plt.cm.jet)
ax = plt.gca()
plt.xlabel("length scale")
plt.ylabel("log_10 SNR")
xlim = ax.get_xlim()
ylim = ax.get_ylim()
# Now run a few optimizations
models = []
optim_point_x = np.empty(2)
optim_point_y = np.empty(2)
np.random.seed(seed=seed)
for i in range(0, model_restarts):
# kern = GPy.kern.RBF(
# 1, variance=np.random.exponential(1.), lengthscale=np.random.exponential(50.)
# )
kern = GPy.kern.RBF(
1, variance=np.random.uniform(1e-3, 1), lengthscale=np.random.uniform(5, 50)
)
m = GPy.models.GPRegression(data["X"], data["Y"], kernel=kern)
m.likelihood.variance = np.random.uniform(1e-3, 1)
optim_point_x[0] = m.rbf.lengthscale
optim_point_y[0] = np.log10(m.rbf.variance) - np.log10(m.likelihood.variance)
# optimize
if optimize:
m.optimize("scg", xtol=1e-6, ftol=1e-6, max_iters=max_iters)
optim_point_x[1] = m.rbf.lengthscale
optim_point_y[1] = np.log10(m.rbf.variance) - np.log10(m.likelihood.variance)
if MPL_AVAILABLE and plot:
plt.arrow(
optim_point_x[0],
optim_point_y[0],
optim_point_x[1] - optim_point_x[0],
optim_point_y[1] - optim_point_y[0],
label=str(i),
head_length=1,
head_width=0.5,
fc="k",
ec="k",
)
models.append(m)
if MPL_AVAILABLE and plot:
ax.set_xlim(xlim)
ax.set_ylim(ylim)
return m # (models, lls)
def _contour_data(data, length_scales, log_SNRs, kernel_call=GPy.kern.RBF):
"""
Evaluate the GP objective function for a given data set for a range of
signal to noise ratios and a range of lengthscales.
:data_set: A data set from the utils.datasets director.
:length_scales: a list of length scales to explore for the contour plot.
:log_SNRs: a list of base 10 logarithm signal to noise ratios to explore for the contour plot.
:kernel: a kernel to use for the 'signal' portion of the data.
"""
lls = []
total_var = np.var(data["Y"])
kernel = kernel_call(1, variance=1.0, lengthscale=1.0)
model = GPy.models.GPRegression(data["X"], data["Y"], kernel=kernel)
for log_SNR in log_SNRs:
SNR = 10.0 ** log_SNR
noise_var = total_var / (1.0 + SNR)
signal_var = total_var - noise_var
model.kern[".*variance"] = signal_var
model.likelihood.variance = noise_var
length_scale_lls = []
for length_scale in length_scales:
model[".*lengthscale"] = length_scale
length_scale_lls.append(model.log_likelihood())
lls.append(length_scale_lls)
return np.array(lls)
[docs]def olympic_100m_men(optimize=True, plot=True):
"""Run a standard Gaussian process regression on the Rogers and Girolami olympics data."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.olympic_100m_men()
# create simple GP Model
m = GPy.models.GPRegression(data["X"], data["Y"])
# set the lengthscale to be something sensible (defaults to 1)
m.rbf.lengthscale = 10
if optimize:
m.optimize("bfgs", max_iters=200)
if MPL_AVAILABLE and plot:
m.plot(plot_limits=(1850, 2050))
return m
[docs]def toy_rbf_1d(optimize=True, plot=True):
"""Run a simple demonstration of a standard Gaussian process fitting it to data sampled from an RBF covariance."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.toy_rbf_1d()
# create simple GP Model
m = GPy.models.GPRegression(data["X"], data["Y"])
if optimize:
m.optimize("bfgs")
if MPL_AVAILABLE and plot:
m.plot()
return m
[docs]def toy_rbf_1d_50(optimize=True, plot=True):
"""Run a simple demonstration of a standard Gaussian process fitting
it to data sampled from an RBF covariance."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.toy_rbf_1d_50()
# create simple GP Model
m = GPy.models.GPRegression(data["X"], data["Y"])
if optimize:
m.optimize("bfgs")
if MPL_AVAILABLE and plot:
m.plot()
return m
[docs]def toy_poisson_rbf_1d_laplace(optimize=True, plot=True):
"""Run a simple demonstration of a standard Gaussian process fitting it to data sampled from an RBF covariance."""
optimizer = "scg"
x_len = 100
X = np.linspace(0, 10, x_len)[:, None]
f_true = np.random.multivariate_normal(np.zeros(x_len), GPy.kern.RBF(1).K(X))
Y = np.array([np.random.poisson(np.exp(f)) for f in f_true])[:, None]
kern = GPy.kern.RBF(1)
poisson_lik = GPy.likelihoods.Poisson()
laplace_inf = GPy.inference.latent_function_inference.Laplace()
# create simple GP Model
m = GPy.core.GP(
X, Y, kernel=kern, likelihood=poisson_lik, inference_method=laplace_inf
)
if optimize:
m.optimize(optimizer)
if MPL_AVAILABLE and plot:
m.plot()
# plot the real underlying rate function
plt.plot(X, np.exp(f_true), "--k", linewidth=2)
return m
[docs]def toy_ARD(
max_iters=1000, kernel_type="linear", num_samples=300, D=4, optimize=True, plot=True
):
# Create an artificial dataset where the values in the targets (Y)
# only depend in dimensions 1 and 3 of the inputs (X). Run ARD to
# see if this dependency can be recovered
X1 = np.sin(np.sort(np.random.rand(num_samples, 1) * 10, 0))
X2 = np.cos(np.sort(np.random.rand(num_samples, 1) * 10, 0))
X3 = np.exp(np.sort(np.random.rand(num_samples, 1), 0))
X4 = np.log(np.sort(np.random.rand(num_samples, 1), 0))
X = np.hstack((X1, X2, X3, X4))
Y1 = np.asarray(2 * X[:, 0] + 3).reshape(-1, 1)
Y2 = np.asarray(4 * (X[:, 2] - 1.5 * X[:, 0])).reshape(-1, 1)
Y = np.hstack((Y1, Y2))
Y = np.dot(Y, np.random.rand(2, D))
Y = Y + 0.2 * np.random.randn(Y.shape[0], Y.shape[1])
Y -= Y.mean()
Y /= Y.std()
if kernel_type == "linear":
kernel = GPy.kern.Linear(X.shape[1], ARD=1)
elif kernel_type == "rbf_inv":
kernel = GPy.kern.RBF_inv(X.shape[1], ARD=1)
else:
kernel = GPy.kern.RBF(X.shape[1], ARD=1)
kernel += GPy.kern.White(X.shape[1]) + GPy.kern.Bias(X.shape[1])
m = GPy.models.GPRegression(X, Y, kernel)
# len_prior = GPy.priors.inverse_gamma(1,18) # 1, 25
# m.set_prior('.*lengthscale',len_prior)
if optimize:
m.optimize(optimizer="scg", max_iters=max_iters)
if MPL_AVAILABLE and plot:
m.kern.plot_ARD()
return m
[docs]def toy_ARD_sparse(
max_iters=1000, kernel_type="linear", num_samples=300, D=4, optimize=True, plot=True
):
# Create an artificial dataset where the values in the targets (Y)
# only depend in dimensions 1 and 3 of the inputs (X). Run ARD to
# see if this dependency can be recovered
X1 = np.sin(np.sort(np.random.rand(num_samples, 1) * 10, 0))
X2 = np.cos(np.sort(np.random.rand(num_samples, 1) * 10, 0))
X3 = np.exp(np.sort(np.random.rand(num_samples, 1), 0))
X4 = np.log(np.sort(np.random.rand(num_samples, 1), 0))
X = np.hstack((X1, X2, X3, X4))
Y1 = np.asarray(2 * X[:, 0] + 3)[:, None]
Y2 = np.asarray(4 * (X[:, 2] - 1.5 * X[:, 0]))[:, None]
Y = np.hstack((Y1, Y2))
Y = np.dot(Y, np.random.rand(2, D))
Y = Y + 0.2 * np.random.randn(Y.shape[0], Y.shape[1])
Y -= Y.mean()
Y /= Y.std()
if kernel_type == "linear":
kernel = GPy.kern.Linear(X.shape[1], ARD=1)
elif kernel_type == "rbf_inv":
kernel = GPy.kern.RBF_inv(X.shape[1], ARD=1)
else:
kernel = GPy.kern.RBF(X.shape[1], ARD=1)
# kernel += GPy.kern.Bias(X.shape[1])
X_variance = np.ones(X.shape) * 0.5
m = GPy.models.SparseGPRegression(X, Y, kernel, X_variance=X_variance)
# len_prior = GPy.priors.inverse_gamma(1,18) # 1, 25
# m.set_prior('.*lengthscale',len_prior)
if optimize:
m.optimize(optimizer="scg", max_iters=max_iters)
if MPL_AVAILABLE and plot:
m.kern.plot_ARD()
return m
[docs]def robot_wireless(max_iters=100, kernel=None, optimize=True, plot=True):
"""Predict the location of a robot given wirelss signal strength readings."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.robot_wireless()
# create simple GP Model
m = GPy.models.GPRegression(data["Y"], data["X"], kernel=kernel)
# optimize
if optimize:
m.optimize(max_iters=max_iters)
Xpredict = m.predict(data["Ytest"])[0]
if MPL_AVAILABLE and plot:
plt.plot(data["Xtest"][:, 0], data["Xtest"][:, 1], "r-")
plt.plot(Xpredict[:, 0], Xpredict[:, 1], "b-")
plt.axis("equal")
plt.title("WiFi Localization with Gaussian Processes")
plt.legend(("True Location", "Predicted Location"))
sse = ((data["Xtest"] - Xpredict) ** 2).sum()
print(("Sum of squares error on test data: " + str(sse)))
return m
[docs]def silhouette(max_iters=100, optimize=True, plot=True):
"""Predict the pose of a figure given a silhouette. This is a task from Agarwal and Triggs 2004 ICML paper."""
try:
import pods
except ImportError:
print("pods unavailable, see https://github.com/sods/ods for example datasets")
return
data = pods.datasets.silhouette()
# create simple GP Model
m = GPy.models.GPRegression(data["X"], data["Y"])
# optimize
if optimize:
m.optimize(messages=True, max_iters=max_iters)
print(m)
return m
[docs]def sparse_GP_regression_1D(
num_samples=400,
num_inducing=5,
max_iters=100,
optimize=True,
plot=True,
checkgrad=False,
):
"""Run a 1D example of a sparse GP regression."""
# sample inputs and outputs
X = np.random.uniform(-3.0, 3.0, (num_samples, 1))
Y = np.sin(X) + np.random.randn(num_samples, 1) * 0.05
# construct kernel
rbf = GPy.kern.RBF(1)
# create simple GP Model
m = GPy.models.SparseGPRegression(X, Y, kernel=rbf, num_inducing=num_inducing)
if checkgrad:
m.checkgrad()
if optimize:
m.optimize("tnc", max_iters=max_iters)
if MPL_AVAILABLE and plot:
m.plot()
return m
[docs]def sparse_GP_regression_2D(
num_samples=400, num_inducing=50, max_iters=100, optimize=True, plot=True, nan=False
):
"""Run a 2D example of a sparse GP regression."""
np.random.seed(1234)
X = np.random.uniform(-3.0, 3.0, (num_samples, 2))
Y = np.sin(X[:, 0:1]) * np.sin(X[:, 1:2]) + np.random.randn(num_samples, 1) * 0.05
if nan:
inan = np.random.binomial(1, 0.2, size=Y.shape)
Y[inan] = np.nan
# construct kernel
rbf = GPy.kern.RBF(2)
# create simple GP Model
m = GPy.models.SparseGPRegression(X, Y, kernel=rbf, num_inducing=num_inducing)
# contrain all parameters to be positive (but not inducing inputs)
m[".*len"] = 2.0
m.checkgrad()
# optimize
if optimize:
m.optimize("tnc", messages=1, max_iters=max_iters)
# plot
if MPL_AVAILABLE and plot:
m.plot()
print(m)
return m
[docs]def simple_mean_function(max_iters=100, optimize=True, plot=True):
"""
The simplest possible mean function. No parameters, just a simple Sinusoid.
"""
# create simple mean function
mf = GPy.core.Mapping(1, 1)
mf.f = np.sin
mf.update_gradients = lambda a, b: None
X = np.linspace(0, 10, 50).reshape(-1, 1)
Y = np.sin(X) + 0.5 * np.cos(3 * X) + 0.1 * np.random.randn(*X.shape)
k = GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()
m = GPy.core.GP(X, Y, kernel=k, likelihood=lik, mean_function=mf)
if optimize:
m.optimize(max_iters=max_iters)
if MPL_AVAILABLE and plot:
m.plot(plot_limits=(-10, 15))
return m
[docs]def parametric_mean_function(max_iters=100, optimize=True, plot=True):
"""
A linear mean function with parameters that we'll learn alongside the kernel
"""
# create simple mean function
mf = GPy.core.Mapping(1, 1)
mf.f = np.sin
X = np.linspace(0, 10, 50).reshape(-1, 1)
Y = np.sin(X) + 0.5 * np.cos(3 * X) + 0.1 * np.random.randn(*X.shape) + 3 * X
mf = GPy.mappings.Linear(1, 1)
k = GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()
m = GPy.core.GP(X, Y, kernel=k, likelihood=lik, mean_function=mf)
if optimize:
m.optimize(max_iters=max_iters)
if MPL_AVAILABLE and plot:
m.plot()
return m
[docs]def warped_gp_cubic_sine(max_iters=100, plot=True):
"""
A test replicating the cubic sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0, 0.2, 151)
Y = np.array([np.power(abs(y), float(1) / 3) * (1, -1)[y < 0] for y in Y])
X = X[:, None]
Y = Y[:, None]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhFunction(n_terms=2)
warp_m = GPy.models.WarpedGP(X, Y, kernel=warp_k, warping_function=warp_f)
warp_m[".*\\.d"].constrain_fixed(1.0)
m = GPy.models.GPRegression(X, Y)
m.optimize_restarts(
parallel=False, robust=True, num_restarts=5, max_iters=max_iters
)
warp_m.optimize_restarts(
parallel=False, robust=True, num_restarts=5, max_iters=max_iters
)
# m.optimize(max_iters=max_iters)
# warp_m.optimize(max_iters=max_iters)
print(warp_m)
print(warp_m[".*warp.*"])
if MPL_AVAILABLE and plot:
warp_m.predict_in_warped_space = False
warp_m.plot(title="Warped GP - Latent space")
warp_m.predict_in_warped_space = True
warp_m.plot(title="Warped GP - Warped space")
m.plot(title="Standard GP")
warp_m.plot_warping()
plt.show()
return warp_m
[docs]def multioutput_gp_with_derivative_observations(plot=True):
f = lambda x: np.sin(x) + 0.1 * (x - 2.0) ** 2 - 0.005 * x ** 3
fd = lambda x: np.cos(x) + 0.2 * (x - 2.0) - 0.015 * x ** 2
N = 10 # Number of observations
M = 10 # Number of derivative observations
Npred = 100 # Number of prediction points
sigma = 0.05 # Noise of observations
sigma_der = 0.05 # Noise of derivative observations
x = np.array([np.linspace(1, 10, N)]).T
y = f(x) + np.array(sigma * np.random.normal(0, 1, (N, 1)))
xd = np.array([np.linspace(2, 8, M)]).T
yd = fd(xd) + np.array(sigma_der * np.random.normal(0, 1, (M, 1)))
xpred = np.array([np.linspace(0, 11, Npred)]).T
ypred_true = f(xpred)
ydpred_true = fd(xpred)
# squared exponential kernel:
se = GPy.kern.RBF(input_dim=1, lengthscale=1.5, variance=0.2)
# We need to generate separate kernel for the derivative observations and give the created kernel as an input:
se_der = GPy.kern.DiffKern(se, 0)
# Then
gauss = GPy.likelihoods.Gaussian(variance=sigma ** 2)
gauss_der = GPy.likelihoods.Gaussian(variance=sigma_der ** 2)
# Then create the model, we give everything in lists, the order of the inputs indicates the order of the outputs
# Now we have the regular observations first and derivative observations second, meaning that the kernels and
# the likelihoods must follow the same order. Crosscovariances are automatically taken care of
m = GPy.models.MultioutputGP(
X_list=[x, xd],
Y_list=[y, yd],
kernel_list=[se, se_der],
likelihood_list=[gauss, gauss_der],
)
# Optimize the model
m.optimize(messages=0, ipython_notebook=False)
if MPL_AVAILABLE and plot:
def plot_gp_vs_real(
m, x, yreal, size_inputs, title, fixed_input=1, xlim=[0, 11], ylim=[-1.5, 3]
):
fig, ax = plt.subplots()
ax.set_title(title)
plt.plot(x, yreal, "r", label="Real function")
rows = (
slice(0, size_inputs[0])
if fixed_input == 0
else slice(size_inputs[0], size_inputs[0] + size_inputs[1])
)
m.plot(
fixed_inputs=[(1, fixed_input)],
which_data_rows=rows,
xlim=xlim,
ylim=ylim,
ax=ax,
)
# Plot the model, the syntax is same as for multioutput models:
plot_gp_vs_real(
m,
xpred,
ydpred_true,
[x.shape[0], xd.shape[0]],
title="Latent function derivatives",
fixed_input=1,
xlim=[0, 11],
ylim=[-1.5, 3],
)
plot_gp_vs_real(
m,
xpred,
ypred_true,
[x.shape[0], xd.shape[0]],
title="Latent function",
fixed_input=0,
xlim=[0, 11],
ylim=[-1.5, 3],
)
# making predictions for the values:
mu, var = m.predict_noiseless(Xnew=[xpred, np.empty((0, 1))])
return m