import numpy as np
from NEDAS.models.vort2d import Vort2DModel
from NEDAS.datasets.synthetic import SyntheticObs
from NEDAS.core.types import VarDesc
[docs]
class Vort2DObs(SyntheticObs):
network_type: str
obs_range: float
def __init__(self, **kwargs):
super().__init__(**kwargs)
restart_dt = 6
self.variables = {
'velocity': VarDesc(name='null', dtype='float', is_vector=True, dt=restart_dt, levels=np.array([0]), z_units='m', units='m/s'),
'vortex_position': VarDesc(name='null', dtype='float', is_vector=True, dt=restart_dt, levels=np.array([0]), z_units='m', units='m'),
'vortex_intensity': VarDesc(name='null', dtype='float', is_vector=False, dt=restart_dt, levels=np.array([0]), z_units='m', units='m/s'),
'vortex_size': VarDesc(name='null', dtype='float', is_vector=False, dt=restart_dt, levels=np.array([0]), z_units='m', units='m'),
}
self.obs_operator = {
'vortex_position': self.get_vortex_position,
'vortex_intensity': self.get_vortex_intensity,
'vortex_size': self.get_vortex_size,
}
[docs]
def generate_obs_network(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
name = kwargs['name']
model = kwargs['model']
assert isinstance(model, Vort2DModel)
grid = kwargs['grid']
# get truth vortex position, some network is vortex-following
velocity = self.get_velocity(**{**kwargs, 'path': model.truth_dir})
# diagnose the vortex position on grid
i, j = self.vortex_position(velocity[0,...], velocity[1,...])
true_center_x, true_center_y = grid.x[j,i], grid.y[j,i]
if name == 'velocity':
nobs = kwargs['nobs']
if self.network_type == 'global':
if nobs is None:
nobs = 1000
y = np.random.uniform(grid.ymin, grid.ymax, nobs)
x = np.random.uniform(grid.xmin, grid.xmax, nobs)
elif self.network_type == 'targeted':
if nobs is None:
nobs = 800
x, y = [], []
while len(x) < nobs:
x1 = np.random.uniform(true_center_x - self.obs_range, true_center_x + self.obs_range)
y1 = np.random.uniform(true_center_y - self.obs_range, true_center_y + self.obs_range)
dist = np.hypot(x1 - true_center_x, y1 - true_center_y)
if dist <= self.obs_range:
x.append(x1)
y.append(y1)
x = np.array(x)
y = np.array(y)
else:
raise ValueError('unknown network type: '+self.network_type)
obs_seq = {'obs': np.full(nobs, np.nan),
't': np.full(nobs, kwargs['time']),
'z': np.zeros(nobs),
'y': y,
'x': x,
'err_std': np.ones(nobs) * kwargs['err']['std']
}
elif name == 'vortex_position':
obs_seq = {'obs': np.array([[np.nan, np.nan]]),
't': np.array([kwargs['time']]),
'z': np.array([0]),
'y': np.array([true_center_y]),
'x': np.array([true_center_x]),
'err_std': np.array([kwargs['err']['std']])
}
elif name in ['vortex_intensity', 'vortex_size']:
obs_seq = {'obs': np.array([np.nan]),
't': np.array([kwargs['time']]),
'z': np.array([0]),
'y': np.array([true_center_y]),
'x': np.array([true_center_x]),
'err_std': np.array([kwargs['err']['std']])
}
else:
raise ValueError('unknown obs variable: '+name)
return obs_seq
# #utility functions for obs diagnostics
[docs]
def vortex_position(self, u, v):
ny, nx = u.shape
# compute vorticity
zeta = (np.roll(v, -1, axis=1) - np.roll(v, 1, axis=1) - np.roll(u, -1, axis=0) + np.roll(u, 1, axis=0)) / 2.0
# search for max vorticity
zmax = -999
center_x, center_y = -1, -1
buff = 6
center_i, center_j = None, None
for j in range(buff, ny-buff):
for i in range(buff, nx-buff):
z = np.sum(zeta[j-buff:j+buff, i-buff:i+buff])
if z > zmax:
zmax = z
center_i, center_j = i, j
return center_i, center_j
[docs]
def vortex_intensity(self, u, v):
return np.max(np.hypot(u, v))
[docs]
def vortex_size(self, u, v, center_i, center_j):
wind = np.hypot(u, v)
ny, nx = wind.shape
nr = 30
wind_min = 15
wind_rad = np.zeros(nr)
count_rad = np.zeros(nr)
for j in range(-nr, nr+1):
for i in range(-nr, nr+1):
r = int(np.sqrt(i**2+j**2))
if r < nr:
wind_rad[r] += wind[int(center_j+j)%ny, int(center_i+i)%nx]
count_rad[r] += 1
wind_rad = wind_rad/count_rad
if np.max(wind_rad)<wind_min or np.where(wind_rad>=wind_min)[0].size==0:
Rsize = -1
else:
i1 = np.where(wind_rad>=wind_min)[0][-1] # #last point with wind > 35knot
if i1==nr-1:
Rsize = i1
else:
Rsize = i1 + (wind_rad[i1] - wind_min) / (wind_rad[i1] - wind_rad[i1+1])
return Rsize
[docs]
def get_velocity(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
model = kwargs['model']
assert isinstance(model, Vort2DModel), 'get_velocity: ERROR: model must be an instance of Vort2DModel'
grid = kwargs['grid']
# read the velocity field from truth
model_velocity = model.read_var(**{**kwargs, 'name':'velocity'})
# convert velocity to target grid
model.grid.set_destination_grid(grid)
velocity = model.grid.convert(model_velocity, is_vector=True)
return velocity
[docs]
def get_vortex_position(self, **kwargs):
velocity = self.get_velocity(**kwargs)
grid = kwargs['model'].grid
center_i, center_j = self.vortex_position(velocity[0,...], velocity[1,...])
obs_seq = np.zeros((2, 1), dtype='float')
obs_seq[0,0] = grid.x[center_j, center_i]
obs_seq[1,0] = grid.y[center_j, center_i]
return obs_seq
[docs]
def get_vortex_intensity(self, **kwargs):
velocity = self.get_velocity(**kwargs)
Vmax = self.vortex_intensity(velocity[0,...], velocity[1,...])
return np.array([Vmax])
[docs]
def get_vortex_size(self, **kwargs):
velocity = self.get_velocity(**kwargs)
dx = kwargs['model'].grid.dx
center_i, center_j = self.vortex_position(velocity[0,...], velocity[1,...])
Rsize = self.vortex_size(velocity[0,...], velocity[1,...], center_i, center_j)
Rsize = Rsize * dx
return np.array([Rsize])