from __future__ import annotations
import numpy as np
from itertools import chain
import warnings
from time import time
from scipy.linalg.lapack import dtrtri
from scipy import optimize
from scipy.stats import norm
from hetgpy.covariance_functions import cov_gen, partial_cov_gen, euclidean_dist
from hetgpy.utils import fast_tUY2, rho_AN
from hetgpy.find_reps import find_reps
from hetgpy.auto_bounds import auto_bounds
from hetgpy.find_reps import find_reps
from hetgpy.utils import duplicated
from hetgpy.update_covar import update_Ki, update_Ki_rep
from hetgpy.plot import plot_diagnostics, plot_optimization_iterates
from copy import deepcopy
import contextlib
from numpy.typing import ArrayLike, NDArray
NDArrayInt = NDArray[np.int_]
MACHINE_DOUBLE_EPS = np.sqrt(np.finfo(float).eps)
[docs]
class homGP():
def __init__(self):
self.mle = self.mleHomGP
return
def __getitem__(self, key):
return self.__dict__[key]
def __setitem__(self,item,value):
self.__dict__[item] = value
[docs]
def get(self,key):
r'''
General `get` item (retrives key from self.__dict__)
'''
return self.__dict__.get(key)
[docs]
def logLikHom(self,X0: ArrayLike, Z0: ArrayLike, Z: ArrayLike, mult: NDArrayInt, theta: ArrayLike, g: float, beta0: float | None = None, covtype: str = "Gaussian", eps: float = MACHINE_DOUBLE_EPS) -> float:
r'''
Log Likelihood under homoskedastic noise
Parameters
----------
X0: ndarray_like
unique design matrix (of size nxd)
Z0: ndarray_like
averaged responses (size nx1)
Z: ndarray_like
observation (output) vector (size N)
mult: ndarray_like
number of replicates as each design location. Note that mult.sum()==N
theta: ndarray_like
lengthscale
g: float
noise variance
beta0: float
trend
covtype: str
covariance kernel to use
eps: float
amount of jitter on diagonal of covariance matrix
Returns
-------
loglik: float
log-likelihood
'''
n = X0.shape[0]
N = Z.shape[0]
C = cov_gen(X1 = X0, theta = theta, type = covtype)
self.C = C
jitter = (eps + g / mult) * np.eye(n)
Ki = np.linalg.cholesky(C + jitter).T
ldetKi = - 2.0 * np.sum(np.log(np.diag(Ki)))
# to mirror R's chol2inv: do the following:
# expose dtrtri from lapack (for fast cholesky inversion of a triangular matrix)
# use result to compute Ki (should match chol2inv)
Ki = dtrtri(Ki)[0] # -- equivalent of chol2inv -- see https://stackoverflow.com/questions/6042308/numpy-inverting-an-upper-triangular-matrix
Ki = Ki @ Ki.T # -- equivalent of chol2inv
self.Ki = Ki
if beta0 is None:
beta0 = Ki.sum(axis=1) @ Z0 / Ki.sum()
self.beta0 = beta0
psi_0 = (Z0 - beta0).T @ Ki @ (Z0 - beta0)
psi = (1.0 / N) * ((((Z-beta0).T @ (Z-beta0) - ((Z0-beta0)*mult).T @ (Z0-beta0)) / g) + psi_0)
# check for divide by zero warnings
if psi <=0: return np.nan
if g <= 0: return np.nan
loglik = (-N / 2.0) * np.log(2*np.pi) - (N / 2.0) * np.log(psi) + (1.0 / 2.0) * ldetKi - (N - n)/2.0 * np.log(g) - (1.0 / 2.0) * np.log(mult).sum() - (N / 2.0)
#print('loglik: ', loglik,'\n')
return loglik
[docs]
def dlogLikHom(self,X0: ArrayLike, Z0: ArrayLike, Z: ArrayLike, mult: NDArrayInt, theta: ArrayLike, g: float, beta0: float | None = None, covtype: str = "Gaussian",
eps: float = MACHINE_DOUBLE_EPS, components: list = ("theta", "g")) -> ArrayLike:
r'''
Log likelihood gradient under homoskedastic noise
Parameters
----------
X0: ndarray_like
unique design matrix (of size nxd)
Z0: ndarray_like
averaged responses (size nx1)
Z: ndarray_like
observation (output) vector (size N)
mult: ndarray_like
number of replicates as each design location. Note that mult.sum()==N
theta: ndarray_like
lengthscale
g: float
noise variance
beta0: float
trend
covtype: str
covariance kernel to use
eps: float
amount of jitter on diagonal of covariance matrix
components: list
directions for partial derivatives, one or both of 'theta' (lengthscales) or 'g' (for noise)
Returns
-------
out: ndarray_like
gradient with respect to hyperparameters
'''
k = len(Z)
n = X0.shape[0]
C = self.C # assumes these have been instantiated by a call to `logLikHom` first
Ki = self.Ki
beta0 = self.beta0
Z0 = Z0 - beta0
Z = Z - beta0
KiZ0 = Ki @ Z0 ## to avoid recomputing
psi = Z0.T @ KiZ0
tmp1 = None
tmp2 = None
# First component, derivative with respect to theta
if "theta" in components:
tmp1 = np.repeat(np.nan, len(theta))
if len(theta)==1:
dC_dthetak = partial_cov_gen(X1 = X0, theta = theta, type = covtype, arg = "theta_k") * C
tmp1 = k/2 * (KiZ0.T @ dC_dthetak) @ KiZ0 /(((Z.T @ Z) - (Z0 * mult).T @ Z0)/g + psi) - 1/2 * np.trace(Ki @ dC_dthetak) # replaces trace_sym
tmp1 = np.array(tmp1).squeeze()
else:
for i in range(len(theta)):
# use i:i+1 to preserve vector structure -- see "An integer, i, returns the same values as i:i+1 except the dimensionality of the returned self is reduced by 1"
## at: https://numpy.org/doc/stable/user/basics.indexing.html
# tmp1[i] <- k/2 * crossprod(KiZ0, dC_dthetak) %*% KiZ0 /((crossprod(Z) - crossprod(Z0 * mult, Z0))/g + psi) - 1/2 * trace_sym(Ki, dC_dthetak)
dC_dthetak = partial_cov_gen(X1 = X0[:,i:i+1], theta = theta[i], type = covtype, arg = "theta_k") * C
tmp1[i] = (k/2 * (KiZ0.T @ dC_dthetak) @ KiZ0 /(((Z.T @ Z) - (Z0 * mult).T @ Z0)/g + psi) - 1/2 * np.trace(Ki @ dC_dthetak)).squeeze() # replaces trace_sym
# Second component derivative with respect to g
if "g" in components:
tmp2 = k/2 * ((Z.T @ Z - (Z0 * mult).T @ Z0)/g**2 + np.sum(KiZ0**2/mult)) / ((Z.T @ Z - (Z0 * mult).T @ Z0)/g + psi) - (k - n)/ (2*g) - 1/2 * np.sum(np.diag(Ki)/mult)
tmp2 = np.array(tmp2).squeeze()
out = np.hstack((tmp1, tmp2)).squeeze()
out = out[~(out==None)].astype(float).reshape(-1,1)
#print('dll', out, '\n')
return out
[docs]
def mleHomGP(self,X: ArrayLike, Z: ArrayLike, lower: ArrayLike | None = None, upper: ArrayLike | None = None, known: dict = dict(),
noiseControl: dict = dict(g_bounds = (MACHINE_DOUBLE_EPS, 1e2)),
init: dict = {},
covtype: str = "Gaussian",
maxit: int = 100, eps: float = MACHINE_DOUBLE_EPS, settings: dict = dict(returnKi = True, factr = 1e7)) -> None:
r'''
Gaussian process modeling with homoskedastic noise.
You may also call this function as `model.mle`
Gaussian process regression under homoskedastic noise based on maximum likelihood estimation of the hyperparameters. This function is enhanced to deal with replicated observations.
Parameters
----------
X : ndarray_like
matrix of all designs, one per row, or list with elements:
- ``X0`` matrix of unique design locations, one point per row
- ``Z0`` vector of averaged observations, of length ``len(X0)``
- ``mult`` number of replicates at designs in ``X0``, of length ``len(X0)``
Z : ndarray_like
Z vector of all observations. If using a list with ``X``, ``Z`` has to be ordered with respect to ``X0``, and of length ``sum(mult)``
lower,upper : ndarray_like
optional bounds for the ``theta`` parameter (see :func: covariance_functions.cov_gen for the exact parameterization).
In the multivariate case, it is possible to give vectors for bounds (resp. scalars) for anisotropy (resp. isotropy)
noiseControl : dict
dict with element:
- ``g_bounds`` vector providing minimal and maximal noise to signal ratio (default to ``(sqrt(MACHINE_DOUBLE_EPS), 100)``).
settings : dict
dict for options about the general modeling procedure, with elements:
- ``return_Ki`` boolean to include the inverse covariance matrix in the object for further use (e.g., prediction).
- ``factr`` (default to 1e7) and ``pgtol`` are available to be passed to `options` for L-BFGS-B in :func: ``scipy.optimize.minimize``.
eps : float
jitter used in the inversion of the covariance matrix for numerical stability
known : dict
optional dict of known parameters (e.g. ``beta0``, ``theta``, ``g``)
init : dict
optional lists of starting values for mle optimization:
- ``theta_init`` initial value of the theta parameters to be optimized over (default to 10% of the range determined with ``lower`` and ``upper``)
- ``g_init`` vector of nugget parameter to be optimized over
covtype : str
covariance kernel type, either ``'Gaussian'``, ``'Matern5_2'`` or ``'Matern3_2'``, see :func: ``~covariance_functions.cov_gen``
maxit : int
maximum number of iterations for `L-BFGS-B` of :func: ``scipy.optimize.minimize`` dedicated to maximum likelihood optimization
Notes
-------
The global covariance matrix of the model is parameterized as ``nu_hat * (C + g * diag(1/mult)) = nu_hat * K``,
with ``C`` the correlation matrix between unique designs, depending on the family of kernel used (see :func: `~hetgpy.covariance_functions.cov_gen` for available choices) and values of lengthscale parameters.
``nu_hat`` is the plugin estimator of the variance of the process.
It is generally recommended to use :func: ``~find_reps.find_reps`` to pre-process the data, to rescale the inputs to the unit cube and to normalize the outputs.
Returns
-------
self, with the following attributes:
- ``theta``: unless given, maximum likelihood estimate (mle) of the lengthscale parameter(s),
- ``nu_hat``: plugin estimator of the variance,
- ``g``: unless given, mle of the nugget of the noise/log-noise process,
- ``trendtype``: either ``"SK"`` if ``beta0`` is provided, else ``"OK"``,
- ``beta0``: constant trend of the mean process, plugin-estimator unless given,
- ``ll``: log-likelihood value, (``ll_non_pen``) is the value without the penalty,
- ``nit_opt``, ``msg``: counts and message returned by :func:``scipy.optimize.minimize``
- ``used_args``: list with arguments provided in the call to the function,
- ``Ki``, ``Kgi``: inverse of the covariance matrices of the mean and noise processes (not scaled by ``nu_hat`` and ``nu_hat_var``),
- ``X0``, ``Z0``, ``Z``, ``eps``, ``logN``, ``covtype``: values given in input,
- ``time``: time to train the model, in seconds.
See also `~hetgpy.homGP.homGP.predict` for predictions, `~hetgpy.homGP.update` for updating an existing model.
``summary`` and ``plot`` functions are available as well.
`~homTP.mleHomTP` provide a Student-t equivalent.
Examples
--------
>>> from hetgpy import homGP
>>> from hetgpy.example_data import mcycle
>>> m = mcycle()
>>> model = homGP()
>>> model.mle(m['times'],m['accel'],lower=[1.0],upper=[10.0],covtype="Matern5_2")
References
----------
M. Binois, Robert B. Gramacy, M. Ludkovski (2018), Practical heteroskedastic Gaussian process modeling for large simulation experiments,
Journal of Computational and Graphical Statistics, 27(4), 808--821.
Preprint available on arXiv:1611.05902.
'''
known = known.copy()
init = init.copy()
if type(X) == dict:
X0 = X['X0']
Z0 = X['Z0']
mult = X['mult']
if sum(mult) != len(Z): raise ValueError(f"Length(Z) should be equal to sum(mult): they are {len(Z)} \n and {sum(mult)}")
if len(X0.shape) == 1: warnings.warn(f"Coercing X0 to shape {len(X0)} x 1"); X0 = X0.reshape(-1,1)
if len(Z0) != X0.shape[0]: raise ValueError("Dimension mismatch between Z0 and X0")
else:
if len(X.shape) == 1: warnings.warn(f"Coercing X to shape {len(X)} x 1"); X = X.reshape(-1,1)
if X.shape[0] != len(Z): raise ValueError("Dimension mismatch between Z and X")
elem = find_reps(X, Z, return_Zlist = False)
X0 = elem['X0']
Z0 = elem['Z0']
Z = elem['Z']
mult = elem['mult']
covtypes = ("Gaussian", "Matern5_2", "Matern3_2")
covtype = [c for c in covtypes if c==covtype][0]
if lower is None or upper is None:
auto_thetas = auto_bounds(X = X0, covtype = covtype)
if lower is None: lower = auto_thetas['lower']
if upper is None: upper = auto_thetas['upper']
if known.get("theta") is None and init.get('theta') is None: init['theta'] = np.sqrt(upper * lower)
lower = np.array(lower).reshape(-1)
upper = np.array(upper).reshape(-1)
if len(lower) != len(upper): raise ValueError("upper and lower should have the same size")
tic = time()
if settings.get('return_Ki') is None: settings['return_Ki'] = True
if noiseControl.get('g_bounds') is None: noiseControl['g_bounds'] = (MACHINE_DOUBLE_EPS, 1e2)
g_min = noiseControl['g_bounds'][0]
g_max = noiseControl['g_bounds'][1]
beta0 = known.get('beta0')
N = len(Z)
n = X0.shape[0]
if len(X0.shape) == 1: raise ValueError("X0 should be a matrix. \n")
if known.get("theta") is None and init.get("theta") is None: init['theta'] = 0.9 * lower + 0.1 * upper # useful for mleHetGP
if known.get('g') is None and init.get('g') is None:
if any(mult > 2):
#t1 = mult.T
#t2 = (Z.squeeze() - np.repeat(Z0,mult))**2
init['g'] = np.mean(
(
(fast_tUY2(mult.T,(Z.squeeze() - np.repeat(Z0,mult))**2)/mult)[np.where(mult > 2)]
))/np.var(Z0,ddof=1)
else:
init['g'] = 0.1
trendtype = 'OK'
if beta0 is not None:
trendtype = 'SK'
## General definition of fn and gr
self.max_loglik = float('-inf')
self.arg_max = np.array([np.nan]).reshape(-1)
def fn(par, X0, Z0, Z, mult, beta0, theta, g):
idx = 0 # to store the first non used element of par
if theta is None:
theta = par[0:len(init['theta'])]
idx = idx + len(init['theta'])
if g is None:
g = par[idx]
loglik = self.logLikHom(X0 = X0, Z0 = Z0, Z = Z, mult = mult, theta = theta, g = g, beta0 = beta0, covtype = covtype, eps = eps)
if np.isnan(loglik) == False:
if loglik > self.max_loglik:
self.max_loglik = loglik
self.arg_max = par
return -1.0 * loglik # for maximization
def gr(par,X0, Z0, Z, mult, beta0, theta, g):
idx = 0
components = []
if theta is None:
theta = par[0:len(init['theta'])]
idx = idx + len(init['theta'])
components.append('theta')
if g is None:
g = par[idx]
components.append('g')
dll = self.dlogLikHom(X0 = X0, Z0 = Z0, Z = Z, mult = mult, theta = theta, g = g, beta0 = beta0, covtype = covtype, eps = eps,
components = components)
return -1.0 * dll # for maximization
## Both known
if known.get('g') is not None and known.get("theta") is not None:
theta_out = known["theta"]
g_out = known['g']
out = dict(value = self.logLikHom(X0 = X0, Z0 = Z0, Z = Z, mult = mult, theta = theta_out, g = g_out, beta0 = beta0, covtype = covtype, eps = eps),
message = "All hyperparameters given", counts = 0, time = time() - tic)
else:
parinit = lowerOpt = upperOpt = []
if known.get("theta") is None:
parinit = init['theta']
lowerOpt = np.array(lower)
upperOpt = np.array(upper)
if known.get('g') is None:
parinit = np.hstack((parinit,init.get('g')))
lowerOpt = np.append(lowerOpt,g_min)
upperOpt = np.append(upperOpt,g_max)
bounds = [(l,u) for l,u in zip(lowerOpt,upperOpt)]
out = optimize.minimize(
fun=fn, # for maximization
args = (X0, Z0, Z, mult, beta0, known.get('theta'), known.get('g')),
x0 = parinit,
jac=gr,
method="L-BFGS-B",
bounds = bounds,
#tol=1e-8,
options=dict(maxiter=maxit, #,
ftol = settings.get('factr',10) * np.finfo(float).eps,#,
gtol = settings.get('pgtol',0) # should map to pgtol
)
)
python_kws_2_R_kws = {
'x':'par',
'fun': 'value',
'nit': 'counts'
}
out['counts'] = dict(nfev=out['nfev'],njev=out['njev'])
for key, val in python_kws_2_R_kws.items():
out[val] = out[key]
if out.success == False:
out = dict(par = self.arg_max, value = -1.0 * self.max_loglik, counts = np.nan,
message = "Optimization stopped due to NAs, use best value so far")
g_out = out['par'][-1] if known.get('g') is None else known.get('g')
theta_out = out['par'][0:len(init['theta'])] if known.get('theta') is None else known['theta']
ki = np.linalg.cholesky(
cov_gen(X1 = X0, theta = theta_out, type = covtype) + np.diag(eps + g_out / mult)
).T
ki = dtrtri(ki)[0]
Ki = ki @ ki.T
self.Ki = Ki
if beta0 is None:
beta0 = Ki.sum(axis=1) @ Z0 / Ki.sum()
psi_0 = ((Z0 - beta0).T @ Ki) @ (Z0 - beta0)
nu = (1.0 / N) * ((((Z-beta0).T @ (Z-beta0) - ((Z0-beta0)*mult).T @ (Z0-beta0)) / g_out) + psi_0)
self.theta = theta_out
self.g = g_out
self.nu_hat = nu
self.ll = -1.0 * out['value']
self.nit_opt = out['counts']
self.beta0 = beta0
self.trendtype = trendtype
self.covtype = covtype
self.msg = out['message']
self.eps = eps
self.X0 = X0
self.Z0 = Z0
self.Z = Z
self.mult = mult
self.used_args = dict(lower = lower, upper = upper, known = known, noiseControl = noiseControl)
self.time = time() - tic
if settings["return_Ki"]: self.Ki = Ki
return self
[docs]
def predict(self, x: ArrayLike, xprime: ArrayLike | None = None,interval: str | None = None, interval_lower: float | None = None, interval_upper: float | None = None,**kw):
r'''
Prediction under homoskedastic noise
Parameters
----------
x : ndarray_like
matrix of designs locations to predict at (one point per row)
xprime : ndarray_like
optional second matrix of predictive locations to obtain the predictive covariance matrix between ``x`` and ``xprime``
interval: str
one of 'confidence' or 'predictive' which is a convenience method to return confidence/predictive intervals corresponding to `interval_lower` and `interval_upper`
interval_lower: float
lower of confidence/predictive interval
interval_upper: float
upper of confidence/predictive interval
nugs_only : bool (default False)
if ``True``, only return noise variance prediction
kwargs : dict
optional additional elements (only used for nugs_only)
Returns
-------
dict with elements:
- ``mean``: kriging mean;
- ``sd2``: kriging variance (filtered, e.g. without the nugget values)
- ``nugs``: noise variance prediction
- ``cov``: (returned if ``xprime`` is given) predictive covariance matrix between ``x`` and ``xprime``
- ``confidence_interval``: prediction with kriging variance only
- ``predictive_interval``: prediction with kriging and noise variance
'''
if len(x.shape) == 1:
x = x.reshape(-1,1)
if x.shape[1] != self['X0'].shape[1]: raise ValueError("x is not a matrix")
if xprime is not None and len(xprime.shape)==1:
xprime = xprime.reshape(-1,1)
if xprime.shape[1] != self['X0'].shape[1]: raise ValueError("xprime is not a matrix")
interval_types = [None,'confidence','predictive']
return_interval = False
if interval is not None:
list_interval = [interval] if type(interval)==str else interval
if 'confidence' not in list_interval and 'predictive' not in list_interval:
raise ValueError(f"interval must be one of 'confidence' or 'predictive' not {interval}")
return_interval = True
if "nugs_only" in kw and kw["nugs_only"]:
return dict(nugs = np.repeat(self['nu_hat'] * self['g'], x.shape[0]))
if self.get('Ki') is None:
# these should be replaced with calls to self instead of self
ki = np.linalg.cholesky(
cov_gen(X1 = self['X0'], theta = self['theta'], type = self['covtype']) + np.diag(self['eps'] + self['g'] / self['mult'])
).T
ki = dtrtri(ki)[0]
self['Ki'] = ki @ ki.T
self['Ki'] /= self['nu_hat'] # this is a subtle difference between R and Python.
kx = self['nu_hat'] * cov_gen(X1 = x, X2 = self['X0'], theta = self['theta'], type = self['covtype'])
nugs = np.repeat(self['nu_hat'] * self['g'], x.shape[0])
mean = self['beta0'] + kx @ (self['Ki'] @ (self['Z0'] - self['beta0']))
if self['trendtype'] == 'SK':
sd2 = self['nu_hat'] - np.diag(kx @ (self['Ki'] @ kx.T))
else:
sd2 = self['nu_hat'] - np.diag(kx @ ((self['Ki'] @ kx.T))) + (1- (self['Ki'].sum(axis=0))@ kx.T)**2/self['Ki'].sum()
if (sd2<0).any():
sd2[sd2<0] = 0
warnings.warn("Numerical errors caused some negative predictive variances to be thresholded to zero. Consider using ginv via homGP.rebuild(robust=True)")
if xprime is not None:
kxprime = self['nu_hat'] * cov_gen(X1 = self['X0'], X2 = xprime, theta = self['theta'], type = self['covtype'])
if self['trendtype'] == 'SK':
if x.shape[0] < xprime.shape[0]:
cov = self['nu_hat'] * cov_gen(X1 = x, X2 = xprime, theta = self['theta'], type = self['covtype']) - kx @ self['Ki'] @ kxprime
else:
cov = self['nu_hat'] * cov_gen(X1 = x, X2 = xprime, theta = self['theta'], type = self['covtype']) - kx @ (self['Ki'] @ kxprime)
else:
if x.shape[0] < xprime.shape[0]:
cov = self['nu_hat'] * cov_gen(X1 = x, X2 = xprime, theta = self['theta'], type = self['covtype']) - kx @ self['Ki'] @ kxprime + ((1-(self['Ki'].sum(axis=0,keepdims=True))@ kx.T).T @ (1-self['Ki'].sum(axis=0,keepdims=True) @ kxprime))/self['Ki'].sum()
else:
cov = self['nu_hat'] * cov_gen(X1 = x, X2 = xprime, theta = self['theta'], type = self['covtype']) - kx @ (self['Ki'] @ kxprime) + ((1-(self['Ki'].sum(axis=0,keepdims=True))@ kx.T).T @ (1-self['Ki'].sum(axis=0,keepdims=True) @ kxprime))/self['Ki'].sum()
else:
cov = None
# re-modify self so Ki is preserved (because R does not modify lists in place)
self['Ki']*=self['nu_hat']
preds = dict(mean = mean, sd2 = sd2, nugs = nugs, cov = cov)
if return_interval:
preds['confidence_interval'] = {}
if 'confidence' in list_interval:
preds['confidence_interval']['lower'] = norm.ppf(interval_lower, loc = preds['mean'], scale = np.sqrt(preds['sd2'])).squeeze()
preds['confidence_interval']['upper'] = norm.ppf(interval_upper, loc = preds['mean'], scale = np.sqrt(preds['sd2'])).squeeze()
preds['predictive_interval'] = {}
if 'predictive' in list_interval:
preds['predictive_interval']['lower'] = norm.ppf(interval_lower, loc = preds['mean'], scale = np.sqrt(preds['sd2'] + preds['nugs'])).squeeze()
preds['predictive_interval']['upper'] = norm.ppf(interval_upper, loc = preds['mean'], scale = np.sqrt(preds['sd2'] + preds['nugs'])).squeeze()
return preds
[docs]
def rebuild(self, robust = False):
r'''
Rebuilds inverse covariance matrix in homGP object (usually after saving). Works in tandem with `strip`
Parameters
----------
robust: bool
use `np.linalg.pinv` for covariance matrix inversion. Otherwise use cholesky
Returns
-------
self with rebuilt inverse covariance matrix
'''
if robust :
self['Ki'] <- np.linalg.pinv(
cov_gen(X1 = self['X0'], theta = self['theta'], type = self['covtype']) + np.diag(self['eps'] + self['g'] / self['mult'])
).T
self['Ki'] /= self['nu_hat']
else:
ki = np.linalg.cholesky(
cov_gen(X1 = self['X0'], theta = self['theta'], type = self['covtype']) + np.diag(self['eps'] + self['g'] / self['mult'])
).T
ki = dtrtri(ki)[0]
self['Ki'] = ki @ ki.T
return self
[docs]
def strip(self):
r'''
Removes larger model objects
Can be rebuilt with `rebuild`
'''
keys = ('Ki','Kgi','modHom','modNugs')
for key in keys:
if key in self.__dict__.keys():
self.__dict__.pop(key,None)
return self
[docs]
def update(self,Xnew: ArrayLike, Znew: ArrayLike, ginit: float = 1e-2,
lower: ArrayLike | None = None, upper: ArrayLike | None = None,
noiseControl: dict | None = None, settings: dict | None = None,
known: dict = {}, maxit: int = 100):
r'''
Update model object with new observations
Parameters
----------
Xnew: ndarray_like
new inputs (one observation per row)
Znew: ndarray_like
new observations
lower: ndarray_like
lower bound for lengthscales. If not provided extracted from `self`
upper: ndarray_like
upper bound for lengthscales. If not provided extracted from `self`
noiseControl: dict
noise bounds. If not provided extracted from `self`
settings: dict
options for optimization. If not provided extracted from `self`
known: dict
known hyperparameters to fix at values
maxit: int
maximum number of iterations for hyperparameter optimization
Returns
-------
self, potentially with updated hyperparameters
Notes
-----
If hyperparameters do not need to be updated, maxit can be set to 0. In this case it is possible to pass NAs in Znew, then the model can still be used to provide updated variance predictions
Example
-------
>>> import numpy as np, from hetgpy import homGP
>>> X = np.array([1, 2, 3, 4, 12]).reshape(-1,1)
>>> Y = np.array([0, -1.75, -2, -0.5, 5])
>>> model = homGP().mle(X,Y,known={'theta':np.array([10.0]),'g':1e-8})
>>> Xnew = np.array([2.5]).reshape(-1,1)
>>> Znew = model.predict(Xnew)['mean']
>>> model.update(Xnew,Znew)
'''
# first reduce Xnew/Znew in case of potential replicates
newdata = find_reps(Xnew, Znew, normalize = False, rescale = False)
if duplicated(np.vstack([self.X0, newdata['X0']])).any():
id_exists = []
for i in range(newdata['X0'].shape[0]):
tmp = duplicated(np.vstack([newdata['X0'][i,:], self.X0]))
if tmp.any():
id_exists.append(i)
id_X0 = tmp.nonzero()[0] - 1
self.Z0[id_X0] = (self.mult[id_X0] * self.Z0[id_X0] + newdata['Z0'][i] * newdata['mult'][i])/(self.mult[id_X0] + newdata['mult'][i])
idZ = np.cumsum(self.mult)+1
self.Z = np.insert(self.Z, values = newdata['Zlist'][i], obj = min(idZ[id_X0],len(idZ)))
## Inverse matrices are updated if MLE is not performed
if maxit == 0:
self.Ki = update_Ki_rep(id_X0, self, nrep = newdata['mult'][i])
self.nit_opt = 0
self.msg = "Not optimized \n"
self.mult[id_X0] = self.mult[id_X0] + newdata['mult'][i]
# remove duplicates now
idxs = np.delete(np.arange(newdata['X0'].shape[0]),id_exists)
newdata['X0'] = newdata['X0'][idxs,:]
newdata['Z0'] = newdata['Z0'][idxs]
newdata['mult'] = newdata['mult'][idxs]
if type(newdata['Zlist'])==dict:
newdata['Zlist'] = {k:v for k,v in newdata['Zlist'].items() if k in idxs}
# decrement key indices
Zlist = {}
for i, val in enumerate(newdata['Zlist'].values()):
Zlist[i] = val
newdata['Zlist'] = Zlist.copy()
foo=1
else:
newdata['Zlist'] = newdata['Zlist'][idxs]
if newdata['X0'].shape[0] > 0 and maxit==0:
for i in np.arange(newdata['X0'].shape[0]):
self.Ki = update_Ki(newdata['X0'][i:i+1,:], self, nrep = newdata['mult'][i], new_lambda = None)
self.X0 = np.vstack([self.X0, newdata['X0']])
self.Z0 = np.hstack([self.Z0, newdata['Z0']])
self.mult = np.hstack([self.mult, newdata['mult']])
if type(newdata['Zlist'])==dict:
self.Z = np.hstack([self.Z, np.hstack(list(newdata['Zlist'].values()))])
else:
self.Z = np.hstack([self.Z, newdata['Zlist']])
self.nit_opt = 0
self.msg = "Not optimized \n"
if maxit > 0:
if upper is None: upper = self.used_args['upper']
if lower is None: lower = self.used_args['lower']
if noiseControl is None:
noiseControl = self.used_args['noiseControl']
init = {}
if settings is None: settings = self.used_args.get('settings')
if known == {}: known = self.used_args['known'].copy()
if known.get('theta') is None: init['theta'] = self.theta
if known.get('g') is None: init['g'] = self.g
self.mleHomGP(X = dict(X0 = np.vstack([self.X0, newdata['X0']]),
Z0 = np.hstack([self.Z0, newdata['Z0']]),
mult = np.hstack([self.mult, newdata['mult']])),
Z = np.hstack([self.Z,
np.array(list(chain(*newdata['Zlist'].values())))]
),
lower = lower,
upper = upper,
noiseControl = noiseControl,
covtype = self.covtype,
init = init,
known = known,
eps = self.eps,
maxit = maxit
)
return self
[docs]
def copy(self):
'''
Make a copy of the model, which is useful in tandem with the update function
Parameters
----------
None
Returns
-------
newmodel: (deep) copy of model
'''
newmodel = deepcopy(self)
return newmodel
[docs]
def plot(self,type='diagnostics'):
r'''
Plot model behavior. See hetgpy.plot for more details on types of plots
Parameters
----------
type: str
type of plot: currently supported are "diagnostics" and "iterates"
Returns
-------
fig, ax: figure and axis object
'''
ptypes = ('diagnostics','iterates')
if type not in ptypes:
raise ValueError(f"{type} not found, select one of {ptypes}")
if type=='diagnostics':
return plot_diagnostics(model=self)
if type=='iterates':
return plot_optimization_iterates(model=self)
[docs]
def summary(self):
r'''
Print a summary of the model object and the MLE routine
'''
print("N = ", len(self.Z), " n = ", len(self.Z0), " d = ", self.X0.shape[1], "\n")
print("Homoskedastic nugget value: ", self.g, "\n")
print(self.covtype, " covariance lengthscale values: ", self.theta, "\n")
print("Variance/scale hyperparameter: ", self.nu_hat, "\n")
if self.trendtype == "SK":
print("Given constant trend value: ", self.beta0, "\n")
else:
print("Estimated constant trend value: ", self.beta0, "\n")
print("MLE optimization: \n", "Log-likelihood = ", self.ll, "; Nb of evaluations (obj, gradient) by L-BFGS-B: ", self.nit_opt, "; message: ", self.msg, "\n")
class homTP():
pass