import os
import glob
from functools import lru_cache
from datetime import datetime, timedelta, timezone
import numpy as np
from NEDAS.utils.conversion import units_convert, t2s, dt1h
from NEDAS.utils.netcdf_lib import nc_read_var, nc_write_var
from NEDAS.utils.progress import watch_log, find_keyword_in_file, watch_files
from NEDAS.grid import RegularGrid
from NEDAS.core import Model
from NEDAS.core.types import VarDesc, IOMode
from ..time_format import dayfor
from ..abfile import ABFileRestart, ABFileArchv, ABFileForcing
from ..model_grid import get_topaz_grid, get_depth, get_mean_ssh, stagger, destagger
from .namelist import namelist
from .postproc import adjust_dp, stmt_fns_sigma, stmt_fns_kappaf
from .cice_utils import thickness_upper_limit, adjust_ice_variables, fix_zsin_profile
[docs]
class Topaz5Model(Model[RegularGrid]):
"""
TOPAZ5 model class.
"""
io_mode: IOMode = 'offline'
nhc_root: str
basedir: str
model_env: str|None
reanalysis_code: str
V: str
X: str
T: str
R: str
E: str
idm: int
jdm: int
kdm: int
nproc: int
nproc_per_run: int
nproc_per_util: int
use_job_array: bool
walltime: int|None
stagnant_log_timeout: int
meanssh_file: str
forcing_file: str
restart_dt: int
output_dt: int
forcing_dt: int
z_units: str
thflag: int
thref: float
thbase: float
kapref: int
yrflag: int
ONEM: float
MIN_SEAICE_CONC: float
MAX_OCEAN_TEMP: float
MIN_OCEAN_SALN: float
MAX_OCEAN_SALN: float
ncat: int
Nilayer: int
saltmax: float
min_salin: float
depressT: float
nsal: float
msal: float
aice_thresh: float
fice_thresh: float
hice_impact: float
def __init__(self, **kwargs):
super().__init__(**kwargs)
levels = np.arange(self.kdm) + 1 # ocean levels, from top to bottom, k=1..kdm
level_sfc = np.array([0]) # some variables are only defined on surface level k=0
self.restart_variables = {
'ocean_velocity': VarDesc(name=('u', 'v'), dtype='float', is_vector=True, dt=self.restart_dt, levels=levels, units='m/s', z_units=self.z_units),
'ocean_layer_thick': VarDesc(name='dp', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='Pa', z_units=self.z_units),
'ocean_temp': VarDesc(name='temp', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='C', z_units=self.z_units),
'ocean_saln': VarDesc(name='saln', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units='psu', z_units=self.z_units),
'ocean_b_velocity': VarDesc(name=('ubavg', 'vbavg'), dtype='float', is_vector=True, dt=self.restart_dt, levels=level_sfc, units='m/s', z_units=self.z_units),
'ocean_b_press': VarDesc(name='pbavg', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units),
'ocean_mixl_depth': VarDesc(name='dpmixl', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units),
'ocean_bot_press': VarDesc(name='pbot', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='Pa', z_units=self.z_units),
'ocean_bot_dense': VarDesc(name='thkk', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='?', z_units=self.z_units),
'ocean_bot_montg_pot': VarDesc(name='psikk', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='?', z_units=self.z_units),
}
self.archive_variables = {
'ocean_velocity_daily': VarDesc(name=('u-vel.', 'v-vel.'), dtype='float', is_vector=True, dt=self.output_dt, levels=levels, units='m/s', z_units=self.z_units),
'ocean_layer_thick_daily': VarDesc(name='thknss', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='Pa', z_units=self.z_units),
'ocean_temp_daily': VarDesc(name='temp', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='C', z_units=self.z_units),
'ocean_saln_daily': VarDesc(name='salin', dtype='float', is_vector=False, dt=self.output_dt, levels=levels, units='psu', z_units=self.z_units),
'ocean_mixl_depth_daily': VarDesc(name='mix_dpth', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='Pa', z_units=self.z_units),
'ocean_dense_daily': VarDesc(name='dense', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='?', z_units=self.z_units),
'ocean_surf_height_daily': VarDesc(name='srfhgt', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units),
}
self.iced_variables = {
'seaice_velocity': VarDesc(name=('uvel', 'vvel'), dtype='float', is_vector=True, dt=self.restart_dt, levels=level_sfc, units='m/s', z_units=self.z_units),
}
for n in range(self.ncat):
self.iced_variables[f"seaice_conc_cat{n}"] = VarDesc(name='aicen', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units=1, z_units=self.z_units)
self.iced_variables[f"seaice_volume_cat{n}"] = VarDesc(name='vicen', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units)
self.iced_variables[f"snow_volume_cat{n}"] = VarDesc(name='vsnon', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units)
self.iced_variables[f"seaice_age_cat{n}"] = VarDesc(name='iage', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='days', z_units=self.z_units)
self.iceh_variables = {
'seaice_velocity_daily': VarDesc(name=('uvel_d', 'vvel_d'), dtype='float', is_vector=True, dt=self.output_dt, levels=level_sfc, units='m/s', z_units=self.z_units),
'seaice_conc_daily': VarDesc(name='aice_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units=1, z_units=self.z_units),
'seaice_thick_daily': VarDesc(name='hi_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units),
'seaice_surf_temp_daily': VarDesc(name='Tsfc_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='C', z_units=self.z_units),
'seaice_saln_daily': VarDesc(name='sice_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='psu', z_units=self.z_units),
'snow_thick_daily': VarDesc(name='hs_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='m', z_units=self.z_units),
'seaice_age_daily': VarDesc(name='iage_d', dtype='float', is_vector=False, dt=self.output_dt, levels=level_sfc, units='days', z_units=self.z_units),
}
self.atmos_forcing_variables = {
'atmos_surf_velocity': VarDesc(name=('wndewd', 'wndnwd'), dtype='float', is_vector=True, dt=self.forcing_dt, levels=level_sfc, units='m/s', z_units=self.z_units),
'atmos_surf_temp': VarDesc(name='airtmp', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='C', z_units=self.z_units),
'atmos_surf_dewpoint': VarDesc(name='dewpt', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='K', z_units=self.z_units),
'atmos_surf_press': VarDesc(name='mslprs', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='Pa', z_units=self.z_units),
'atmos_precip': VarDesc(name='precip', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='m/s', z_units=self.z_units),
'atmos_down_longwave': VarDesc(name='radflx', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='W/m2', z_units=self.z_units),
'atmos_down_shortwave': VarDesc(name='shwflx', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='W/m2', z_units=self.z_units),
'atmos_surf_vapor_mix': VarDesc(name='vapmix', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/kg', z_units=self.z_units),
'atmos_column_vapor': VarDesc(name='tcwv', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/m2', z_units=self.z_units),
'atmos_column_liquid': VarDesc(name='tclw', dtype='float', is_vector=False, dt=self.forcing_dt, levels=level_sfc, units='kg/m2', z_units=self.z_units),
}
self.force_synoptic_names = [name for r in self.atmos_forcing_variables.values() for name in (r.name if isinstance(r.name, tuple) else [r.name])]
self.diag_variables = {
'ocean_surf_height': VarDesc(name='ssh', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units),
'ocean_surf_height_anomaly': VarDesc(name='sla', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units),
'ocean_surf_temp': VarDesc(name='sst', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='C', z_units=self.z_units),
'seaice_conc': VarDesc(name='sic', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units=1, z_units=self.z_units),
'seaice_thick': VarDesc(name='sit', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units),
'snow_thick': VarDesc(name='snwt', dtype='float', is_vector=False, dt=self.restart_dt, levels=level_sfc, units='m', z_units=self.z_units),
}
self.operator = {
'ocean_surf_height': self.get_ocean_surf_height,
'ocean_surf_height_anomaly': self.get_ocean_surf_height_anomaly,
'ocean_surf_temp': self.get_ocean_surf_temp,
'seaice_conc': self.get_seaice_conc,
'seaice_thick': self.get_seaice_thick,
'snow_thick': self.get_snow_thick,
}
self.variables = {**self.restart_variables,
**self.iced_variables,
**self.iceh_variables,
**self.atmos_forcing_variables,
**self.diag_variables,
**self.archive_variables}
grid_info_file = os.path.join(self.basedir, 'topo', 'grid.info')
if self.basedir and os.path.exists(grid_info_file):
self.grid = get_topaz_grid(grid_info_file)
self.depthfile = os.path.join(self.basedir, 'topo', f'depth_{self.R}_{self.T}.a')
if self.depthfile and os.path.exists(self.depthfile):
self.depth, self.grid.mask = get_depth(self.depthfile, self.grid)
self.meanssh = None
if self.meanssh_file and os.path.exists(self.meanssh_file):
self.meanssh = get_mean_ssh(self.meanssh_file, self.grid)
#TODO: z bank cache can be implemented (similar to grid bank for nextsim)
[docs]
def is_ncat(self, name):
return (name in self.iced_variables) and (name.split('_')[-1][:3] == 'cat')
[docs]
def get_cat_id(self, name):
if self.is_ncat(name):
return int(name.split('_')[-1][3:])
else:
raise ValueError(f"get_cat_id: variable {name} is not a ncat variable")
[docs]
def filename(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if kwargs['member'] is not None:
mstr = '_mem{:03d}'.format(kwargs['member']+1)
else:
mstr = ''
# filename for each model component
if kwargs['name'] in self.restart_variables:
tstr = kwargs['time'].strftime('%Y_%j_%H_%M%S')
file = 'restart.'+tstr+mstr+'.a'
elif kwargs['name'] in self.iced_variables:
t = kwargs['time']
tstr = f"{t:%Y-%m-%d}-{t.hour*3600:05}"
file = 'iced.'+tstr+mstr+'.nc'
elif kwargs['name'] in self.iceh_variables:
tstr = kwargs['time'].strftime('%Y-%m-%d')
file = os.path.join(mstr[1:], 'SCRATCH', 'cice', 'iceh.'+tstr+'.nc')
elif kwargs['name'] in self.atmos_forcing_variables:
file = os.path.join(mstr[1:], 'SCRATCH', 'forcing')
return os.path.join(kwargs['path'], file)
elif kwargs['name'] in self.archive_variables:
tstr = kwargs['time'].strftime('%Y_%j_??') # archive variables are daily means
file = os.path.join(mstr[1:], 'SCRATCH', 'archm.'+tstr+'.a')
elif kwargs['name'] in self.diag_variables:
kstr = f"_k{kwargs['k']}_"
tstr = t2s(kwargs['time'])
file = os.path.join(mstr[1:], 'SCRATCH', kwargs['name']+kstr+tstr+'.npy')
return os.path.join(kwargs['path'], file)
else:
raise ValueError(f"filename: ERROR: unknown variable name '{kwargs['name']}'")
path = kwargs['path']
fname = os.path.join(path, file)
files = glob.glob(fname)
if len(files) > 0:
return files[0]
# if no corresponding files found under the given path
# try to find two layers above for the other cycle time
dirs = path.split(os.sep)
if dirs[-1] == 'topaz.v5' and len(dirs[-2])==12:
root = os.sep.join(dirs[:-2])
for tdir in os.listdir(root):
fname = os.path.join(root, tdir, 'topaz.v5', file)
files = glob.glob(fname)
if len(files) > 0:
return files[0]
raise FileNotFoundError(f"filename: ERROR: could not find {file} in {path} or its parent directories")
[docs]
def read_grid(self, **kwargs):
pass
[docs]
def read_mask(self, **kwargs):
pass
[docs]
def read_var(self, **kwargs):
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
name = kwargs['name']
rec = self.variables[name].asdict()
# get the variable from restart files
if name in self.restart_variables:
f = ABFileRestart(fname, 'r', idm=self.grid.nx, jdm=self.grid.ny)
if rec['is_vector']:
var1 = f.read_field(rec['name'][0], level=kwargs['k'], tlevel=1, mask=self.grid.mask)
var2 = f.read_field(rec['name'][1], level=kwargs['k'], tlevel=1, mask=self.grid.mask)
var = np.array([var1, var2])
else:
var = f.read_field(rec['name'], level=kwargs['k'], tlevel=1, mask=self.grid.mask)
f.close()
elif name in self.iced_variables:
if rec['is_vector']:
if self.is_ncat(name): # ncat variable
cat_id = self.get_cat_id(name)
var1 = nc_read_var(fname, rec['name'][0])[cat_id,...]
var2 = nc_read_var(fname, rec['name'][1])[cat_id,...]
else:
var1 = nc_read_var(fname, rec['name'][0])
var2 = nc_read_var(fname, rec['name'][1])
var = np.array([var1, var2])
else:
if self.is_ncat(name): # ncat variable
var = nc_read_var(fname, rec['name'])[self.get_cat_id(name),...]
else:
var = nc_read_var(fname, rec['name'])
elif name in self.iceh_variables:
if rec['is_vector']:
var1 = nc_read_var(fname, rec['name'][0])[0, ...]
var2 = nc_read_var(fname, rec['name'][1])[0, ...]
var = np.array([var1, var2])
else:
var = nc_read_var(fname, rec['name'])[0, ...]
elif name in self.atmos_forcing_variables:
if kwargs['time'].tzinfo is None:
kwargs['time'] = kwargs['time'].replace(tzinfo=timezone.utc)
dtime = (kwargs['time'] - datetime(1900,12,31,tzinfo=timezone.utc)) / (24*dt1h)
if rec['is_vector']:
f1 = ABFileForcing(fname+'.'+rec['name'][0], 'r')
var1 = f1.read_field(rec['name'][0], dtime)
f2 = ABFileForcing(fname+'.'+rec['name'][1], 'r')
var2 = f2.read_field(rec['name'][1], dtime)
var = np.array([var1, var2])
f1.close()
f2.close()
else:
f = ABFileForcing(fname+'.'+rec['name'], 'r')
var = f.read_field(rec['name'], dtime)
f.close()
elif name in self.diag_variables:
# if the npy file exists, one could just read it to get the variable.
# but here we always calculate the variable from the model state, and refresh to the npy file, to be safe
if not os.path.exists(fname):
var = self.operator[name](**kwargs)
np.save(fname, var)
else:
var = np.load(fname)
elif name in self.archive_variables:
f = ABFileArchv(fname, 'r', mask=True)
if rec['is_vector']:
var1 = f.read_field(rec['name'][0], level=kwargs['k'])
var2 = f.read_field(rec['name'][1], level=kwargs['k'])
var = np.array([var1, var2])
else:
var = f.read_field(rec['name'], level=kwargs['k'])
f.close()
else:
raise ValueError(f"read_var: ERROR: unknown variable name '{name}'")
# convert units if necessary
var = units_convert(rec['units'], kwargs['units'], var)
return var
[docs]
def write_var(self, var, **kwargs):
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
name = kwargs['name']
rec = self.variables[name].asdict()
# convert back to old units
var = units_convert(kwargs['units'], rec['units'], var)
if name in self.restart_variables:
# open the restart file for over-writing
# the 'r+' mode and a new overwrite_field method were added in the ABFileRestart in .abfile
f = ABFileRestart(fname, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True)
if rec['is_vector']:
for i in range(2):
f.overwrite_field(var[i,...], self.grid.mask, rec['name'][i], level=kwargs['k'], tlevel=1)
else:
f.overwrite_field(var, self.grid.mask, rec['name'], level=kwargs['k'], tlevel=1)
f.close()
elif name in self.iced_variables:
if rec['is_vector']:
for i in range(2):
if self.is_ncat(name):
dims = {'ncat':None, 'nj':self.grid.ny, 'ni':self.grid.nx}
recno = {'ncat':self.get_cat_id(name)}
else:
dims = {'nj':self.grid.ny, 'ni':self.grid.nx}
recno = None
nc_write_var(fname, dims, rec['name'][i], var[i,...], recno=recno, comm=kwargs['comm'])
else:
if self.is_ncat(name):
dims = {'ncat':None, 'nj':self.grid.ny, 'ni':self.grid.nx}
recno = {'ncat':self.get_cat_id(name)}
else:
dims = {'nj':self.grid.ny, 'ni':self.grid.nx}
recno = None
nc_write_var(fname, dims, rec['name'], var, recno=recno, comm=kwargs['comm'])
elif name in self.iceh_variables:
dims = {'time':None, 'nj':self.grid.ny, 'ni':self.grid.nx}
recno = {'time':0}
if rec['is_vector']:
for i in range(2):
nc_write_var(fname, dims, rec['name'][i], var[i,...], recno=recno, comm=kwargs['comm'])
else:
nc_write_var(fname, dims, rec['name'], var, recno=recno, comm=kwargs['comm'])
elif name in self.atmos_forcing_variables:
if kwargs['time'].tzinfo is None:
kwargs['time'] = kwargs['time'].replace(tzinfo=timezone.utc)
dtime = (kwargs['time'] - datetime(1900,12,31,tzinfo=timezone.utc)) / timedelta(days=1)
if rec['is_vector']:
for i in range(2):
f = ABFileForcing(fname+'.'+rec['name'][i], 'r+')
f.overwrite_field(var[i,...], None, rec['name'][i], dtime)
f.close()
else:
f = ABFileForcing(fname+'.'+rec['name'], 'r+')
f.overwrite_field(var, None, rec['name'], dtime)
f.close()
elif name in self.diag_variables:
np.save(fname, var)
else:
print(f"WARNING: write_var not implemented for variable {name}, skipping...")
[docs]
def z_coords(self, **kwargs):
return self._z_coords_cached(**kwargs)
@lru_cache(maxsize=3)
def _z_coords_cached(self, **kwargs):
"""
Calculate vertical coordinates given the 3D model state
Return:
- z: np.array
The corresponding z field
"""
# some defaults if not set in kwargs
if 'k' not in kwargs:
kwargs['k'] = 0
z = np.zeros((self.jdm, self.idm))
if kwargs['k'] == 0:
# if level index is 0, this is the surface, so just return zeros
return z
else:
# get layer thickness and convert to units
rec = kwargs.copy()
rec['name'] = 'ocean_layer_thick'
rec['units'] = self.variables['ocean_layer_thick'].units # should be Pa
if self.z_units == 'm':
dz = - self.read_var(**rec) / self.ONEM # in meters, negative relative to surface
elif self.z_units == 'Pa':
dz = self.read_var(**rec)
else:
raise ValueError('do not know how to calculate z_coords for z_units = '+self.z_units)
# use recursive func, get previous layer z and add dz
kwargs['k'] -= 1
z_prev = self.z_coords(**kwargs)
return z_prev + dz
[docs]
def get_ocean_surf_height(self, **kwargs):
"""Get ocean surface height from restart variables
Adapted from p_ssh_from_state.F90 in ReanalysisTP5/SSHFromState_HYCOMICE"""
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
tbaric = (self.kapref == self.thflag)
restart_file = self.filename(**{**kwargs, 'name':'ocean_bot_montg_pot'})
f = ABFileRestart(restart_file, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True)
psikk = f.read_field('psikk', level=0, tlevel=1)
thkk = f.read_field('thkk', level=0, tlevel=1)
pbavg = f.read_field('pbavg', level=0, tlevel=1)
ind = (self.depth < -0.1) & ~(np.isnan(self.depth)) # valid points to calculate ssh on
levels = list(self.variables['ocean_layer_thick'].levels)
idm, jdm, kdm = self.grid.nx, self.grid.ny, len(levels)
pres = np.zeros((kdm+1, jdm, idm)) # cumulative pressure
thstar = np.zeros((kdm, jdm, idm))
montg = np.zeros((jdm, idm))
oneta = np.zeros((jdm, idm))
ssh = np.zeros((jdm, idm))
for k in range(kdm):
saln = f.read_field('saln', level=levels[k], tlevel=1)
temp = f.read_field('temp', level=levels[k], tlevel=1)
dp = f.read_field('dp', level=levels[k], tlevel=1)
# use upper interface pressure in converting sigma to sigma-star
# this is to avoid density variations in layers intersected by bottom
th = stmt_fns_sigma(self.thflag, temp, saln)
kapf = stmt_fns_kappaf(self.thflag, temp, saln, pres[k, ...], self.thref)
if tbaric:
thstar[k, ...][ind] = th[ind] + kapf[ind]
else:
thstar[k, ...][ind] = th[ind]
pres[k+1, ...][ind] = pres[k, ...][ind] + dp[ind]
oneta[ind] = 1. + pbavg[ind] / pres[-1, ...][ind]
# m_prime in lowest layer
montg[ind] = psikk[ind] + (pres[-1, ...][ind] * (thkk[ind] + self.thbase - thstar[-1, ...][ind]) - pbavg[ind] * (thstar[-1, ...][ind])) * self.thref**2
# m_prime in remaining layers
for k in range(kdm-2, -1, -1):
montg[ind] += pres[k+1, ...][ind] * oneta[ind] * (thstar[k+1, ...][ind] - thstar[k, ...][ind]) * self.thref**2
ssh[ind] = (montg[ind] / self.thref + pbavg[ind]) / self.ONEM
ssh[~ind] = np.nan
ssh[self.grid.mask] = np.nan
return ssh
[docs]
def get_ocean_surf_height_anomaly(self, **kwargs):
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
self.meanssh_file = os.path.join(self.basedir, 'topo', 'meanssh.uf')
assert self.meanssh is not None, f"SLA: cannot find meanssh file {self.meanssh_file}"
ssh = self.get_ocean_surf_height(**kwargs)
sla = ssh - self.meanssh
sla[self.grid.mask] = np.nan
return sla
[docs]
def get_ocean_surf_temp(self, **kwargs):
#just return first level ocean_temp
return self.read_var(**{**kwargs, 'name':'ocean_temp', 'k':1})
[docs]
def get_seaice_conc(self, **kwargs):
"""
Get total seaice concentration from multicategory ice concentration (aicen)
adapted from ReanalysisTP5/SSHFromState_HYCOMICE/mod_read_icednc by J. Xie
"""
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
seaice_conc = np.zeros(self.grid.x.shape)
rec = kwargs.copy()
# try to read it from iced file
try:
for cat_id in range(self.ncat):
rec['name'] = f'seaice_conc_cat{cat_id}' # can use iceh or iced files
rec['units'] = self.variables[rec['name']].units
seaice_conc += self.read_var(**rec)
except FileNotFoundError:
# if failed, try to read from iceh file
rec['name'] = 'seaice_conc_daily'
rec['units'] = self.variables[rec['name']].units
seaice_conc = self.read_var(**rec)
seaice_conc[np.where(seaice_conc<self.MIN_SEAICE_CONC)] = 0.0 # discard below threadshold
seaice_conc[self.grid.mask] = np.nan
return seaice_conc
[docs]
def get_seaice_thick(self, **kwargs):
"""
Get total seaice thickness from the multicategory ice volume (vicen)
"""
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined before calling get_seaice_thick")
seaice_conc = self.get_seaice_conc(**kwargs)
seaice_volume = np.zeros(self.grid.x.shape)
rec = kwargs.copy()
for cat_id in range(self.ncat):
rec['name'] = f'seaice_volume_cat{cat_id}'
rec['units'] = self.variables[rec['name']].units
seaice_volume += self.read_var(**rec)
seaice_thick = np.zeros(self.grid.x.shape)
ind = np.where(seaice_conc>=self.MIN_SEAICE_CONC)
upper_limit = thickness_upper_limit(seaice_conc[ind], 'seaice')
seaice_thick[ind] = np.minimum(seaice_volume[ind] / seaice_conc[ind], upper_limit)
seaice_thick[self.grid.mask] = np.nan
return seaice_thick
[docs]
def get_snow_thick(self, **kwargs):
"""
Get total snow thickness from the multi-category snow volume (vsnon)
"""
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined before calling get_snow_thick")
seaice_conc = self.get_seaice_conc(**kwargs)
snow_volume = np.zeros(self.grid.x.shape)
rec = kwargs.copy()
for cat_id in range(self.ncat):
rec['name'] = f'snow_volume_cat{cat_id}'
rec['units'] = self.variables[rec['name']].units
snow_volume += self.read_var(**rec)
snow_thick = np.zeros(self.grid.x.shape)
ind = np.where(seaice_conc>=self.MIN_SEAICE_CONC)
upper_limit = thickness_upper_limit(seaice_conc[ind], 'snow')
snow_thick[ind] = np.minimum(snow_volume[ind] / seaice_conc[ind], upper_limit)
return snow_thick
[docs]
def preprocess(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
# task_id = kwargs.get('worker_id', 0)
# offset = task_id * self.nproc_per_util
time = kwargs['time']
forecast_period = kwargs['forecast_period']
next_time = time + forecast_period * dt1h
if kwargs['member'] is not None:
mstr = '_mem{:03d}'.format(kwargs['member']+1)
else:
mstr = ''
run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH')
# make sure model run directory exists
self.c.fs.make_dir(run_dir)
# generate namelists, blkdat, ice_in, etc.
namelist(self, time, forecast_period, run_dir)
# copy synoptic forcing fields from a long record in basedir, will be perturbed later
for varname in self.force_synoptic_names:
forcing_file = self.forcing_file.format(member=kwargs['member']+1, time=time, name=varname)
forcing_file_out = os.path.join(run_dir, 'forcing.'+varname)
try:
f = ABFileForcing(forcing_file, 'r')
except FileNotFoundError:
#print(f"WARNING: {forcing_file} not found, skipping...")
if varname in ['tcwv', 'tclw']: # these two variables are optional
continue
else:
raise FileNotFoundError(f"preprocess: ERROR: forcing file {forcing_file} not found")
fo = ABFileForcing(forcing_file_out, 'w', idm=f.idm, jdm=f.jdm, cline1=f._cline1, cline2=f._cline2)
t = time
dt = self.forcing_dt
rdtime = dt / 24
while t <= next_time:
dtime1 = dayfor(self.yrflag, t.year, int(t.strftime('%j')), t.hour)
fld = f.read_field(varname, dtime1)
fo.write_field(fld, None, varname, dtime1, rdtime)
t += dt * dt1h
f.close()
fo.close()
# link necessary files for model run
shell_cmd = f"cd {run_dir}; "
# partition setting
partit_file = os.path.join(self.basedir, 'topo', 'partit', f'depth_{self.R}_{self.T}.{self.nproc:04d}')
shell_cmd += f"ln -fs {partit_file} patch.input; "
# topo files
for ext in ['.a', '.b']:
file = os.path.join(self.basedir, 'topo', 'regional.grid'+ext)
shell_cmd += f"ln -fs {file} .; "
file = os.path.join(self.basedir, 'topo', f'depth_{self.R}_{self.T}'+ext)
shell_cmd += f"ln -fs {file} regional.depth{ext}; "
file = os.path.join(self.basedir, 'topo', f'kmt_{self.R}_{self.T}.nc')
shell_cmd += f"ln -fs {file} cice_kmt.nc; "
file = os.path.join(self.basedir, 'topo', 'cice_grid.nc')
shell_cmd += f"ln -fs {file} .; "
# nest files
nest_dir = os.path.join(self.basedir, 'nest', self.E)
shell_cmd += f"ln -fs {nest_dir} nest; "
# TODO: there is extra logic in nhc_root/bin/expt_preprocess.sh to be added here
# relax files
for ext in ['.a', '.b']:
for varname in ['intf', 'saln', 'temp']:
file = os.path.join(self.basedir, 'relax', self.E, 'relax_'+varname[:3]+ext)
shell_cmd += f"ln -fs {file} {'relax.'+varname+ext}; "
for varname in ['thkdf4', 'veldf4']:
file = os.path.join(self.basedir, 'relax', self.E, varname+ext)
shell_cmd += f"ln -fs {file} {varname+ext}; "
# other forcing files
for ext in ['.a', '.b']:
# rivers
file = os.path.join(self.basedir, 'force', 'rivers', self.E, 'rivers'+ext)
shell_cmd += f"ln -fs {file} {'forcing.rivers'+ext}; "
# seawifs
file = os.path.join(self.basedir, 'force', 'seawifs', 'kpar'+ext)
shell_cmd += f"ln -fs {file} {'forcing.kpar'+ext}; "
self.c.run_job(shell_cmd, nproc=1)
# copy restart files from restart_dir
restart_dir = kwargs['restart_dir']
# job_submit_cmd = kwargs['job_submit_cmd']
tstr = time.strftime('%Y_%j_%H_%M%S')
for ext in ['.a', '.b']:
file = os.path.join(restart_dir, 'restart.'+tstr+mstr+ext)
file1 = os.path.join(kwargs['path'], 'restart.'+tstr+mstr+ext)
self.c.run_job(f"cp -fL {file} {file1}", nproc=1)
self.c.run_job(f"ln -fs {file1} {os.path.join(run_dir, 'restart.'+tstr+ext)}", nproc=1)
self.c.fs.make_dir(os.path.join(run_dir, 'cice'))
tstr = f"{time:%Y-%m-%d}-{time.hour*3600:05}"
file = os.path.join(restart_dir, 'iced.'+tstr+mstr+'.nc')
file1 = os.path.join(kwargs['path'], 'iced.'+tstr+mstr+'.nc')
# TODO: use runtime file manipulation instead
self.c.run_job(f"cp -fL {file} {file1}", nproc=1)
self.c.run_job(f"ln -fs {file1} {os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc')}", nproc=1)
self.c.run_job(f"echo {os.path.join('.', 'cice', 'iced.'+tstr+'.nc')} > {os.path.join(run_dir, 'cice', 'ice.restart_file')}", nproc=1)
[docs]
def postprocess(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
time = kwargs['time']
member = kwargs['member']
if member is not None:
mstr = '_mem{:03d}'.format(member+1)
else:
mstr = ''
run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH')
self.c.fs.make_dir(run_dir)
# link files
commands = ""
if self.model_env:
commands += f". {self.model_env}; "
commands += f"cd {run_dir}; "
for ext in ['.a', '.b']:
file1 = os.path.join(kwargs['restart_dir'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}')
file2 = f'forecast{member+1:03}{ext}'
commands += f"ln -fs {file1} {file2}; "
file1 = os.path.join(kwargs['restart_dir'], f'iced.{time:%Y-%m-%d}-{time.hour*3600:05}{mstr}.nc')
file2 = f'ice_forecast{member+1:03}.nc'
commands += f"ln -fs {file1} {file2}; "
commands += f"ln -fs {self.reanalysis_code}/FILES/depths{self.idm}x{self.jdm}.uf .; "
self.c.run_job(commands, nproc=1)
# restart2nc on forecast files
commands = ""
if self.model_env:
commands += f". {self.model_env}; "
commands += f"cd {run_dir}; "
commands += f"{os.path.join(self.reanalysis_code, 'ASSIM', 'BIN', 'restart2nc')} forecast{member+1:03}.a ice_forecast{member+1:03}.nc"
self.c.run_job(commands, nproc=1)
# add posterior ice variables in analysis abfile
for ext in ['.a', '.b']:
file1 = os.path.join(kwargs['path'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}')
file2 = os.path.join(run_dir, f'analysis{member+1:03}{ext}')
self.c.run_job(f"cp -L {file1} {file2}", nproc=1)
f = ABFileRestart(file2, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True)
nfld = len(f.fields.keys())
# fld = np.load(self.filename(path=kwargs['path'], name='seaice_conc', member=member, time=time))
fld = self.read_var(**{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1})
f.write_field(fld, None, 'ficem', 0, 1, nfld)
# fld = np.load(self.filename(path=kwargs['path'], name='seaice_thick', member=member, time=time))
fld = self.read_var(**{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'})
f.write_field(fld, None, 'hicem', 0, 1, nfld+1)
f.close()
# run fixhycom and update restart file
commands = ""
if self.model_env:
commands += f". {self.model_env}; "
commands += f"cd {run_dir}; "
commands += f"{os.path.join(self.reanalysis_code, 'ASSIM', 'BIN', 'fixhycom')} analysis{member+1:03}.a {member+1} forecast{member+1:03}.nc ice_forecast{member+1:03}.nc {time:%j} 0; "
commands += f"cat fixanalysis{member+1:03}.b >> tmp{member+1:03}.b; mv tmp{member+1:03}.b fixanalysis{member+1:03}.b; "
for ext in ['.a', '.b']:
file1 = os.path.join(run_dir, f'fixanalysis{member+1:03}{ext}')
file2 = os.path.join(kwargs['path'], f'restart.{time:%Y_%j_%H_%M%S}{mstr}{ext}')
commands += f"mv {file1} {file2}; "
file1 = os.path.join(run_dir, f'fix_ice_forecast{member+1:03}.nc')
file2 = os.path.join(kwargs['path'], f'iced.{time:%Y-%m-%d}-{time.hour*3600:05}{mstr}.nc')
commands += f"mv {file1} {file2}; "
self.c.run_job(commands, nproc=1)
[docs]
def postprocess_native(self, *args, **kwargs):
"""Post processing the restart variables for next forecast"""
# routines adapted from the EnKF-MPI-TOPAZ/Tools/fixhycom.F90 code
kwargs = super().parse_kwargs(kwargs)
if self.grid is None:
raise AttributeError("topaz5model: grid not yet defined")
# adjust ocean layer thickness dp
rec = kwargs.copy()
rec['name'] = 'ocean_layer_thick'
rec['units'] = self.variables['ocean_layer_thick'].units # should be Pa
levels = list(self.variables['ocean_layer_thick'].levels)
dp = np.zeros((len(levels), self.grid.ny, self.grid.nx))
for ilev, k in enumerate(levels):
dp[ilev, ...] = self.read_var(**{**rec, 'k':k})
dp = adjust_dp(dp, -self.depth, self.ONEM)
for ilev, k in enumerate(levels):
self.write_var(dp[ilev,...], **{**rec, 'k':k})
# loop over fields in restart file
restart_file = self.filename(**{**kwargs, 'name':'ocean_temp'})
f = ABFileRestart(restart_file, 'r+', idm=self.grid.nx, jdm=self.grid.ny, mask=True)
for i, rec in f.fields.items():
name = rec['field']
tlevel = rec['tlevel']
k = rec['k']
fld = f.read_field(name, tlevel=tlevel, level=k)
# reset variables out of their normal range
if name == 'temp':
saln = f.read_field('saln', tlevel=tlevel, level=k)
temp_min = -0.057 * saln
ind = np.where(fld < temp_min)
fld[ind] = temp_min[ind]
fld[np.where(fld > self.MAX_OCEAN_TEMP)] = self.MAX_OCEAN_TEMP
elif name == 'saln':
fld[np.where(fld > self.MAX_OCEAN_SALN)] = self.MAX_OCEAN_SALN
fld[np.where(fld < self.MIN_OCEAN_SALN)] = self.MIN_OCEAN_SALN
elif name == 'dp':
fld = dp[levels.index(k), ...] # set dp to the adjusted value
# write the field back
f.overwrite_field(fld, None, name, tlevel=tlevel, level=k)
f.close()
# fix sea ice variables, from enkf-topaz/Tools/m_put_mod_fld_nc: fix_cice
restart_dir = kwargs['restart_dir']
prior_ice_file = self.filename(**{**kwargs, 'path':restart_dir, 'name':'seaice_conc_cat1'})
post_ice_file = self.filename(**{**kwargs, 'name':'seaice_conc_cat1'})
fice = self.read_var(**{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1})
hice = self.read_var(**{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'})
zSin, Tmlt = fix_zsin_profile(self.Nilayer+1, self.saltmax, self.depressT, self.nsal, self.msal)
adjust_ice_variables(prior_ice_file, post_ice_file, fice, hice, self.grid.mask,
self.aice_thresh, self.fice_thresh, self.hice_impact, zSin, Tmlt)
# update the diagnostic ice variables
self.write_var(fice, **{**kwargs, 'name':'seaice_conc', 'k':0, 'units':1})
self.write_var(hice, **{**kwargs, 'name':'seaice_thick', 'k':0, 'units':'m'})
[docs]
def run(self, *args, **kwargs):
assert self.ens_run_strategy=='scheduler', f"{self.__class__.__name__}: unsupported run_strategy '{self.ens_run_strategy}'"
kwargs = super().parse_kwargs(kwargs)
task_id = kwargs.get('worker_id', 0)
self.run_status = 'running'
time = kwargs['time']
forecast_period = kwargs['forecast_period']
next_time = time + forecast_period * dt1h
member = kwargs['member']
if member is not None:
mstr = '_mem{:03d}'.format(member+1)
else:
mstr = ''
run_dir = os.path.join(kwargs['path'], mstr[1:], 'SCRATCH')
self.c.fs.make_dir(run_dir)
log_file = os.path.join(run_dir, "run.log")
self.c.run_job("touch "+log_file, nproc=1)
run_success = False
# early exit if the run is already finished
if find_keyword_in_file(log_file, 'Exiting hycom_cice'):
run_success = True
else:
# check if input file exists
input_files = []
tstr = time.strftime('%Y_%j_%H_%M%S')
for ext in ['.a', '.b']:
input_files.append(os.path.join(run_dir, 'restart.'+tstr+ext))
tstr = f"{time:%Y-%m-%d}-{time.hour*3600:05}"
input_files.append(os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc'))
for file in input_files:
if not os.path.exists(file):
raise RuntimeError(f"topaz.v5.model.run: input file missing: {file}")
# clean up some files from previous runs
self.c.run_job(f"cd {run_dir}; rm -f archm.* ovrtn_out summary_out", nproc=1)
self.c.run_job(f"echo > {log_file}", nproc=1)
# build the shell command line
model_exe = os.path.join(self.basedir, f'expt_{self.X}', 'build', f'src_{self.V}ZA-07Tsig0-i-sm-sse_relo_mpi', 'hycom_cice')
shell_cmd = ""
if self.model_env:
shell_cmd = ". "+self.model_env+"; " # enter topaz5 env
shell_cmd += "cd "+run_dir+"; " # enter run directory
shell_cmd += 'JOB_EXECUTE '+model_exe+" >& run.log"
# run the model, give it 3 attempts
for i in range(3):
try:
job_opts = {
**kwargs,
'job_name': 'topaz5',
'run_dir': run_dir,
'parallel_mode': 'mpi',
'log_file': log_file,
'nproc': self.nproc,
'offset': task_id * self.nproc_per_run,
}
self.c.run_job(shell_cmd, **job_opts)
except RuntimeError as e:
print(f"{e}, retrying ({2-i} attempts remain)")
self.c.run_job(f"cp {log_file} {log_file}.attempt{i}", nproc=1)
continue
# check output
if find_keyword_in_file(log_file, 'Exiting hycom_cice'):
run_success = True
break
assert run_success, f"model run failed after 3 attempts, check in {run_dir}"
# move the output restart files to forecast_dir
tstr = next_time.strftime('%Y_%j_%H_%M%S')
for ext in ['.a', '.b']:
file1 = os.path.join(run_dir, 'restart.'+tstr+ext)
file2 = os.path.join(kwargs['path'], 'restart.'+tstr+mstr+ext)
if os.path.exists(file1):
self.c.run_job(f"mv {file1} {file2}", nproc=1)
else:
assert os.path.exists(file2), f"error moving output file {file1} to {file2}"
tstr = f"{next_time:%Y-%m-%d}-{next_time.hour*3600:05}"
file1 = os.path.join(run_dir, 'cice', 'iced.'+tstr+'.nc')
file2 = os.path.join(kwargs['path'], 'iced.'+tstr+mstr+'.nc')
if os.path.exists(file1):
self.c.run_job(f"mv {file1} {file2}", nproc=1)
else:
assert os.path.exists(file2), f"error moving output file {file1} to {file2}"
[docs]
def generate_truth(self, *args, **kwargs):
self.c.message = "not yet implemented, skipping..."
pass
[docs]
def generate_init_ensemble(self, *args, **kwargs):
# TODO: check validity of self.ens_init_dir and restart files within
# some application may need to skip restart file checks, add an option for that?
self.c.message = "not yet implemented, skipping..."
pass