import os
import yaml
import cftime
import numpy as np
from pyproj import Proj
from NEDAS.grid import Grid
from NEDAS.utils.netcdf_lib import nc_write_var
from NEDAS.utils.conversion import ensure_list, proj2dict, s2t, dt1h
from NEDAS.core.context import Context
[docs]
def get_task_list(c: Context, **kwargs):
"""get a list of tasks to be done, as unique kwargs to be passed to run()"""
# parse kwargs
# load the default config first
default_config_file = os.path.join(os.path.dirname(__file__), 'default.yml')
kwargs_from_default: dict = {}
with open(default_config_file, 'r') as f:
loaded_default = yaml.safe_load(f)
if isinstance(loaded_default, dict):
kwargs_from_default = loaded_default
kwargs_from_config_file: dict = {}
if 'config_file' in kwargs and kwargs['config_file'] is not None:
with open(kwargs['config_file'], 'r') as f:
loaded_config = yaml.safe_load(f)
if isinstance(loaded_config, dict):
kwargs_from_config_file = loaded_config
# overwrite with kwargs
kwargs = {**kwargs_from_default, **kwargs_from_config_file, **kwargs}
# generate task list
tasks = []
for member in range(c.nens):
for vname in ensure_list(kwargs['variables']):
tasks.append({**kwargs, 'variable': vname, 'member': member})
return tasks
[docs]
def get_file_list(c: Context, **kwargs):
"""Return a list of files to be opened for collective i/o"""
if 'time' in kwargs:
time_start = s2t(kwargs['time'])
else:
time_start = c.time
files = []
for member in range(c.nens):
file = kwargs['file'].format(time=time_start, member=member+1)
if file not in files:
files.append(file)
return files
[docs]
def run(c: Context, **kwargs):
"""
Run diagnostics: convert model restart variables into netcdf files, given formatting options in kwargs
"""
vname = kwargs['variable']
member = kwargs['member']
model_name = kwargs['model_src']
model = c.models[model_name]
grid_def = kwargs['grid_def']
if grid_def:
proj = Proj(grid_def['proj'])
xmin = grid_def['xmin']
xmax = grid_def['xmax']
ymin = grid_def['ymin']
ymax = grid_def['ymax']
dx = grid_def['dx']
centered = grid_def.get('centered', False)
grid = Grid.regular_grid(proj, xmin, xmax, ymin, ymax, dx, centered)
else:
grid = model.grid
proj_params = proj2dict(grid.proj)
x = grid.x[0, :] / 1e5
y = grid.y[:, 0] / 1e5 # convert to 100km units
lon, lat = grid.proj(grid.x, grid.y, inverse=True)
if 'time' in kwargs:
time_start = s2t(kwargs['time'])
else:
time_start = c.time
dt_hours = kwargs.get("dt_hours", getattr(model, 'output_dt'))
forecast_hours = kwargs.get("forecast_hours", c.config.cycle_period)
time_units = kwargs.get('time_units', 'seconds since 1970-01-01T00:00:00+00:00')
time_calendar = kwargs.get('time_calendar', 'standard')
t_steps = range(0, forecast_hours, dt_hours)
path = c.fs.forecast_dir(time_start, model_name)
for n_step, t_step in enumerate(t_steps):
t = time_start + t_step * dt1h
c.debug_message = f"convert_output on variable '{vname:20}' for {model_name:10} member {member+1:3} at {t}"
# read the variable from the model restart file
rec = model.variables[vname]
file = kwargs['file'].format(time=time_start, member=member+1)
c.fs.make_dir(os.path.dirname(file))
lon_name: str = kwargs.get('lon_name', 'lon')
lat_name: str = kwargs.get('lat_name', 'lat')
x_name: str = kwargs.get('x_name', 'x')
y_name: str = kwargs.get('y_name', 'y')
time_name: str = kwargs.get('time_name', 'time')
recno = {}
recno[time_name] = n_step
levels = rec.levels
is_vector = rec.is_vector
for k in levels:
# read the field from model restart file
model.read_grid(path=path, name=vname, time=t, member=member, k=k)
fld = model.read_var(path=path, name=vname, time=t, member=member, k=k)
model.grid.set_destination_grid(grid)
# convert to output grid
fld_ = model.grid.convert(fld, is_vector=is_vector)
# build dimension records
dims = {}
dims[time_name] = None # make time dimension unlimited in nc file
k_name = kwargs.get('k_name')
if len(levels) > 1:
dims[k_name] = None # add level dimension (unlimited) if there are multiple levels
dims[y_name] = grid.ny
dims[x_name] = grid.nx
# output the variable
if len(levels) > 1:
recno[k_name] = list(levels).index(k)
# variable attr
attr = {'standard_name':vname,
'units':rec.units,
'grid_mapping': proj_params['projection'],
'coordinates': f"{lon_name} {lat_name}",
}
if is_vector:
for i in range(2):
if isinstance(rec.name, tuple):
rec_name = rec.name[i]
else:
rec_name = rec.name+'_'+(x_name, y_name)[i]
nc_write_var(file, dims, rec_name, fld_[i,...], recno=recno, attr=attr, comm=c.comm)
else:
assert isinstance(rec.name, str)
nc_write_var(file, dims, rec.name, fld_, recno=recno, attr=attr, comm=c.comm)
# output the dimension variables
time = cftime.date2num(t, units=time_units)
time_attr = {'long_name': 'forecast time', 'units': time_units, 'calendar': time_calendar}
nc_write_var(file, {time_name:None}, time_name, time, dtype='float', recno=recno, attr=time_attr, comm=c.comm)
nc_write_var(file, {x_name:grid.nx}, x_name, x, attr={'standard_name':'projection_x_coordinate', 'units':'100 km'}, comm=c.comm)
nc_write_var(file, {y_name:grid.ny}, y_name, y, attr={'standard_name':'projection_y_coordinate', 'units':'100 km'}, comm=c.comm)
nc_write_var(file, {y_name:grid.ny, x_name:grid.nx}, lon_name, lon, attr={'standard_name':'longitude', 'units':'degrees_east'}, comm=c.comm)
nc_write_var(file, {y_name:grid.ny, x_name:grid.nx}, lat_name, lat, attr={'standard_name':'latitude', 'units':'degrees_north'}, comm=c.comm)
# output projection info
nc_write_var(file, {}, proj_params['projection'], np.array([1]), attr=proj_params, comm=c.comm)