Source code for dafi.main

# Copyright 2020 Virginia Polytechnic Institute and State University.
""" DAFI: a Python package for data assimilaton and field inversion.
"""

# standard library imports
import importlib
import logging
import warnings
import os
import time as tm

import sys

# third party imports
import numpy as np

# local imports
# user-specified physics model imported later with importlib
# user-specified inverse method imported later with importlib


[docs]def run(model_file, inverse_method, nsamples, ntime=None, perturb_obs_option='iter', obs_err_multiplier=1.0, analysis_to_obs=False, convergence_option='max', max_iterations=1, convergence_residual=None, convergence_factor=1.0, save_level=None, save_dir='results_dafi', rand_seed=None, verbosity=0, inputs_inverse={}, inputs_model={}): """ Run DAFI. Accesible through ``dafi.run()``. Parameters ---------- model_file: str Name (path) of dynamic model module/package. inverse_method: str Name of inverse method from dafi.inverse_methods module. nsamples : int Number of samples in ensemble. ntime : int Number of data assimilation times. For stationary use *'1'* or *'None'*. Default *'None'*. perturb_obs_option: str Option on when to perturb observations: *'iter'* to perturb at each iteration (inner loop), *'time'* to perturb only once each data assimilaton time (outer loop), *'None'* to not perturb observations. Default *'iter'*. obs_err_multiplier: float Factor by which to multiply the observation error matrix. This is done by some authors in the literature. Default *'1.0'*. analysis_to_obs: bool Map analysis state to observation space. Default *'False'*. convergence_option: str Convergence criteria to use: *'discrepancy'* to use the discrepancy principle, *'residual'* to use the iterative residual, *'max'* to reach the maximum iterations. Default *'max'*. max_iterations: int Maximum number of iterations at a given time-step. Default *'1'*. convergence_residual: float Residual value for convergence if *reach_max* is *'False'* and *convergence_option* is *'residual'*. Default *'None'*. convergence_factor: float Factor used in the discrepancy principle convergence option. Default *'1.0'*. save_level : str Level of results to save: *'None'* to not save results, *'time'* to save results at each data assimilation time step, *'iter'* to save results at each iteration, *'debug'* to save additional intermediate quantities. Default *'None'*. save_dir: str Folder where to save results. Default *'./results_dafi'*. rand_seed : float Seed for numpy.random. If None random seed not set. Default *'None'*. verbosity: int Logging verbosity level, between -1 and 9 (currently -1-3 used). For no logging use *'-1'*. For debug-level logging use *'debug'*. Default *'0'*. inputs_inverse : dict Inverse method specific inputs. Default empty dictionary *'{}'*. inputs_model : dict Physics model specific inputs. Default empty dictionary *'{}'*. Returns ------- state_analysis_list : list List of analysis states at each DA time. Length is number of DA time steps. Each entry is an nd.array containing the ensemble analysis states (:math:`x_a`) at that time step. If only one DA time (e.g. stationary, inversion problem) is the single ndarray. Each entry is: *dtype=float*, *ndim=2*, *shape=(nstate, nsamples)*. """ # collect inputs # the inverse method and physics model are allowed to modify these inputs_dafi = { 'model_file': model_file, 'inverse_method': inverse_method, 'nsamples': nsamples, 'ntime': ntime, 'perturb_obs_option': perturb_obs_option, 'obs_err_multiplier': obs_err_multiplier, 'analysis_to_obs': analysis_to_obs, 'convergence_option': convergence_option, 'max_iterations': max_iterations, 'convergence_residual': convergence_residual, 'convergence_factor': convergence_factor, 'save_level': save_level, 'save_dir': save_dir, 'rand_seed': rand_seed, 'verbosity': verbosity, } # configure logger if verbosity == 'debug': logging.basicConfig(format='%(message)s', level=logging.DEBUG) else: logging.basicConfig(format='%(message)s', level=_log_level(verbosity)) logger = logging.getLogger(__name__) # random seed # this is done before importing local modules that use np.random if rand_seed is not None: np.random.seed(rand_seed) # stationary problem if inputs_dafi['ntime'] is None: inputs_dafi['ntime'] = 1 # create save directory if save_level != None: _create_dir(save_dir) # initialize physics model sys.path.append(os.path.dirname(os.path.abspath(model_file))) model_module = os.path.basename(os.path.splitext(model_file)[0]) Model = getattr( importlib.import_module(model_module), 'Model') model = Model(inputs_dafi, inputs_model) # initialize inverse method Inverse = getattr( importlib.import_module('dafi.inverse'), inverse_method) inverse = Inverse(inputs_dafi, inputs_inverse) # log log_message = "Solving the inverse problem" + \ f"\nPhysics Model: {model.name}" + \ f"\nInverse Method: {inverse.name}" logger.log(_log_level(0), log_message) # solve state_analysis_list = _solve(inputs_dafi, inverse, model) # log logger.log(_log_level(0), 'Done.') if inputs_dafi['ntime'] == 1: state_analysis_list = state_analysis_list[0] return state_analysis_list
def _solve(inputs_dafi, inverse, model): """ Solve the inverse problem. Implements the general inverse problem consisting of an outer (time) loop and inner (iteration) loop. """ logger = logging.getLogger(__name__ + '._solve') # time and iteration arrays time_array = np.arange(inputs_dafi['ntime'], dtype=int) iteration_array = np.arange(inputs_dafi['max_iterations'], dtype=int) # initial ensemble state_forecast = model.generate_ensemble() # outer loop - time marching state_analysis_list = [] for time in time_array: log_message = f"\nData assimilation step: {time}" logger.log(_log_level(1), log_message) # dynamic model - propagate the state ensemble to current DA time. if time != 0: state_forecast = model.forecast_to_time(state_analysis, time) # get observations at current DA time obs_vec, obs_error = model.get_obs(time) obs_error *= inputs_dafi['obs_err_multiplier'] obs = np.tile(obs_vec, (inputs_dafi['nsamples'], 1)).T if inputs_dafi['perturb_obs_option'] == 'time': obs, obs_perturbation = _perturb_vec( obs_vec, obs_error, inputs_dafi['nsamples']) # prior state for iterative methods that require it state_prior = state_forecast # inner loop - iterative analysis at fixed DA time. misfit_list = [] noise_list = [] residual_list = [] tdir = os.path.join(inputs_dafi['save_dir'], f"t_{time}") early_stop = False for iteration in iteration_array: # log log_message = f"\n Iteration: {iteration}" logger.log(_log_level(2), log_message) ts = tm.time() # map the state vector to observation space if iteration != 0: state_forecast = state_analysis.copy() state_in_obsspace = model.state_to_observation(state_forecast) print(f' Ensemble of forecast ... {tm.time()-ts:.2f}s') if iteration == 0: state_in_obsspace_prior = state_in_obsspace # perturb observations if inputs_dafi['perturb_obs_option'] == 'iter': obs, obs_perturbation = _perturb_vec( obs_vec, obs_error, inputs_dafi['nsamples']) # data assimilaton ts = tm.time() # inflation if inverse.inflation_flag: if iteration == 0: inverse.gamma = inverse.gamma elif iteration == 1: coeff = 1.0 / np.sqrt(inputs_dafi['nsamples'] - 1.0) Sd = coeff * (state_in_obsspace - state_in_obsspace.mean(axis=1,keepdims=1)) Sd_norm = np.sqrt(np.linalg.inv(obs_error)).dot(Sd) inverse.gamma *= np.trace(Sd_norm.dot(Sd_norm.T)) / len(obs_vec) elif iteration != 1: inverse.gamma = inverse.gamma * inverse.beta print(inverse.gamma) state_analysis = inverse.analysis( iteration, state_forecast, state_in_obsspace, obs, obs_error, obs_vec) print(f' Data assimilation analysis ... {tm.time()-ts:.2f}s') # save results if inputs_dafi['save_level'] in {'iter', 'debug'}: results = {'y': obs, 'Hx': state_in_obsspace, 'xa': state_analysis, 'xf': state_forecast, } for key, val in results.items(): dir = os.path.join(tdir, key) _create_dir(dir) file = key + f'_{iteration}' np.savetxt(os.path.join(dir, file), val) # check convergence diff = obs - state_in_obsspace misfit_norm = np.linalg.norm(np.mean(diff, axis=1)) misfit_list.append(misfit_norm) # inflation if inverse.inflation_flag and misfit_list[iteration] > misfit_list[iteration-1]: dir = os.path.join(tdir, 'xf') state_forecast = np.loadtxt(dir + '/xf_{}'.format(iteration-1)) for loop in range(5): # TODO: put 5 in inputfile # log print(f' Inner iteration: {loop}') inverse.gamma = inverse.gamma * inverse.alpha dir = os.path.join(tdir, 'Hx') state_in_obsspace = np.loadtxt(dir + '/Hx_{}'.format(iteration-1)) # perturb observations if inputs_dafi['perturb_obs_option'] == 'iter': obs, obs_perturbation = _perturb_vec( obs_vec, obs_error, inputs_dafi['nsamples']) diff = obs - state_in_obsspace misfit_norm = np.linalg.norm(np.mean(diff, axis=1)) if misfit_norm < misfit_list[iteration-1]: misfit_list[iteration] = misfit_norm break state_analysis = inverse.analysis( iteration, state_forecast, state_in_obsspace, obs, obs_error, obs_vec) print(f' misfit = ... {misfit_norm}s') conv, log_message, (residual, noise) = _convergence( misfit_list, obs_error, inputs_dafi['convergence_factor'], inputs_dafi['convergence_residual'], inputs_dafi['convergence_option']) residual_list.append(residual) noise_list.append(noise) if len(iteration_array) > 1: logger.log(_log_level(3), log_message) if conv and (iteration < inputs_dafi['max_iterations']): early_stop = True break # save inner loop convergence history if inputs_dafi['save_level'] in {'iter', 'debug'}: convergence = {'misfit': misfit_list, 'min_discrepancy': noise_list, 'residual': residual_list, } for key, val in convergence.items(): np.savetxt(os.path.join(tdir, key), val) key, val = ('iteration', np.array([iteration], dtype=int)) np.savetxt(os.path.join(tdir, key), val, fmt='%i') # log inner loop summary if early_stop: message = "convergence, early stop." else: message = "max iteration reached." log_message = "\n Inversion completed: " + message logger.log(_log_level(2), log_message) # map analysis state to observation space and save if inputs_dafi['analysis_to_obs']: log_message = "\n Mapping final analysis states " + \ "to observation space." logger.log(_log_level(2), log_message) state_in_obsspace = model.state_to_observation(state_analysis) if inputs_dafi['save_level'] in {'iter', 'debug'}: dir = os.path.join(tdir, 'Hx') file = 'Hxa' np.savetxt(os.path.join(dir, file), state_in_obsspace) # save outer loop if inputs_dafi['save_level'] in {'time', 'iter', 'debug'}: results = {'y': obs, 'Hx': state_in_obsspace_prior, 'R': obs_error, 'xa': state_analysis, 'xf': state_prior } if inputs_dafi['analysis_to_obs']: results['Hxa'] = state_in_obsspace for key, val in results.items(): dir = os.path.join(inputs_dafi['save_dir'], key) _create_dir(dir) file = key + f'_{time}' np.savetxt(os.path.join(dir, file), val) # model cleanup try: model.clean('iter') except AttributeError: pass # collect output state_analysis_list.append(state_analysis) # model cleanup try: model.clean('time') except AttributeError: pass return state_analysis_list def _convergence(misfit_list, obs_error, noise_factor, min_residual, option): """ Calculate convergence metrics. Also returns convergence decision and log message. """ # Convergence: discrepancy principle noise_level = np.sqrt(np.trace(obs_error)) if noise_factor is None: conv_discrepancy = False else: noise_criteria = noise_factor * noise_level conv_discrepancy = misfit_list[-1] < noise_criteria # Convergence: residual of misfit iteration = len(misfit_list) - 1 if iteration > 0: residual = abs(misfit_list[iteration] - misfit_list[iteration-1]) residual /= abs(misfit_list[0]) else: residual = np.nan if min_residual is None: conv_residual = False else: conv_residual = residual < min_residual # Convergence if option == 'max': conv = False elif option == 'discrepancy': conv = conv_discrepancy elif option == 'residual': conv = conv_residual # log log_message = f" Convergence (variance): {conv_discrepancy}" log_message += f"\n Norm of misfit: {misfit_list[-1]}" log_message += f"\n Noise level: {noise_criteria}" log_message += f"\n Convergence (residual): {conv_residual}" log_message += f"\n Relative iterative residual: {residual}" log_message += f"\n Relative convergence criterion: {min_residual}" return conv, log_message, (residual, noise_criteria) def _log_level(verbosity): """Return log level for specified verbosity level. """ message = 'Verbosity should be between -2 and 9' if verbosity > 9: warnings.warn(message, RuntimeWarning) verbosity = 9 elif verbosity < -1: warnings.warn(message, RuntimeWarning) verbosity = -1 level = 29 - verbosity return level def _perturb_vec(mean, cov, nsamps, perturb_diag=1e-10): """ Create samples of a random vector. Uses a multivariate Gaussian distribution. Returns matrix of samples where each column is a sample. """ ndim = len(mean) # check symmetric if not np.allclose(cov, cov.T): raise ValueError('Covariance matrix is not symmetric.') # add small value to diagonal for numerical stability of Cholesky decomp. cov += np.eye(ndim)*perturb_diag # Cholesky decomposition l_mat = np.linalg.cholesky(cov) # create correlated perturbations x_mat = np.random.normal(loc=0.0, scale=1.0, size=(ndim, nsamps)) perturb = np.matmul(l_mat, x_mat) return np.tile(mean, (nsamps, 1)).T + perturb, perturb def _create_dir(dir,): """ Create directory if it does not exist. """ if not os.path.exists(dir): os.makedirs(dir)