NEDAS.core package

Submodules

NEDAS.core.assimilator module

class NEDAS.core.assimilator.Assimilator(c: Context)[source]

Bases: ABC

assimilate(c: Context)[source]

Main method to run the batch assimilation algorithm

partition_grid(c: Context) None[source]

Partition the analysis grid into several parts and distribute the workload over the mpi ranks.

abstractmethod init_partitions(c: Context) list[source]

Generate spatial partitioning of the domain

abstractmethod assign_obs(c: Context) dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], ndarray]][source]

Assign the observation sequence to each partition par_id

Parameters:

c (Context) – the runtime context object

Returns:

Indices in the full obs_seq for the subset of obs that belongs to partition par_id

Return type:

dict[ObsRecordID, dict[PartitionID, np.ndarray]]

abstractmethod distribute_partitions(c: Context) dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]][source]

Distribute partitions across processors

transpose_to_ensemble_complete(c: Context) None[source]

Communicate among mpi ranks and transpose the locally-stored state/obs chunks to ensemble-complete

transpose_to_field_complete(c: Context)[source]

Communicate among mpi ranks and transpose the locally-stored state/obs chunks back to field-complete

abstractmethod assimilation_algorithm(c: Context) None[source]

The main assimilation algorithm will be implemented by subclasses

NEDAS.core.context module

class NEDAS.core.context.Context(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: object

Runtime context manages the generation and interaction of dynamic objects in runtime

comm: parallel.Comm
comm_rec: parallel.Comm
comm_mem: parallel.Comm
progress: progress.Progress
grid: grid.GridType
grid_orig: grid.GridType
models: dict[str, Model]
datasets: dict[str, Dataset]
assimilator: Assimilator
updator: Updator
transform_funcs: list[Transform]
localization_funcs: dict[str, Callable]
inflation_func: Inflation
state: State
obs: Obs
debug: bool
interactive: bool
is_notebook: bool
time: datetime
iter: int
pid_show: int
nens: int
mem_list: dict[ProcIDMem, list[MemID]]
fs: FileSystem
io: IOBackend
jsub: JobSubmitter
check_interactive() bool[source]

If the runtime environment supports interactive output (with ANSI escape code).

check_notebook() bool[source]

If the runtime environment is a jupyter notebook

get_cols() int[source]
set_logging() None[source]
distribute_mem_tasks() dict[int, list[int]][source]

Distribute mem_id across processors

update_assim_tools()[source]

Update the assimilation tool components based on runtime configuration

property prev_time: datetime

Previous analysis time. Automatically updated when self.time changes.

Returns:

Previous analysis time.

Return type:

datetime

property next_time: datetime

Next analysis time. Automatically updated when self.time changes.

Returns:

Next analysis time.

Return type:

datetime

set_comm() None[source]

Initialize the MPI communicator, split the communicator if necessary.

For serial program, use a dummy communicator, set nproc to the number of available processors on the machine; for MPI program, use MPI.COMM_WORLD and check if size matchs with nproc.

Split the communicator into member and record groups, according to nproc and nproc_mem. See NEDAS.utils.parallel module for more details.

set_grid() None[source]

Initialize the analysis grid based on the configuration.

If grid_def['type'] is ‘custom’, will create a analysis grid based on provided parameters. If grid_def['type'] is a model name, will load the grid from the specified model class.

set_models() None[source]

Initialize model instances based on model_def[model_name] settings. Store the model instances in models[model_name].

set_datasets() None[source]

Initialize dataset instances based on dataset_def[dataset_name] settings. Store the dataset instances in datasets[dataset_name].

property total_tasks
property current_task
property message
property debug_message
timer(func: Callable)[source]

Decorator to count the elapsed time for a function at runtime.

logger(func_name: str)[source]

Decorator to register the func in call stack and show runtime messages.

print_1p(msg: str)[source]

Customized print function for showing runtime message.

Only the processor with PID = self.pid_show will show the message, this avoids the redundancy if all processors are showing the same message.

log_event(msg: str, flag='')[source]
show_greeting() None[source]
show_summary() None[source]
dump_config(config_file: str) None[source]

Dumps a snapshot of the current state to a yaml config file. The original config object remains unchanged in memory.

run_job(commands: str, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', nproc: int = 1, offset: int = 0, **kwargs) None[source]

The user-facing method for running command on a computer. It re-configures the existing job submitter with runtime arguments and execute the command.

Parameters:
  • commands (str) – Shell commands to be dispatched by job submitter

  • parallel_mode (ParallelMode, optional) – parallel mode (‘serial’, ‘mpi’, ‘openmp’), default is ‘serial’

  • nproc (int, optional) – number of processors (default is 1)

  • offset (int, optional) – offset in full list of processors (default is 0)

  • **kwargs – other keyword arguments to update the job submitter configuration

NEDAS.core.dataset module

class NEDAS.core.dataset.Dataset(context: Context | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: ABC

Dataset class (template for specific dataset sources)

variables: dict[Annotated[str, 'variable name'], VarDesc] = {}
obs_operator: dict[Annotated[str, 'variable name'], Callable] = {}
memory: dict = {}
dataset_name: str
property c: Context
parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]

Parse the input kwargs to pinpoint a specific file/variable…

get_mstr(member)[source]
get_tstr(time)[source]
generate_obs_network(**kwargs) dict[str, ndarray][source]

Generate a random observing network for use in synthetic observation experiments.

Parameters:

**kwargs

read_obs(**kwargs) dict[str, ndarray][source]
abstractmethod read_obs_from_file(**kwargs) dict[str, ndarray][source]

Return observation sequence matching the given kwargs

read_obs_from_memory(**kwargs) dict[str, ndarray][source]
write_obs(seq: dict, **kwargs) None[source]
write_obs_to_file(seq: dict, **kwargs)[source]
write_obs_to_memory(seq: dict, **kwargs)[source]
save_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
load_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]

NEDAS.core.inflation module

class NEDAS.core.inflation.Inflation(coef: float = 1.0, adaptive: bool = False, prior: bool = False, post: bool = False)[source]

Bases: ABC

Class for inflating the ensemble members (covariance inflation)

validate_obs_ens(c: Context, obs_ens: dict) bool[source]

Check if the obs_ens has all member and records

obs_space_stats(c: Context)[source]

observation-space statistics

abstractmethod adaptive_prior_inflation(c: Context)[source]
abstractmethod adaptive_post_inflation(c: Context)[source]
abstractmethod apply_inflation(c: Context, flag: Literal['prior', 'post'])[source]

NEDAS.core.io_backend module

class NEDAS.core.io_backend.IOBackend[source]

Bases: ABC

Base class for handling runtime input/output for state and observation variables and OS-level operation such as file manipulation and running commands

io_mode

‘offline’ for file I/O and ‘online’ for persistent memory I/O

Type:

IOMode

tags

List of names for copies of state/obs data

Type:

list[str]

IOTags:

‘current’: Mutable buffer for the data, being updated by assimilation and outer-loop iterations ‘prior’: read-only snapshot, also known as background/forecast, kept for O-B statistics. ‘post’: final state after the assimilation, known as the (re)analysis. ‘truth’: truth, as reference state in synthetic OSSE experiments ‘raw’: original information. For obs it is the actual obs

io_mode: Literal['online', 'offline'] = 'offline'
tags: list[str] = ['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean']
validate_tag(tag: str)[source]
prepare_fields_storage(c: Context, tag: str) None[source]

Prepare for storage of fields data. Only needed for offline io modes: initialize the binary file that stores fields and write its metadata.

abstractmethod read_field(c: Context, tag: str, rec_id: int, mem_id: int) np.ndarray[source]

Read a 2D field data from the state

Parameters:
  • c (Context) – the runtime context

  • tag (str) – which copy of the state to read from

  • rec_id (int) – field record index

  • mem_id (int) – ensemble member index from 0 to nens-1

Returns:

the 2D field data

Return type:

np.ndarray

abstractmethod write_field(fld: np.ndarray, c: Context, tag: str, rec_id: int, mem_id: int) None[source]

Write a 2D field data to the state

Parameters:
  • fld (np.ndarray) – the 2D field data

  • c (Context) – the runtime context

  • tag (str) – which copy of the state to write to

  • rec_id (int) – field record index

  • mem_id (int) – ensemble member index

abstractmethod call_method(c: Context, tag: str, method: Callable, *args, **kwargs) Any[source]

Call a method to perform some tasks.

Parameters:
  • c (Context) – the runtime context

  • tag (str) – which copy of the model state to request io from: “prior”, “post” or “truth”

  • method (Callable) – method name

  • *args – will be passed to the method

  • **kwargs

    will be passed to the method

Returns:

whatever the method(**kwargs) returns

Return type:

Any

save_ndarray(c: Context, name: str, data: np.ndarray, path: str | None = None) None[source]

Save ndarray data

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • data (np.ndarray) – the data

  • path (str, optional) – system path to save the data to.

load_ndarray(c: Context, name: str, path: str | None = None) np.ndarray | None[source]

Load ndarray from saved data

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • path (str, optional) – system path to the saved data.

Returns:

the data

Return type:

np.ndarray

save_debug_data(c: Context, name: str, data: dict, path: str | None = None) None[source]

Save debug data in npy format

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • data (dict) – the data

  • path (str, optional) – system path to save the data to.

To recover the data, use np.load(file, allow_pickle=True).item()

NEDAS.core.model module

class NEDAS.core.model.Model(context: Context | None = None, io_mode: Literal['online', 'offline'] | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: Generic[GridT], ABC

Class for configuring and running a model

variables: dict[Annotated[str, 'variable name'], VarDesc]
grid: GridT
z: dict[Annotated[int, 'z level index'], ndarray]
mask: ndarray
ens_init_dir: str | None
truth_dir: str | None
ens_run_strategy: Literal['scheduler', 'batch']
nproc_per_run: int = 1
nproc_per_util: int = 1
walltime: int | None = None
run_process = None
run_status: str = 'pending'
restart_dir: str
forecast_period: int
memory: dict
io_mode: Literal['online', 'offline']
model_name: str
property c: Context
parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]
get_mstr(member)[source]
get_tstr(time)[source]
abstractmethod read_grid(**kwargs) None[source]

Read the grid information from the model output.

Parameters:

**kwargs – Keyword arguments for reading the grid.

read_var(**kwargs) ndarray[source]

Read a variable from the model output.

Parameters:

**kwargs – Keyword arguments for reading the variable.

Returns:

The read variable.

Return type:

np.ndarray

read_var_from_file(**kwargs) ndarray[source]
read_var_from_memory(**kwargs)[source]
write_var(var, **kwargs) None[source]

Write a variable to the model output.

Parameters:
  • var (np.ndarray) – The variable to write.

  • **kwargs – Keyword arguments for writing the variable.

write_var_to_file(var, **kwargs)[source]
write_var_to_memory(var, **kwargs)[source]
save_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
load_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
abstractmethod z_coords(**kwargs) ndarray[source]

Get the vertical coordinates of the model.

Parameters:

**kwargs – Keyword arguments for getting the vertical coordinates.

Returns:

The vertical coordinates.

Return type:

np.ndarray

abstractmethod preprocess(*args, **kwargs) None[source]

Preprocess the model data.

Parameters:

**kwargs – Keyword arguments for preprocessing.

abstractmethod postprocess(*args, **kwargs) None[source]

Postprocess the model data.

Parameters:

**kwargs – Keyword arguments for postprocessing.

abstractmethod run(*args, **kwargs) None[source]

Run the model forward in time.

Parameters:
  • *args – Arguments

  • **kwargs – Keyword arguments

Keyword Arguments:
  • time (datetime) – current time when forecast starts

  • restart_dir (str) – directory where restart files are located

  • forecast_period (int) – forecast period in hours

If self.ens_run_strategy == ‘batch’, the method will run all ensemble members in one go, expect additional kwargs[‘nens’] to be the ensemble size. If self.ens_run_strategy == ‘scheduler’, the method runs a single member indexed by kwargs[‘member’], and kwargs[‘worker_id’] is the pid assigned by the scheduler to run this method.

generate_truth(*args, **kwargs) None[source]

Generate truth (nature run) model states. Use for running synthetic observation experiments.

generate_init_ensemble(*args, **kwargs) None[source]

Generate initial perturbed model states for ensemble forecasts.

Parameters:
  • nens (int) – ensemble size

  • **kwargs

NEDAS.core.obs module

class NEDAS.core.obs.Obs(c: Context)[source]

Bases: object

Class for handling observations.

The observation has dimensions: variable, time, z, y, x Since the observation network is typically irregular, we store the obs record for each variable in a 1d sequence, with coordinates (t,z,y,x), and size nobs

To parallelize workload, we distribute each obs record over all the processors

for batch assimilation mode, each pid stores the list of local obs within the hroi of its tiles, with size nlobs (number of local obs)

for serial mode, each pid stores a non-overlapping subset of the obs list, here ‘local’ obs (in storage sense) is broadcast to all pid before computing its update to the state/obs near that obs.

The hroi is separately defined for each obs record. For very large hroi, the serial mode is more parallel efficient option, since in batch mode the same obs may need to be stored in multiple pids

To compare to the observation, obs_prior simulated by the model needs to be computed, they have dimension [nens, nlobs], indexed by (mem_id, obs_id)

obs_inds: dict = {}
obs_seq: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'] = {}
obs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'] = {}
lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
lobs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
obs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
data: dict = {}
obs_rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'obs record id']]] = {}
distribute_obs_tasks(c: Context)[source]

Distribute obs_rec_id across processors

Parameters:

c (Context) – the runtime context object.

Returns:

Dictionary {pid_rec (int): list[obs_rec_id (int)]}

Return type:

dict

get_ref_z(c: Context, model_name: str, time: datetime) dict[Annotated[int, 'z level index'], ndarray][source]

Get the reference z coords at level k on the analysis grid from a model, to be used in dataset modules for generating generate_obs_network or superobing/thinning in read_obs.

Parameters:
  • c (Context) – the runtime context

  • model_name (str) – the model name

  • time (datetime) – the time of the model state

Returns:

the z coords field at each level

Return type:

dict[LevelID, np.ndarray]

state_to_obs(c: Context, tag: str, **kwargs) ndarray[source]

Compute the corresponding obs value given the state variable(s), namely the “obs_prior” This function includes several ways to compute the obs_prior:

1, If obs_name is one of the variables provided by the model_src module, then model_src.read_var shall be able to provide the obs field defined on model native grid. Then we convert the obs field to the analysis grid and do vertical interpolation.

2, If obs_name is one of the variables provided by obs.obs_operator, we call it to obtain the obs seq. Typically the obs_operator performs more complex computation, such as path integration, radiative transfer model, etc. (slowest)

Parameters:
  • c (Context) – the runtime context object

  • tag (str) – ‘prior’ or ‘post’, or ‘truth’ if generating synthetic obs

  • **kwargs – Additional parameters - member: int, member index; or None if dealing with synthetic obs - name: str, obs variable name - time: datetime obj, time of the obs window - is_vector: bool, if True the obs is a vector measurement - dataset_src: str, dataset source module name providing the obs - model_src: str, model source module name providing the state - x, y, z, t: np.array, coordinates from obs_seq

Returns:

Values corresponding to the obs_seq but from the state identified by kwargs

Return type:

np.ndarray

get_model_fld_z_on_grid(c: Context, tag: str, **kwargs) tuple[ndarray, ndarray][source]

Get obs variable field and z coords at level k and convert to c.grid

horizontal_interp(c: Context, fld: ndarray, zfld: ndarray, is_vector: bool, obs_x: ndarray, obs_y: ndarray) tuple[ndarray, ndarray][source]

Interpolate fld and zfld horizontally to the obs_x,obs_y locations

vertical_interp(seq: ndarray, k: int, levels: Annotated[ndarray | Sequence, 'z levels'], f: ndarray, fp: ndarray | None, z: ndarray, zp: ndarray | None, dzp: ndarray | None, obs_z: ndarray) tuple[source]

Interpolate f(k) with z(k) coords vertically to the obs z locations.

Vertical interp to obs_z, take ocean depth as example: : - - - - :

————z[k-2] ————————

k-1 - - - - f[k-1], fp }dzp prevous layer

———- z[k-1], zp ——————–

k - - - - v[k], f }dz current layer

———- z[k], z ——————–

k+1 - - - - v[k+1]

layer thickness of the current level k is denoted as z, for the previous level as zp; the variable f are considered layer averages, so they are defined at layer centers.

validate_seq_shape(seq: ndarray, is_vector: bool) None[source]

Validate the shape of an observation sequence. Allowed shape: (nobs,) for scalar obs seq; (2, nobs) for vector obs seq.

collect_obs_seq(c: Context) Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'][source]

Process the obs in parallel, read dataset files and convert to obs_seq which contains obs value, coordinates and other info

Since this is the actual obs (1 copy), only 1 processor needs to do the work

Argss:

c (Context): The runtime context object.

Returns:

observation sequence. Dictionary {obs_rec_id (int): record}

where each record is a dictionary {key: np.ndarray}, the mandatory keys are ‘obs’ the observed values (measurements) ‘x’, ‘y’, ‘z’, ‘t’ the coordinates for each measurement ‘err_std’ the uncertainties for each measurement there can be other optional keys provided by read_obs() but we don’t use them

Return type:

ObsSeq

prepare_obs(c: Context) None[source]
prepare_obs_from_state(c: Context, tag: str) None[source]

Compute the obs priors in parallel, run state_to_obs to obtain obs_prior_seq

Parameters:
  • c (Context) – the runtime context object

  • tag (str) – ‘prior’ or ‘post’ ensemble model states

global_obs_list(c: Context) list[tuple[Annotated[int, 'obs record id'], int | None, Annotated[int, 'process id in comm'], int]][source]
transpose_obs_seq(c: Context, input_obs: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']) Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'][source]

Transpose the obs sequence from field-complete to ensemble-complete

Parameters:
  • c (Context) – the runtime context

  • input_obs (ObsSeq) – obs_seq from process_all_obs(), dict[obs_rec_id, dict[key, np.array]]

Returns,

LocalObsSeq: the lobs dict[obs_rec_id, dict[par_id, dict[key, np.array]]], key = ‘obs’,’x’,’y’,’z’,’t’…

transpose_to_ensemble_complete(c: Context, input_obs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'][source]

Transpose obs from field-complete to ensemble-complete

Step 1, Within comm_mem, send the subset of input_obs with mem_id and par_id from the source proc (src_pid) to the destination proc (dst_pid), store the result in tmp_obs with all the mem_id (ensemble-complete)

Step 2, Gather all obs_rec_id within comm_rec, so that each pid_rec will have the entire obs record for assimilation

Parameters:
  • c (Context) – the runtime context

  • input_obs (ObsEns) – obs_prior from process_all_obs_priors(), dict[(mem_id, obs_rec_id), np.array];

Returns,

LocalObsEns: the lobs_prior dict[(mem_id, obs_rec_id), dict[par_id, np.array]]

transpose_to_field_complete(c: Context, lobs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'][source]

Transpose obs from ensemble-complete to field-complete

Parameters:
  • c (Context) – the runtime context

  • lobs (LocalObsEns) – ensemble-complete local obs

Returns:

field-complete obs_seq ensemble

Return type:

ObsEns

pack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) dict[source]

pack lobs and lobs_prior into arrays for the jitted functions

unpack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'], data: dict) None[source]

unpack data and write back to the original lobs_prior dict

NEDAS.core.obs_info module

class NEDAS.core.obs_info.ObsInfo(c: Context)[source]

Bases: object

Manages the metadata, indexing and memory allocation for the observation sequences

records

dictionary containing obs_rec_id and the corresponding obs record

Type:

dict[int], ObsRecord]

variables

set of unique variables in the observations

Type:

set[str]

err_types

set of unique error models used in the observations

Type:

set[str]

records: dict[int, ObsRecord]
variables: list[str]
err_types: list[str]
add_obs_record(c: Context, vrec: dict)[source]

Add observation record

Parameters:
  • c (Context) – the runtime context object

  • vrec (dict) – the observation record defining its properties

complete_err_cross_corr_matrix()[source]

Go through the obs error cross correlation matrix again to fill in the default values

NEDAS.core.scheme module

class NEDAS.core.scheme.Scheme(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: ABC

Runtime scheme base class.

The Scheme coordinates all runtime generation and manipulation of objects.

steps_need_mpi: dict[str, bool] = {}
scheduler: OfflineScheduler | None = None
config: Config
online_mode: bool
use_synthetic_obs: bool = False
property c

The runtime context, with lazy initialization

abstractmethod run_all()[source]

A schemem must implement a run_all method to describe the workflow.

external_call(step: str | None = None, **kwargs)[source]

Run the scheme from an external call. Saving the current context to a temporary config file, then run a subprocess to

run_step(step: str) None[source]

Manages how to run a specified step in the workflow.

run_ensemble_tasks(strategy: Literal['scheduler', 'batch'], tag: Literal['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean'], task_name: str, func: Callable, **opts) None[source]

NEDAS.core.state module

class NEDAS.core.state.State(c: Context)[source]

Bases: object

The State class manages the state variables for the assimilation system.

The analysis is performed on a regular grid.

The entire state has dimensions: member, variable, time, z, y, x indexed by: mem_id, v, t, k, j, i with size: nens, nv, nt, nz, ny, nx

To parallelize workload, we group the dimensions into 3 indices:

mem_id indexes the ensemble members

rec_id indexes the uniq 2D fields with (v, t, k), since nz and nt may vary for different variables, we stack these dimensions in the ‘record’ dimension with size nrec

par_id indexes the spatial partitions, which are subset of the 2D grid given by (ist, ied, di, jst, jed, dj), for a complete field fld[j,i] the processor with par_id stores fld[ist:ied:di, jst:jed:dj] locally.

The entire state is distributed across the memory of many processors, at any moment, a processor only stores a subset of state in its memory: either having all the mem_id,rec_id but only a subset of par_id (we call this ensemble-complete), or having all the par_id but a subset of mem_id,rec_id (we call this field-complete). It is easier to perform i/o and pre/post processing on field-complete state, while easier to run assimilation algorithms with ensemble-complete state.

partitions: list = []
par_list: dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]] = {}
fields_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
fields_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
state_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
fields_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
data = {}
info: StateInfo
rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'field record id']]]
distribute_state_tasks(c: Context) dict[int, list[int]][source]

Distribute rec_id across processors

prepare_state(c: Context) None[source]

Main method to collect fields from model to form the complete state (field-complete distributed)

collect_prior_fields(c: Context) None[source]

Collect fields from prior model state, convert them to the analysis grid, preprocess (coarse-graining etc), save to fields[mem_id, rec_id] pointing to the uniq fields

Parameters:

c (Context) – context object

Returns:

fields dictionary [(mem_id, rec_id), fld]

where fld is np.array defined on c.grid, it’s one of the state variable field

dict: fields_z dictionary [(mem_id, rec_id), zfld]

where zfld is same shape as fld, it’s he z coordinates corresponding to each field

Return type:

dict

collect_scalar_variables(c)[source]
output_state(c: Context, tag: str, mem_id_out: int | None = None, rec_id_out: int | None = None) None[source]

Parallel output the fields to the binary state_file

Parameters:
  • c (Context) – the runtime context obj

  • tag (str) – which version of state this is: ‘prior’, ‘post’ or ‘z’ coords?

  • mem_id_out (int, optional) – member id to be output, if None all available ids will output.

  • rec_id_out (int, optional) – record id to be output, if None all available ids will output.

output_ens_mean(c: Context, tag: str) None[source]

Compute ensemble mean of a field stored distributively on all pid_mem collect means on pid_mem=0, and output to mean_file

Parameters:
  • c (Context) – the runtime context obj

  • tag (str) – which version of state this is: ‘prior_mean’, ‘post_mean’, or ‘z’

  • mean_file (str) – path to the output binary file for the ensemble mean

output_ref_z(c: Context)[source]
pack_field_chunk(c: Context, fld, is_vector, dst_pid)[source]
unpack_field_chunk(c, fld, fld_chk, src_pid)[source]
transpose_to_ensemble_complete(c: Context, fields: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'][source]

Send chunks of field owned by a pid to other pid so that the field-complete fields get transposed into ensemble-complete state with keys (mem_id, rec_id) pointing to the partition in par_list

Parameters:
  • c (Context) – the runtime context

  • fields (FieldEns) – The locally stored field-complete fields with subset of mem_id,rec_id

Returns:

The locally stored ensemble-complete field chunks on partitions, dict[(mem_id, rec_id), dict[par_id, fld_chk]]

Return type:

StateEns

transpose_to_field_complete(c: Context, state: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'][source]

Transposes back the state to field-complete fields

Parameters:
  • c (Context) – the runtime context

  • state (StateEns) – the locally stored ensemble-complete field chunks for subset of par_id

Returns:

the locally stored field-complete fields for subset of mem_id,rec_id.

Return type:

FieldEns

pack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) dict[source]

pack state dict into arrays to be more easily handled by jitted funcs

unpack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], data: dict) None[source]

unpack data and write back to the state dict

NEDAS.core.state_info module

class NEDAS.core.state_info.StateInfo(c: Context)[source]

Bases: object

Manages the metadata, indexing, and memory offsets for the model state.

shape

domain dimension(s) for the fields

Type:

tuple

fields

dictionary containing field ids and the corresponding field records

Type:

dict[int, FieldRecord]

size

total size of the complete state (bytes), for one member

Type:

int

variables

set of unique variables in the state

Type:

set[str]

err_types

set of unique error models in the state

Type:

set[str]

shape: tuple
mask: ndarray
fields: dict[int, FieldRecord]
size: int
variables: list[str]
err_types: list[str]
add_fields_for_variable(c: Context, vrec: dict) None[source]

Add fields for a variable in the state. The state variable has dimensions t, z, y, x while the ‘field’ is the 2D part with y, x dimensions.

Parameters:
  • c (Context) – the runtime context object

  • vrec (dict) – the variable record defining its properties

write_to_file(binfile: str)[source]

Write the info to a .dat file accompanying the .bin file

Parameters:

binfile (str) – File path for the .bin file

read_from_file(binfile: str)[source]

Read .dat file accompanying the .bin file and updates state_info

Parameters:

binfile (str) – File path for the .bin file

NEDAS.core.transform module

class NEDAS.core.transform.Transform(c: Context, **kwargs)[source]

Bases: ABC

Base class for miscellaneous transform functions

abstractmethod forward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]

Forward transform for the model state variables.

Parameters:
  • c (Context) – the runtime context.

  • rec (FieldRecord) – State information record for the variable.

  • field (np.ndarray) – The state variable.

Returns:

The transformed state variable.

Return type:

np.ndarray

abstractmethod backward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]

Backward (inverse) transform for the model state variables.

Parameters:
  • c (Context) – the runtime context.

  • rec (FieldRecord) – State information record for the variable.

  • field (np.ndarray) – The transformed state variable.

Returns:

The state variable transformed back to the original space.

Return type:

np.ndarray

abstractmethod forward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]

Forward transform for the observation sequence.

Parameters:
  • c (Context) – the runtime context.

  • obs_rec (ObsRecord) – Observation information record.

  • obs_seq (dict[str, np.ndarray]) – The observation sequence. With keys 'obs' the observation; 'x', 'y', 'z', 't' the space/time coordinates; and 'err_std' the observation errors.

Returns:

The transformed observation sequence.

Return type:

dict[str, np.ndarray]

abstractmethod backward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]

Backward (inverse) transform for the observation sequence.

Parameters:
  • c (Context) – the runtime context.

  • obs_rFieldRecord)ict) – Observation information record.

  • obs_seq (dict) – The transformed observation sequence. With keys 'obs' the observation; 'x', 'y', 'z', 't' the space/time coordinates; and 'err_std' the observation errors.

Returns:

The observation sequence transformed back to the original space.

Return type:

dict

NEDAS.core.types module

class NEDAS.core.types.VarDesc(name: str | tuple[str, str], is_vector: bool, dtype: str = 'float', dt: float = 1, levels: Annotated[Union[numpy.ndarray, Sequence], 'z levels'] = <factory>, units: Annotated[str | float, 'physical unit'] = 1, z_units: Annotated[str | float, 'physical unit'] = 1)[source]

Bases: object

name: str | tuple[str, str]
is_vector: bool
dtype: str = 'float'
dt: float = 1
levels: Annotated[ndarray | Sequence, 'z levels']
units: Annotated[str | float, 'physical unit'] = 1
z_units: Annotated[str | float, 'physical unit'] = 1
asdict() dict[source]
class NEDAS.core.types.FieldRecord(name: str, model_src: str, dtype: str, is_vector: bool, units: Annotated[str | float, 'physical unit'], err_type: str, time: datetime, dt: float, k: Annotated[int, 'z level index'], pos: int)[source]

Bases: object

Represents a single 2D slice in the state vector.

name

name of the state variable

Type:

str

model_src

name of the model source module for this variable

Type:

str

dtype

data type

Type:

str

is_vector

if this variable is a vector

Type:

bool

units

physical units of this variable

Type:

str|float

err_type

type of error model to use for this variable

Type:

str

time

time coordinate for this field

Type:

datetime

dt

representative time interval (hours) for this field

Type:

float

k

vertical z coordinate index for this field

Type:

int

pos

seek position (number of bytes) for the start of this field in the binary file

Type:

int

name: str
model_src: str
dtype: str
is_vector: bool
units: Annotated[str | float, 'physical unit']
err_type: str
time: datetime
dt: float
k: Annotated[int, 'z level index']
pos: int
asdict() dict[source]
class NEDAS.core.types.ErrorModel(type: str, std: float, hcorr: float, vcorr: float, tcorr: float, cross_corr: dict[str, float])[source]

Bases: object

Parameters defining an error model

type

type of error distribution

Type:

str

std

error standard deviation, in variable units

Type:

float

hcorr

horizontal correlation scale, in grid.x units

Type:

float

vcorr

vertical correlation scale, in z units

Type:

float

tcorr

temporal correlation scale, in hours

Type:

float

cross_corr

cross-variable correlation dictionary

Type:

dict[str, float]

type: str
std: float
hcorr: float
vcorr: float
tcorr: float
cross_corr: dict[str, float]
asdict() dict[source]
class NEDAS.core.types.ObsRecord(name: str, dataset_src: str, model_src: str, nobs: int, obs_window_min: int, obs_window_max: int, dtype: str, is_vector: bool, units: Annotated[str | float, 'physical unit'], z_units: Annotated[str | float, 'physical unit'], time: datetime, dt: float, err: ErrorModel, hroi: float, vroi: float, troi: float, impact_on_state: dict)[source]

Bases: object

Represents a single observation record in the full list of observations

name

name of the observation record

Type:

str

dataset_src

name of dataset source module for this observation

Type:

str

model_src

name of the model source module for this observation

Type:

str

nobs

number of individual observations in this record

Type:

int

obs_window_min

offset from analysis time for the start of the observation window (hours)

Type:

int

obs_window_max

offset from analysis time for the end of the observation window (hours)

Type:

int

err

error model used for this observation

Type:

ErrorModel

dtype

data type

Type:

str

is_vector

if this variable is a vector

Type:

bool

units

physical units of this observation

Type:

str

z_units

vertical coordinate units

Type:

str

time

time coordinate for this observation

Type:

datetime

dt

representative time interval (hours) for this observation

Type:

float

name: str
dataset_src: str
model_src: str
nobs: int
obs_window_min: int
obs_window_max: int
dtype: str
is_vector: bool
units: Annotated[str | float, 'physical unit']
z_units: Annotated[str | float, 'physical unit']
time: datetime
dt: float
err: ErrorModel
hroi: float
vroi: float
troi: float
impact_on_state: dict
asdict() dict[source]

NEDAS.core.updator module

class NEDAS.core.updator.Updator(c: Context)[source]

Bases: ABC

Base class for updators of the model restart files

increment: dict = {}
update(c: Context) None[source]

Top-level routine to apply the analysis increments to the original model restart files (as initial conditions for the next forecast)

init_all_file_locks(c: Context) None[source]

Prepare file locks for asynchronous io, needed for blocking write (e.g. in netcdf without parallel support)

abstractmethod compute_increment(c: Context) None[source]
abstractmethod update_files(c: Context, mem_id: Annotated[int, 'member id'], rec_id: Annotated[int, 'field record id']) None[source]

Module contents

class NEDAS.core.Context(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: object

Runtime context manages the generation and interaction of dynamic objects in runtime

comm: parallel.Comm
comm_rec: parallel.Comm
comm_mem: parallel.Comm
progress: progress.Progress
grid: grid.GridType
grid_orig: grid.GridType
models: dict[str, Model]
datasets: dict[str, Dataset]
assimilator: Assimilator
updator: Updator
transform_funcs: list[Transform]
localization_funcs: dict[str, Callable]
inflation_func: Inflation
state: State
obs: Obs
debug: bool
interactive: bool
is_notebook: bool
time: datetime
iter: int
pid_show: int
nens: int
mem_list: dict[ProcIDMem, list[MemID]]
fs: FileSystem
io: IOBackend
jsub: JobSubmitter
check_interactive() bool[source]

If the runtime environment supports interactive output (with ANSI escape code).

check_notebook() bool[source]

If the runtime environment is a jupyter notebook

get_cols() int[source]
set_logging() None[source]
distribute_mem_tasks() dict[int, list[int]][source]

Distribute mem_id across processors

update_assim_tools()[source]

Update the assimilation tool components based on runtime configuration

property prev_time: datetime

Previous analysis time. Automatically updated when self.time changes.

Returns:

Previous analysis time.

Return type:

datetime

property next_time: datetime

Next analysis time. Automatically updated when self.time changes.

Returns:

Next analysis time.

Return type:

datetime

set_comm() None[source]

Initialize the MPI communicator, split the communicator if necessary.

For serial program, use a dummy communicator, set nproc to the number of available processors on the machine; for MPI program, use MPI.COMM_WORLD and check if size matchs with nproc.

Split the communicator into member and record groups, according to nproc and nproc_mem. See NEDAS.utils.parallel module for more details.

set_grid() None[source]

Initialize the analysis grid based on the configuration.

If grid_def['type'] is ‘custom’, will create a analysis grid based on provided parameters. If grid_def['type'] is a model name, will load the grid from the specified model class.

set_models() None[source]

Initialize model instances based on model_def[model_name] settings. Store the model instances in models[model_name].

set_datasets() None[source]

Initialize dataset instances based on dataset_def[dataset_name] settings. Store the dataset instances in datasets[dataset_name].

property total_tasks
property current_task
property message
property debug_message
timer(func: Callable)[source]

Decorator to count the elapsed time for a function at runtime.

logger(func_name: str)[source]

Decorator to register the func in call stack and show runtime messages.

print_1p(msg: str)[source]

Customized print function for showing runtime message.

Only the processor with PID = self.pid_show will show the message, this avoids the redundancy if all processors are showing the same message.

log_event(msg: str, flag='')[source]
show_greeting() None[source]
show_summary() None[source]
dump_config(config_file: str) None[source]

Dumps a snapshot of the current state to a yaml config file. The original config object remains unchanged in memory.

run_job(commands: str, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', nproc: int = 1, offset: int = 0, **kwargs) None[source]

The user-facing method for running command on a computer. It re-configures the existing job submitter with runtime arguments and execute the command.

Parameters:
  • commands (str) – Shell commands to be dispatched by job submitter

  • parallel_mode (ParallelMode, optional) – parallel mode (‘serial’, ‘mpi’, ‘openmp’), default is ‘serial’

  • nproc (int, optional) – number of processors (default is 1)

  • offset (int, optional) – offset in full list of processors (default is 0)

  • **kwargs – other keyword arguments to update the job submitter configuration

class NEDAS.core.Assimilator(c: Context)[source]

Bases: ABC

assimilate(c: Context)[source]

Main method to run the batch assimilation algorithm

partition_grid(c: Context) None[source]

Partition the analysis grid into several parts and distribute the workload over the mpi ranks.

abstractmethod init_partitions(c: Context) list[source]

Generate spatial partitioning of the domain

abstractmethod assign_obs(c: Context) dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], ndarray]][source]

Assign the observation sequence to each partition par_id

Parameters:

c (Context) – the runtime context object

Returns:

Indices in the full obs_seq for the subset of obs that belongs to partition par_id

Return type:

dict[ObsRecordID, dict[PartitionID, np.ndarray]]

abstractmethod distribute_partitions(c: Context) dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]][source]

Distribute partitions across processors

transpose_to_ensemble_complete(c: Context) None[source]

Communicate among mpi ranks and transpose the locally-stored state/obs chunks to ensemble-complete

transpose_to_field_complete(c: Context)[source]

Communicate among mpi ranks and transpose the locally-stored state/obs chunks back to field-complete

abstractmethod assimilation_algorithm(c: Context) None[source]

The main assimilation algorithm will be implemented by subclasses

class NEDAS.core.Updator(c: Context)[source]

Bases: ABC

Base class for updators of the model restart files

increment: dict = {}
update(c: Context) None[source]

Top-level routine to apply the analysis increments to the original model restart files (as initial conditions for the next forecast)

init_all_file_locks(c: Context) None[source]

Prepare file locks for asynchronous io, needed for blocking write (e.g. in netcdf without parallel support)

abstractmethod compute_increment(c: Context) None[source]
abstractmethod update_files(c: Context, mem_id: Annotated[int, 'member id'], rec_id: Annotated[int, 'field record id']) None[source]
class NEDAS.core.Inflation(coef: float = 1.0, adaptive: bool = False, prior: bool = False, post: bool = False)[source]

Bases: ABC

Class for inflating the ensemble members (covariance inflation)

validate_obs_ens(c: Context, obs_ens: dict) bool[source]

Check if the obs_ens has all member and records

obs_space_stats(c: Context)[source]

observation-space statistics

abstractmethod adaptive_prior_inflation(c: Context)[source]
abstractmethod adaptive_post_inflation(c: Context)[source]
abstractmethod apply_inflation(c: Context, flag: Literal['prior', 'post'])[source]
class NEDAS.core.Transform(c: Context, **kwargs)[source]

Bases: ABC

Base class for miscellaneous transform functions

abstractmethod forward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]

Forward transform for the model state variables.

Parameters:
  • c (Context) – the runtime context.

  • rec (FieldRecord) – State information record for the variable.

  • field (np.ndarray) – The state variable.

Returns:

The transformed state variable.

Return type:

np.ndarray

abstractmethod backward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]

Backward (inverse) transform for the model state variables.

Parameters:
  • c (Context) – the runtime context.

  • rec (FieldRecord) – State information record for the variable.

  • field (np.ndarray) – The transformed state variable.

Returns:

The state variable transformed back to the original space.

Return type:

np.ndarray

abstractmethod forward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]

Forward transform for the observation sequence.

Parameters:
  • c (Context) – the runtime context.

  • obs_rec (ObsRecord) – Observation information record.

  • obs_seq (dict[str, np.ndarray]) – The observation sequence. With keys 'obs' the observation; 'x', 'y', 'z', 't' the space/time coordinates; and 'err_std' the observation errors.

Returns:

The transformed observation sequence.

Return type:

dict[str, np.ndarray]

abstractmethod backward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]

Backward (inverse) transform for the observation sequence.

Parameters:
  • c (Context) – the runtime context.

  • obs_rFieldRecord)ict) – Observation information record.

  • obs_seq (dict) – The transformed observation sequence. With keys 'obs' the observation; 'x', 'y', 'z', 't' the space/time coordinates; and 'err_std' the observation errors.

Returns:

The observation sequence transformed back to the original space.

Return type:

dict

class NEDAS.core.Model(context: Context | None = None, io_mode: Literal['online', 'offline'] | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: Generic[GridT], ABC

Class for configuring and running a model

variables: dict[Annotated[str, 'variable name'], VarDesc]
grid: GridT
z: dict[Annotated[int, 'z level index'], ndarray]
mask: ndarray
ens_init_dir: str | None
truth_dir: str | None
ens_run_strategy: Literal['scheduler', 'batch']
nproc_per_run: int = 1
nproc_per_util: int = 1
walltime: int | None = None
run_process = None
run_status: str = 'pending'
restart_dir: str
forecast_period: int
memory: dict
io_mode: Literal['online', 'offline']
model_name: str
property c: Context
parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]
get_mstr(member)[source]
get_tstr(time)[source]
abstractmethod read_grid(**kwargs) None[source]

Read the grid information from the model output.

Parameters:

**kwargs – Keyword arguments for reading the grid.

read_var(**kwargs) ndarray[source]

Read a variable from the model output.

Parameters:

**kwargs – Keyword arguments for reading the variable.

Returns:

The read variable.

Return type:

np.ndarray

read_var_from_file(**kwargs) ndarray[source]
read_var_from_memory(**kwargs)[source]
write_var(var, **kwargs) None[source]

Write a variable to the model output.

Parameters:
  • var (np.ndarray) – The variable to write.

  • **kwargs – Keyword arguments for writing the variable.

write_var_to_file(var, **kwargs)[source]
write_var_to_memory(var, **kwargs)[source]
save_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
load_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
abstractmethod z_coords(**kwargs) ndarray[source]

Get the vertical coordinates of the model.

Parameters:

**kwargs – Keyword arguments for getting the vertical coordinates.

Returns:

The vertical coordinates.

Return type:

np.ndarray

abstractmethod preprocess(*args, **kwargs) None[source]

Preprocess the model data.

Parameters:

**kwargs – Keyword arguments for preprocessing.

abstractmethod postprocess(*args, **kwargs) None[source]

Postprocess the model data.

Parameters:

**kwargs – Keyword arguments for postprocessing.

abstractmethod run(*args, **kwargs) None[source]

Run the model forward in time.

Parameters:
  • *args – Arguments

  • **kwargs – Keyword arguments

Keyword Arguments:
  • time (datetime) – current time when forecast starts

  • restart_dir (str) – directory where restart files are located

  • forecast_period (int) – forecast period in hours

If self.ens_run_strategy == ‘batch’, the method will run all ensemble members in one go, expect additional kwargs[‘nens’] to be the ensemble size. If self.ens_run_strategy == ‘scheduler’, the method runs a single member indexed by kwargs[‘member’], and kwargs[‘worker_id’] is the pid assigned by the scheduler to run this method.

generate_truth(*args, **kwargs) None[source]

Generate truth (nature run) model states. Use for running synthetic observation experiments.

generate_init_ensemble(*args, **kwargs) None[source]

Generate initial perturbed model states for ensemble forecasts.

Parameters:
  • nens (int) – ensemble size

  • **kwargs

class NEDAS.core.Dataset(context: Context | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: ABC

Dataset class (template for specific dataset sources)

variables: dict[Annotated[str, 'variable name'], VarDesc] = {}
obs_operator: dict[Annotated[str, 'variable name'], Callable] = {}
memory: dict = {}
dataset_name: str
property c: Context
parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]

Parse the input kwargs to pinpoint a specific file/variable…

get_mstr(member)[source]
get_tstr(time)[source]
generate_obs_network(**kwargs) dict[str, ndarray][source]

Generate a random observing network for use in synthetic observation experiments.

Parameters:

**kwargs

read_obs(**kwargs) dict[str, ndarray][source]
abstractmethod read_obs_from_file(**kwargs) dict[str, ndarray][source]

Return observation sequence matching the given kwargs

read_obs_from_memory(**kwargs) dict[str, ndarray][source]
write_obs(seq: dict, **kwargs) None[source]
write_obs_to_file(seq: dict, **kwargs)[source]
write_obs_to_memory(seq: dict, **kwargs)[source]
save_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
load_memory(tag: str, time: datetime | None = None, path: str | None = None) None[source]
class NEDAS.core.FileSystem(config: Config | None = None)[source]

Bases: object

Manages runtime file system paths, name of files and directories

config: Config
cycle_dir(time: datetime) str[source]

Directory path for an analysis cycle.

Parameters:

time (datetime) – Time of the analysis cycle.

Returns:

Directory path for the analysis cycle.

Return type:

str

forecast_dir(time: datetime, model_name: str) str[source]

Directory path for a model forecast step.

Parameters:
  • time (datetime) – Time of the analysis cycle.

  • model_name (str) – Name of the model.

Returns:

Directory path for the model forecast.

Return type:

str

analysis_dir(time: datetime, iter: int = 0) str[source]

Directory path for an analysis step.

Parameters:
  • time (datetime) – Time of the analysis cycle.

  • iter (int) – If niter > 1, an outer iteration loop exists, step is the index in the loop.

Returns:

Directory path for the analysis step.

Return type:

str

make_dir(dirname: str | None) None[source]

Create a directory if it does not exist.

FileExistsError can happen if multiple processors are trying to make the same directory. This function will ignore this error and continue.

Parameters:

dirname (str|None) – Directory name to be created.

copy_file(file1: str, file2: str) None[source]
move_file(file1: str, file2: str) None[source]
move_files_to_dir(files: str, dirname: str) None[source]
remove_files(files: str) None[source]
remove_dir(dirname: str) None[source]

Tries to create a symbolic link. If the filesystem doesn’t support it (like exFAT), falls back to a file copy.

class NEDAS.core.IOBackend[source]

Bases: ABC

Base class for handling runtime input/output for state and observation variables and OS-level operation such as file manipulation and running commands

io_mode

‘offline’ for file I/O and ‘online’ for persistent memory I/O

Type:

IOMode

tags

List of names for copies of state/obs data

Type:

list[str]

IOTags:

‘current’: Mutable buffer for the data, being updated by assimilation and outer-loop iterations ‘prior’: read-only snapshot, also known as background/forecast, kept for O-B statistics. ‘post’: final state after the assimilation, known as the (re)analysis. ‘truth’: truth, as reference state in synthetic OSSE experiments ‘raw’: original information. For obs it is the actual obs

io_mode: Literal['online', 'offline'] = 'offline'
tags: list[str] = ['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean']
validate_tag(tag: str)[source]
prepare_fields_storage(c: Context, tag: str) None[source]

Prepare for storage of fields data. Only needed for offline io modes: initialize the binary file that stores fields and write its metadata.

abstractmethod read_field(c: Context, tag: str, rec_id: int, mem_id: int) np.ndarray[source]

Read a 2D field data from the state

Parameters:
  • c (Context) – the runtime context

  • tag (str) – which copy of the state to read from

  • rec_id (int) – field record index

  • mem_id (int) – ensemble member index from 0 to nens-1

Returns:

the 2D field data

Return type:

np.ndarray

abstractmethod write_field(fld: np.ndarray, c: Context, tag: str, rec_id: int, mem_id: int) None[source]

Write a 2D field data to the state

Parameters:
  • fld (np.ndarray) – the 2D field data

  • c (Context) – the runtime context

  • tag (str) – which copy of the state to write to

  • rec_id (int) – field record index

  • mem_id (int) – ensemble member index

abstractmethod call_method(c: Context, tag: str, method: Callable, *args, **kwargs) Any[source]

Call a method to perform some tasks.

Parameters:
  • c (Context) – the runtime context

  • tag (str) – which copy of the model state to request io from: “prior”, “post” or “truth”

  • method (Callable) – method name

  • *args – will be passed to the method

  • **kwargs

    will be passed to the method

Returns:

whatever the method(**kwargs) returns

Return type:

Any

save_ndarray(c: Context, name: str, data: np.ndarray, path: str | None = None) None[source]

Save ndarray data

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • data (np.ndarray) – the data

  • path (str, optional) – system path to save the data to.

load_ndarray(c: Context, name: str, path: str | None = None) np.ndarray | None[source]

Load ndarray from saved data

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • path (str, optional) – system path to the saved data.

Returns:

the data

Return type:

np.ndarray

save_debug_data(c: Context, name: str, data: dict, path: str | None = None) None[source]

Save debug data in npy format

Parameters:
  • c (Context) – the runtime context

  • name (str) – the name of the data

  • data (dict) – the data

  • path (str, optional) – system path to save the data to.

To recover the data, use np.load(file, allow_pickle=True).item()

class NEDAS.core.JobSubmitter(job_name: str = 'run', run_dir: str = '.', nproc: int = 1, offset: int = 0, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', debug: bool = False, **kwargs)[source]

Bases: ABC

Run a command (shell script) on a specific computer. For a local computer this is as easy as a subprocess call to execute the command. But for large-scale HPC, this may involve submitting the job script to a scheduler and wait for its completion.

Parameters:
  • job_name (str, optional) – Name of the job. Defaults to ‘run’.

  • run_dir (str, optional) – Directory where the job will execute. Defaults to ‘.’.

  • nproc (int, optional) – Number of processors requested. Defaults to 1.

  • offset (int, optional) – Number of processors to offset from the start. Defaults to 0.

  • parallel_mode (ParallelMode, optional) – Parallelization strategy (e.g., ‘serial’, ‘mpi’, ‘openmp’). Defaults to ‘serial’.

  • debug (bool, optional) – Enables verbose logging for troubleshooting. Defaults to False.

  • **kwargs – Other arbitrary keyword arguments.

property nproc: int

Number of requested processors for the job

property offset: int

Number of processors to skip from the beginning for the job This allows different jobs to spawm the total available nproc in the allocation

abstract property nproc_avail: int

Number of available processors on a host machine This should be redefined in subclasses to machine specific behavior

abstract property execute_command: str

Execute command for running the job on the host machine, replacing ‘JOB_EXECUTE’ in ‘commands’

parse_commands(commands: str) str[source]

Parse shell command to replace ‘JOB_EXECUTE’ with machine-specific strings.

abstractmethod run(commands: str) None[source]

Top level run method for a job submitter.

Parameters:

commands (str) – shell commands to be run by the job submitter. In the commands string, ‘JOB_EXECUTE’ will be replaced by the correct execute_command, and ‘JOB_ARRAY_INDEX’ will be replaced by the scheduler’s index variable name to perform array jobs.

class NEDAS.core.State(c: Context)[source]

Bases: object

The State class manages the state variables for the assimilation system.

The analysis is performed on a regular grid.

The entire state has dimensions: member, variable, time, z, y, x indexed by: mem_id, v, t, k, j, i with size: nens, nv, nt, nz, ny, nx

To parallelize workload, we group the dimensions into 3 indices:

mem_id indexes the ensemble members

rec_id indexes the uniq 2D fields with (v, t, k), since nz and nt may vary for different variables, we stack these dimensions in the ‘record’ dimension with size nrec

par_id indexes the spatial partitions, which are subset of the 2D grid given by (ist, ied, di, jst, jed, dj), for a complete field fld[j,i] the processor with par_id stores fld[ist:ied:di, jst:jed:dj] locally.

The entire state is distributed across the memory of many processors, at any moment, a processor only stores a subset of state in its memory: either having all the mem_id,rec_id but only a subset of par_id (we call this ensemble-complete), or having all the par_id but a subset of mem_id,rec_id (we call this field-complete). It is easier to perform i/o and pre/post processing on field-complete state, while easier to run assimilation algorithms with ensemble-complete state.

partitions: list = []
par_list: dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]] = {}
fields_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
fields_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
state_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
fields_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
data = {}
info: StateInfo
rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'field record id']]]
distribute_state_tasks(c: Context) dict[int, list[int]][source]

Distribute rec_id across processors

prepare_state(c: Context) None[source]

Main method to collect fields from model to form the complete state (field-complete distributed)

collect_prior_fields(c: Context) None[source]

Collect fields from prior model state, convert them to the analysis grid, preprocess (coarse-graining etc), save to fields[mem_id, rec_id] pointing to the uniq fields

Parameters:

c (Context) – context object

Returns:

fields dictionary [(mem_id, rec_id), fld]

where fld is np.array defined on c.grid, it’s one of the state variable field

dict: fields_z dictionary [(mem_id, rec_id), zfld]

where zfld is same shape as fld, it’s he z coordinates corresponding to each field

Return type:

dict

collect_scalar_variables(c)[source]
output_state(c: Context, tag: str, mem_id_out: int | None = None, rec_id_out: int | None = None) None[source]

Parallel output the fields to the binary state_file

Parameters:
  • c (Context) – the runtime context obj

  • tag (str) – which version of state this is: ‘prior’, ‘post’ or ‘z’ coords?

  • mem_id_out (int, optional) – member id to be output, if None all available ids will output.

  • rec_id_out (int, optional) – record id to be output, if None all available ids will output.

output_ens_mean(c: Context, tag: str) None[source]

Compute ensemble mean of a field stored distributively on all pid_mem collect means on pid_mem=0, and output to mean_file

Parameters:
  • c (Context) – the runtime context obj

  • tag (str) – which version of state this is: ‘prior_mean’, ‘post_mean’, or ‘z’

  • mean_file (str) – path to the output binary file for the ensemble mean

output_ref_z(c: Context)[source]
pack_field_chunk(c: Context, fld, is_vector, dst_pid)[source]
unpack_field_chunk(c, fld, fld_chk, src_pid)[source]
transpose_to_ensemble_complete(c: Context, fields: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'][source]

Send chunks of field owned by a pid to other pid so that the field-complete fields get transposed into ensemble-complete state with keys (mem_id, rec_id) pointing to the partition in par_list

Parameters:
  • c (Context) – the runtime context

  • fields (FieldEns) – The locally stored field-complete fields with subset of mem_id,rec_id

Returns:

The locally stored ensemble-complete field chunks on partitions, dict[(mem_id, rec_id), dict[par_id, fld_chk]]

Return type:

StateEns

transpose_to_field_complete(c: Context, state: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'][source]

Transposes back the state to field-complete fields

Parameters:
  • c (Context) – the runtime context

  • state (StateEns) – the locally stored ensemble-complete field chunks for subset of par_id

Returns:

the locally stored field-complete fields for subset of mem_id,rec_id.

Return type:

FieldEns

pack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) dict[source]

pack state dict into arrays to be more easily handled by jitted funcs

unpack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], data: dict) None[source]

unpack data and write back to the state dict

class NEDAS.core.Obs(c: Context)[source]

Bases: object

Class for handling observations.

The observation has dimensions: variable, time, z, y, x Since the observation network is typically irregular, we store the obs record for each variable in a 1d sequence, with coordinates (t,z,y,x), and size nobs

To parallelize workload, we distribute each obs record over all the processors

for batch assimilation mode, each pid stores the list of local obs within the hroi of its tiles, with size nlobs (number of local obs)

for serial mode, each pid stores a non-overlapping subset of the obs list, here ‘local’ obs (in storage sense) is broadcast to all pid before computing its update to the state/obs near that obs.

The hroi is separately defined for each obs record. For very large hroi, the serial mode is more parallel efficient option, since in batch mode the same obs may need to be stored in multiple pids

To compare to the observation, obs_prior simulated by the model needs to be computed, they have dimension [nens, nlobs], indexed by (mem_id, obs_id)

obs_inds: dict = {}
obs_seq: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'] = {}
obs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'] = {}
lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
lobs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
obs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
data: dict = {}
obs_rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'obs record id']]] = {}
distribute_obs_tasks(c: Context)[source]

Distribute obs_rec_id across processors

Parameters:

c (Context) – the runtime context object.

Returns:

Dictionary {pid_rec (int): list[obs_rec_id (int)]}

Return type:

dict

get_ref_z(c: Context, model_name: str, time: datetime) dict[Annotated[int, 'z level index'], ndarray][source]

Get the reference z coords at level k on the analysis grid from a model, to be used in dataset modules for generating generate_obs_network or superobing/thinning in read_obs.

Parameters:
  • c (Context) – the runtime context

  • model_name (str) – the model name

  • time (datetime) – the time of the model state

Returns:

the z coords field at each level

Return type:

dict[LevelID, np.ndarray]

state_to_obs(c: Context, tag: str, **kwargs) ndarray[source]

Compute the corresponding obs value given the state variable(s), namely the “obs_prior” This function includes several ways to compute the obs_prior:

1, If obs_name is one of the variables provided by the model_src module, then model_src.read_var shall be able to provide the obs field defined on model native grid. Then we convert the obs field to the analysis grid and do vertical interpolation.

2, If obs_name is one of the variables provided by obs.obs_operator, we call it to obtain the obs seq. Typically the obs_operator performs more complex computation, such as path integration, radiative transfer model, etc. (slowest)

Parameters:
  • c (Context) – the runtime context object

  • tag (str) – ‘prior’ or ‘post’, or ‘truth’ if generating synthetic obs

  • **kwargs – Additional parameters - member: int, member index; or None if dealing with synthetic obs - name: str, obs variable name - time: datetime obj, time of the obs window - is_vector: bool, if True the obs is a vector measurement - dataset_src: str, dataset source module name providing the obs - model_src: str, model source module name providing the state - x, y, z, t: np.array, coordinates from obs_seq

Returns:

Values corresponding to the obs_seq but from the state identified by kwargs

Return type:

np.ndarray

get_model_fld_z_on_grid(c: Context, tag: str, **kwargs) tuple[ndarray, ndarray][source]

Get obs variable field and z coords at level k and convert to c.grid

horizontal_interp(c: Context, fld: ndarray, zfld: ndarray, is_vector: bool, obs_x: ndarray, obs_y: ndarray) tuple[ndarray, ndarray][source]

Interpolate fld and zfld horizontally to the obs_x,obs_y locations

vertical_interp(seq: ndarray, k: int, levels: Annotated[ndarray | Sequence, 'z levels'], f: ndarray, fp: ndarray | None, z: ndarray, zp: ndarray | None, dzp: ndarray | None, obs_z: ndarray) tuple[source]

Interpolate f(k) with z(k) coords vertically to the obs z locations.

Vertical interp to obs_z, take ocean depth as example: : - - - - :

————z[k-2] ————————

k-1 - - - - f[k-1], fp }dzp prevous layer

———- z[k-1], zp ——————–

k - - - - v[k], f }dz current layer

———- z[k], z ——————–

k+1 - - - - v[k+1]

layer thickness of the current level k is denoted as z, for the previous level as zp; the variable f are considered layer averages, so they are defined at layer centers.

validate_seq_shape(seq: ndarray, is_vector: bool) None[source]

Validate the shape of an observation sequence. Allowed shape: (nobs,) for scalar obs seq; (2, nobs) for vector obs seq.

collect_obs_seq(c: Context) Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'][source]

Process the obs in parallel, read dataset files and convert to obs_seq which contains obs value, coordinates and other info

Since this is the actual obs (1 copy), only 1 processor needs to do the work

Argss:

c (Context): The runtime context object.

Returns:

observation sequence. Dictionary {obs_rec_id (int): record}

where each record is a dictionary {key: np.ndarray}, the mandatory keys are ‘obs’ the observed values (measurements) ‘x’, ‘y’, ‘z’, ‘t’ the coordinates for each measurement ‘err_std’ the uncertainties for each measurement there can be other optional keys provided by read_obs() but we don’t use them

Return type:

ObsSeq

prepare_obs(c: Context) None[source]
prepare_obs_from_state(c: Context, tag: str) None[source]

Compute the obs priors in parallel, run state_to_obs to obtain obs_prior_seq

Parameters:
  • c (Context) – the runtime context object

  • tag (str) – ‘prior’ or ‘post’ ensemble model states

global_obs_list(c: Context) list[tuple[Annotated[int, 'obs record id'], int | None, Annotated[int, 'process id in comm'], int]][source]
transpose_obs_seq(c: Context, input_obs: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']) Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'][source]

Transpose the obs sequence from field-complete to ensemble-complete

Parameters:
  • c (Context) – the runtime context

  • input_obs (ObsSeq) – obs_seq from process_all_obs(), dict[obs_rec_id, dict[key, np.array]]

Returns,

LocalObsSeq: the lobs dict[obs_rec_id, dict[par_id, dict[key, np.array]]], key = ‘obs’,’x’,’y’,’z’,’t’…

transpose_to_ensemble_complete(c: Context, input_obs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'][source]

Transpose obs from field-complete to ensemble-complete

Step 1, Within comm_mem, send the subset of input_obs with mem_id and par_id from the source proc (src_pid) to the destination proc (dst_pid), store the result in tmp_obs with all the mem_id (ensemble-complete)

Step 2, Gather all obs_rec_id within comm_rec, so that each pid_rec will have the entire obs record for assimilation

Parameters:
  • c (Context) – the runtime context

  • input_obs (ObsEns) – obs_prior from process_all_obs_priors(), dict[(mem_id, obs_rec_id), np.array];

Returns,

LocalObsEns: the lobs_prior dict[(mem_id, obs_rec_id), dict[par_id, np.array]]

transpose_to_field_complete(c: Context, lobs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'][source]

Transpose obs from ensemble-complete to field-complete

Parameters:
  • c (Context) – the runtime context

  • lobs (LocalObsEns) – ensemble-complete local obs

Returns:

field-complete obs_seq ensemble

Return type:

ObsEns

pack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) dict[source]

pack lobs and lobs_prior into arrays for the jitted functions

unpack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'], data: dict) None[source]

unpack data and write back to the original lobs_prior dict

class NEDAS.core.Perturbation(c: Context)[source]

Bases: object

Perturbation top-level manager

nfld: int = 0
perturb: dict[str, Any] = {}
task_list: dict[int, list[dict]] = {}
distribute_perturb_tasks(c: Context) dict[int, list[dict]][source]
count_num_fields(c: Context)[source]
prepare_perturb_dir(c)[source]

Prepare and clear the directory where perturbation data will be stored (offline mode)

save_perturb_data(c: Context, **rec)[source]

Save a copy of perturbation data, for use by the next analysis cycle

load_perturb_data(c: Context, **rec)[source]

Load the perturbation data

collect_fields(c: Context, t: datetime, k: int, **rec) dict[str, ndarray][source]

Collect all model fields to be perturbed

output_perturbed_fields(c: Context, fields: dict[str, ndarray], t: datetime, k: int, **rec) None[source]
class NEDAS.core.Diagnostics(c: Context)[source]

Bases: object

This class manages diagnostics functions

task_list: dict[Annotated[int, 'process id in comm'], list]
distribute_diag_tasks(c: Context)[source]

Build the full task list and distribute among mpi ranks

init_file_locks(c: Context)[source]

Build the full task list for the diagnostics part of the config

class NEDAS.core.Scheme(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]

Bases: ABC

Runtime scheme base class.

The Scheme coordinates all runtime generation and manipulation of objects.

steps_need_mpi: dict[str, bool] = {}
scheduler: OfflineScheduler | None = None
config: Config
online_mode: bool
use_synthetic_obs: bool = False
property c

The runtime context, with lazy initialization

abstractmethod run_all()[source]

A schemem must implement a run_all method to describe the workflow.

external_call(step: str | None = None, **kwargs)[source]

Run the scheme from an external call. Saving the current context to a temporary config file, then run a subprocess to

run_step(step: str) None[source]

Manages how to run a specified step in the workflow.

run_ensemble_tasks(strategy: Literal['scheduler', 'batch'], tag: Literal['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean'], task_name: str, func: Callable, **opts) None[source]