import numpy as np
from NEDAS.grid import Grid, IrregularGrid, Grid1D
from NEDAS.utils.multiscale import get_scale_component, get_error_scale_factor
from NEDAS.core import Context, Transform
[docs]
class ScaleBandpass(Transform):
"""
Subclass for scale bandpass filter to get a scale component.
"""
nscale: int
decompose_obs: bool
def __init__(self, c, decompose_obs=True, **kwargs):
self.decompose_obs = decompose_obs
# validate config parameters
self.nscale = c.config.niter
for key in ['resolution_level', 'character_length',]:
value = getattr(c.config, key)
assert isinstance(value, list), f"{value} is not a list"
assert len(value) == self.nscale, f"{value} length != {self.nscale}"
[docs]
def forward_state(self, c, rec, field):
if self.nscale == 1:
return field
# for state on analysis grid, just get_scale_component (using Fourier method)
# pad voids with zero
mask = np.isnan(field)
field[mask] = 0.0
field = get_scale_component(c.grid, field, c.config.character_length, c.iter)
field[mask] = np.nan
return field
[docs]
def backward_state(self, c, rec, field):
return field
[docs]
def forward_obs(self, c, obs_rec, obs_seq):
if self.nscale == 1:
return obs_seq
if not self.decompose_obs:
return obs_seq
# temporarily convert obs grid to the analysis grid
# create the irregular obs grid
if isinstance(c.grid, Grid1D):
raise NotImplementedError("ScaleBandpass transform is not implemented for 1D grid.")
obs_grid = IrregularGrid(c.grid.proj, obs_seq['x'], obs_seq['y'])
# remove unwanted triangles in the obs grid
tri_a = getattr(obs_grid.tri, 'a')
tri_p = getattr(obs_grid.tri, 'p')
tri_ratio = getattr(obs_grid.tri, 'ratio')
max_a = np.quantile(tri_a, 0.999)
max_p = np.quantile(tri_p, 0.99)
msk = np.logical_or(tri_a > max_a, tri_p > max_p, tri_ratio < 0.3)
# convert obs to analysis grid
obs_grid = IrregularGrid(c.grid.proj, obs_seq['x'], obs_seq['y'], triangles=obs_grid.tri.triangles[~msk,:])
obs_grid.set_destination_grid(c.grid)
obs_fld = obs_grid.convert(obs_seq['obs'], is_vector=obs_rec.is_vector, method='nearest', coarse_grain=False)
# pad voids with zeros, for convolution later
mask = np.isnan(obs_fld)
obs_fld[mask] = 0.0
# get scale component on analysis grid
obs_fld_new = get_scale_component(c.grid, obs_fld, c.config.character_length, c.iter)
if obs_rec.is_vector:
for i in range(2):
obs_seq['obs'][i,...] = c.grid.interp(obs_fld_new[i,...], obs_seq['x'], obs_seq['y'], method='nearest')
else:
obs_seq['obs'] = c.grid.interp(obs_fld_new, obs_seq['x'], obs_seq['y'], method='nearest')
# TODO: current implementation is very slow
# update obs err std because some averaging happened in get_scale_component
#obs_seq['err_std'] *= get_error_scale_factor(c.grid, c.character_length, c.iter)
assert c.config.obs_err_scale_fac is not None
obs_seq['err_std'] *= c.config.obs_err_scale_fac[c.iter]
return obs_seq
[docs]
def backward_obs(self, c, obs_rec, obs_seq):
return obs_seq