Source code for NEDAS.models.topaz.v5.topaz5model

import os
import glob
from functools import lru_cache
from datetime import datetime, timedelta, timezone
import numpy as np

from NEDAS.utils.conversion import units_convert, t2s, dt1h
from NEDAS.utils.netcdf_lib import nc_read_var, nc_write_var
from NEDAS.utils.progress import watch_log, find_keyword_in_file, watch_files
from NEDAS.grid import RegularGrid
from NEDAS.core import Model
from NEDAS.core.types import VarDesc, IOMode

from ..time_format import dayfor
from ..abfile import ABFileRestart, ABFileArchv, ABFileForcing
from ..model_grid import get_topaz_grid, get_depth, get_mean_ssh, stagger, destagger
from .namelist import namelist
from .postproc import adjust_dp, stmt_fns_sigma, stmt_fns_kappaf
from .cice_utils import thickness_upper_limit, adjust_ice_variables, fix_zsin_profile

[docs] class Topaz5Model(Model[RegularGrid]): """ TOPAZ5 model class. """ io_mode: IOMode = 'offline' nhc_root: str basedir: str model_env: str|None reanalysis_code: str V: str X: str T: str R: str E: str idm: int jdm: int kdm: int nproc: int nproc_per_run: int nproc_per_util: int use_job_array: bool walltime: int|None stagnant_log_timeout: int meanssh_file: str forcing_file: str restart_dt: int output_dt: int forcing_dt: int z_units: str thflag: int thref: float thbase: float kapref: int yrflag: int ONEM: float MIN_SEAICE_CONC: float MAX_OCEAN_TEMP: float MIN_OCEAN_SALN: float MAX_OCEAN_SALN: float ncat: int Nilayer: int saltmax: float min_salin: float depressT: float nsal: float msal: float aice_thresh: float fice_thresh: float hice_impact: float def __init__(self, **kwargs): super().__init__(**kwargs) levels = np.arange(self.kdm) + 1 # ocean levels, from top to bottom, k=1..kdm level_sfc = np.array([0]) # some variables are only defined on surface level k=0 self.restart_variables = { 'ocean_velocity': VarDesc(name=('u', 'v'), dtype='float', is_vector=True, dt=self.restart_dt, levels=levels, units='m/s', z_units=self.z_units), 'ocean_layer_thick': VarDesc(name='dp', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='Pa', z_units=self.z_units), 'ocean_temp': VarDesc(name='temp', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='C', z_units=self.z_units), 'ocean_saln': VarDesc(name='saln', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='psu', z_units=self.z_units), 'ocean_b_velocity': VarDesc(name=('ubavg', 'vbavg'), dtype='float', is_vector=True, dt=self.restart_dt, levels=level_sfc, units='m/s', z_units=self.z_units), 'ocean_b_press': VarDesc(name='pbavg', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units), 'ocean_mixl_depth': VarDesc(name='dpmixl', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units), 'ocean_bot_press': VarDesc(name='pbot', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units), 'ocean_bot_dense': VarDesc(name='thkk', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='?', z_units=self.z_units), 'ocean_bot_montg_pot': VarDesc(name='psikk', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='?', z_units=self.z_units), } self.archive_variables = { 'ocean_velocity_daily': VarDesc(name=('u-vel.', 'v-vel.'), dtype='float', is_vector=True, dt=self.output_dt, levels=levels, units='m/s', z_units=self.z_units), 'ocean_layer_thick_daily': VarDesc(name='thknss', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='Pa', z_units=self.z_units), 'ocean_temp_daily': VarDesc(name='temp', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='C', z_units=self.z_units), 'ocean_saln_daily': VarDesc(name='salin', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='psu', z_units=self.z_units), 'ocean_mixl_depth_daily': VarDesc(name='mix_dpth', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='Pa', z_units=self.z_units), 'ocean_dense_daily': VarDesc(name='dense', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='?', z_units=self.z_units), 'ocean_surf_height_daily': VarDesc(name='srfhgt', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units), } self.iced_variables = { 'seaice_velocity': VarDesc(name=('uvel', 'vvel'), dtype='float', is_vector=True, dt=self.restart_dt, levels=level_sfc, units='m/s', z_units=self.z_units), } for n in range(self.ncat): self.iced_variables[f"seaice_conc_cat{n}"] = VarDesc(name='aicen', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units=1, z_units=self.z_units) self.iced_variables[f"seaice_volume_cat{n}"] = VarDesc(name='vicen', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units) self.iced_variables[f"snow_volume_cat{n}"] = VarDesc(name='vsnon', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units) self.iced_variables[f"seaice_age_cat{n}"] = VarDesc(name='iage', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='days', z_units=self.z_units) self.iceh_variables = { 'seaice_velocity_daily': VarDesc(name=('uvel_d', 'vvel_d'), dtype='float', is_vector=True, dt=self.output_dt, levels=level_sfc, units='m/s', z_units=self.z_units), 'seaice_conc_daily': VarDesc(name='aice_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units=1, z_units=self.z_units), 'seaice_thick_daily': VarDesc(name='hi_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units), 'seaice_surf_temp_daily': VarDesc(name='Tsfc_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='C', z_units=self.z_units), 'seaice_saln_daily': VarDesc(name='sice_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='psu', z_units=self.z_units), 'snow_thick_daily': VarDesc(name='hs_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units), 'seaice_age_daily': VarDesc(name='iage_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='days', z_units=self.z_units), } self.atmos_forcing_variables = { 'atmos_surf_velocity': VarDesc(name=('wndewd', 'wndnwd'), dtype='float', is_vector=True, dt=self.forcing_dt, levels=level_sfc, units='m/s', z_units=self.z_units), 'atmos_surf_temp': VarDesc(name='airtmp', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='C', z_units=self.z_units), 'atmos_surf_dewpoint': VarDesc(name='dewpt', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='K', z_units=self.z_units), 'atmos_surf_press': VarDesc(name='mslprs', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='Pa', z_units=self.z_units), 'atmos_precip': VarDesc(name='precip', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='m/s', z_units=self.z_units), 'atmos_down_longwave': VarDesc(name='radflx', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='W/m2', z_units=self.z_units), 'atmos_down_shortwave': VarDesc(name='shwflx', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='W/m2', z_units=self.z_units), 'atmos_surf_vapor_mix': VarDesc(name='vapmix', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/kg', z_units=self.z_units), 'atmos_column_vapor': VarDesc(name='tcwv', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/m2', z_units=self.z_units), 'atmos_column_liquid': VarDesc(name='tclw', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/m2', z_units=self.z_units), } self.force_synoptic_names = [name for r in self.atmos_forcing_variables.values() for name in (r.name if isinstance(r.name, tuple) else [r.name])] self.diag_variables = { 'ocean_surf_height': VarDesc(name='ssh', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units), 'ocean_surf_height_anomaly': VarDesc(name='sla', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units), 'ocean_surf_temp': VarDesc(name='sst', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='C', z_units=self.z_units), 'seaice_conc': VarDesc(name='sic', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units=1, z_units=self.z_units), 'seaice_thick': VarDesc(name='sit', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units), 'snow_thick': VarDesc(name='snwt', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units), } self.operator = { 'ocean_surf_height': self.get_ocean_surf_height, 'ocean_surf_height_anomaly': self.get_ocean_surf_height_anomaly, 'ocean_surf_temp': self.get_ocean_surf_temp, 'seaice_conc': self.get_seaice_conc, 'seaice_thick': self.get_seaice_thick, 'snow_thick': self.get_snow_thick, } self.variables = {**self.restart_variables, **self.iced_variables, **self.iceh_variables, **self.atmos_forcing_variables, **self.diag_variables, **self.archive_variables} grid_info_file = os.path.join(self.basedir, 'topo', 'grid.info') if self.basedir and os.path.exists(grid_info_file): self.grid = get_topaz_grid(grid_info_file) self.depthfile = os.path.join(self.basedir, 'topo', f'depth_{self.R}_{self.T}.a') if self.depthfile and os.path.exists(self.depthfile): self.depth, self.grid.mask = get_depth(self.depthfile, self.grid) self.meanssh = None if self.meanssh_file and os.path.exists(self.meanssh_file): self.meanssh = get_mean_ssh(self.meanssh_file, self.grid) #TODO: z bank cache can be implemented (similar to grid bank for nextsim)
[docs] def is_ncat(self, name): return (name in self.iced_variables) and (name.split('_')[-1][:3] == 'cat')
[docs] def get_cat_id(self, name): if self.is_ncat(name): return int(name.split('_')[-1][3:]) else: raise ValueError(f"get_cat_id: variable {name} is not a ncat variable")
[docs] def filename(self, **kwargs): kwargs = super().parse_kwargs(kwargs) if kwargs['member'] is not None: mstr = '_mem{:03d}'.format(kwargs['member']+1) else: mstr = '' # filename for each model component if kwargs['name'] in self.restart_variables: tstr = kwargs['time'].strftime('%Y_%j_%H_%M%S') file = 'restart.'+tstr+mstr+'.a' elif kwargs['name'] in self.iced_variables: t = kwargs['time'] tstr = f"{t:%Y-%m-%d}-{t.hour*3600:05}" file = 'iced.'+tstr+mstr+'.nc' elif kwargs['name'] in self.iceh_variables: tstr = kwargs['time'].strftime('%Y-%m-%d') file = os.path.join(mstr[1:], 'SCRATCH', 'cice', 'iceh.'+tstr+'.nc') elif kwargs['name'] in self.atmos_forcing_variables: file = os.path.join(mstr[1:], 'SCRATCH', 'forcing') return os.path.join(kwargs['path'], file) elif kwargs['name'] in self.archive_variables: tstr = kwargs['time'].strftime('%Y_%j_??') # archive variables are daily means file = os.path.join(mstr[1:], 'SCRATCH', 'archm.'+tstr+'.a') elif kwargs['name'] in self.diag_variables: kstr = f"_k{kwargs['k']}_" tstr = t2s(kwargs['time']) file = os.path.join(mstr[1:], 'SCRATCH', kwargs['name']+kstr+tstr+'.npy') return os.path.join(kwargs['path'], file) else: raise ValueError(f"filename: ERROR: unknown variable name '{kwargs['name']}'") path = kwargs['path'] fname = os.path.join(path, file) files = glob.glob(fname) if len(files) > 0: return files[0] # if no corresponding files found under the given path # try to find two layers above for the other cycle time dirs = path.split(os.sep) if dirs[-1] == 'topaz.v5' and len(dirs[-2])==12: root = os.sep.join(dirs[:-2]) for tdir in os.listdir(root): fname = os.path.join(root, tdir, 'topaz.v5', file) files = glob.glob(fname) if len(files) > 0: return files[0] raise FileNotFoundError(f"filename: ERROR: could not find {file} in {path} or its parent directories")
[docs] def read_grid(self, **kwargs): pass
[docs] def read_mask(self, **kwargs): pass
[docs] def read_var(self, **kwargs): if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") kwargs = super().parse_kwargs(kwargs) fname = self.filename(**kwargs) name = kwargs['name'] rec = self.variables[name].asdict() # get the variable from restart files if name in self.restart_variables: f = ABFileRestart(fname, 'r', idm=self.grid.nx, jdm=self.grid.ny) if rec['is_vector']: var1 = f.read_field(rec['name'][0], level=kwargs['k'], tlevel=1, mask=self.grid.mask) var2 = f.read_field(rec['name'][1], level=kwargs['k'], tlevel=1, mask=self.grid.mask) var = np.array([var1, var2]) else: var = f.read_field(rec['name'], level=kwargs['k'], tlevel=1, mask=self.grid.mask) f.close() elif name in self.iced_variables: if rec['is_vector']: if self.is_ncat(name): # ncat variable cat_id = self.get_cat_id(name) var1 = nc_read_var(fname, rec['name'][0])[cat_id,...] var2 = nc_read_var(fname, rec['name'][1])[cat_id,...] else: var1 = nc_read_var(fname, rec['name'][0]) var2 = nc_read_var(fname, rec['name'][1]) var = np.array([var1, var2]) else: if self.is_ncat(name): # ncat variable var = nc_read_var(fname, rec['name'])[self.get_cat_id(name),...] else: var = nc_read_var(fname, rec['name']) elif name in self.iceh_variables: if rec['is_vector']: var1 = nc_read_var(fname, rec['name'][0])[0, ...] var2 = nc_read_var(fname, rec['name'][1])[0, ...] var = np.array([var1, var2]) else: var = nc_read_var(fname, rec['name'])[0, ...] elif name in self.atmos_forcing_variables: if kwargs['time'].tzinfo is None: kwargs['time'] = kwargs['time'].replace(tzinfo=timezone.utc) dtime = (kwargs['time'] - datetime(1900,12,31,tzinfo=timezone.utc)) / (24*dt1h) if rec['is_vector']: f1 = ABFileForcing(fname+'.'+rec['name'][0], 'r') var1 = f1.read_field(rec['name'][0], dtime) f2 = ABFileForcing(fname+'.'+rec['name'][1], 'r') var2 = f2.read_field(rec['name'][1], dtime) var = np.array([var1, var2]) f1.close() f2.close() else: f = ABFileForcing(fname+'.'+rec['name'], 'r') var = f.read_field(rec['name'], dtime) f.close() elif name in self.diag_variables: # if the npy file exists, one could just read it to get the variable. # but here we always calculate the variable from the model state, and refresh to the npy file, to be safe if not os.path.exists(fname): var = self.operator[name](**kwargs) np.save(fname, var) else: var = np.load(fname) elif name in self.archive_variables: f = ABFileArchv(fname, 'r', mask=True) if rec['is_vector']: var1 = f.read_field(rec['name'][0], level=kwargs['k']) var2 = f.read_field(rec['name'][1], level=kwargs['k']) var = np.array([var1, var2]) else: var = f.read_field(rec['name'], level=kwargs['k']) f.close() else: raise ValueError(f"read_var: ERROR: unknown variable name '{name}'") # convert units if necessary var = units_convert(rec['units'], kwargs['units'], var) return var
[docs] def write_var(self, var, **kwargs): if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") kwargs = super().parse_kwargs(kwargs) fname = self.filename(**kwargs) name = kwargs['name'] rec = self.variables[name].asdict() # convert back to old units var = units_convert(kwargs['units'], rec['units'], var) if name in self.restart_variables: # open the restart file for over-writing # the 'r+' mode and a new overwrite_field method were added in the ABFileRestart in .abfile f = ABFileRestart(fname, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True) if rec['is_vector']: for i in range(2): f.overwrite_field(var[i,...], self.grid.mask, rec['name'][i], level=kwargs['k'], tlevel=1) else: f.overwrite_field(var, self.grid.mask, rec['name'], level=kwargs['k'], tlevel=1) f.close() elif name in self.iced_variables: if rec['is_vector']: for i in range(2): if self.is_ncat(name): dims = {'ncat':None, 'nj':self.grid.ny, 'ni':self.grid.nx} recno = {'ncat':self.get_cat_id(name)} else: dims = {'nj':self.grid.ny, 'ni':self.grid.nx} recno = None nc_write_var(fname, dims, rec['name'][i], var[i,...], recno=recno, comm=kwargs['comm']) else: if self.is_ncat(name): dims = {'ncat':None, 'nj':self.grid.ny, 'ni':self.grid.nx} recno = {'ncat':self.get_cat_id(name)} else: dims = {'nj':self.grid.ny, 'ni':self.grid.nx} recno = None nc_write_var(fname, dims, rec['name'], var, recno=recno, comm=kwargs['comm']) elif name in self.iceh_variables: dims = {'time':None, 'nj':self.grid.ny, 'ni':self.grid.nx} recno = {'time':0} if rec['is_vector']: for i in range(2): nc_write_var(fname, dims, rec['name'][i], var[i,...], recno=recno, comm=kwargs['comm']) else: nc_write_var(fname, dims, rec['name'], var, recno=recno, comm=kwargs['comm']) elif name in self.atmos_forcing_variables: if kwargs['time'].tzinfo is None: kwargs['time'] = kwargs['time'].replace(tzinfo=timezone.utc) dtime = (kwargs['time'] - datetime(1900,12,31,tzinfo=timezone.utc)) / timedelta(days=1) if rec['is_vector']: for i in range(2): f = ABFileForcing(fname+'.'+rec['name'][i], 'r+') f.overwrite_field(var[i,...], None, rec['name'][i], dtime) f.close() else: f = ABFileForcing(fname+'.'+rec['name'], 'r+') f.overwrite_field(var, None, rec['name'], dtime) f.close() elif name in self.diag_variables: np.save(fname, var) else: print(f"WARNING: write_var not implemented for variable {name}, skipping...")
[docs] def z_coords(self, **kwargs): return self._z_coords_cached(**kwargs)
@lru_cache(maxsize=3) def _z_coords_cached(self, **kwargs): """ Calculate vertical coordinates given the 3D model state Return: - z: np.array The corresponding z field """ # some defaults if not set in kwargs if 'k' not in kwargs: kwargs['k'] = 0 z = np.zeros((self.jdm, self.idm)) if kwargs['k'] == 0: # if level index is 0, this is the surface, so just return zeros return z else: # get layer thickness and convert to units rec = kwargs.copy() rec['name'] = 'ocean_layer_thick' rec['units'] = self.variables['ocean_layer_thick'].units # should be Pa if self.z_units == 'm': dz = - self.read_var(**rec) / self.ONEM # in meters, negative relative to surface elif self.z_units == 'Pa': dz = self.read_var(**rec) else: raise ValueError('do not know how to calculate z_coords for z_units = '+self.z_units) # use recursive func, get previous layer z and add dz kwargs['k'] -= 1 z_prev = self.z_coords(**kwargs) return z_prev + dz
[docs] def get_ocean_surf_height(self, **kwargs): """Get ocean surface height from restart variables Adapted from p_ssh_from_state.F90 in ReanalysisTP5/SSHFromState_HYCOMICE""" if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") tbaric = (self.kapref == self.thflag) restart_file = self.filename(**{**kwargs, 'name':'ocean_bot_montg_pot'}) f = ABFileRestart(restart_file, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True) psikk = f.read_field('psikk', level=0, tlevel=1) thkk = f.read_field('thkk', level=0, tlevel=1) pbavg = f.read_field('pbavg', level=0, tlevel=1) ind = (self.depth < -0.1) & ~(np.isnan(self.depth)) # valid points to calculate ssh on levels = list(self.variables['ocean_layer_thick'].levels) idm, jdm, kdm = self.grid.nx, self.grid.ny, len(levels) pres = np.zeros((kdm+1, jdm, idm)) # cumulative pressure thstar = np.zeros((kdm, jdm, idm)) montg = np.zeros((jdm, idm)) oneta = np.zeros((jdm, idm)) ssh = np.zeros((jdm, idm)) for k in range(kdm): saln = f.read_field('saln', level=levels[k], tlevel=1) temp = f.read_field('temp', level=levels[k], tlevel=1) dp = f.read_field('dp', level=levels[k], tlevel=1) # use upper interface pressure in converting sigma to sigma-star # this is to avoid density variations in layers intersected by bottom th = stmt_fns_sigma(self.thflag, temp, saln) kapf = stmt_fns_kappaf(self.thflag, temp, saln, pres[k, ...], self.thref) if tbaric: thstar[k, ...][ind] = th[ind] + kapf[ind] else: thstar[k, ...][ind] = th[ind] pres[k+1, ...][ind] = pres[k, ...][ind] + dp[ind] oneta[ind] = 1. + pbavg[ind] / pres[-1, ...][ind] # m_prime in lowest layer montg[ind] = psikk[ind] + (pres[-1, ...][ind] * (thkk[ind] + self.thbase - thstar[-1, ...][ind]) - pbavg[ind] * (thstar[-1, ...][ind])) * self.thref**2 # m_prime in remaining layers for k in range(kdm-2, -1, -1): montg[ind] += pres[k+1, ...][ind] * oneta[ind] * (thstar[k+1, ...][ind] - thstar[k, ...][ind]) * self.thref**2 ssh[ind] = (montg[ind] / self.thref + pbavg[ind]) / self.ONEM ssh[~ind] = np.nan ssh[self.grid.mask] = np.nan return ssh
[docs] def get_ocean_surf_height_anomaly(self, **kwargs): if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") self.meanssh_file = os.path.join(self.basedir, 'topo', 'meanssh.uf') assert self.meanssh is not None, f"SLA: cannot find meanssh file {self.meanssh_file}" ssh = self.get_ocean_surf_height(**kwargs) sla = ssh - self.meanssh sla[self.grid.mask] = np.nan return sla
[docs] def get_ocean_surf_temp(self, **kwargs): #just return first level ocean_temp return self.read_var(**{**kwargs, 'name':'ocean_temp', 'k':1})
[docs] def get_seaice_conc(self, **kwargs): """ Get total seaice concentration from multicategory ice concentration (aicen) adapted from ReanalysisTP5/SSHFromState_HYCOMICE/mod_read_icednc by J. Xie """ if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") seaice_conc = np.zeros(self.grid.x.shape) rec = kwargs.copy() # try to read it from iced file try: for cat_id in range(self.ncat): rec['name'] = f'seaice_conc_cat{cat_id}' # can use iceh or iced files rec['units'] = self.variables[rec['name']].units seaice_conc += self.read_var(**rec) except FileNotFoundError: # if failed, try to read from iceh file rec['name'] = 'seaice_conc_daily' rec['units'] = self.variables[rec['name']].units seaice_conc = self.read_var(**rec) seaice_conc[np.where(seaice_conc<self.MIN_SEAICE_CONC)] = 0.0 # discard below threadshold seaice_conc[self.grid.mask] = np.nan return seaice_conc
[docs] def get_seaice_thick(self, **kwargs): """ Get total seaice thickness from the multicategory ice volume (vicen) """ if self.grid is None: raise AttributeError("topaz5model: grid not yet defined before calling get_seaice_thick") seaice_conc = self.get_seaice_conc(**kwargs) seaice_volume = np.zeros(self.grid.x.shape) rec = kwargs.copy() for cat_id in range(self.ncat): rec['name'] = f'seaice_volume_cat{cat_id}' rec['units'] = self.variables[rec['name']].units seaice_volume += self.read_var(**rec) seaice_thick = np.zeros(self.grid.x.shape) ind = np.where(seaice_conc>=self.MIN_SEAICE_CONC) upper_limit = thickness_upper_limit(seaice_conc[ind], 'seaice') seaice_thick[ind] = np.minimum(seaice_volume[ind] / seaice_conc[ind], upper_limit) seaice_thick[self.grid.mask] = np.nan return seaice_thick
[docs] def get_snow_thick(self, **kwargs): """ Get total snow thickness from the multi-category snow volume (vsnon) """ if self.grid is None: raise AttributeError("topaz5model: grid not yet defined before calling get_snow_thick") seaice_conc = self.get_seaice_conc(**kwargs) snow_volume = np.zeros(self.grid.x.shape) rec = kwargs.copy() for cat_id in range(self.ncat): rec['name'] = f'snow_volume_cat{cat_id}' rec['units'] = self.variables[rec['name']].units snow_volume += self.read_var(**rec) snow_thick = np.zeros(self.grid.x.shape) ind = np.where(seaice_conc>=self.MIN_SEAICE_CONC) upper_limit = thickness_upper_limit(seaice_conc[ind], 'snow') snow_thick[ind] = np.minimum(snow_volume[ind] / seaice_conc[ind], upper_limit) return snow_thick
[docs] def preprocess(self, *args, **kwargs): kwargs = super().parse_kwargs(kwargs) # task_id = kwargs.get('worker_id', 0) # offset = task_id * self.nproc_per_util time = kwargs['time'] forecast_period = kwargs['forecast_period'] next_time = time + forecast_period * dt1h if kwargs['member'] is not None: mstr = '_mem{:03d}'.format(kwargs['member']+1) else: mstr = '' run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH') # make sure model run directory exists self.c.fs.make_dir(run_dir) # generate namelists, blkdat, ice_in, etc. namelist(self, time, forecast_period, run_dir) # copy synoptic forcing fields from a long record in basedir, will be perturbed later for varname in self.force_synoptic_names: forcing_file = self.forcing_file.format(member=kwargs['member']+1, time=time, name=varname) forcing_file_out = os.path.join(run_dir, 'forcing.'+varname) try: f = ABFileForcing(forcing_file, 'r') except FileNotFoundError: #print(f"WARNING: {forcing_file} not found, skipping...") if varname in ['tcwv', 'tclw']: # these two variables are optional continue else: raise FileNotFoundError(f"preprocess: ERROR: forcing file {forcing_file} not found") fo = ABFileForcing(forcing_file_out, 'w', idm=f.idm, jdm=f.jdm, cline1=f._cline1, cline2=f._cline2) t = time dt = self.forcing_dt rdtime = dt / 24 while t <= next_time: dtime1 = dayfor(self.yrflag, t.year, int(t.strftime('%j')), t.hour) fld = f.read_field(varname, dtime1) fo.write_field(fld, None, varname, dtime1, rdtime) t += dt * dt1h f.close() fo.close() # link necessary files for model run shell_cmd = f"cd {run_dir}; " # partition setting partit_file = os.path.join(self.basedir, 'topo', 'partit', f'depth_{self.R}_{self.T}.{self.nproc:04d}') shell_cmd += f"ln -fs {partit_file} patch.input; " # topo files for ext in ['.a', '.b']: file = os.path.join(self.basedir, 'topo', 'regional.grid'+ext) shell_cmd += f"ln -fs {file} .; " file = os.path.join(self.basedir, 'topo', f'depth_{self.R}_{self.T}'+ext) shell_cmd += f"ln -fs {file} regional.depth{ext}; " file = os.path.join(self.basedir, 'topo', f'kmt_{self.R}_{self.T}.nc') shell_cmd += f"ln -fs {file} cice_kmt.nc; " file = os.path.join(self.basedir, 'topo', 'cice_grid.nc') shell_cmd += f"ln -fs {file} .; " # nest files nest_dir = os.path.join(self.basedir, 'nest', self.E) shell_cmd += f"ln -fs {nest_dir} nest; " # TODO: there is extra logic in nhc_root/bin/expt_preprocess.sh to be added here # relax files for ext in ['.a', '.b']: for varname in ['intf', 'saln', 'temp']: file = os.path.join(self.basedir, 'relax', self.E, 'relax_'+varname[:3]+ext) shell_cmd += f"ln -fs {file} {'relax.'+varname+ext}; " for varname in ['thkdf4', 'veldf4']: file = os.path.join(self.basedir, 'relax', self.E, varname+ext) shell_cmd += f"ln -fs {file} {varname+ext}; " # other forcing files for ext in ['.a', '.b']: # rivers file = os.path.join(self.basedir, 'force', 'rivers', self.E, 'rivers'+ext) shell_cmd += f"ln -fs {file} {'forcing.rivers'+ext}; " # seawifs file = os.path.join(self.basedir, 'force', 'seawifs', 'kpar'+ext) shell_cmd += f"ln -fs {file} {'forcing.kpar'+ext}; " self.c.run_job(shell_cmd, nproc=1) # copy restart files from restart_dir restart_dir = kwargs['restart_dir'] # job_submit_cmd = kwargs['job_submit_cmd'] tstr = time.strftime('%Y_%j_%H_%M%S') for ext in ['.a', '.b']: file = os.path.join(restart_dir, 'restart.'+tstr+mstr+ext) file1 = os.path.join(kwargs['path'], 'restart.'+tstr+mstr+ext) self.c.run_job(f"cp -fL {file} {file1}", nproc=1) self.c.run_job(f"ln -fs {file1} {os.path.join(run_dir, 'restart.'+tstr+ext)}", nproc=1) self.c.fs.make_dir(os.path.join(run_dir, 'cice')) tstr = f"{time:%Y-%m-%d}-{time.hour*3600:05}" file = os.path.join(restart_dir, 'iced.'+tstr+mstr+'.nc') file1 = os.path.join(kwargs['path'], 'iced.'+tstr+mstr+'.nc') # TODO: use runtime file manipulation instead self.c.run_job(f"cp -fL {file} {file1}", nproc=1) self.c.run_job(f"ln -fs {file1} {os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc')}", nproc=1) self.c.run_job(f"echo {os.path.join('.', 'cice', 'iced.'+tstr+'.nc')} > {os.path.join(run_dir, 'cice', 'ice.restart_file')}", nproc=1)
[docs] def postprocess(self, *args, **kwargs): kwargs = super().parse_kwargs(kwargs) if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") time = kwargs['time'] member = kwargs['member'] if member is not None: mstr = '_mem{:03d}'.format(member+1) else: mstr = '' run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH') self.c.fs.make_dir(run_dir) # link files commands = "" if self.model_env: commands += f". {self.model_env}; " commands += f"cd {run_dir}; " for ext in ['.a', '.b']: file1 = os.path.join(kwargs['restart_dir'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}') file2 = f'forecast{member+1:03}{ext}' commands += f"ln -fs {file1} {file2}; " file1 = os.path.join(kwargs['restart_dir'], f'iced.{time:%Y-%m-%d}-{time.hour*3600:05}{mstr}.nc') file2 = f'ice_forecast{member+1:03}.nc' commands += f"ln -fs {file1} {file2}; " commands += f"ln -fs {self.reanalysis_code}/FILES/depths{self.idm}x{self.jdm}.uf .; " self.c.run_job(commands, nproc=1) # restart2nc on forecast files commands = "" if self.model_env: commands += f". {self.model_env}; " commands += f"cd {run_dir}; " commands += f"{os.path.join(self.reanalysis_code, 'ASSIM', 'BIN', 'restart2nc')} forecast{member+1:03}.a ice_forecast{member+1:03}.nc" self.c.run_job(commands, nproc=1) # add posterior ice variables in analysis abfile for ext in ['.a', '.b']: file1 = os.path.join(kwargs['path'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}') file2 = os.path.join(run_dir, f'analysis{member+1:03}{ext}') self.c.run_job(f"cp -L {file1} {file2}", nproc=1) f = ABFileRestart(file2, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True) nfld = len(f.fields.keys()) # fld = np.load(self.filename(path=kwargs['path'], name='seaice_conc', member=member, time=time)) fld = self.read_var(**{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1}) f.write_field(fld, None, 'ficem', 0, 1, nfld) # fld = np.load(self.filename(path=kwargs['path'], name='seaice_thick', member=member, time=time)) fld = self.read_var(**{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'}) f.write_field(fld, None, 'hicem', 0, 1, nfld+1) f.close() # run fixhycom and update restart file commands = "" if self.model_env: commands += f". {self.model_env}; " commands += f"cd {run_dir}; " commands += f"{os.path.join(self.reanalysis_code, 'ASSIM', 'BIN', 'fixhycom')} analysis{member+1:03}.a {member+1} forecast{member+1:03}.nc ice_forecast{member+1:03}.nc {time:%j} 0; " commands += f"cat fixanalysis{member+1:03}.b >> tmp{member+1:03}.b; mv tmp{member+1:03}.b fixanalysis{member+1:03}.b; " for ext in ['.a', '.b']: file1 = os.path.join(run_dir, f'fixanalysis{member+1:03}{ext}') file2 = os.path.join(kwargs['path'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}') commands += f"mv {file1} {file2}; " file1 = os.path.join(run_dir, f'fix_ice_forecast{member+1:03}.nc') file2 = os.path.join(kwargs['path'], f'iced.{time:%Y-%m-%d}-{time.hour*3600:05}{mstr}.nc') commands += f"mv {file1} {file2}; " self.c.run_job(commands, nproc=1)
[docs] def postprocess_native(self, *args, **kwargs): """Post processing the restart variables for next forecast""" # routines adapted from the EnKF-MPI-TOPAZ/Tools/fixhycom.F90 code kwargs = super().parse_kwargs(kwargs) if self.grid is None: raise AttributeError("topaz5model: grid not yet defined") # adjust ocean layer thickness dp rec = kwargs.copy() rec['name'] = 'ocean_layer_thick' rec['units'] = self.variables['ocean_layer_thick'].units # should be Pa levels = list(self.variables['ocean_layer_thick'].levels) dp = np.zeros((len(levels), self.grid.ny, self.grid.nx)) for ilev, k in enumerate(levels): dp[ilev, ...] = self.read_var(**{**rec, 'k':k}) dp = adjust_dp(dp, -self.depth, self.ONEM) for ilev, k in enumerate(levels): self.write_var(dp[ilev,...], **{**rec, 'k':k}) # loop over fields in restart file restart_file = self.filename(**{**kwargs, 'name':'ocean_temp'}) f = ABFileRestart(restart_file, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True) for i, rec in f.fields.items(): name = rec['field'] tlevel = rec['tlevel'] k = rec['k'] fld = f.read_field(name, tlevel=tlevel, level=k) # reset variables out of their normal range if name == 'temp': saln = f.read_field('saln', tlevel=tlevel, level=k) temp_min = -0.057 * saln ind = np.where(fld < temp_min) fld[ind] = temp_min[ind] fld[np.where(fld > self.MAX_OCEAN_TEMP)] = self.MAX_OCEAN_TEMP elif name == 'saln': fld[np.where(fld > self.MAX_OCEAN_SALN)] = self.MAX_OCEAN_SALN fld[np.where(fld < self.MIN_OCEAN_SALN)] = self.MIN_OCEAN_SALN elif name == 'dp': fld = dp[levels.index(k), ...] # set dp to the adjusted value # write the field back f.overwrite_field(fld, None, name, tlevel=tlevel, level=k) f.close() # fix sea ice variables, from enkf-topaz/Tools/m_put_mod_fld_nc: fix_cice restart_dir = kwargs['restart_dir'] prior_ice_file = self.filename(**{**kwargs, 'path':restart_dir, 'name':'seaice_conc_cat1'}) post_ice_file = self.filename(**{**kwargs, 'name':'seaice_conc_cat1'}) fice = self.read_var(**{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1}) hice = self.read_var(**{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'}) zSin, Tmlt = fix_zsin_profile(self.Nilayer+1, self.saltmax, self.depressT, self.nsal, self.msal) adjust_ice_variables(prior_ice_file, post_ice_file, fice, hice, self.grid.mask, self.aice_thresh, self.fice_thresh, self.hice_impact, zSin, Tmlt) # update the diagnostic ice variables self.write_var(fice, **{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1}) self.write_var(hice, **{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'})
[docs] def run(self, *args, **kwargs): assert self.ens_run_strategy=='scheduler', f"{self.__class__.__name__}: unsupported run_strategy '{self.ens_run_strategy}'" kwargs = super().parse_kwargs(kwargs) task_id = kwargs.get('worker_id', 0) self.run_status = 'running' time = kwargs['time'] forecast_period = kwargs['forecast_period'] next_time = time + forecast_period * dt1h member = kwargs['member'] if member is not None: mstr = '_mem{:03d}'.format(member+1) else: mstr = '' run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH') self.c.fs.make_dir(run_dir) log_file = os.path.join(run_dir, "run.log") self.c.run_job("touch "+log_file, nproc=1) run_success = False # early exit if the run is already finished if find_keyword_in_file(log_file, 'Exiting hycom_cice'): run_success = True else: # check if input file exists input_files = [] tstr = time.strftime('%Y_%j_%H_%M%S') for ext in ['.a', '.b']: input_files.append(os.path.join(run_dir, 'restart.'+tstr+ext)) tstr = f"{time:%Y-%m-%d}-{time.hour*3600:05}" input_files.append(os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc')) for file in input_files: if not os.path.exists(file): raise RuntimeError(f"topaz.v5.model.run: input file missing: {file}") # clean up some files from previous runs self.c.run_job(f"cd {run_dir}; rm -f archm.* ovrtn_out summary_out", nproc=1) self.c.run_job(f"echo > {log_file}", nproc=1) # build the shell command line model_exe = os.path.join(self.basedir, f'expt_{self.X}', 'build', f'src_{self.V}ZA-07Tsig0-i-sm-sse_relo_mpi', 'hycom_cice') shell_cmd = "" if self.model_env: shell_cmd = ". "+self.model_env+"; " # enter topaz5 env shell_cmd += "cd "+run_dir+"; " # enter run directory shell_cmd += 'JOB_EXECUTE '+model_exe+" >& run.log" # run the model, give it 3 attempts for i in range(3): try: job_opts = { **kwargs, 'job_name': 'topaz5', 'run_dir': run_dir, 'parallel_mode': 'mpi', 'log_file': log_file, 'nproc': self.nproc, 'offset': task_id * self.nproc_per_run, } self.c.run_job(shell_cmd, **job_opts) except RuntimeError as e: print(f"{e}, retrying ({2-i} attempts remain)") self.c.run_job(f"cp {log_file} {log_file}.attempt{i}", nproc=1) continue # check output if find_keyword_in_file(log_file, 'Exiting hycom_cice'): run_success = True break assert run_success, f"model run failed after 3 attempts, check in {run_dir}" # move the output restart files to forecast_dir tstr = next_time.strftime('%Y_%j_%H_%M%S') for ext in ['.a', '.b']: file1 = os.path.join(run_dir, 'restart.'+tstr+ext) file2 = os.path.join(kwargs['path'], 'restart.'+tstr+mstr+ext) if os.path.exists(file1): self.c.run_job(f"mv {file1} {file2}", nproc=1) else: assert os.path.exists(file2), f"error moving output file {file1} to {file2}" tstr = f"{next_time:%Y-%m-%d}-{next_time.hour*3600:05}" file1 = os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc') file2 = os.path.join(kwargs['path'], 'iced.'+tstr+mstr+'.nc') if os.path.exists(file1): self.c.run_job(f"mv {file1} {file2}", nproc=1) else: assert os.path.exists(file2), f"error moving output file {file1} to {file2}"
[docs] def generate_truth(self, *args, **kwargs): self.c.message = "not yet implemented, skipping..." pass
[docs] def generate_init_ensemble(self, *args, **kwargs): # TODO: check validity of self.ens_init_dir and restart files within # some application may need to skip restart file checks, add an option for that? self.c.message = "not yet implemented, skipping..." pass