import os
import numpy as np
from NEDAS.utils.conversion import dt1h
from NEDAS.grid import Grid
from NEDAS.core import Model
from NEDAS.core.types import VarDesc
from .namelist import namelist
from .util import read_data_bin, write_data_bin, grid2spec, spec2grid
from .util import psi2zeta, psi2u, psi2v, psi2temp, uv2zeta, zeta2psi, temp2psi
[docs]
class QGFortranModel(Model):
"""
Class for configuring and running the qg model
"""
kmax: int
nz: int
restart_dt: float
nproc_per_run: int
psi_init_type: str
model_code_dir: str
model_env: str
spinup_hours: int
def __init__(self, **kwargs):
super().__init__(**kwargs)
assert self.io_mode == 'offline', f"{self.__class__.__name__} only support offline io mode"
self.dx = kwargs['dx'] if 'dx' in kwargs else 1.0
n = 2*(self.kmax+1)
self.ny, self.nx = n, n
x, y = np.meshgrid(np.arange(n), np.arange(n))
self.grid = Grid(None, x, y, cyclic_dim='xy')
self.grid.mask = np.full(self.grid.x.shape, False) # no grid points are masked
self.dz = kwargs['dz'] if 'dz' in kwargs else 1.0
levels = np.arange(0, self.nz, self.dz)
self.variables = {
'velocity': VarDesc(name=('u', 'v'), dtype='float', is_vector=True, dt=self.restart_dt, levels=levels, units=1, z_units=1),
'streamfunc': VarDesc(name='psi', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units=1, z_units=1),
'vorticity': VarDesc(name='zeta', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units=1, z_units=1),
'temperature': VarDesc(name='temp', dtype='float', is_vector=False, dt=self.restart_dt, levels=levels, units=1, z_units=1),
}
assert self.nproc_per_run==1, f'qg model only support serial runs (got task_nproc={self.nproc_per_run})'
[docs]
def filename(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if kwargs['member'] is not None:
mstr = '{:04d}'.format(kwargs['member']+1)
else:
mstr = ''
assert kwargs['time'] is not None, 'missing time in kwargs'
tstr = kwargs['time'].strftime('%Y%m%d_%H')
return os.path.join(kwargs['path'], mstr, 'output_'+tstr+'.bin')
[docs]
def read_grid(self, **kwargs):
pass
[docs]
def read_mask(self, **kwargs):
pass
[docs]
def read_var(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
k = kwargs['k']
k1 = int(k)
if k1 < self.nz-1:
k2 = k1+1
else:
k2 = k1
psik1 = read_data_bin(fname, self.kmax, self.nz, k1)
psik2 = read_data_bin(fname, self.kmax, self.nz, k2)
if kwargs['name'] == 'streamfunc':
var1 = spec2grid(psik1).T
var2 = spec2grid(psik2).T
elif kwargs['name'] == 'velocity':
uk1 = psi2u(psik1)
vk1 = psi2v(psik1)
u1 = spec2grid(uk1).T
v1 = spec2grid(vk1).T
var1 = np.array([u1, v1])
uk2 = psi2u(psik2)
vk2 = psi2v(psik2)
u2 = spec2grid(uk2).T
v2 = spec2grid(vk2).T
var2 = np.array([u2, v2])
elif kwargs['name'] == 'vorticity':
zetak1 = psi2zeta(psik1)
var1 = spec2grid(zetak1).T
zetak2 = psi2zeta(psik2)
var2 = spec2grid(zetak2).T
elif kwargs['name'] == 'temperature':
tempk1 = psi2temp(psik1)
var1 = spec2grid(tempk1).T
tempk2 = psi2temp(psik2)
var2 = spec2grid(tempk2).T
else:
raise ValueError('unknown variable name '+kwargs['name'])
# vertical interp between var1 and var2
if k1 < self.nz-1:
return (var1*(k2-k) + var2*(k-k1)) / (k2-k1)
else:
return var1
[docs]
def write_var(self, var, **kwargs):
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
k = kwargs['k']
if k==int(k):
if kwargs['name'] == 'streamfunc':
psik = grid2spec(var.T)
elif kwargs['name'] == 'velocity':
uk = grid2spec(var[0,...].T)
vk = grid2spec(var[1,...].T)
psik = zeta2psi(uv2zeta(uk, vk))
elif kwargs['name'] == 'vorticity':
zetak = grid2spec(var.T)
psik = zeta2psi(zetak)
elif kwargs['name'] == 'temperature':
tempk = grid2spec(var.T)
psik = temp2psi(tempk)
else:
raise ValueError('unknown variable name '+kwargs['name'])
write_data_bin(fname, psik, self.kmax, self.nz, int(k))
[docs]
def z_coords(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
z = np.full(self.grid.x.shape, kwargs['k'])
return z
[docs]
def preprocess(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
restart_dir = kwargs['restart_dir']
restart_file = self.filename(**{**kwargs, 'path':restart_dir})
# restart file to be used in this experiment, in work_dir/cycle/...
input_file = self.filename(**kwargs)
input_dir = os.path.dirname(input_file)
# just cp the prepared files to the work_dir location
self.c.fs.make_dir(input_dir)
self.c.fs.copy_file(restart_file, input_file)
[docs]
def postprocess(self, *args, **kwargs):
pass
[docs]
def run(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
task_id = kwargs.get('worker_id', 0)
if kwargs['member'] is not None:
mstr = '_mem{:04d}'.format(kwargs['member']+1)
else:
mstr = ''
job_name = f"qg_run{mstr}"
self.run_status = 'running'
input_file = self.filename(**kwargs)
run_dir = os.path.dirname(input_file)
self.c.fs.make_dir(run_dir)
time = kwargs['time']
forecast_period = kwargs['forecast_period']
next_time = time + forecast_period * dt1h
output_file = self.filename(**{**kwargs, 'time':next_time})
if time >= self.c.config.time_start:
#this is during cycling
psi_init_type = 'read'
prep_input_cmd = 'ln -fs '+input_file+' input.bin; '
else:
# this is initial run for spin up
psi_init_type = self.psi_init_type
prep_input_cmd = ''
qg_exe = os.path.join(self.model_code_dir, 'src', 'qg.exe')
# build the shell command line
shell_cmd = ". "+self.model_env+"; " # enter the qg model env
shell_cmd += "cd "+run_dir+"; " # enter the run dir
shell_cmd += "rm -f restart.nml; " # clean up before run
shell_cmd += prep_input_cmd # prepare input file
shell_cmd += "JOB_EXECUTE "+qg_exe+" . " # the qg model exe
shell_cmd += "> run.log 2>&1" # output to log
log_file = os.path.join(run_dir, 'run.log')
# give it several tries, each time decreasing time step
for dt_ratio in [1, 0.6, 0.2]:
namelist(vars(self), time, forecast_period, psi_init_type, kwargs['member'], dt_ratio, run_dir)
self.c.run_job(shell_cmd, job_name=job_name, offset=task_id*self.nproc_per_run, **kwargs)
# check output
with open(log_file, 'rt') as f:
if 'Calculation done' in f.read():
break
# check output
with open(log_file, 'rt') as f:
if 'Calculation done' not in f.read():
raise RuntimeError('errors in '+log_file)
if not os.path.exists(os.path.join(run_dir, 'output.bin')):
raise RuntimeError('output.bin file not found')
shell_cmd = "cd "+run_dir+"; "
shell_cmd += "mv output.bin "+output_file
self.c.run_job(shell_cmd, offset=task_id*self.nproc_per_run, **kwargs)
[docs]
def generate_truth(self, *args, **kwargs) -> None:
assert self.truth_dir is not None
kwargs = super().parse_kwargs(kwargs)
kwargs['member'] = None
self.c.fs.make_dir(self.truth_dir)
# create the truth files
self.c.debug_message = f"Creating truth run for qg model in {self.truth_dir}"
run_dir = os.path.join(self.truth_dir, 'run')
init_file = f"output_{self.c.config.time_start:%Y%m%d_%H}.bin"
self.c.debug_message = f"Running the model for spinup period to get initial condition: {init_file}"
kwargs['time'] = self.c.config.time_start - self.spinup_hours * dt1h
self.run(**{**kwargs, 'path':run_dir, 'member':0, 'forecast_period':self.spinup_hours})
kwargs['time'] = self.c.config.time_start
while kwargs['time'] < self.c.config.time_end:
current_file = f"output_{kwargs['time']:%Y%m%d_%H}.bin"
next_time = kwargs['time'] + kwargs['forecast_period'] * dt1h
next_file = f"output_{next_time:%Y%m%d_%H}.bin"
self.c.debug_message = f"Running the model from condition {current_file} to reach {next_file}"
self.run(**{**kwargs, 'path':run_dir, 'member':0})
kwargs['time'] = next_time
# clean up
self.c.fs.move_files_to_dir(os.path.join(run_dir, '*', 'output*.bin'), self.truth_dir)
self.c.debug_message = f"removing temporary run directory: {run_dir}"
self.c.fs.remove_dir(run_dir)
[docs]
def generate_init_ensemble(self, *args, **kwargs) -> None:
assert self.ens_init_dir is not None
kwargs = super().parse_kwargs(kwargs)
debug = kwargs.get('debug', False)
basename = f"output_{kwargs['time']:%Y%m%d_%H}.bin"
mstr = f"{kwargs['member']+1:04d}"
init_file = os.path.join(self.ens_init_dir, mstr, basename)
# check if restart file can be found in model.ens_init_dir already
if os.path.exists(init_file):
if debug:
print(f"Init file {init_file} already exists, skipping")
return
# create the initial ensemble members
if debug:
print(f"Creating initial condition for qg modeli member {kwargs['member']+1}:")
init_time = kwargs['time'] - self.spinup_hours * dt1h
next_time = kwargs['time']
run_dir = self.c.fs.forecast_dir(init_time, 'qg.fortran')
if debug:
print(f"initial condition type: {self.psi_init_type}")
print(f"spinup period: {self.spinup_hours} hours")
if debug:
print(f"Spinning up member {kwargs['member']+1} from {init_time} to {next_time}")
self.run(**{**kwargs, 'path':run_dir, 'time':init_time, 'forecast_period':self.spinup_hours})
if debug:
print("Moving output files")
src_file = os.path.join(run_dir, mstr, basename)
self.c.fs.make_dir(os.path.dirname(init_file))
self.c.fs.move_file(src_file, init_file)