Source code for NEDAS.assim_tools.transforms.scale_bandpass

import numpy as np
from NEDAS.grid import Grid, IrregularGrid, Grid1D
from NEDAS.utils.multiscale import get_scale_component, get_error_scale_factor
from NEDAS.core import Context, Transform

[docs] class ScaleBandpass(Transform): """ Subclass for scale bandpass filter to get a scale component. """ nscale: int decompose_obs: bool def __init__(self, c, decompose_obs=True, **kwargs): self.decompose_obs = decompose_obs # validate config parameters self.nscale = c.config.niter for key in ['resolution_level', 'character_length',]: value = getattr(c.config, key) assert isinstance(value, list), f"{value} is not a list" assert len(value) == self.nscale, f"{value} length != {self.nscale}"
[docs] def forward_state(self, c, rec, field): if self.nscale == 1: return field # for state on analysis grid, just get_scale_component (using Fourier method) # pad voids with zero mask = np.isnan(field) field[mask] = 0.0 field = get_scale_component(c.grid, field, c.config.character_length, c.iter) field[mask] = np.nan return field
[docs] def backward_state(self, c, rec, field): return field
[docs] def forward_obs(self, c, obs_rec, obs_seq): if self.nscale == 1: return obs_seq if not self.decompose_obs: return obs_seq # temporarily convert obs grid to the analysis grid # create the irregular obs grid if isinstance(c.grid, Grid1D): raise NotImplementedError("ScaleBandpass transform is not implemented for 1D grid.") obs_grid = IrregularGrid(c.grid.proj, obs_seq['x'], obs_seq['y']) # remove unwanted triangles in the obs grid tri_a = getattr(obs_grid.tri, 'a') tri_p = getattr(obs_grid.tri, 'p') tri_ratio = getattr(obs_grid.tri, 'ratio') max_a = np.quantile(tri_a, 0.999) max_p = np.quantile(tri_p, 0.99) msk = np.logical_or(tri_a > max_a, tri_p > max_p, tri_ratio < 0.3) # convert obs to analysis grid obs_grid = IrregularGrid(c.grid.proj, obs_seq['x'], obs_seq['y'], triangles=obs_grid.tri.triangles[~msk,:]) obs_grid.set_destination_grid(c.grid) obs_fld = obs_grid.convert(obs_seq['obs'], is_vector=obs_rec.is_vector, method='nearest', coarse_grain=False) # pad voids with zeros, for convolution later mask = np.isnan(obs_fld) obs_fld[mask] = 0.0 # get scale component on analysis grid obs_fld_new = get_scale_component(c.grid, obs_fld, c.config.character_length, c.iter) if obs_rec.is_vector: for i in range(2): obs_seq['obs'][i,...] = c.grid.interp(obs_fld_new[i,...], obs_seq['x'], obs_seq['y'], method='nearest') else: obs_seq['obs'] = c.grid.interp(obs_fld_new, obs_seq['x'], obs_seq['y'], method='nearest') # TODO: current implementation is very slow # update obs err std because some averaging happened in get_scale_component #obs_seq['err_std'] *= get_error_scale_factor(c.grid, c.character_length, c.iter) assert c.config.obs_err_scale_fac is not None obs_seq['err_std'] *= c.config.obs_err_scale_fac[c.iter] return obs_seq
[docs] def backward_obs(self, c, obs_rec, obs_seq): return obs_seq