Source code for NEDAS.datasets.ecmwf.era5.era5data

"""Interface to ERA5 dataset stored 6 hourly in netCDF formats"""

import os
import numpy as np
from datetime import datetime, timedelta, timezone
from pyproj import Proj
import netCDF4
from NEDAS.grid import Grid
from NEDAS.utils.conversion import units_convert
from NEDAS.core import Dataset
from NEDAS.core.types import VarDesc
from NEDAS.datasets.ecmwf import atmos_utils

[docs] class ERA5Data(Dataset): def __init__(self, **kwargs): super().__init__(**kwargs) # variable dictionary for ERA5 naming convention level_sfc = np.array([0]) self.variables = { 'atmos_surf_velocity': VarDesc(name=('10U', '10V'), dtype='float', is_vector=True, dt=6, levels=level_sfc, z_units='Pa', units='m/s'), 'atmos_surf_temp': VarDesc(name='2T', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='K'), 'atmos_surf_dewpoint': VarDesc(name='2D', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='K'), 'atmos_surf_press': VarDesc(name='MSL', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='Pa'), 'atmos_precip': VarDesc(name='TP', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='m/s'), 'atmos_down_longwave': VarDesc(name='STRD', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='W/m2'), 'atmos_down_shortwave': VarDesc(name='SSRD', dtype='float', is_vector=False, dt=6, levels=level_sfc, z_units='Pa', units='W/m2'), } self.diag_variable_getter = { 'atmos_surf_vapor_mix': self.get_vapmix, } self.grid = None # #format filename
[docs] def filename(self, **kwargs): kwargs = super().parse_kwargs(kwargs) assert kwargs['time'] is not None, 'please specify time' assert kwargs['name'] is not None, 'please specify variable name' year = '{:04d}'.format(kwargs['time'].year) rec = self.variables[kwargs['name']].asdict() if rec['is_vector']: files = [] for name in rec['name']: file = os.path.join(kwargs['path'], f"6h.{name}_{year}.nc") assert os.path.exists(file), 'file not found: '+file files.append(file) return files else: file = os.path.join(kwargs['path'], f"6h.{rec['name']}_{year}.nc") assert os.path.exists(file), 'file not found: '+file files = [file] return files
[docs] def read_grid(self, **kwargs): kwargs = super().parse_kwargs(kwargs) fname = self.filename(**kwargs)[0] with netCDF4.Dataset(fname) as f: lon = f['longitude'][:].data lat = f['latitude'][:].data x, y = np.meshgrid(lon, lat) self.grid = Grid(Proj('+proj=longlat'), x, y, cyclic_dim='x', pole_dim='y', pole_index=(0,))
# find the nearest index in data for the given t
[docs] def find_time_index(self, time_series, time): t_ = (time - datetime(1900,1,1,tzinfo=timezone.utc)) / timedelta(hours=1) ind = np.abs(time_series - t_).argmin() return ind
[docs] def read_var(self, **kwargs): kwargs = super().parse_kwargs(kwargs) name = kwargs['name'] assert name is not None, 'please specify which variable (name=?) to get' time = kwargs['time'] k = kwargs.get('k', 0) rec = self.variables[name].asdict() if time is None: t_index = 0 if name in self.diag_variable_getter: var = self.diag_variable_getter[name](**kwargs) else: files = self.filename(**kwargs) if rec['is_vector']: var = [] for i,file in enumerate(files): with netCDF4.Dataset(file) as f: t_in_file = f['time'][:].data t_index = self.find_time_index(t_in_file, time) dat = f.variables[rec['name'][i]][t_index, ...] tmp = dat.data tmp[dat.mask] = np.nan var.append(tmp) var = np.array(var) else: with netCDF4.Dataset(files[0]) as f: t_in_file = f['time'][:].data t_index = self.find_time_index(t_in_file, time) dat = f.variables[rec['name']][t_index, ...] var = dat.data var[dat.mask] = np.nan # convert units if necessary if rec['name'] in ('TP', 'STRD', 'SSRD'): # need to convert the fluxes to per second units (they are per 6 hours in the dataset) var /= 3600. * 6 var = units_convert(rec['units'], kwargs['units'], var) return var
[docs] def get_vapmix(self, **kwargs): dewpoint = self.read_var(**{**kwargs, 'name':'atmos_surf_dewpoint', 'units':'K'}) e_sat = atmos_utils.satvap(dewpoint) press = self.read_var(**{**kwargs, 'name':'atmos_surf_press', 'units':'Pa'}) vapmix = atmos_utils.vapmix(e_sat, press) return vapmix
[docs] def read_obs_from_file(self, **kwargs): raise NotImplementedError