Source code for hetgpy.find_reps

"""
docstring for find_reps
"""

from __future__ import annotations
import numpy as np
[docs] def find_reps(X,Z, return_Zlist = True, rescale = False, normalize = False, inputBounds = None, use_torch=False): """Prepare data for use with ``mleHomGP`` and ``mleHetGP`` In particular to find replicated observations Parameters ---------- X : array_like matrix of design locations, one point per row Z : array_like vector of observations at ``X`` return_Zlist: bool to return `Zlist`, see below rescale: bool if ``True``, the inputs are rescaled to the unit hypercube normalize: bool if ``True``, the outputs are centered and normalized inputBounds: array_like optional matrix of known boundaries in original input space, of size 2 times `X.shape[1]`. If not provided, and ``rescale == True``, it is estimated from the data. Returns ------- dict dictionary of outputs type explain types out dictionary of - ``X0`` matrix with unique designs locations, one point per row - ``Z0`` vector of averaged observations at ``X0`` - ``mult`` number of replicates at ``X0`` - ``Z`` vector with all observations, sorted according to ``X0`` - ``Zlist`` optional list, each element corresponds to observations at a design in ``X0`` - ``inputBounds`` optional matrix, to rescale back to the original input space - ``outputStats`` optional vector, with mean and variance of the original outputs. References ---------- Binois et. al (2018) Examples -------- >>> from hetgpy.data import mcycle >>> from hetgpy.hetGP import hetGP >>> X = mcycle['times'] >>> Z = mcycle['accel'] >>> model = hetGP() >>> out = model.find_reps(X, Z) >>> print(out) """ if use_torch: raise NotImplementedError("torch implementation is not yet available.") if type(X) != np.ndarray: raise ValueError(f"X must be a numpy array, is currently: {type(X)}") if type(Z) != np.ndarray: raise ValueError(f"X must be a numpy array, is currently: {type(Z)}") if len(Z.shape)>1: if Z.shape[1]>1: raise ValueError(f"Z must be 1D") Z = Z.reshape(-1) if X.shape[0] == 1: if return_Zlist: return dict(X0=X,Z0=Z,mult = np.array([1]), Z = Z, Zlist = {0:Z}) return(dict(X0 = X, Z0 = Z, mult = 1, Z = Z)) if len(X.shape) == 1: # if x is a 1D series raise ValueError(f"X appears to be a 1D array. Suggest reshaping with X.reshape(-1,1)") if rescale: if inputBounds is None: inputBounds = np.array([X.min(axis=0), X.max(axis=0)]) X = (X - inputBounds[0,:]) @ np.diag(1/(inputBounds[1,:] - inputBounds[0,:])) outputStats = None if normalize: outputStats = np.array([Z.mean(), Z.var()]) Z = (Z - outputStats[0])/np.sqrt(outputStats[1]) #X0 = np.unique(X, axis = 0) _, indices, corresp = np.unique(X, axis = 0, return_index=True, return_inverse=True) # np.unique sorts by default -- explicit sorting of the *indices* preserves the order of observations in X indices.sort() X0 = X[indices,:] if X0.shape[0] == X.shape[0]: if return_Zlist: return dict(X0 = X, Z0 = Z, mult = np.repeat(1, len(Z)), Z = Z, Zlist = {k:Z[k].reshape(-1) for k in range(len(Z))}, inputBounds = inputBounds, outputStats = outputStats) return dict(X0 = X, Z0 = Z, mult = np.repeat(1, len(Z)), Z = Z, inputBounds = inputBounds, outputStats = outputStats) # TODO: consider numba-ing this part. Replicating *split* in R is a bit tricky Zlist = {} Z0 = np.zeros(X0.shape[0], dtype=X0.dtype) mult = np.zeros(X0.shape[0], dtype=int) for idx, val in enumerate(corresp[indices]): out = Z[(val==corresp).nonzero()[0]] Zlist[idx] = out.reshape(-1) Z0[idx] = out.mean() mult[idx] = len(out) if return_Zlist: return dict(X0 = X0, Z0 = Z0, mult = mult, Z = np.concatenate(list(Zlist.values())), Zlist = Zlist, inputBounds = inputBounds, outputStats = outputStats) return dict(X0 = X0, Z0 = Z0, mult = mult, Z = np.concatenate(list(Zlist.values())), inputBounds = inputBounds, outputStats = outputStats)