Source code for hetgpy.crnGP

'''
command random number GP
'''

from __future__ import annotations
import numpy as np
from itertools import chain
import warnings
from time import time
from scipy.linalg.lapack import dtrtri
from scipy import optimize
from scipy.stats import norm
from hetgpy.covariance_functions import cov_gen, partial_cov_gen, euclidean_dist
from hetgpy.utils import fast_tUY2, rho_AN
from hetgpy.find_reps import find_reps
from hetgpy.auto_bounds import auto_bounds
from hetgpy.find_reps import find_reps
from hetgpy.utils import duplicated
from hetgpy.update_covar import update_Ki, update_Ki_rep
from hetgpy.plot import plot_diagnostics, plot_optimization_iterates
from copy import deepcopy
import contextlib
from itertools import combinations, product
import math
from numpy.typing import ArrayLike, NDArray
NDArrayInt = NDArray[np.int_]
MACHINE_DOUBLE_EPS = np.sqrt(np.finfo(float).eps)

[docs] class crnGP(): def __init__(self): self.mle = self.mlecrnGP self.ids = None return def __getitem__(self, key): return self.__dict__[key] def __setitem__(self,item,value): self.__dict__[item] = value
[docs] def get(self,key): r''' General `get` item (retrives key from self.__dict__) ''' return self.__dict__.get(key)
def pairwise_rho(self,S0r, S0c = None,rho = None): rho = np.atleast_1d(rho) if S0c is None: S0c = S0r.copy() nr, nc = S0r.shape[0], S0c.shape[0] rho_indices = np.array(list(combinations(np.unique(S0r),2))) # iterate over rho indices and make pointer to rho and its reverse rho_mapper = {} for i, idx in enumerate(rho_indices): s = tuple(sorted(idx)) s_rev = tuple(reversed(s)) rho_mapper[s] = rho[i] rho_mapper[s_rev] = rho[i] seed_pairs = np.array(list(product(S0r,S0c))) seed_pairs = np.concatenate([seed_pairs,np.arange(len(seed_pairs)).reshape(-1,1)],axis=1) out = [] default_rho = rho.mean() for row1 in seed_pairs: s1, s2, idx = tuple(row1.astype(int)) if s1 != s2: r = rho_mapper.get((s1,s2),default_rho) out.append([s1,s2,idx,r]) out = np.array(out) Cs = np.zeros(shape = (nr,nc),dtype=float).flatten() Cs[out[:,-2].astype(int)] = out[:,-1] Cs = Cs.reshape(nr,nc) mask = S0r[:,None]==S0c Cs[mask] = 1.0 return Cs
[docs] def loglik(self,X0, S0, Z, theta, g, rho, stype, beta0 = None, covtype = "Gaussian", eps = MACHINE_DOUBLE_EPS): r''' Log Likelihood under correlated noise Parameters ---------- X0: ndarray_like unique design matrix (of size nxd) S0: ndarray_like vector of seed information (size nx1) Z: ndarray_like observation (output) vector (size N) theta: ndarray_like lengthscale g: float noise variance rho: float seed correlation strength stype: str type of kronecker structure of the data beta0: float trend covtype: str covariance kernel to use eps: float amount of jitter on diagonal of covariance matrix Returns ------- loglik: float log-likelihood ''' n = X0.shape[0] d = X0.shape[1] Cx = cov_gen(X1 = X0, theta = theta, type = covtype) if self.ids is None: # mimics R's outer(S0,S0,'==') self.ids = S0 == S0[:,None] if isinstance(rho,float) or rho.size==1: Cs = np.full(shape=(n,n),fill_value=rho,dtype=float) else: Cs = self.pairwise_rho(S0c=S0,S0r=S0,rho=rho) Cs[self.ids] = 1.0 C = Cx * Cs self.C = C self.Cx = Cx self.Cs = Cs jitter = (eps+g)*np.eye(n) Ki = np.linalg.cholesky(C + jitter).T ldetKi = - 2.0 * np.sum(np.log(np.diag(Ki))) # to mirror R's chol2inv: do the following: # expose dtrtri from lapack (for fast cholesky inversion of a triangular matrix) # use result to compute Ki (should match chol2inv) Ki = dtrtri(Ki)[0] # -- equivalent of chol2inv -- see https://stackoverflow.com/questions/6042308/numpy-inverting-an-upper-triangular-matrix Ki = Ki @ Ki.T # -- equivalent of chol2inv self.Ki = Ki if beta0 is None: beta0 = Ki.sum(axis=1) @ Z / Ki.sum() self.beta0 = beta0 psi = (((Z - beta0).T @ Ki) @ (Z - beta0)) / n loglik = -n/2 * np.log(2*np.pi) - n/2 * np.log(psi) + 1/2 * ldetKi - n/2 return loglik
[docs] def dloglik(self,X0, S0, Z, theta, g, rho, stype, beta0 = None, covtype = "Gaussian", eps = MACHINE_DOUBLE_EPS, components = ("theta", "g", "rho")): r''' Gradient of log-likelihood under correlated noise Parameters ---------- X0: ndarray_like matrix of designs (one point per row) S0: ndarray_like integer vector containing seed information Z: ndarray_like vector of outputs theta: ndarray_like scalar (isotropic) or vector (anisotropic) lengthscale values g: float noise variance rho: float seed correlation stype: str type of kronecker structure of the data beta0: float trend covtype: str covariance kernel to use eps: float amount of jitter on diagonal of covariance matrix components: tuple directions for partial derivatives, one or several of 'theta' (lengthscales) or 'g' (for noise) or 'rho' (seed correlation strength) Returns ------- gradient with respect to hyperparameters (specified via ``components``) ''' n, d = X0.shape C = self.C Cx = self.Cx Cs = self.Cs Ki = self.Ki if beta0 is None: beta0 = Ki.sum(axis=1) @ Z / Ki.sum() Z = (Z - beta0).copy() KiZ = Ki @ Z psi = (Z @ KiZ).squeeze() / n tmp1 = np.array([]) tmp2 = np.array([]) tmp3 = np.array([]) if 'theta' in components: tmp1 = np.full(shape=len(theta),fill_value=np.nan) for i in range(len(theta)): dC_dthetak = partial_cov_gen(X1=X0,theta=theta[i],type=covtype,arg="theta_k") * C tmp1[i] = 0.5 * (KiZ.T @ dC_dthetak) @ KiZ / psi - 0.5 * np.trace(Ki @ dC_dthetak) if 'g' in components: tmp2 = 0.5 * np.sum(KiZ**2) / psi - 0.5 * np.sum(np.diag(Ki)) tmp2 = np.atleast_1d(tmp2) if 'rho' in components: dC_drho = np.ones(shape=(n,n)) ids = self.ids dC_drho[ids] = 0 dC_drho *= Cx tmp3 = 0.5 * (KiZ.T @ dC_drho) @ KiZ / psi - 0.5 * np.trace(Ki @ dC_drho) tmp3 = np.atleast_1d(tmp3) return np.concatenate([tmp1,tmp2,tmp3]).squeeze()
def loglikT(self,X0, S0, T0, Z, theta, g, rho, beta0 = None, covtype = "Gaussian", eps = MACHINE_DOUBLE_EPS): n = X0.shape[0] N = Z.size d = X0.shape[1] Cx = cov_gen(X1 = X0, theta = theta[0:d], type = covtype) if self.ids is None: # mimics R's outer(S0,S0,'==') self.ids = S0 == S0[:,None] Cs = np.full(shape=(n,n),fill_value=rho,dtype=float) Cs[self.ids] = 1.0 Ct = cov_gen(X1 = T0, theta = np.atleast_1d(theta[d]), type = covtype) self.Cx = Cx self.Cs = Cs self.Ct = Ct SCxs = np.linalg.svd(Cx * Cs) SCt = np.linalg.svd(Ct) term1 = np.kron(SCt.U,SCxs.U) * np.repeat(1.0 / (np.kron(SCt.S,SCxs.S) + g),N).reshape(N,N).T term2 = np.kron(SCt.U,SCxs.U) Ki = term1 @ term2.T ldetKi = -1.0 * np.sum(np.log(np.kron(SCt.S,SCxs.S) + g)) self.Ki = Ki Z = Z.flatten(order='F') if beta0 is None: beta0 = Ki.sum(axis=1) @ Z / Ki.sum() psi = (((Z - beta0).T @ Ki) @ (Z - beta0)) / N loglik = -N/2 * np.log(2*np.pi) - N/2 * np.log(psi) + 1/2 * ldetKi - N/2 return loglik def dloglikT(self,X0, T0, S0, Z, theta, g, rho, beta0 = None, covtype = "Gaussian", eps = MACHINE_DOUBLE_EPS, components = ("theta", "g", "rho")): n = X0.shape[0] N = Z.size d = X0.shape[1] Cx = self.Cx Cs = self.Cs Ct = self.Ct Ki = self.Ki Z = Z.flatten(order='F').copy() if beta0 is None: beta0 = Ki.sum(axis=1) @ Z / Ki.sum() Z = (Z - beta0).copy() KiZ = Ki.T @ Z psi = (Z.T @ KiZ).squeeze() tmp1, tmp2, tmp3 = np.array([]), np.array([]), np.array([]) if 'theta' in components: tmp1 = np.full(shape=len(theta)-1,fill_value=np.nan) for i in range(len(theta)-1): dC_dthetak = partial_cov_gen(X1=X0,theta=theta[i],type=covtype,arg="theta_k") * Cx * Cs dC_dthetak = np.kron(Ct,dC_dthetak) tmp1[i] = 0.5 * (KiZ.T @ dC_dthetak) @ KiZ / psi - 0.5 * np.trace(Ki @ dC_dthetak) # now for time dC_dthetak = partial_cov_gen(X1=T0,theta=theta[d],type=covtype,arg="theta_k") * Ct dC_dthetak = np.kron(dC_dthetak,Cx * Cs) tmp1 = np.concatenate([tmp1, np.atleast_1d(N/2 * (KiZ.T @ dC_dthetak) @ KiZ / psi - 0.5 * np.trace(Ki @ dC_dthetak))]) if 'g' in components: tmp2 = (N/2) * np.sum(KiZ**2) / psi - 0.5 * np.sum(np.diag(Ki)) tmp2 = np.atleast_1d(tmp2) if 'rho' in components: dC_drho = np.ones(shape=(n,n)) ids = self.ids dC_drho[ids] = 0 dC_drho *= Cx dC_drho = np.kron(Ct,dC_drho) tmp3 = (N/2) * (KiZ.T @ dC_drho) @ KiZ / psi - 0.5 * np.trace(Ki @ dC_drho) tmp3 = np.atleast_1d(tmp3) return np.concatenate([tmp1,tmp2,tmp3])
[docs] def mlecrnGP(self,X, Z, T0 = None, rhotype = "simple", stype = "none", lower = None, upper = None, known = dict(), noiseControl = dict(g_bounds = np.array((10*MACHINE_DOUBLE_EPS, 1e2)), rho_bounds = None), init = dict(), covtype = "Gaussian", maxit = 100, eps = MACHINE_DOUBLE_EPS, settings = dict(return_Ki = True, factr = 1e7)): r''' Gaussian process modeling with correlated noise. You may also call this function as `crnGP.mle` Gaussian process regression under correlated noise based on maximum likelihood estimation of the hyperparameters. Parameters ---------- X : ndarray_like matrix of all designs, one per row, or list with elements. The last column is assumed to contain the integer seed value. Z : ndarray_like Z vector of all observations. T0 : ndarray_like: T0 optional vector of times (same for all ``X's``). Not currently supported. lower,upper : ndarray_like optional bounds for the ``theta`` parameter (see :func: covariance_functions.cov_gen for the exact parameterization). In the multivariate case, it is possible to give vectors for bounds (resp. scalars) for anisotropy (resp. isotropy) noiseControl : dict dict with element: - ``g_bounds`` vector providing minimal and maximal noise to signal ratio (default to ``(sqrt(MACHINE_DOUBLE_EPS), 100)``). settings : dict dict for options about the general modeling procedure, with elements: - ``return_Ki`` boolean to include the inverse covariance matrix in the object for further use (e.g., prediction). - ``factr`` (default to 1e7) and ``pgtol`` are available to be passed to `options` for L-BFGS-B in :func: ``scipy.optimize.minimize``. eps : float jitter used in the inversion of the covariance matrix for numerical stability known : dict optional dict of known parameters (e.g. ``beta0``, ``theta``, ``g``) init : dict optional lists of starting values for mle optimization: - ``theta_init`` initial value of the theta parameters to be optimized over (default to 10% of the range determined with ``lower`` and ``upper``) - ``g_init`` vector of nugget parameter to be optimized over covtype : str covariance kernel type, either ``'Gaussian'``, ``'Matern5_2'`` or ``'Matern3_2'``, see :func: ``~covariance_functions.cov_gen`` maxit : int maximum number of iterations for `L-BFGS-B` of :func: ``scipy.optimize.minimize`` dedicated to maximum likelihood optimization Notes ------- The global covariance matrix of the model is parameterized as ``nu_hat * (Cx + g Id) * Cs = nu_hat * K``, with ``Cx`` the spatial correlation matrix between unique designs, depending on the family of kernel used (see ``~covariance_functions.cov_gen`` for available choices) and values of lengthscale parameters. Cs is the correlation matrix between seed values, equal to 1 if the seeds are equal, ``rho`` otherwise. ``nu_hat`` is the plugin estimator of the variance of the process. It is generally recommended to rescale the inputs to the unit cube and to normalize the outputs. Returns ------- self, with the following attributes: - ``theta``: unless given, maximum likelihood estimate (mle) of the lengthscale parameter(s), - ``nu_hat``: plugin estimator of the variance, - ``g``: unless given, mle of the nugget of the noise/log-noise process, - ``trendtype``: either ``"SK"`` if ``beta0`` is provided, else ``"OK"``, - ``beta0``: constant trend of the mean process, plugin-estimator unless given, - ``ll``: log-likelihood value, - ``nit_opt``, ``msg``: counts and message returned by :func:``scipy.optimize.minimize`` - ``used_args``: list with arguments provided in the call to the function, - ``Ki``, ``Kgi``: inverse of the covariance matrices of the mean and noise processes (not scaled by ``nu_hat`` and ``nu_hat_var``), - ``X0``, ``Z0``, ``Z``, ``eps``, ``logN``, ``covtype``: values given in input, - ``time``: time to train the model, in seconds. References ---------- A Fadikar, M Binois, N Collier, A Stevens, KB Toh, J Ozik. Trajectory-oriented optimization of stochastic epidemiological models. 2023 Winter Simulation Conference (WSC), San Antonio, TX, USA, 2023, pp. 1244-1255, doi: 10.1109/WSC60868.2023.10408258 Examples -------- >>> from hetgpy import crnGP >>> import numpy as np >>> rng = np.random.default_rng(1) >>> pps = 10 # points per seed >>> x = np.linspace(0,2*np.pi,pps).reshape(-1,1) >>> X = np.vstack([x,x]) >>> seeds = ([1] * pps) + ([5] * pps) >>> X = np.hstack([X,np.array(seeds).reshape(-1,1)]) >>> # amplitude and phase shift >>> Z = np.sin(X[:,0] + (np.pi/2)*(X[:,-1]==5)) + X[:,-1] >>> GP = crnGP() >>> GP.mle(X,Z,covtype="Matern5_2") ''' # copy on instantiation init = init.copy() known = known.copy() noiseControl = noiseControl.copy() if len(X.shape)==1: X = X.reshape(-1,1) if T0 is None and X.shape[0] != Z.shape[0]: raise ValueError(f"Dimension mismatch between Z and X: {Z.shape=}, {X.shape=}") if T0 is not None and X.shape[0] != Z.shape[0]: raise ValueError(f"Dimension mismatch between Z and X: {Z.shape=}, {X.shape=}") stypes = ("none", "XS") if stype not in stypes: raise ValueError(f"stype must be one of {stypes}") covtypes = ("Gaussian", "Matern5_2", "Matern3_2") if covtype not in covtypes: raise ValueError(f"covtype must be one of {covtypes}") X0 = X[:,0:-1] S0 = X[:,-1] if T0 is not None and len(T0.shape)==1: T0 = T0.reshape(-1,1) d = X0.shape[1] if np.abs(S0 - S0.astype(int)).sum() > 0: raise ValueError(f"Last col of X is assumed to contain integer-valued seed information.") S0 = S0.astype(int) if rhotype not in ['simple']: raise ValueError("rhotype must be one of ['simple']") if lower is None or upper is None: auto_thetas = auto_bounds(X = X0, covtype = covtype) if lower is None: lower = auto_thetas['lower'] if upper is None: upper = auto_thetas['upper'] if init.get('theta') is None or len(init['theta']) < len(lower): init['theta'] = np.sqrt(upper * lower) if T0 is not None: auto_thetasT = auto_bounds(X = T0, covtype = covtype) lower = np.concatenate([lower,auto_thetasT['lower']]) upper = np.concatenate([upper,auto_thetasT['upper']]) if init.get('theta') is None or len(init['theta']) < len(lower): init['theta'] = np.sqrt(upper * lower) tic = time() if settings.get('return_Ki') is None: settings['return_Ki'] = True if noiseControl.get('g_bounds') is None: noiseControl['g_bounds'] = np.array([MACHINE_DOUBLE_EPS, 1e2]) if noiseControl.get('rho_bounds') is None: if rhotype == 'simple': noiseControl['rho_bounds'] = np.array([0.001, 0.9]) g_min, g_max = noiseControl['g_bounds'] beta0 = known.get('beta0') n = X0.shape[0] if known.get('theta') is None and init.get('theta') is None: init['theta'] = 0.9*lower + 0.1 * upper if known.get('g') is None and init.get('g') is None: init['g'] = 0.1 if known.get('rho') is None and init.get('rho') is None and rhotype == "simple": init['rho'] = 0.1 trendtype = 'OK' if beta0 is not None: trendtype = 'SK' self.max_loglik = -1.0 * float('inf') self.arg_max = np.nan def fn(par, X0, S0, T0, Z, beta0, theta, g, rho): idx = 0 if theta is None: theta = par[0:len(init['theta'])] idx = idx + len(init['theta']) if g is None: g = par[idx] idx = idx + 1 if rho is None: rho = par[idx:].squeeze() if T0 is None: loglik = self.loglik(X0 = X0, S0 = S0, Z = Z, theta = theta, g = g, rho = rho, stype = stype, beta0 = beta0, covtype = covtype,eps = eps) else: loglik = self.loglikT(X0 = X0, S0 = S0, T0 = T0, Z = Z, theta = theta, g = g, rho = rho, beta0 = beta0, covtype = covtype, eps = eps) if not np.isnan(loglik): if loglik > self.max_loglik: self.max_loglik = loglik self.arg_max = par return -1.0 * loglik def gr(par, X0, S0, T0, Z, beta0, theta, g, rho): idx = 0 components = [] if theta is None: theta = par[0:len(init['theta'])] idx = idx + len(init['theta']) components.append('theta') if g is None: g = par[idx] idx = idx + 1 components.append('g') if rho is None: rho = par[idx:].squeeze() components.append('rho') if T0 is None: return -1.0 * self.dloglik(X0 = X0,S0 = S0, Z = Z, theta = theta, g = g, rho = rho, stype = stype, beta0 = beta0, covtype = covtype, eps = eps, components = components) else: return -1.0 * self.dloglikT(X0 = X0, T0 = T0, S0 = S0, Z = Z, theta = theta, g = g, rho = rho, beta0 = beta0, covtype = covtype, eps = eps, components = components) if known.get('g') is not None and known.get('theta') is not None and known.get('rho') is not None: theta_out = known['theta'] g_out = known['g'] rho_out = known['rho'] if T0 is None: out = dict( value = -1.0 * self.loglik(X0 = X0, S0 = S0, Z = Z, theta = theta_out, g = g_out, rho = rho_out, stype = stype, beta0 = beta0, covtype = covtype,eps = eps), message = 'All hyperparameters given', counts = 0, time = time() - tic ) else: out = dict( value = -1.0 * self.loglikT(X0 = X0, S0 = S0, T0 = T0, Z = Z, theta = theta_out, g = g_out, rho = rho_out, beta0 = beta0, covtype = covtype, eps = eps), message = 'All hyperparameters given', counts = 0, time = time() - tic ) else: parinit, lowerOpt, upperOpt = np.array([]), np.array([]), np.array([]) if known.get('theta') is None: parinit = init['theta'] lowerOpt = np.concatenate([lowerOpt, lower]) upperOpt = np.concatenate([upperOpt, upper]) if known.get('g') is None: parinit = np.concatenate([parinit, np.array([init['g']])]) g_min = np.atleast_1d(g_min) g_max = np.atleast_1d(g_max) lowerOpt = np.concatenate([lowerOpt, g_min]) upperOpt = np.concatenate([upperOpt, g_max]) if known.get('rho') is None: parinit = np.concatenate([parinit, np.atleast_1d(init['rho'])]) lowerOpt = np.concatenate([ lowerOpt, np.atleast_1d(noiseControl['rho_bounds'][0]) ]) upperOpt = np.concatenate([ upperOpt, np.atleast_1d(noiseControl['rho_bounds'][1]) ]) bounds = [(l,u) for l,u in zip(lowerOpt,upperOpt)] out = optimize.minimize( fun=fn, # for maximization args = (X0, S0, T0, Z, beta0, known.get('theta'), known.get('g'), known.get('rho')), x0 = parinit, jac=gr, method="L-BFGS-B", bounds = bounds, options=dict(maxiter=maxit, ftol = settings.get('factr',10) * np.finfo(float).eps,#, gtol = settings.get('pgtol',0) # should map to pgtol ) ) python_kws_2_R_kws = { 'x':'par', 'fun': 'value', 'nit': 'counts' } out['counts'] = dict(nfev=out['nfev'],njev=out['njev']) for key, val in python_kws_2_R_kws.items(): out[val] = out[key] if out.success == False: out = dict(par = self.arg_max, value = -1.0 * self.max_loglik, counts = np.nan, message = "Optimization stopped due to NAs, use best value so far") idx = 0 if known.get('theta') is None: idx += len(init['theta']) theta_out = out['par'][0:idx] else: theta_out = known['theta'] g_out = out['par'][idx] if known.get('g') is None else known.get('g') rho_out = out['par'][(idx+1):] if known.get('rho') is None else known.get('rho') Cx = cov_gen(X1 = X0, theta = np.atleast_1d(theta_out), type = covtype) if self.ids is None: # mimics R's outer(S0,S0,'==') self.ids = S0 == S0[:,None] if rhotype == 'simple': Cs = np.full(shape=(n,n),fill_value=rho_out,dtype=float) Cs[self.ids] = 1.0 C = Cx * Cs self.C = C self.Cx = Cx self.Cs = Cs if T0 is not None: Ct = cov_gen(X1 = T0, theta = np.atleast_1d(theta_out[d]), type = covtype) SCxs = np.linalg.svd(Cx*Cs) SCt = np.linalg.svd(Ct) term1 = np.kron(SCt.U,SCxs.U) * (1.0 / np.kron(SCt.S,SCxs.S) + g_out) term2 = np.kron(SCt.U,SCxs.U) Ki = term1 @ term2.T else: jitter = (eps+g_out)*np.eye(n) Ki = np.linalg.cholesky(C + jitter).T # to mirror R's chol2inv: do the following: # expose dtrtri from lapack (for fast cholesky inversion of a triangular matrix) # use result to compute Ki (should match chol2inv) Ki = dtrtri(Ki)[0] # -- equivalent of chol2inv -- see https://stackoverflow.com/questions/6042308/numpy-inverting-an-upper-triangular-matrix Ki = Ki @ Ki.T # -- equivalent of chol2inv self.Ki = Ki Zf = Z.flatten(order='F') if beta0 is None: beta0 = Ki.sum(axis=1) @ Zf / Ki.sum() self.X0 = X0 self.Z = Z self.covtype = covtype self.S0 = S0 self.trendtype = trendtype self.nu_hat = (Zf - beta0).T @ self.Ki @ (Zf - beta0) / Zf.shape[0] self.ll = -1.0 * out['value'] self.theta = theta_out self.g = g_out self.rho = rho_out self.time = time() - tic return
[docs] def predict(self,x,xprime = None,t0 = None, interval: str | None = None, interval_lower: float | None = None, interval_upper: float | None = None,**kw): r''' Prediction under correlated noise Parameters ---------- x : ndarray_like matrix of designs locations to predict at (one point per row) xprime : ndarray_like optional second matrix of predictive locations to obtain the predictive covariance matrix between ``x`` and ``xprime`` t0: ndarray_like vector of times (not currently supported) interval: str one of 'confidence' or 'predictive' which is a convenience method to return confidence/predictive intervals corresponding to `interval_lower` and `interval_upper` interval_lower: float lower of confidence/predictive interval interval_upper: float upper of confidence/predictive interval nugs_only : bool (default False) if ``True``, only return noise variance prediction kwargs : dict optional additional elements (only used for nugs_only) Returns ------- dict with elements: - ``mean``: kriging mean; - ``sd2``: kriging variance (filtered, e.g. without the nugget values) - ``nugs``: noise variance prediction - ``cov``: (returned if ``xprime`` is given) predictive covariance matrix between ``x`` and ``xprime`` - ``confidence_interval``: prediction with kriging variance only - ``predictive_interval``: prediction with kriging and noise variance ''' if len(x.shape)==1: x = x.reshape(-1,1) if (x.shape[1]-1) != self.X0.shape[1]: raise ValueError(f"Problem with x format") s = x[:,-1] x = x[:,0:-1] if xprime is not None: if len(xprime.shape)==1: xprime = xprime.reshape(-1,1) if (xprime.shape[1]-1) != self.X0.shape[1]: raise ValueError(f"Problem with xprime format") sp = xprime[:,-1] xprime = xprime[:,0:-1] interval_types = [None,'confidence','predictive'] return_interval = False if interval is not None: list_interval = [interval] if type(interval)==str else interval if 'confidence' not in list_interval and 'predictive' not in list_interval: raise ValueError(f"interval must be one of 'confidence' or 'predictive' not {interval}") return_interval = True if "nugs_only" in kw and kw["nugs_only"]: return dict(nugs = np.repeat(self['nu_hat'] * self['g'], x.shape[0])) d = x.shape[1] self['Ki'] /= self['nu_hat'] kx = self['nu_hat'] * cov_gen(X1=x,X2 = self['X0'],theta = self['theta'], type=self['covtype']) tmp = s[:,None] == self['S0'] if isinstance(self['rho'],float) or self['rho'].size==1: ks = np.full(shape = (x.shape[0],self['X0'].shape[0]),fill_value=self['rho']) else: ks = self.pairwise_rho(S0r=s,S0c=self['S0'],rho=self['rho']) ks[tmp] = 1 kx = kx * ks nugs = np.repeat(self['nu_hat'] * self['g'],x.shape[0]) mean = self['beta0'] + kx @ (self['Ki'] @ (self['Z'] - self['beta0'])) if self['trendtype'] == 'SK': sd2 = self['nu_hat'] - np.diag(kx @ (self['Ki'] @ kx.T)) else: sd2 = self['nu_hat'] - np.diag(kx @ ((self['Ki'] @ kx.T))) + (1- (self['Ki'].sum(axis=0))@ kx.T)**2/self['Ki'].sum() if (sd2 < 0).any(): warnings.warn('Numerical errors caused some negative predictive variances to be thresholded to zero. Consider using np.inv via rebuild.CRNGP') sd2[sd2 < 0] = 0 if xprime is not None: kxprime = self['nu_hat'] * cov_gen(X1 = self['X0'],X2=xprime,theta = self['theta'],type = self['covtype']) tmp = self['S0'][:,None] == s ksprime = np.full(shape = (self['X0'].shape[0],xprime.shape[0]),fill_value=self['rho']) ksprime[tmp] = 1 kxprime = kxprime * ksprime kxxprime = self['nu_hat'] * cov_gen(X1 = x,X2=xprime,theta = self['theta'],type = self['covtype']) ksxxprime = np.full(shape = (x.shape[0],xprime.shape[0]),fill_value=self['rho']) tmp = s == sp[:,None] ksxxprime[tmp] = 1 kxxprime = kxxprime * ksxxprime if self['trendtype'] == 'SK': if x.shape[0] < xprime.shape[0]: cov = kxxprime - kx @ self['Ki'] @ kxprime else: cov = kxxprime - kx @ (self['Ki'] @ kxprime) else: if x.shape[0] < xprime.shape[0]: cov = kxxprime - kx @ self['Ki'] @ kxprime + ((1-(self['Ki'].sum(axis=0,keepdims=True))@ kx.T).T @ (1-self['Ki'].sum(axis=0,keepdims=True) @ kxprime))/self['Ki'].sum() else: cov = kxxprime - kx @ (self['Ki'] @ kxprime) + ((1-(self['Ki'].sum(axis=0,keepdims=True))@ kx.T).T @ (1-self['Ki'].sum(axis=0,keepdims=True) @ kxprime))/self['Ki'].sum() else: cov = None # rescale Ki self['Ki'] *= self['nu_hat'] preds = dict( mean = mean, sd2 = sd2, nugs = nugs, cov = cov ) if return_interval: preds['confidence_interval'] = {} if 'confidence' in list_interval: preds['confidence_interval']['lower'] = norm.ppf(interval_lower, loc = preds['mean'], scale = np.sqrt(preds['sd2'])).squeeze() preds['confidence_interval']['upper'] = norm.ppf(interval_upper, loc = preds['mean'], scale = np.sqrt(preds['sd2'])).squeeze() preds['predictive_interval'] = {} if 'predictive' in list_interval: preds['predictive_interval']['lower'] = norm.ppf(interval_lower, loc = preds['mean'], scale = np.sqrt(preds['sd2'] + preds['nugs'])).squeeze() preds['predictive_interval']['upper'] = norm.ppf(interval_upper, loc = preds['mean'], scale = np.sqrt(preds['sd2'] + preds['nugs'])).squeeze() return preds