Source code for dafi.random_field.field


""" Random fields representation and manipulation.

These functions can be called directly from ``dafi.random_field``, e.g.

.. code-block:: python

   >>> dafi.random_field.calc_kl_modes(*args)
"""

# standard library imports
import warnings

# third party imports
import numpy as np
from scipy import sparse as sp
from scipy.sparse import linalg as splinalg
from scipy import interpolate
from scipy import spatial


# KL decomposition
[docs]def calc_kl_modes(cov, nmodes=None, weight_field=None, eps=1e-8, normalize=True): """ Calculate the first N Karhunen-Loève modes for a covariance field. Converts the covariance to a sparse matrix if it is not one yet. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* nmodes : int Number of KL modes to obtain. weight_field : ndarray Weight (e.g. cell volume) associated with each state. Default ones (1). *dtype=float*, *ndim=1*, *shape=(nstate)* eps : float Small quantity to add to the diagonal of the covariance matrix for numerical stability. normalize : bool Whether to normalize (norm = 1) the KL modes. Returns ------- eig_vals : ndarray Eigenvalue associated with each mode. *dtype=float*, *ndim=1*, *shape=(nmodes)* kl_modes : ndarray KL modes (eigenvectors). *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* """ # convert to sparse matrix cov = sp.csc_matrix(cov) # default values nstate = cov.shape[0] if nmodes is None: nmodes = nstate-1 weight_field = _preprocess_field(weight_field, nstate, 1.0) # add small value to diagonal cov = cov + sp.eye(cov.shape[0], format='csc')*eps weight_field = np.squeeze(weight_field) weight_vec = np.atleast_2d(weight_field) weight_mat = np.sqrt(np.dot(weight_vec.T, weight_vec)) cov_weighted = cov.multiply(weight_mat) # perform the eig-decomposition eig_vals, eig_vecs = sp.linalg.eigsh(cov_weighted, k=nmodes) # sort the eig-value and eig-vectors in a descending order ascending_order = eig_vals.argsort() descending_order = ascending_order[::-1] eig_vals = eig_vals[descending_order] eig_vecs = eig_vecs[:, descending_order] # normalized KL modes weight_diag = np.diag(np.sqrt(weight_field)) kl_modes = np.dot(np.linalg.inv(weight_diag), eig_vecs) # normalized # check if negative eigenvalues for imode in np.arange(nmodes): neg_eigv = False if eig_vals[imode] < 0: neg_eigv = True warn_message = f'Negative eigenvalue for mode {imode}.' warnings.warn(warn_message) kl_modes[:, imode] *= 0. if neg_eigv: warn_message = 'Some modes have negative eigenvalues. ' + \ 'The number of KL modes might be too large. ' + \ "Alternatively, use a larger value for 'eps'." # weight by appropriate variance if not normalize: kl_modes = scale_kl_modes(eig_vals, kl_modes) return eig_vals, kl_modes
[docs]def calc_kl_modes_coverage(cov, coverage, weight_field=None, eps=1e-8, max_modes=None, normalize=True): """ Calculate all KL modes and return only those required to achieve a certain coverage of the variance. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* coverage : float Desired percentage coverage of the variance. Value between 0-1. weight_field : ndarray Weight (e.g. cell volume) associated with each state. Default ones (1). *dtype=float*, *ndim=1*, *shape=(nstate)* eps : float Small quantity to add to the diagonal of the covariance matrix for numerical stability. normalize : bool Whether to normalize (norm = 1) the KL modes. Returns ------- eig_vals : ndarray Eigenvalue associated with each mode. For the first N modes such that the desired coverage of the variance is achieved. *dtype=float*, *ndim=1*, *shape=(N)* kl_modes : ndarray first N KL modes (eigenvectors) such that the desired coverage of the variance is achieved. *dtype=float*, *ndim=2*, *shape=(nstate, N)* """ # convert to sparse matrix cov = sp.csc_matrix(cov) # default values nstate = cov.shape[0] weight_field = _preprocess_field(weight_field, nstate, 1.0) if max_modes is None: max_modes = nstate - 1 # get the first max_modes KL modes eig_vals, kl_modes = calc_kl_modes( cov, max_modes, weight_field, eps, normalize) # return only those KL modes required for desired coverage cummalative_variance = kl_coverage(cov, eig_vals, weight_field) coverage_index = np.argmax(cummalative_variance >= coverage) if coverage_index == 0: coverage_index = max_modes return eig_vals[:coverage_index], kl_modes[:, :coverage_index]
[docs]def scale_kl_modes(eig_vals, kl_modes_norm): """ Weight the KL modes by the appropriate variance. Parameters ---------- eig_vals : ndarray Eigenvalue associated with each mode. *dtype=float*, *ndim=1*, *shape=(nmodes)* kl_modes_norm : ndarray Normalized (norm = 1) KL modes (eigenvectors). *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* Returns ------- kl_modes_weighted : ndarray KL modes with correct magnitude. *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* """ nmodes = len(eig_vals) kl_modes_weighted = kl_modes_norm.copy() for imode in np.arange(nmodes): kl_modes_weighted[:, imode] *= np.sqrt(eig_vals[imode]) return kl_modes_weighted
[docs]def kl_coverage(cov, eig_vals, weight_field=None): """ Calculate the percentage of the covariance covered by the the first N KL modes for N from 1-nmodes. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* eig_vals : ndarray Eigenvalues associated with each mode. *dtype=float*, *ndim=1*, *shape=(nmodes)* weight_field : ndarray Weight (e.g. cell volume) associated with each state. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- coverage: ndarray Cumulative variance coverage of the first N modes. Each value is 0-1 and increasing. *dtype=float*, *ndim=1*, *shape=(nmodes)* """ # make sparse if its not already cov = sp.csc_matrix(cov) # default values nstate = cov.shape[0] weight_field = _preprocess_field(weight_field, nstate, 1.0) # calculate coverage weight_vec = np.atleast_2d(weight_field) weight_mat = np.sqrt(np.dot(weight_vec.T, weight_vec)) cov_weighted = cov.multiply(weight_mat) cov_trace = np.sum(cov_weighted.diagonal()) return np.cumsum(eig_vals) / cov_trace
[docs]def reconstruct_kl(modes, coeffs, mean=None): """ Reconstruct a field using KL modes and given coefficients. Can create multiple fields by providing two dimensional array of coefficients. Parameters ---------- modes : ndarray KL modes. *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* coeffs : ndarray Array of coefficients. *dtype=float*, *ndim=2*, *shape=(nmodes, nsamples)* mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- fields : ndarray Reconstructed fields. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ # number of modes, samples, and states if len(coeffs.shape) == 1: coeffs = np.expand_dims(coeffs, 1) nmodes, nsamps = coeffs.shape nstate = modes.shape[0] # mean vector mean = _preprocess_field(mean, nstate, 0.0) mean = np.expand_dims(np.squeeze(mean), axis=1) # create samples fields = np.tile(mean, [nsamps]) for imode in range(nmodes): vec1 = np.atleast_2d(coeffs[imode, :]) vec2 = np.atleast_2d(modes[:, imode]) fields += np.dot(vec1.T, vec2).T return fields
[docs]def project_kl(field, modes, weight_field=None, mean=None): """ Project a field onto a set of modes. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* modes : ndarray KL modes. *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* weight_field : ndarray Weight (e.g. cell volume) associated with each state. *dtype=float*, *ndim=1*, *shape=(nstate)* mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- coeffs : ndarray Projection magnitude. *dtype=float*, *ndim=1*, *shape=(nmodes)* """ nstate, nmode = modes.shape mean = _preprocess_field(mean, nstate, 0.0) coeffs = [] for imode in range(nmode): mode = modes[:, imode] coeffs.append(projection_magnitude(field-mean, mode, weight_field)) return np.array(coeffs)
def _preprocess_field(field, nstate, default): """Pre-process provided weight field. """ # default value if field is None: field = np.ones(nstate)*default # constant value if len(np.atleast_1d(np.squeeze(np.array(field)))) == 1: field = np.ones(nstate)*field return field # linear algebra on scalar fields
[docs]def integral(field, weight_field): """ Calculate the integral of a field. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- field_integral : float The integral of the field over the domain. """ field = np.squeeze(field) assert field.ndim == 1 nstate = len(field) weight_field = _preprocess_field(weight_field, nstate, 1.0) return np.sum(field * weight_field)
[docs]def inner_product(field_1, field_2, weight_field): """ Calculate the inner product between two fields. The two fields share the same weights. Parameters ---------- field_1 : ndarray One scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* field_2 : ndarray Another scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- product : float The inner product between the two fields. """ return integral(field_1 * field_2, weight_field)
[docs]def norm(field, weight_field): """ Calculate the L2-norm of a field. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- field_norm : float The norm of the field. """ return np.sqrt(inner_product(field, field, weight_field))
[docs]def unit_field(field, weight_field): """ Calculate the unit field (norm = 1) in same direction. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- field_normed : ndarray Normalized (norm = 1) scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* """ return field / norm(field, weight_field)
[docs]def projection_magnitude(field_1, field_2, weight_field): """ Get magnitude of projection of field_1 onto field_2. The two fields share the same weights. Parameters ---------- field_1 : ndarray Scalar field being projected. *dtype=float*, *ndim=1*, *shape=(ncells)* field_2 : ndarray Scalar field used for projection direction. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- magnitude : float magnitude of the projected field. """ magnitude = inner_product(field_1, field_2, weight_field) / \ (norm(field_2, weight_field)**2) return magnitude
[docs]def projection(field_1, field_2, weight_field): """ Project field_1 onto field_2. The two fields share the same weights. Parameters ---------- field_1 : ndarray Scalar field being projected. *dtype=float*, *ndim=1*, *shape=(ncells)* field_2 : ndarray Scalar field used for projection direction. *dtype=float*, *ndim=1*, *shape=(ncells)* weight_field : ndarray Cell volumes. *dtype=float*, *ndim=1*, *shape=(ncells)* Returns ------- projected_field : ndarray Projected field. *dtype=float*, *ndim=1*, *shape=(ncells)* """ magnitude = projection_magnitude(field_1, field_2, weight_field) direction = unit_field(field_2, weight_field) return magnitude*direction
# interpolation
[docs]def interpolate_field_rbf(data, coords, kernel, length_scale): """ Interpolate data using a radial basis function (RBF) to create a field from sparse specifications. This is used for instance to specify a variance field based on expert knowledge. Parameters ---------- data : ndarray Sparse data to create interpolation from. For an NxM array, the number of data points is N, the number of dimensions (coordinates) is M-1, and the Mth column is the data value. *dtype=float*, *ndim=2*, *shape=(N, M)* coords : ndarray Coordinates of the cell centers of the full discretized field. The RBF will be evaluated at these points. *dtype=float*, *ndim=2*, *shape=(ncells, M-1)* kernel : str Kernel (function) of the RBF. See *'function'* input of `scipy.interpolate.Rbf`_ for list of options. length_scale : float Length scale parameter (epsilon in `scipy.interpolate.Rbf`_) in the RBF kernel. Returns ------- field : ndarray Full field. *dtype=float*, *ndim=1*, *shape=(ncells)* """ args1 = [] args2 = [] ncoord = coords.shape[1] for icoord in range(ncoord): args1.append(data[:, icoord]) args2.append(coords[:, icoord]) interp_func = interpolate.Rbf( *args1, function=kernel, epsilon=length_scale) return interp_func(*args2)
[docs]def inverse_distance_weights(coords, connectivity, points, tol=1e-6): """ Create linear interpolation matrix (observation operatror H). """ # get host cell (cell centre closest to point) tree = spatial.cKDTree(coords) distances, indexes = tree.query(list(points)) npoints = points.shape[0] ncells = coords.shape[0] # calculate weights mat = sp.lil_matrix((npoints, ncells)) for i in range(npoints): id = indexes[i] if distances[i] < tol: # if location is cell centre mat[i, id] = 1.0 else: point = np.expand_dims(np.squeeze(points[i, :]), 0) neighbours = coords[connectivity[id], :] dist = spatial.distance.cdist(point, neighbours) weight = 1 / dist wsum = np.sum(weight) + 1 / distances[i] weight /= wsum # host cell mat[i, id] = (1 / distances[i]) / wsum # neighbour cells mat[i, connectivity[id]] = weight return sp.csc_matrix(mat)
# Gaussian process: generate samples
[docs]def gp_samples_cholesky(cov, nsamples, mean=None, eps=1e-8): """ Generate samples of a Gaussian Process using Cholesky decomposition. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* nsamples : int Number of samples to generate. mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* eps : float Small quantity to add to the diagonal of the covariance matrix for numerical stability. Returns ------- samples : ndarray Matrix of samples. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ # make sparse if its not already cov = sp.csc_matrix(cov) nstate = cov.shape[0] # add small value to diagonal cov = cov + sp.eye(nstate, format='csc')*eps # mean vector mean = _preprocess_field(mean, nstate, 0.0) mean = np.expand_dims(np.squeeze(mean), axis=1) # Create samples using Cholesky Decomposition L = sparse_cholesky(cov) a = np.random.normal(size=(nstate, nsamples)) perturb = L.dot(a) return mean + perturb
[docs]def sparse_cholesky(cov): """ Compute the Cholesky decomposition for a sparse (scipy) matrix. Adapted from `gist.github.com/omitakahiro`_. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* Returns ------- lower: scipy.sparse.csc_matrix Lower triangular Cholesky factor of the covariance matrix. """ # convert to sparse matrix cov = sp.csc_matrix(cov) # LU decomposition LU = splinalg.splu(cov, diag_pivot_thresh=0) # check the matrix is positive definite. n = cov.shape[0] posd = (LU.perm_r == np.arange(n)).all() and (LU.U.diagonal() > 0).all() if not posd: raise ValueError('The matrix is not positive definite') return LU.L.dot(sp.diags(LU.U.diagonal()**0.5))
[docs]def gp_samples_kl(cov, nsamples, weight_field, nmodes=None, mean=None, eps=1e-8): """ Generate samples of a Gaussian Process using KL decomposition. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* nsamples : int Number of samples to generate. weight_field : ndarray Weight (e.g. cell volume) associated with each state. *dtype=float*, *ndim=1*, *shape=(nstate)* nmodes : int Number of modes to use when generating samples. *'None'* to use all modes. mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* eps : float Small quantity to add to the diagonal of the covariance matrix for numerical stability. Returns ------- samples : ndarray Matrix of samples. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ # KL decomposition eigv, modes = calc_kl_modes(cov, nmodes, weight_field, eps, False) if nmodes is None: nmodes = len(eigv) # create samples coeffs = np.random.normal(0, 1, [nmodes, nsamples]) return reconstruct_kl(modes, coeffs, mean)
[docs]def gp_samples_klmodes(modes, nsamples, mean=None): """ Generate samples of a Gaussian Process using the given KL modes. Parameters ---------- modes : ndarray KL modes. *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* nsamples : int Number of samples to generate. mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- samples : ndarray Matrix of samples. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ # create samples nmodes = modes.shape[1] coeffs = np.random.normal(0, 1, [nmodes, nsamples]) return reconstruct_kl(modes, coeffs, mean)
[docs]def gp_samples_kl_coverage(cov, nsamples, weight_field, coverage=0.99, max_modes=None, mean=None, eps=1e-8): """ Generate samples of a Gaussian Process using KL decomposition. Only the firs N modes required to get the desired variance coverage are used. Parameters ---------- cov : ndarray Covariance matrix. Can be ndarray, matrix, or scipy sparse matrix. *dtype=float*, *ndim=2*, *shape=(nstate, nstate)* nsamples : int Number of samples to generate. weight_field : ndarray Weight (e.g. cell volume) associated with each state. *dtype=float*, *ndim=1*, *shape=(nstate)* coverage : float Desired percentage coverage of the variance. Value between 0-1. max_modes : int Maximum number of modes used. This is the number of modes that is calculated. If less are needed to achieve the desired coverage the additional ones are discarded. mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* eps : float Small quantity to add to the diagonal of the covariance matrix for numerical stability. Returns ------- samples : ndarray Matrix of samples. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* nmodes : int Number of modes used to achieve the requested coverage. """ # KL decomposition eigv, klmodes = calc_kl_modes_coverage( cov, coverage, weight_field, eps, max_modes, False) nmodes = len(eigv) # create samples coeffs = np.random.normal(0, 1, [nmodes, nsamples]) return reconstruct_kl(klmodes, coeffs, mean), nmodes
[docs]def gp_sqrexp_samples(nsamples, coords, stddev, length_scales, mean=None, weight_field=None, max_modes=None): """ Generate samples from a Gaussian Process with square exponential correlation kernel. This is a convinience function for new users or simple cases. It create the covariance matrix, does the KL decomposition, keeps the required modes for 99% coverage, and create the samples. Parameters ---------- nsamples : int Number of samples to generate. coords : ndarray Array of coordinates. Each row correspond to a different point and the number of columns is the number of physical dimensions (e.g. 3 for (x,y,z)). *dtype=float*, *ndim=2*, *shape=(npoints, ndims)* stddev : ndarray Standard deviation of each state. Alternatively, provide a float for a constant standard deviation. *dtype=float*, *ndim=1*, *shape=(nstate)* length_scales : list Length scale for each physical dimensions. List length is ndims. Each entry is either a one dimensional ndarray of length nstate (length scale field) or a float (constant length scale). mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* weight_field : ndarray Weight (e.g. cell volume) associated with each state. *dtype=float*, *ndim=1*, *shape=(nstate)* max_modes : int Maximum number of modes used. This is the number of modes that is calculated. If less are needed to achieve 99% coverage the additional ones are discarded. """ from dafi.random_field.covariance import generate_cov cov = generate_cov( 'sqrexp', stddev, coords=coords, length_scales=length_scales) samples, _ = gp_samples_kl_coverage( cov, nsamples, weight_field, 0.99, max_modes, mean) return samples
# Random field class
[docs]class GaussianProcess(object): """ Gaussian process class. Also allows for the creation of a function of a Gaussian process. E.g. see *'Lognormal'* class. """
[docs] def __init__(self, klmodes, mean=None, weights=None, func=None, funcinv=None): """ Initialize Gaussian process class. Parameters ---------- klmodes : ndarray KL modes (eigenvectors). *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* mean : ndarray Mean vector. Default zero (0). *dtype=float*, *ndim=1*, *shape=(nstate)* weights : ndarray Weight (e.g. cell volume) associated with each state. Default ones (1). *dtype=float*, *ndim=1*, *shape=(nstate)* func: function Function to create a random process that is a function of a Gaussian process. Default is identity function (GP). funcinv: function Inverse of func. """ nstate = klmodes.shape[0] self.klmodes = klmodes self.ncell, self.nmodes = self.klmodes.shape self.mean = _preprocess_field(mean, self.ncell, 0.0) self.weights = _preprocess_field(mean, nstate, 1.0) def func_identity(x): return x if func is None: func = func_identity if funcinv is None: funcinv = func_identity self.func = func self.funcinv = funcinv
[docs] def sample_coeffs(self, nsamples): """ Create Karhunen-Loève (KL) coefficents for random samples. Parameters ---------- nsamples : int Number of samples for which to generate KL coefficients. Returns ------- coeffs : ndarray Matrix of samples KL coefficients for the Gaussian process. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ return np.random.normal(0, 1, [self.nmodes, nsamples])
[docs] def sample_gp(self, nsamples, mean=None): """ Generate samples of the Gaussian process. Parameters ---------- nsamples : int Number of samples to generate. mean : ndarray Mean vector. If *None*, self.mean is used. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- samples : ndarray Sample fields from Gaussian process. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* coeffs : ndarray Matrix of samples KL coefficients for the Gaussian process. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ if mean is None: mean = self.mean coeffs = self.sample_coeffs(nsamples) return reconstruct_kl(self.klmodes, coeffs, mean), coeffs
[docs] def sample_func(self, nsamples, mean=None): """ Generate samples of the function of the Gaussian process. Parameters ---------- nsamples : int Number of samples to generate. mean : ndarray Mean vector. If *None*, self.mean is used. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- samples : ndarray Sample fields from the function of the Gaussian process. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ if mean is None: mean = self.mean coeffs = self.sample_coeffs(nsamples) samps_gp = reconstruct_kl(self.klmodes, coeffs, mean) return self.func(samps_gp), coeffs
[docs] def reconstruct_gp(self, coeffs, mean=None): """ Reconstruct the Gaussian process field from given KL coefficients. Parameters ---------- coeffs : ndarray Array of KL coefficients. *dtype=float*, *ndim=2*, *shape=(nmodes, nsamples)* mean : ndarray Mean vector. If *None*, self.mean is used. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- fields : ndarray Reconstructed fields. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ if mean is None: mean = self.mean return reconstruct_kl(self.klmodes, coeffs, mean)
[docs] def reconstruct_func(self, coeffs, mean=None): """ Reconstruct the function of the Gaussian process field from given KL coefficients. Parameters ---------- coeffs : ndarray Array of KL coefficients. *dtype=float*, *ndim=2*, *shape=(nmodes, nsamples)* mean : ndarray Mean vector. If *None*, self.mean is used. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- fields : ndarray Reconstructed fields. *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)* """ if mean is None: mean = self.mean val_gp = reconstruct_kl(self.klmodes, coeffs, mean) return self.func(val_gp)
[docs] def pdf(self, coeffs): """ Probaility density function (PDF). PDF(x) where x is a field (point in sample space) specified by KL coeffiecients. Parameters ---------- coeffs : ndarray Array of KL coefficients. *dtype=float*, *ndim=2*, *shape=(nmodes, nsamples)* Returns ------- pdf : ndarray Value of the PDF function for the given point in the sample space. """ return np.exp(logpdf(coeffs))
[docs] def logpdf(self, coeffs): """ Logarithm of the probability density function. log(PDF(x)) where x is a field (point in sample space) specified by KL coeffiecients. Parameters ---------- coeffs : ndarray Array of KL coefficients. *dtype=float*, *ndim=2*, *shape=(nmodes, nsamples)* Returns ------- logpdf : ndarray Logarithm of the value of the PDF function for the given point in the sample space. """ if len(coeffs.shape) == 1: coeffs = np.expand_dims(coeffs, 1) norm_coeff = np.linalg.norm(coeffs, axis=0) const = np.log((2*np.pi)**(-self.ncell/2)) return const + -0.5*norm_coeff**2
[docs] def project_gp_field(self, field, mean=None): """ Project a field onto the KL modes. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- coeffs : ndarray Projection magnitude. *dtype=float*, *ndim=1*, *shape=(nmodes)* """ return project_kl(field, self.klmodes, self.weights, mean)
[docs] def project_func_field(self, field, mean=None): """ Project a field from the function of the Gaussian process onto the KL modes. Parameters ---------- field : ndarray Scalar field. *dtype=float*, *ndim=1*, *shape=(ncells)* mean : ndarray Mean vector. *dtype=float*, *ndim=1*, *shape=(nstate)* Returns ------- coeffs : ndarray Projection magnitude. *dtype=float*, *ndim=1*, *shape=(nmodes)* """ field = self.funcinv(field) mean = _preprocess_field(mean) mean = self.funcinv(mean) return project_kl(field, self.klmodes, self.weights, mean)
[docs]class LogNormal(GaussianProcess): """ Log-normal process class. """
[docs] def __init__(self, klmodes_gp, median=1.0, weights=None): """ Initialize log-normal process class. Parameters ---------- klmodes_gp : ndarray KL modes (eigenvectors) of the underlying Gaussian process. *dtype=float*, *ndim=2*, *shape=(nstate, nmodes)* median : ndarray Median vector. Default one (1). *dtype=float*, *ndim=1*, *shape=(nstate)* weights : ndarray Weight (e.g. cell volume) associated with each state. Default ones (1). *dtype=float*, *ndim=1*, *shape=(nstate)* """ nstate = klmodes_gp.shape[0] median = _preprocess_field(median, nstate, 1.0) self.median_func = np.expand_dims(np.squeeze(median), 1) def func(x): return self.median_func * np.exp(x) def funcinv(y): return np.log(y / self.median_func) mean = 0.0 super(self.__class__, self).__init__( klmodes_gp, mean, weights, func, funcinv)