Source code for NEDAS.models.wrf.wrf_model

import os
import numpy as np
from pyproj import Proj
from NEDAS.grid import RegularGrid
from NEDAS.utils.conversion import dt1h
from NEDAS.core import Model
from NEDAS.core.types import VarDesc
# from .namelist import namelist
# from .bin_io import read_

[docs] class WRFModel(Model[RegularGrid]): map_proj: str ref_lat: float ref_lon: float truelat1: float truelat2: float max_dom: int e_we: list[int] e_sn: list[int] e_vert: list[int] dx: list[float] dy: list[float] model_code_dir: str nproc_per_run: int walltime: int|None z_units: str restart_dt: float def __init__(self, **kwargs): super().__init__(**kwargs) # derived default values self.ref_x = self.e_we[0] / 2 self.ref_y = self.e_sn[0] / 2 levels = np.arange(1, self.e_vert[0]+1, 1) # use domain 1 setting for z levels level_sfc = np.array([0]) self.variables = { 'atmos_velocity': VarDesc(name=('u_1', 'v_1'), dtype='float', is_vector=True, dt=self.restart_dt, levels=levels, units='m/s', z_units=self.z_units), 'atmos_surf_velocity': VarDesc(name=('U10', 'V10'), dtype='float', is_vector=True, dt=self.restart_dt, levels=level_sfc, units='m/s', z_units=self.z_units), 'atmos_temp': VarDesc(name='T', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='K', z_units=self.z_units), 'atmos_pres': VarDesc(name='P', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='Pa', z_units=self.z_units), 'atmos_q_vapor': VarDesc(name='QVAPOR', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='kg/kg', z_units=self.z_units), } self.read_grid()
[docs] def filename(self, **kwargs): kwargs = super().parse_kwargs(kwargs) tstr = kwargs['time'].strftime('%Y-%m-%d_%H:%M:%S') return os.path.join(kwargs['path'], 'wrfout_'+tstr+'.nc')
[docs] def read_grid(self, **kwargs): if self.map_proj == 'polar': proj = Proj(f'+proj=stere +lat_0={self.ref_lat} +lon_0={self.ref_lon} +lat_ts={self.truelat1}') elif self.map_proj == 'lambert': proj = Proj(f'+proj=lcc +lat_0={self.ref_lat} +lon_0={self.ref_lon} +lat_1={self.truelat1} +lat_2={self.truelat2}') elif self.map_proj == 'mercator': proj = Proj(f'+proj=merc +lat_0={self.ref_lat} +lon_0={self.ref_lon}') elif self.map_proj == 'lat-lon': proj = Proj(f'+proj=longlat') else: raise ValueError(f'unknown map_proj type {self.map_proj}') # staggering here? # again, use domain 1 settings for grid x_coords = (np.arange(self.e_we[0]) - self.ref_x) * self.dx y_coords = (np.arange(self.e_sn[0]) - self.ref_y) * self.dy x, y = np.meshgrid(x_coords, y_coords) self.grid = RegularGrid(proj, x, y)
[docs] def read_var(self, **kwargs): kwargs = super().parse_kwargs(kwargs) raise NotImplementedError
[docs] def write_var(self, var, **kwargs): kwargs = super().parse_kwargs(kwargs) pass
[docs] def z_coords(self, **kwargs): kwargs = super().parse_kwargs(kwargs) raise NotImplementedError
[docs] def preprocess(self, task_id=0, **kwargs): kwargs = super().parse_kwargs(kwargs) pass
[docs] def postprocess(self, task_id=0, **kwargs): pass
[docs] def run(self, task_id=0, **kwargs): kwargs = super().parse_kwargs(kwargs) self.run_status = 'running' fname = self.filename(**kwargs) run_dir = os.path.dirname(fname) print('running wrf model in '+run_dir, flush=True) wrf_src = os.path.join(self.model_code_dir, 'setup.src') wrf_exe = os.path.join(self.model_code_dir, 'main', 'wrf.exe') log_file = 'rsl.error.0000' # collect restart variables from bin and write to wrfrst # build the run command shell_cmd = ". "+wrf_src+"; " # enter wrf env shell_cmd += f"JOB_EXECUTE {wrf_exe} >& run.log" self.c.run_job(shell_cmd, job_name='wrf.run', run_dir=run_dir, nproc=self.nproc_per_run, offset=task_id*self.nproc_per_run, walltime=self.walltime, **kwargs)
# "SUCCESS COMPLETE" in log_file # wrfrst at nexttime collect to bin file