Source code for NEDAS.datasets.rgps.rgps_obs

import os
import glob
import numpy as np
from datetime import datetime, timedelta, timezone
import pyproj
from scipy.spatial import KDTree
from NEDAS.grid import IrregularGrid
from NEDAS.core import Dataset
from NEDAS.core.types import VarDesc

from .utils import get_data_traj_pairs, get_triangulation, get_velocity, get_velocity_gradients, get_deform_div, get_deform_shear, get_deform_vort

[docs] class RgpsObs(Dataset): dataset_dir: str proj4: str DAYS_SEARCH_TOLERANCE: int DRIFT_MAX: float DRIFT_ERR_STD: float DEFORM_MAX: float DEFORM_ERR_STD: float def __init__(self, **kwargs): super().__init__(**kwargs) # variable dictionary for RGPS naming convention self.variables = { 'seaice_drift': VarDesc(name='null', dtype='float', is_vector=True, dt=24, levels=np.array([0]), z_units='m', units='km/day'), 'seaice_deform_shear': VarDesc(name='null', dtype='float', is_vector=False, dt=24, levels=np.array([0]), z_units='m', units='1/day'), 'seaice_deform_div': VarDesc(name='null', dtype='float', is_vector=False, dt=24, levels=np.array([0]), z_units='m', units='1/day'), 'seaice_deform_vort': VarDesc(name='null', dtype='float', is_vector=False, dt=24, levels=np.array([0]), z_units='m', units='1/day'), } # RGPS trajectory data (x,y) is in NorthPolarStereo projection: self.proj = pyproj.Proj(self.proj4) self.obs_operator = {} # TODO the get_traj_pairs functions sometimes fail, near the tree.query part, returns wrong index? # for now use the model.diag_variables for deform variables instead # 'seaice_drift': self.get_seaice_drift, # 'seaice_deform_shear': self.get_seaice_deform_shear, # 'seaice_deform_div': self.get_seaice_deform_div, # 'seaice_deform_vort': self.get_seaice_deform_vort, #}
[docs] def parse_kwargs(self, kwargs): kwargs = super().parse_kwargs(kwargs) assert kwargs['time'] is not None, 'rgps.read_obs: time is not defined' assert kwargs['obs_window_max'] is not None, 'rgps.read_obs: obs_window_max is not defined' assert kwargs['obs_window_min'] is not None, 'rgps.read_obs: obs_window_min is not defined' return kwargs
[docs] def filename(self, **kwargs): kwargs = super().parse_kwargs(kwargs) t = kwargs['time'] if t.tzinfo is None: t = t.replace(tzinfo=timezone.utc) if 'obs_window_min' in kwargs and 'obs_window_max' in kwargs: d_range = [kwargs['obs_window_min'], kwargs['obs_window_max']] else: d_range = [0] file_list = [] search = os.path.join(kwargs['path'], 'RGPS*traj.mat') for result in glob.glob(search): ss = result.split('_') t1 = datetime.strptime(ss[1], '%Y-%m-%d') t2 = datetime.strptime(ss[2], '%Y-%m-%d') t1 = t1.replace(tzinfo=timezone.utc) t2 = t2.replace(tzinfo=timezone.utc) for d in d_range: t_ = t + d * timedelta(hours=1) if t_ >= t1 and t_ <= t2 and result not in file_list: file_list.append(result) assert len(file_list)>0, 'no matching files found' return file_list
[docs] def read_obs_from_file(self, **kwargs): """read obs from rgps dataset""" kwargs = super().parse_kwargs(kwargs) obs_name = kwargs['name'] d0_out = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_min'] d1_out = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_max'] # target grid for obs_seq grid = kwargs['grid'] # note: x,y are obs location on grid.proj (in meters) # x0,y0,x1,y1 are position on rgps_proj (in kilometers), # triangles: velocity is defined on nodes and deform on elements # record: trajectory id, we process one record at a time obs_seq = {'obs':[], 'err_std':[], 't':[], 'z':[], 'y':[], 'x':[], 'x0':[], 'y0':[], 'triangles':[], 'record':[], 'index':[]} rec = 0 for file_name in self.filename(**kwargs): pairs = get_data_traj_pairs(file_name, d0_out, d1_out, self.DAYS_SEARCH_TOLERANCE) for x0, y0, t0, x1, y1, t1 in pairs: tri = get_triangulation(x0, y0) u, v = get_velocity(x0, y0, t0, x1, y1, t1) # convert x0,y0 from rgps_proj to grid.proj in meters x, y = grid.proj(*self.proj(x0*1000, y0*1000, inverse=True)) if obs_name == 'seaice_drift': for p in range(len(x)): # quality check if np.hypot(u[p], v[p]) > self.DRIFT_MAX or np.isnan(u[p]) or np.isnan(v[p]): continue # add the obs to sequence obs_seq['obs'].append([u[p], v[p]]) obs_seq['err_std'].append(self.DRIFT_ERR_STD) obs_seq['t'].append(t0[p]) obs_seq['z'].append(0.) obs_seq['y'].append(y[p]) obs_seq['x'].append(x[p]) obs_seq['record'].append(rec) obs_seq['index'].append(p) elif obs_name in ['seaice_deform_shear', 'seaice_deform_div', 'seaice_deform_vort']: x_elem = np.mean(x[tri.triangles], axis=1) y_elem = np.mean(y[tri.triangles], axis=1) t_elem = t0[tri.triangles[:, 0]] ux, uy, vx, vy = get_velocity_gradients(x0, y0, u, v) if obs_name == 'seaice_deform_shear': obs_values = np.hypot(ux - vy, uy + vx) elif obs_name == 'seaice_deform_div': obs_values = ux + vy elif obs_name == 'seaice_deform_vort': obs_values = vx - uy else: raise ValueError(f"unknown obs_name {obs_name}") for p in range(len(x_elem)): # quality check tri_mask = getattr(tri, 'mask') if tri_mask[p]: continue if np.abs(obs_values[p]) > self.DEFORM_MAX or np.isnan(obs_values[p]): continue # add the obs to sequence obs_seq['obs'].append(obs_values[p]) obs_seq['err_std'].append(self.DEFORM_ERR_STD) obs_seq['t'].append(t_elem[p]) obs_seq['z'].append(0.) obs_seq['y'].append(y_elem[p]) obs_seq['x'].append(x_elem[p]) obs_seq['record'].append(rec) obs_seq['index'].append(p) # save raw data for state_to_obs to use obs_seq['x0'].append(x0) obs_seq['y0'].append(y0) obs_seq['triangles'].append(tri.triangles) rec += 1 # convert from list to np.array # raw data are kept in list format obs_seq_arr = {} for key in ('obs', 'err_std', 't', 'y', 'x', 'z'): obs_seq_arr[key] = np.array(obs_seq[key]) if self.variables[obs_name].is_vector: obs_seq_arr['obs'] = obs_seq_arr['obs'].T # superob to target grid, coarsen and keep the valid grid points # make a mesh grid for the obs points obs_x, obs_y = obs_seq['x'], obs_seq['y'] obs_grid = IrregularGrid(grid.proj, obs_x, obs_y) # remove unwanted triangles in the mesh tri_a = getattr(obs_grid.tri, 'a') tri_p = getattr(obs_grid.tri, 'p') tri_ratio = getattr(obs_grid.tri, 'ratio') msk = np.logical_or(tri_a > 2e8, tri_p > 1e5, tri_ratio < 0.3) obs_grid = IrregularGrid(grid.proj, obs_x, obs_y, triangles=obs_grid.tri.triangles[~msk,:]) # convert to target grid with coarse graining (superobing) obs_grid.set_destination_grid(grid) obs_on_grid = obs_grid.convert(obs_seq['obs'], is_vector=self.variables[obs_name].is_vector, coarse_grain=True) # overwrite the obs info with superobs if self.variables[obs_name].is_vector: msk = np.isnan(obs_on_grid[0,...]) obs_seq_arr['obs'] = np.array([obs_on_grid[0,~msk].flatten(), obs_on_grid[1,~msk].flatten()]) else: msk = np.isnan(obs_on_grid) obs_seq_arr['obs'] = obs_on_grid[~msk].flatten() obs_seq_arr['x'] = grid.x[~msk].flatten() obs_seq_arr['y'] = grid.y[~msk].flatten() # other parameters for key in ('z', 't', 'err_std'): obs_seq_arr[key] = np.full(obs_seq_arr['x'].size, obs_seq_arr[key][0]) return obs_seq_arr
[docs] def get_model_files(self, **kwargs): kwargs = super().parse_kwargs(kwargs) # start and end time of the trajectories t0 = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_min'] t1 = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_max'] if kwargs['member'] is not None: mstr = '{:03d}'.format(kwargs['member']+1) else: mstr = '' file_list = [] t_list = [] search = os.path.join(kwargs['path'], '..', '..', '*', kwargs['model_src'], mstr, 'mesh*.bin') for result in glob.glob(search): if '00Z' not in result: # discard files without time string continue tstr = result.split('.')[-2].split('_')[-1] if 'post_regrid' in result: tstr = tstr.replace('0Z', '1Z') t = datetime.strptime(tstr, '%Y%m%dT%H%M%SZ') if t<t0 or t>t1: continue file_list.append(result) t_list.append(t) file_list = [file_list[n] for n in np.argsort(t_list)] assert len(file_list)>0, 'no matching files found' return file_list
[docs] def get_traj_pairs(self, **kwargs): """get nextsim simulated trajectory pairs, corresponding to rgps""" model = kwargs['model'] # rgps start positions on rgps_proj, in km units x0, y0 = kwargs['x0'], kwargs['y0'] # make a copy of start position to be updated to final position nrec = len(x0) x, y, i, dx, dy = [], [], [], [], [] for r in range(nrec): x.append(x0[r].copy()*1000) y.append(y0[r].copy()*1000) i.append(np.zeros(x0[r].shape, dtype='int')) dx.append(np.zeros(x0[r].shape)) dy.append(np.zeros(x0[r].shape)) # time for the start/end position t0 = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_min'] t1 = kwargs['time'] + timedelta(hours=1) * kwargs['obs_window_max'] file_list = self.get_model_files(**kwargs) for n in range(len(file_list)): fname = file_list[n] # get model mesh points in native proj, in meters model.read_grid(meshfile=fname, **kwargs) mx = model.grid.x my = model.grid.y # convert to rgps_proj, in km units x_, y_ = self.proj(*model.grid.proj(mx, my, inverse=True)) if n==0 or 'post_regrid' in fname: # build kdtree for spatial search if model mesh changed tree = KDTree(np.vstack([x_, y_]).T) # search for traj position in mesh for r in range(nrec): d, i[r] = tree.query(np.vstack([x[r], y[r]]).T) dx[r], dy[r] = x[r] - x_[i[r]], y[r] - y_[i[r]] if 'post_regrid' not in fname: # move to next position for r in range(nrec): x[r], y[r] = x_[i[r]] + dx[r], y_[i[r]] + dy[r] pairs = [] for r in range(nrec): pairs.append([x0[r], y0[r], np.full(x0[r].shape, t0), x[r]/1000, y[r]/1000, np.full(x0[r].shape, t1)]) return pairs
def _get_seaice_property(self, obs_name, compute_func, **kwargs): kwargs = super().parse_kwargs(kwargs) pairs = self.get_traj_pairs(**kwargs) xo, yo, tri, r, i = kwargs['x'], kwargs['y'], kwargs['triangles'], kwargs['record'], kwargs['index'] nobs = xo.size if self.variables[obs_name].is_vector: obs_seq = np.full((2, nobs), np.nan) else: obs_seq = np.full(nobs, np.nan) rec = 0 for x0, y0, t0, x1, y1, t1 in pairs: rind = np.where(r==rec) # indices in xo for record rec ind = i[rind] # indices in x0 that forms the record rec part of xo obs_value = compute_func(x0, y0, t0, x1, y1, t1) if self.variables[obs_name].is_vector: obs_seq[:, rind] = obs_value[:, ind] else: obs_seq[rind] = obs_value[ind] rec += 1 return obs_seq
[docs] def get_seaice_drift(self, **kwargs): return self._get_seaice_property('seaice_drift', get_velocity, **kwargs)
[docs] def get_seaice_deform_shear(self, **kwargs): return self._get_seaice_property('seaice_deform_shear', get_deform_shear, **kwargs)
[docs] def get_seaice_deform_div(self, **kwargs): return self._get_seaice_property('seaice_deform_div', get_deform_div, **kwargs)
[docs] def get_seaice_deform_vort(self, **kwargs): return self._get_seaice_property('seaice_deform_vort', get_deform_vort, **kwargs)