import os
import numpy as np
from NEDAS.grid import Grid1D
from NEDAS.utils.conversion import dt1h
from NEDAS.utils.netcdf_lib import nc_read_var, nc_write_var
from NEDAS.core import Model
from NEDAS.core.types import VarDesc, IOMode
[docs]
class Lorenz96Model(Model[Grid1D]):
"""
Lorenz 1996 model with 40 variables
Args:
nx (int): dimension of the model, default is 40.
F (float): forcing parameter, default is 8
dt (float): model time step, default is 0.05
numeric_opt (str): numeric option, default is 'rk4'
"""
io_mode: IOMode = 'online' # both online and offline supported, default to online
nx: int
F: float
dt: float
restart_dt: float
numeric_opt: str
memory: dict = {}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.grid = Grid1D.regular_grid(0, self.nx, 1, cyclic=True)
self.grid.mask = np.full(self.grid.x.shape, False)
self.variables = {
'state': VarDesc(name='state', dtype='float', is_vector=False, dt=self.restart_dt, levels=np.array([0]), units='*', z_units='*'),
}
self.z = {0: np.zeros(self.nx)}
self.run_1step_funcs = {
'euler_forward': self.run_1step_euler_forward,
'rk4': self.run_1step_rk4,
}
assert self.numeric_opt in self.run_1step_funcs, f"{self.__class__.__name__}: unknown numerics '{self.numeric_opt}'."
self.run_1step = self.run_1step_funcs[self.numeric_opt]
# convention to real time for the nondimensional model
# 6 h in meteorological models is representative by t = 0.05
# so 120 hours per unit model time
self.hours_per_unit_time = 120.
[docs]
def dxdt(self, x):
return (np.roll(x, -1) - np.roll(x, 2)) * np.roll(x, 1) - x + self.F
[docs]
def run_1step_euler_forward(self, x):
return x + self.dt * self.dxdt(x)
[docs]
def run_1step_rk4(self, x):
dx1 = self.dt * self.dxdt(x)
dx2 = self.dt * self.dxdt(x + dx1/2.0)
dx3 = self.dt * self.dxdt(x + dx2/2.0)
dx4 = self.dt * self.dxdt(x + dx3)
return x + (dx1 + 2.0*dx2 + 2.0*dx3 + dx4)/6.0
[docs]
def advance_time(self, x_in, T):
"""
Nonlinear advance_time function
Args:
x (np.ndarray): the initial condition
T (float): duration of the simulation
Return:
np.ndarray: the updated model state after simulation
"""
# check if any state becomes NaN
if np.isnan(x_in).any():
raise RuntimeError('NaN detected in lorenz96 model state. Aborting...')
if np.isinf(x_in).any():
raise RuntimeError('Inf detected in lorenz96 model state. Aborting...')
# run model forward in time to reach duration T:
x = x_in.copy()
for _ in range(int(T/self.dt)):
x = self.run_1step(x)
return x
[docs]
def filename(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
mstr = self.get_mstr(kwargs['member'])
tstr = self.get_tstr(kwargs['time'])
return os.path.join(kwargs['path'], tstr+mstr+'.nc')
[docs]
def read_grid(self, **kwargs):
pass
[docs]
def read_mask(self, **kwargs):
pass
[docs]
def read_var_from_file(self, **kwargs):
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
name = kwargs['name']
var_name = self.variables[name].name
assert isinstance(var_name, str)
var = nc_read_var(fname, var_name)[0, ...]
return var
[docs]
def write_var_to_file(self, var, **kwargs):
kwargs = super().parse_kwargs(kwargs)
fname = self.filename(**kwargs)
name = kwargs['name']
var_name = self.variables[name].name
assert isinstance(var_name, str)
nc_write_var(fname, {'t':None, 'x':self.nx}, var_name, var, recno={'t':0})
[docs]
def z_coords(self, **kwargs):
return self.z[kwargs['k']]
[docs]
def generate_initial_condition(self):
state = np.random.normal(0, 1, self.nx)
return state
[docs]
def preprocess(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if self.io_mode == 'offline':
self.c.fs.make_dir(kwargs['path'])
file1 = self.filename(**{**kwargs, 'path':kwargs['restart_dir']})
file2 = self.filename(**kwargs)
self.c.fs.copy_file(file1, file2)
elif self.io_mode == 'online':
# save a copy of the current state (forecast) as prior
var = self.read_var_from_memory(**kwargs)
self.write_var_to_memory(var.copy(), **{**kwargs, 'tag':'prior'})
[docs]
def postprocess(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
# if offline mode, the current files are just posterior states
# do nothing
if self.io_mode == 'online':
# save a copy of the current state (analysis) as posterior
# don't need to copy since current state is no longer updated
var = self.read_var_from_memory(**kwargs)
self.write_var_to_memory(var, **{**kwargs, 'tag':'post'})
[docs]
def run(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
self.run_status = 'running'
state = self.read_var(**kwargs)
next_time = kwargs['time'] + kwargs['forecast_period'] * dt1h
next_state = self.advance_time(state, kwargs['forecast_period']/self.hours_per_unit_time)
self.write_var(next_state, **{**kwargs, 'time':next_time})
self.run_status = 'complete'
[docs]
def generate_truth(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if self.io_mode == 'offline':
self.c.fs.make_dir(self.truth_dir)
state = self.generate_initial_condition()
kwargs['time'] = self.c.config.time_start
kwargs['member'] = None
while kwargs['time'] <= self.c.config.time_end:
self.write_var(state, **kwargs)
state = self.advance_time(state, kwargs['forecast_period']/self.hours_per_unit_time)
kwargs['time'] += kwargs['forecast_period'] * dt1h
[docs]
def generate_init_ensemble(self, *args, **kwargs):
kwargs = super().parse_kwargs(kwargs)
if self.io_mode == 'offline':
self.c.fs.make_dir(self.ens_init_dir)
state = self.generate_initial_condition()
kwargs['time'] = self.c.config.time_start
kwargs['path'] = self.ens_init_dir
self.write_var(state, **kwargs)