NEDAS.core package
Submodules
NEDAS.core.assimilator module
- class NEDAS.core.assimilator.Assimilator(c: Context)[source]
Bases:
ABC- partition_grid(c: Context) None[source]
Partition the analysis grid into several parts and distribute the workload over the mpi ranks.
- abstractmethod init_partitions(c: Context) list[source]
Generate spatial partitioning of the domain
- abstractmethod assign_obs(c: Context) dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], ndarray]][source]
Assign the observation sequence to each partition par_id
- Parameters:
c (Context) – the runtime context object
- Returns:
Indices in the full obs_seq for the subset of obs that belongs to partition par_id
- Return type:
dict[ObsRecordID, dict[PartitionID, np.ndarray]]
- abstractmethod distribute_partitions(c: Context) dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]][source]
Distribute partitions across processors
- transpose_to_ensemble_complete(c: Context) None[source]
Communicate among mpi ranks and transpose the locally-stored state/obs chunks to ensemble-complete
NEDAS.core.context module
- class NEDAS.core.context.Context(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
objectRuntime context manages the generation and interaction of dynamic objects in runtime
- comm: parallel.Comm
- comm_rec: parallel.Comm
- comm_mem: parallel.Comm
- progress: progress.Progress
- grid: grid.GridType
- grid_orig: grid.GridType
- assimilator: Assimilator
- localization_funcs: dict[str, Callable]
- debug: bool
- interactive: bool
- is_notebook: bool
- time: datetime
- iter: int
- pid_show: int
- nens: int
- mem_list: dict[ProcIDMem, list[MemID]]
- fs: FileSystem
- jsub: JobSubmitter
- check_interactive() bool[source]
If the runtime environment supports interactive output (with ANSI escape code).
- update_assim_tools()[source]
Update the assimilation tool components based on runtime configuration
- property prev_time: datetime
Previous analysis time. Automatically updated when self.time changes.
- Returns:
Previous analysis time.
- Return type:
datetime
- property next_time: datetime
Next analysis time. Automatically updated when self.time changes.
- Returns:
Next analysis time.
- Return type:
datetime
- set_comm() None[source]
Initialize the MPI communicator, split the communicator if necessary.
For serial program, use a dummy communicator, set
nprocto the number of available processors on the machine; for MPI program, useMPI.COMM_WORLDand check if size matchs withnproc.Split the communicator into member and record groups, according to
nprocandnproc_mem. SeeNEDAS.utils.parallelmodule for more details.
- set_grid() None[source]
Initialize the analysis grid based on the configuration.
If
grid_def['type']is ‘custom’, will create a analysis grid based on provided parameters. Ifgrid_def['type']is a model name, will load the grid from the specified model class.
- set_models() None[source]
Initialize model instances based on
model_def[model_name]settings. Store the model instances inmodels[model_name].
- set_datasets() None[source]
Initialize dataset instances based on
dataset_def[dataset_name]settings. Store the dataset instances indatasets[dataset_name].
- property total_tasks
- property current_task
- property message
- property debug_message
- logger(func_name: str)[source]
Decorator to register the func in call stack and show runtime messages.
- print_1p(msg: str)[source]
Customized print function for showing runtime message.
Only the processor with PID = self.pid_show will show the message, this avoids the redundancy if all processors are showing the same message.
- dump_config(config_file: str) None[source]
Dumps a snapshot of the current state to a yaml config file. The original config object remains unchanged in memory.
- run_job(commands: str, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', nproc: int = 1, offset: int = 0, **kwargs) None[source]
The user-facing method for running command on a computer. It re-configures the existing job submitter with runtime arguments and execute the command.
- Parameters:
commands (str) – Shell commands to be dispatched by job submitter
parallel_mode (ParallelMode, optional) – parallel mode (‘serial’, ‘mpi’, ‘openmp’), default is ‘serial’
nproc (int, optional) – number of processors (default is 1)
offset (int, optional) – offset in full list of processors (default is 0)
**kwargs – other keyword arguments to update the job submitter configuration
NEDAS.core.dataset module
- class NEDAS.core.dataset.Dataset(context: Context | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
ABCDataset class (template for specific dataset sources)
- obs_operator: dict[Annotated[str, 'variable name'], Callable] = {}
- memory: dict = {}
- dataset_name: str
- parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]
Parse the input kwargs to pinpoint a specific file/variable…
- generate_obs_network(**kwargs) dict[str, ndarray][source]
Generate a random observing network for use in synthetic observation experiments.
- Parameters:
**kwargs
NEDAS.core.inflation module
- class NEDAS.core.inflation.Inflation(coef: float = 1.0, adaptive: bool = False, prior: bool = False, post: bool = False)[source]
Bases:
ABCClass for inflating the ensemble members (covariance inflation)
NEDAS.core.io_backend module
- class NEDAS.core.io_backend.IOBackend[source]
Bases:
ABCBase class for handling runtime input/output for state and observation variables and OS-level operation such as file manipulation and running commands
- io_mode
‘offline’ for file I/O and ‘online’ for persistent memory I/O
- Type:
IOMode
- tags
List of names for copies of state/obs data
- Type:
list[str]
- IOTags:
‘current’: Mutable buffer for the data, being updated by assimilation and outer-loop iterations ‘prior’: read-only snapshot, also known as background/forecast, kept for O-B statistics. ‘post’: final state after the assimilation, known as the (re)analysis. ‘truth’: truth, as reference state in synthetic OSSE experiments ‘raw’: original information. For obs it is the actual obs
- io_mode: Literal['online', 'offline'] = 'offline'
- tags: list[str] = ['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean']
- prepare_fields_storage(c: Context, tag: str) None[source]
Prepare for storage of fields data. Only needed for offline io modes: initialize the binary file that stores fields and write its metadata.
- abstractmethod read_field(c: Context, tag: str, rec_id: int, mem_id: int) np.ndarray[source]
Read a 2D field data from the state
- Parameters:
c (Context) – the runtime context
tag (str) – which copy of the state to read from
rec_id (int) – field record index
mem_id (int) – ensemble member index from 0 to nens-1
- Returns:
the 2D field data
- Return type:
np.ndarray
- abstractmethod write_field(fld: np.ndarray, c: Context, tag: str, rec_id: int, mem_id: int) None[source]
Write a 2D field data to the state
- Parameters:
fld (np.ndarray) – the 2D field data
c (Context) – the runtime context
tag (str) – which copy of the state to write to
rec_id (int) – field record index
mem_id (int) – ensemble member index
- abstractmethod call_method(c: Context, tag: str, method: Callable, *args, **kwargs) Any[source]
Call a method to perform some tasks.
- Parameters:
c (Context) – the runtime context
tag (str) – which copy of the model state to request io from: “prior”, “post” or “truth”
method (Callable) – method name
*args – will be passed to the method
**kwargs –
will be passed to the method
- Returns:
whatever the method(**kwargs) returns
- Return type:
Any
- save_ndarray(c: Context, name: str, data: np.ndarray, path: str | None = None) None[source]
Save ndarray data
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
data (np.ndarray) – the data
path (str, optional) – system path to save the data to.
- load_ndarray(c: Context, name: str, path: str | None = None) np.ndarray | None[source]
Load ndarray from saved data
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
path (str, optional) – system path to the saved data.
- Returns:
the data
- Return type:
np.ndarray
- save_debug_data(c: Context, name: str, data: dict, path: str | None = None) None[source]
Save debug data in npy format
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
data (dict) – the data
path (str, optional) – system path to save the data to.
To recover the data, use np.load(file, allow_pickle=True).item()
NEDAS.core.model module
- class NEDAS.core.model.Model(context: Context | None = None, io_mode: Literal['online', 'offline'] | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
Generic[GridT],ABCClass for configuring and running a model
- grid: GridT
- z: dict[Annotated[int, 'z level index'], ndarray]
- mask: ndarray
- ens_init_dir: str | None
- truth_dir: str | None
- ens_run_strategy: Literal['scheduler', 'batch']
- nproc_per_run: int = 1
- nproc_per_util: int = 1
- walltime: int | None = None
- run_process = None
- run_status: str = 'pending'
- restart_dir: str
- forecast_period: int
- memory: dict
- io_mode: Literal['online', 'offline']
- model_name: str
- abstractmethod read_grid(**kwargs) None[source]
Read the grid information from the model output.
- Parameters:
**kwargs – Keyword arguments for reading the grid.
- read_var(**kwargs) ndarray[source]
Read a variable from the model output.
- Parameters:
**kwargs – Keyword arguments for reading the variable.
- Returns:
The read variable.
- Return type:
np.ndarray
- write_var(var, **kwargs) None[source]
Write a variable to the model output.
- Parameters:
var (np.ndarray) – The variable to write.
**kwargs – Keyword arguments for writing the variable.
- abstractmethod z_coords(**kwargs) ndarray[source]
Get the vertical coordinates of the model.
- Parameters:
**kwargs – Keyword arguments for getting the vertical coordinates.
- Returns:
The vertical coordinates.
- Return type:
np.ndarray
- abstractmethod preprocess(*args, **kwargs) None[source]
Preprocess the model data.
- Parameters:
**kwargs – Keyword arguments for preprocessing.
- abstractmethod postprocess(*args, **kwargs) None[source]
Postprocess the model data.
- Parameters:
**kwargs – Keyword arguments for postprocessing.
- abstractmethod run(*args, **kwargs) None[source]
Run the model forward in time.
- Parameters:
*args – Arguments
**kwargs – Keyword arguments
- Keyword Arguments:
time (datetime) – current time when forecast starts
restart_dir (str) – directory where restart files are located
forecast_period (int) – forecast period in hours
If self.ens_run_strategy == ‘batch’, the method will run all ensemble members in one go, expect additional kwargs[‘nens’] to be the ensemble size. If self.ens_run_strategy == ‘scheduler’, the method runs a single member indexed by kwargs[‘member’], and kwargs[‘worker_id’] is the pid assigned by the scheduler to run this method.
NEDAS.core.obs module
- class NEDAS.core.obs.Obs(c: Context)[source]
Bases:
objectClass for handling observations.
The observation has dimensions: variable, time, z, y, x Since the observation network is typically irregular, we store the obs record for each variable in a 1d sequence, with coordinates (t,z,y,x), and size nobs
To parallelize workload, we distribute each obs record over all the processors
for batch assimilation mode, each pid stores the list of local obs within the hroi of its tiles, with size nlobs (number of local obs)
for serial mode, each pid stores a non-overlapping subset of the obs list, here ‘local’ obs (in storage sense) is broadcast to all pid before computing its update to the state/obs near that obs.
The hroi is separately defined for each obs record. For very large hroi, the serial mode is more parallel efficient option, since in batch mode the same obs may need to be stored in multiple pids
To compare to the observation, obs_prior simulated by the model needs to be computed, they have dimension [nens, nlobs], indexed by (mem_id, obs_id)
- obs_inds: dict = {}
- obs_seq: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'] = {}
- obs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
- lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'] = {}
- lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
- lobs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
- obs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
- data: dict = {}
- obs_rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'obs record id']]] = {}
- distribute_obs_tasks(c: Context)[source]
Distribute obs_rec_id across processors
- Parameters:
c (Context) – the runtime context object.
- Returns:
Dictionary {pid_rec (int): list[obs_rec_id (int)]}
- Return type:
dict
- get_ref_z(c: Context, model_name: str, time: datetime) dict[Annotated[int, 'z level index'], ndarray][source]
Get the reference z coords at level k on the analysis grid from a model, to be used in dataset modules for generating generate_obs_network or superobing/thinning in read_obs.
- Parameters:
c (Context) – the runtime context
model_name (str) – the model name
time (datetime) – the time of the model state
- Returns:
the z coords field at each level
- Return type:
dict[LevelID, np.ndarray]
- state_to_obs(c: Context, tag: str, **kwargs) ndarray[source]
Compute the corresponding obs value given the state variable(s), namely the “obs_prior” This function includes several ways to compute the obs_prior:
1, If obs_name is one of the variables provided by the model_src module, then model_src.read_var shall be able to provide the obs field defined on model native grid. Then we convert the obs field to the analysis grid and do vertical interpolation.
2, If obs_name is one of the variables provided by obs.obs_operator, we call it to obtain the obs seq. Typically the obs_operator performs more complex computation, such as path integration, radiative transfer model, etc. (slowest)
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’, or ‘truth’ if generating synthetic obs
**kwargs – Additional parameters - member: int, member index; or None if dealing with synthetic obs - name: str, obs variable name - time: datetime obj, time of the obs window - is_vector: bool, if True the obs is a vector measurement - dataset_src: str, dataset source module name providing the obs - model_src: str, model source module name providing the state - x, y, z, t: np.array, coordinates from obs_seq
- Returns:
Values corresponding to the obs_seq but from the state identified by kwargs
- Return type:
np.ndarray
- get_model_fld_z_on_grid(c: Context, tag: str, **kwargs) tuple[ndarray, ndarray][source]
Get obs variable field and z coords at level k and convert to c.grid
- horizontal_interp(c: Context, fld: ndarray, zfld: ndarray, is_vector: bool, obs_x: ndarray, obs_y: ndarray) tuple[ndarray, ndarray][source]
Interpolate fld and zfld horizontally to the obs_x,obs_y locations
- vertical_interp(seq: ndarray, k: int, levels: Annotated[ndarray | Sequence, 'z levels'], f: ndarray, fp: ndarray | None, z: ndarray, zp: ndarray | None, dzp: ndarray | None, obs_z: ndarray) tuple[source]
Interpolate f(k) with z(k) coords vertically to the obs z locations.
Vertical interp to obs_z, take ocean depth as example: : - - - - :
————z[k-2] ————————
- k-1 - - - - f[k-1], fp }dzp prevous layer
———- z[k-1], zp ——————–
- k - - - - v[k], f }dz current layer
———- z[k], z ——————–
k+1 - - - - v[k+1]
layer thickness of the current level k is denoted as z, for the previous level as zp; the variable f are considered layer averages, so they are defined at layer centers.
- validate_seq_shape(seq: ndarray, is_vector: bool) None[source]
Validate the shape of an observation sequence. Allowed shape: (nobs,) for scalar obs seq; (2, nobs) for vector obs seq.
- collect_obs_seq(c: Context) Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'][source]
Process the obs in parallel, read dataset files and convert to obs_seq which contains obs value, coordinates and other info
Since this is the actual obs (1 copy), only 1 processor needs to do the work
- Argss:
c (Context): The runtime context object.
- Returns:
- observation sequence. Dictionary {obs_rec_id (int): record}
where each record is a dictionary {key: np.ndarray}, the mandatory keys are ‘obs’ the observed values (measurements) ‘x’, ‘y’, ‘z’, ‘t’ the coordinates for each measurement ‘err_std’ the uncertainties for each measurement there can be other optional keys provided by read_obs() but we don’t use them
- Return type:
ObsSeq
- prepare_obs_from_state(c: Context, tag: str) None[source]
Compute the obs priors in parallel, run state_to_obs to obtain obs_prior_seq
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’ ensemble model states
- global_obs_list(c: Context) list[tuple[Annotated[int, 'obs record id'], int | None, Annotated[int, 'process id in comm'], int]][source]
- transpose_obs_seq(c: Context, input_obs: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']) Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'][source]
Transpose the obs sequence from field-complete to ensemble-complete
- Parameters:
c (Context) – the runtime context
input_obs (ObsSeq) – obs_seq from process_all_obs(), dict[obs_rec_id, dict[key, np.array]]
- Returns,
LocalObsSeq: the lobs dict[obs_rec_id, dict[par_id, dict[key, np.array]]], key = ‘obs’,’x’,’y’,’z’,’t’…
- transpose_to_ensemble_complete(c: Context, input_obs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'][source]
Transpose obs from field-complete to ensemble-complete
Step 1, Within comm_mem, send the subset of input_obs with mem_id and par_id from the source proc (src_pid) to the destination proc (dst_pid), store the result in tmp_obs with all the mem_id (ensemble-complete)
Step 2, Gather all obs_rec_id within comm_rec, so that each pid_rec will have the entire obs record for assimilation
- Parameters:
c (Context) – the runtime context
input_obs (ObsEns) – obs_prior from process_all_obs_priors(), dict[(mem_id, obs_rec_id), np.array];
- Returns,
LocalObsEns: the lobs_prior dict[(mem_id, obs_rec_id), dict[par_id, np.array]]
- transpose_to_field_complete(c: Context, lobs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'][source]
Transpose obs from ensemble-complete to field-complete
- Parameters:
c (Context) – the runtime context
lobs (LocalObsEns) – ensemble-complete local obs
- Returns:
field-complete obs_seq ensemble
- Return type:
ObsEns
- pack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) dict[source]
pack lobs and lobs_prior into arrays for the jitted functions
- unpack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'], data: dict) None[source]
unpack data and write back to the original lobs_prior dict
NEDAS.core.obs_info module
- class NEDAS.core.obs_info.ObsInfo(c: Context)[source]
Bases:
objectManages the metadata, indexing and memory allocation for the observation sequences
- records
dictionary containing obs_rec_id and the corresponding obs record
- Type:
dict[int], ObsRecord]
- variables
set of unique variables in the observations
- Type:
set[str]
- err_types
set of unique error models used in the observations
- Type:
set[str]
- variables: list[str]
- err_types: list[str]
NEDAS.core.scheme module
- class NEDAS.core.scheme.Scheme(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
ABCRuntime scheme base class.
The Scheme coordinates all runtime generation and manipulation of objects.
- steps_need_mpi: dict[str, bool] = {}
- scheduler: OfflineScheduler | None = None
- online_mode: bool
- use_synthetic_obs: bool = False
- property c
The runtime context, with lazy initialization
- abstractmethod run_all()[source]
A schemem must implement a run_all method to describe the workflow.
NEDAS.core.state module
- class NEDAS.core.state.State(c: Context)[source]
Bases:
objectThe State class manages the state variables for the assimilation system.
The analysis is performed on a regular grid.
The entire state has dimensions: member, variable, time, z, y, x indexed by: mem_id, v, t, k, j, i with size: nens, nv, nt, nz, ny, nx
To parallelize workload, we group the dimensions into 3 indices:
mem_id indexes the ensemble members
rec_id indexes the uniq 2D fields with (v, t, k), since nz and nt may vary for different variables, we stack these dimensions in the ‘record’ dimension with size nrec
par_id indexes the spatial partitions, which are subset of the 2D grid given by (ist, ied, di, jst, jed, dj), for a complete field fld[j,i] the processor with par_id stores fld[ist:ied:di, jst:jed:dj] locally.
The entire state is distributed across the memory of many processors, at any moment, a processor only stores a subset of state in its memory: either having all the mem_id,rec_id but only a subset of par_id (we call this ensemble-complete), or having all the par_id but a subset of mem_id,rec_id (we call this field-complete). It is easier to perform i/o and pre/post processing on field-complete state, while easier to run assimilation algorithms with ensemble-complete state.
- partitions: list = []
- par_list: dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]] = {}
- fields_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- fields_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- state_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- fields_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- data = {}
- rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'field record id']]]
- distribute_state_tasks(c: Context) dict[int, list[int]][source]
Distribute rec_id across processors
- prepare_state(c: Context) None[source]
Main method to collect fields from model to form the complete state (field-complete distributed)
- collect_prior_fields(c: Context) None[source]
Collect fields from prior model state, convert them to the analysis grid, preprocess (coarse-graining etc), save to fields[mem_id, rec_id] pointing to the uniq fields
- Parameters:
c (Context) – context object
- Returns:
- fields dictionary [(mem_id, rec_id), fld]
where fld is np.array defined on c.grid, it’s one of the state variable field
- dict: fields_z dictionary [(mem_id, rec_id), zfld]
where zfld is same shape as fld, it’s he z coordinates corresponding to each field
- Return type:
dict
- output_state(c: Context, tag: str, mem_id_out: int | None = None, rec_id_out: int | None = None) None[source]
Parallel output the fields to the binary state_file
- Parameters:
c (Context) – the runtime context obj
tag (str) – which version of state this is: ‘prior’, ‘post’ or ‘z’ coords?
mem_id_out (int, optional) – member id to be output, if None all available ids will output.
rec_id_out (int, optional) – record id to be output, if None all available ids will output.
- output_ens_mean(c: Context, tag: str) None[source]
Compute ensemble mean of a field stored distributively on all pid_mem collect means on pid_mem=0, and output to mean_file
- Parameters:
c (Context) – the runtime context obj
tag (str) – which version of state this is: ‘prior_mean’, ‘post_mean’, or ‘z’
mean_file (str) – path to the output binary file for the ensemble mean
- transpose_to_ensemble_complete(c: Context, fields: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'][source]
Send chunks of field owned by a pid to other pid so that the field-complete fields get transposed into ensemble-complete state with keys (mem_id, rec_id) pointing to the partition in par_list
- Parameters:
c (Context) – the runtime context
fields (FieldEns) – The locally stored field-complete fields with subset of mem_id,rec_id
- Returns:
The locally stored ensemble-complete field chunks on partitions, dict[(mem_id, rec_id), dict[par_id, fld_chk]]
- Return type:
StateEns
- transpose_to_field_complete(c: Context, state: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'][source]
Transposes back the state to field-complete fields
- Parameters:
c (Context) – the runtime context
state (StateEns) – the locally stored ensemble-complete field chunks for subset of par_id
- Returns:
the locally stored field-complete fields for subset of mem_id,rec_id.
- Return type:
FieldEns
- pack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) dict[source]
pack state dict into arrays to be more easily handled by jitted funcs
- unpack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], data: dict) None[source]
unpack data and write back to the state dict
NEDAS.core.state_info module
- class NEDAS.core.state_info.StateInfo(c: Context)[source]
Bases:
objectManages the metadata, indexing, and memory offsets for the model state.
- shape
domain dimension(s) for the fields
- Type:
tuple
- fields
dictionary containing field ids and the corresponding field records
- Type:
dict[int, FieldRecord]
- size
total size of the complete state (bytes), for one member
- Type:
int
- variables
set of unique variables in the state
- Type:
set[str]
- err_types
set of unique error models in the state
- Type:
set[str]
- shape: tuple
- mask: ndarray
- fields: dict[int, FieldRecord]
- size: int
- variables: list[str]
- err_types: list[str]
- add_fields_for_variable(c: Context, vrec: dict) None[source]
Add fields for a variable in the state. The state variable has dimensions t, z, y, x while the ‘field’ is the 2D part with y, x dimensions.
- Parameters:
c (Context) – the runtime context object
vrec (dict) – the variable record defining its properties
NEDAS.core.transform module
- class NEDAS.core.transform.Transform(c: Context, **kwargs)[source]
Bases:
ABCBase class for miscellaneous transform functions
- abstractmethod forward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]
Forward transform for the model state variables.
- Parameters:
c (Context) – the runtime context.
rec (FieldRecord) – State information record for the variable.
field (np.ndarray) – The state variable.
- Returns:
The transformed state variable.
- Return type:
np.ndarray
- abstractmethod backward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]
Backward (inverse) transform for the model state variables.
- Parameters:
c (Context) – the runtime context.
rec (FieldRecord) – State information record for the variable.
field (np.ndarray) – The transformed state variable.
- Returns:
The state variable transformed back to the original space.
- Return type:
np.ndarray
- abstractmethod forward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]
Forward transform for the observation sequence.
- Parameters:
- Returns:
The transformed observation sequence.
- Return type:
dict[str, np.ndarray]
- abstractmethod backward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]
Backward (inverse) transform for the observation sequence.
- Parameters:
c (Context) – the runtime context.
obs_rFieldRecord)ict) – Observation information record.
obs_seq (dict) – The transformed observation sequence. With keys
'obs'the observation;'x', 'y', 'z', 't'the space/time coordinates; and'err_std'the observation errors.
- Returns:
The observation sequence transformed back to the original space.
- Return type:
dict
NEDAS.core.types module
- class NEDAS.core.types.VarDesc(name: str | tuple[str, str], is_vector: bool, dtype: str = 'float', dt: float = 1, levels: Annotated[Union[numpy.ndarray, Sequence], 'z levels'] = <factory>, units: Annotated[str | float, 'physical unit'] = 1, z_units: Annotated[str | float, 'physical unit'] = 1)[source]
Bases:
object- name: str | tuple[str, str]
- is_vector: bool
- dtype: str = 'float'
- dt: float = 1
- levels: Annotated[ndarray | Sequence, 'z levels']
- units: Annotated[str | float, 'physical unit'] = 1
- z_units: Annotated[str | float, 'physical unit'] = 1
- class NEDAS.core.types.FieldRecord(name: str, model_src: str, dtype: str, is_vector: bool, units: Annotated[str | float, 'physical unit'], err_type: str, time: datetime, dt: float, k: Annotated[int, 'z level index'], pos: int)[source]
Bases:
objectRepresents a single 2D slice in the state vector.
- name
name of the state variable
- Type:
str
- model_src
name of the model source module for this variable
- Type:
str
- dtype
data type
- Type:
str
- is_vector
if this variable is a vector
- Type:
bool
- units
physical units of this variable
- Type:
str|float
- err_type
type of error model to use for this variable
- Type:
str
- time
time coordinate for this field
- Type:
datetime
- dt
representative time interval (hours) for this field
- Type:
float
- k
vertical z coordinate index for this field
- Type:
int
- pos
seek position (number of bytes) for the start of this field in the binary file
- Type:
int
- name: str
- model_src: str
- dtype: str
- is_vector: bool
- units: Annotated[str | float, 'physical unit']
- err_type: str
- time: datetime
- dt: float
- k: Annotated[int, 'z level index']
- pos: int
- class NEDAS.core.types.ErrorModel(type: str, std: float, hcorr: float, vcorr: float, tcorr: float, cross_corr: dict[str, float])[source]
Bases:
objectParameters defining an error model
- type
type of error distribution
- Type:
str
- std
error standard deviation, in variable units
- Type:
float
- hcorr
horizontal correlation scale, in grid.x units
- Type:
float
- vcorr
vertical correlation scale, in z units
- Type:
float
- tcorr
temporal correlation scale, in hours
- Type:
float
- cross_corr
cross-variable correlation dictionary
- Type:
dict[str, float]
- type: str
- std: float
- hcorr: float
- vcorr: float
- tcorr: float
- cross_corr: dict[str, float]
- class NEDAS.core.types.ObsRecord(name: str, dataset_src: str, model_src: str, nobs: int, obs_window_min: int, obs_window_max: int, dtype: str, is_vector: bool, units: Annotated[str | float, 'physical unit'], z_units: Annotated[str | float, 'physical unit'], time: datetime, dt: float, err: ErrorModel, hroi: float, vroi: float, troi: float, impact_on_state: dict)[source]
Bases:
objectRepresents a single observation record in the full list of observations
- name
name of the observation record
- Type:
str
- dataset_src
name of dataset source module for this observation
- Type:
str
- model_src
name of the model source module for this observation
- Type:
str
- nobs
number of individual observations in this record
- Type:
int
- obs_window_min
offset from analysis time for the start of the observation window (hours)
- Type:
int
- obs_window_max
offset from analysis time for the end of the observation window (hours)
- Type:
int
- err
error model used for this observation
- Type:
- dtype
data type
- Type:
str
- is_vector
if this variable is a vector
- Type:
bool
- units
physical units of this observation
- Type:
str
- z_units
vertical coordinate units
- Type:
str
- time
time coordinate for this observation
- Type:
datetime
- dt
representative time interval (hours) for this observation
- Type:
float
- name: str
- dataset_src: str
- model_src: str
- nobs: int
- obs_window_min: int
- obs_window_max: int
- dtype: str
- is_vector: bool
- units: Annotated[str | float, 'physical unit']
- z_units: Annotated[str | float, 'physical unit']
- time: datetime
- dt: float
- err: ErrorModel
- hroi: float
- vroi: float
- troi: float
- impact_on_state: dict
NEDAS.core.updator module
- class NEDAS.core.updator.Updator(c: Context)[source]
Bases:
ABCBase class for updators of the model restart files
- increment: dict = {}
- update(c: Context) None[source]
Top-level routine to apply the analysis increments to the original model restart files (as initial conditions for the next forecast)
Module contents
- class NEDAS.core.Context(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
objectRuntime context manages the generation and interaction of dynamic objects in runtime
- comm: parallel.Comm
- comm_rec: parallel.Comm
- comm_mem: parallel.Comm
- progress: progress.Progress
- grid: grid.GridType
- grid_orig: grid.GridType
- assimilator: Assimilator
- localization_funcs: dict[str, Callable]
- debug: bool
- interactive: bool
- is_notebook: bool
- time: datetime
- iter: int
- pid_show: int
- nens: int
- mem_list: dict[ProcIDMem, list[MemID]]
- fs: FileSystem
- jsub: JobSubmitter
- check_interactive() bool[source]
If the runtime environment supports interactive output (with ANSI escape code).
- update_assim_tools()[source]
Update the assimilation tool components based on runtime configuration
- property prev_time: datetime
Previous analysis time. Automatically updated when self.time changes.
- Returns:
Previous analysis time.
- Return type:
datetime
- property next_time: datetime
Next analysis time. Automatically updated when self.time changes.
- Returns:
Next analysis time.
- Return type:
datetime
- set_comm() None[source]
Initialize the MPI communicator, split the communicator if necessary.
For serial program, use a dummy communicator, set
nprocto the number of available processors on the machine; for MPI program, useMPI.COMM_WORLDand check if size matchs withnproc.Split the communicator into member and record groups, according to
nprocandnproc_mem. SeeNEDAS.utils.parallelmodule for more details.
- set_grid() None[source]
Initialize the analysis grid based on the configuration.
If
grid_def['type']is ‘custom’, will create a analysis grid based on provided parameters. Ifgrid_def['type']is a model name, will load the grid from the specified model class.
- set_models() None[source]
Initialize model instances based on
model_def[model_name]settings. Store the model instances inmodels[model_name].
- set_datasets() None[source]
Initialize dataset instances based on
dataset_def[dataset_name]settings. Store the dataset instances indatasets[dataset_name].
- property total_tasks
- property current_task
- property message
- property debug_message
- logger(func_name: str)[source]
Decorator to register the func in call stack and show runtime messages.
- print_1p(msg: str)[source]
Customized print function for showing runtime message.
Only the processor with PID = self.pid_show will show the message, this avoids the redundancy if all processors are showing the same message.
- dump_config(config_file: str) None[source]
Dumps a snapshot of the current state to a yaml config file. The original config object remains unchanged in memory.
- run_job(commands: str, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', nproc: int = 1, offset: int = 0, **kwargs) None[source]
The user-facing method for running command on a computer. It re-configures the existing job submitter with runtime arguments and execute the command.
- Parameters:
commands (str) – Shell commands to be dispatched by job submitter
parallel_mode (ParallelMode, optional) – parallel mode (‘serial’, ‘mpi’, ‘openmp’), default is ‘serial’
nproc (int, optional) – number of processors (default is 1)
offset (int, optional) – offset in full list of processors (default is 0)
**kwargs – other keyword arguments to update the job submitter configuration
- class NEDAS.core.Assimilator(c: Context)[source]
Bases:
ABC- partition_grid(c: Context) None[source]
Partition the analysis grid into several parts and distribute the workload over the mpi ranks.
- abstractmethod init_partitions(c: Context) list[source]
Generate spatial partitioning of the domain
- abstractmethod assign_obs(c: Context) dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], ndarray]][source]
Assign the observation sequence to each partition par_id
- Parameters:
c (Context) – the runtime context object
- Returns:
Indices in the full obs_seq for the subset of obs that belongs to partition par_id
- Return type:
dict[ObsRecordID, dict[PartitionID, np.ndarray]]
- abstractmethod distribute_partitions(c: Context) dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]][source]
Distribute partitions across processors
- transpose_to_ensemble_complete(c: Context) None[source]
Communicate among mpi ranks and transpose the locally-stored state/obs chunks to ensemble-complete
- class NEDAS.core.Updator(c: Context)[source]
Bases:
ABCBase class for updators of the model restart files
- increment: dict = {}
- update(c: Context) None[source]
Top-level routine to apply the analysis increments to the original model restart files (as initial conditions for the next forecast)
- class NEDAS.core.Inflation(coef: float = 1.0, adaptive: bool = False, prior: bool = False, post: bool = False)[source]
Bases:
ABCClass for inflating the ensemble members (covariance inflation)
- class NEDAS.core.Transform(c: Context, **kwargs)[source]
Bases:
ABCBase class for miscellaneous transform functions
- abstractmethod forward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]
Forward transform for the model state variables.
- Parameters:
c (Context) – the runtime context.
rec (FieldRecord) – State information record for the variable.
field (np.ndarray) – The state variable.
- Returns:
The transformed state variable.
- Return type:
np.ndarray
- abstractmethod backward_state(c: Context, rec: FieldRecord, field: ndarray) ndarray[source]
Backward (inverse) transform for the model state variables.
- Parameters:
c (Context) – the runtime context.
rec (FieldRecord) – State information record for the variable.
field (np.ndarray) – The transformed state variable.
- Returns:
The state variable transformed back to the original space.
- Return type:
np.ndarray
- abstractmethod forward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]
Forward transform for the observation sequence.
- Parameters:
- Returns:
The transformed observation sequence.
- Return type:
dict[str, np.ndarray]
- abstractmethod backward_obs(c: Context, obs_rec: ObsRecord, obs_seq: dict[str, ndarray]) dict[str, ndarray][source]
Backward (inverse) transform for the observation sequence.
- Parameters:
c (Context) – the runtime context.
obs_rFieldRecord)ict) – Observation information record.
obs_seq (dict) – The transformed observation sequence. With keys
'obs'the observation;'x', 'y', 'z', 't'the space/time coordinates; and'err_std'the observation errors.
- Returns:
The observation sequence transformed back to the original space.
- Return type:
dict
- class NEDAS.core.Model(context: Context | None = None, io_mode: Literal['online', 'offline'] | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
Generic[GridT],ABCClass for configuring and running a model
- grid: GridT
- z: dict[Annotated[int, 'z level index'], ndarray]
- mask: ndarray
- ens_init_dir: str | None
- truth_dir: str | None
- ens_run_strategy: Literal['scheduler', 'batch']
- nproc_per_run: int = 1
- nproc_per_util: int = 1
- walltime: int | None = None
- run_process = None
- run_status: str = 'pending'
- restart_dir: str
- forecast_period: int
- memory: dict
- io_mode: Literal['online', 'offline']
- model_name: str
- abstractmethod read_grid(**kwargs) None[source]
Read the grid information from the model output.
- Parameters:
**kwargs – Keyword arguments for reading the grid.
- read_var(**kwargs) ndarray[source]
Read a variable from the model output.
- Parameters:
**kwargs – Keyword arguments for reading the variable.
- Returns:
The read variable.
- Return type:
np.ndarray
- write_var(var, **kwargs) None[source]
Write a variable to the model output.
- Parameters:
var (np.ndarray) – The variable to write.
**kwargs – Keyword arguments for writing the variable.
- abstractmethod z_coords(**kwargs) ndarray[source]
Get the vertical coordinates of the model.
- Parameters:
**kwargs – Keyword arguments for getting the vertical coordinates.
- Returns:
The vertical coordinates.
- Return type:
np.ndarray
- abstractmethod preprocess(*args, **kwargs) None[source]
Preprocess the model data.
- Parameters:
**kwargs – Keyword arguments for preprocessing.
- abstractmethod postprocess(*args, **kwargs) None[source]
Postprocess the model data.
- Parameters:
**kwargs – Keyword arguments for postprocessing.
- abstractmethod run(*args, **kwargs) None[source]
Run the model forward in time.
- Parameters:
*args – Arguments
**kwargs – Keyword arguments
- Keyword Arguments:
time (datetime) – current time when forecast starts
restart_dir (str) – directory where restart files are located
forecast_period (int) – forecast period in hours
If self.ens_run_strategy == ‘batch’, the method will run all ensemble members in one go, expect additional kwargs[‘nens’] to be the ensemble size. If self.ens_run_strategy == ‘scheduler’, the method runs a single member indexed by kwargs[‘member’], and kwargs[‘worker_id’] is the pid assigned by the scheduler to run this method.
- class NEDAS.core.Dataset(context: Context | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
ABCDataset class (template for specific dataset sources)
- obs_operator: dict[Annotated[str, 'variable name'], Callable] = {}
- memory: dict = {}
- dataset_name: str
- parse_kwargs(kwargs: dict[str, Any]) dict[str, Any][source]
Parse the input kwargs to pinpoint a specific file/variable…
- generate_obs_network(**kwargs) dict[str, ndarray][source]
Generate a random observing network for use in synthetic observation experiments.
- Parameters:
**kwargs
- class NEDAS.core.FileSystem(config: Config | None = None)[source]
Bases:
objectManages runtime file system paths, name of files and directories
- cycle_dir(time: datetime) str[source]
Directory path for an analysis cycle.
- Parameters:
time (datetime) – Time of the analysis cycle.
- Returns:
Directory path for the analysis cycle.
- Return type:
str
- forecast_dir(time: datetime, model_name: str) str[source]
Directory path for a model forecast step.
- Parameters:
time (datetime) – Time of the analysis cycle.
model_name (str) – Name of the model.
- Returns:
Directory path for the model forecast.
- Return type:
str
- analysis_dir(time: datetime, iter: int = 0) str[source]
Directory path for an analysis step.
- Parameters:
time (datetime) – Time of the analysis cycle.
iter (int) – If niter > 1, an outer iteration loop exists, step is the index in the loop.
- Returns:
Directory path for the analysis step.
- Return type:
str
- class NEDAS.core.IOBackend[source]
Bases:
ABCBase class for handling runtime input/output for state and observation variables and OS-level operation such as file manipulation and running commands
- io_mode
‘offline’ for file I/O and ‘online’ for persistent memory I/O
- Type:
IOMode
- tags
List of names for copies of state/obs data
- Type:
list[str]
- IOTags:
‘current’: Mutable buffer for the data, being updated by assimilation and outer-loop iterations ‘prior’: read-only snapshot, also known as background/forecast, kept for O-B statistics. ‘post’: final state after the assimilation, known as the (re)analysis. ‘truth’: truth, as reference state in synthetic OSSE experiments ‘raw’: original information. For obs it is the actual obs
- io_mode: Literal['online', 'offline'] = 'offline'
- tags: list[str] = ['current', 'prior', 'prior_mean', 'post', 'post_mean', 'truth', 'raw', 'z', 'z_mean']
- prepare_fields_storage(c: Context, tag: str) None[source]
Prepare for storage of fields data. Only needed for offline io modes: initialize the binary file that stores fields and write its metadata.
- abstractmethod read_field(c: Context, tag: str, rec_id: int, mem_id: int) np.ndarray[source]
Read a 2D field data from the state
- Parameters:
c (Context) – the runtime context
tag (str) – which copy of the state to read from
rec_id (int) – field record index
mem_id (int) – ensemble member index from 0 to nens-1
- Returns:
the 2D field data
- Return type:
np.ndarray
- abstractmethod write_field(fld: np.ndarray, c: Context, tag: str, rec_id: int, mem_id: int) None[source]
Write a 2D field data to the state
- Parameters:
fld (np.ndarray) – the 2D field data
c (Context) – the runtime context
tag (str) – which copy of the state to write to
rec_id (int) – field record index
mem_id (int) – ensemble member index
- abstractmethod call_method(c: Context, tag: str, method: Callable, *args, **kwargs) Any[source]
Call a method to perform some tasks.
- Parameters:
c (Context) – the runtime context
tag (str) – which copy of the model state to request io from: “prior”, “post” or “truth”
method (Callable) – method name
*args – will be passed to the method
**kwargs –
will be passed to the method
- Returns:
whatever the method(**kwargs) returns
- Return type:
Any
- save_ndarray(c: Context, name: str, data: np.ndarray, path: str | None = None) None[source]
Save ndarray data
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
data (np.ndarray) – the data
path (str, optional) – system path to save the data to.
- load_ndarray(c: Context, name: str, path: str | None = None) np.ndarray | None[source]
Load ndarray from saved data
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
path (str, optional) – system path to the saved data.
- Returns:
the data
- Return type:
np.ndarray
- save_debug_data(c: Context, name: str, data: dict, path: str | None = None) None[source]
Save debug data in npy format
- Parameters:
c (Context) – the runtime context
name (str) – the name of the data
data (dict) – the data
path (str, optional) – system path to save the data to.
To recover the data, use np.load(file, allow_pickle=True).item()
- class NEDAS.core.JobSubmitter(job_name: str = 'run', run_dir: str = '.', nproc: int = 1, offset: int = 0, parallel_mode: Literal['serial', 'mpi', 'openmp'] = 'serial', debug: bool = False, **kwargs)[source]
Bases:
ABCRun a command (shell script) on a specific computer. For a local computer this is as easy as a subprocess call to execute the command. But for large-scale HPC, this may involve submitting the job script to a scheduler and wait for its completion.
- Parameters:
job_name (str, optional) – Name of the job. Defaults to ‘run’.
run_dir (str, optional) – Directory where the job will execute. Defaults to ‘.’.
nproc (int, optional) – Number of processors requested. Defaults to 1.
offset (int, optional) – Number of processors to offset from the start. Defaults to 0.
parallel_mode (ParallelMode, optional) – Parallelization strategy (e.g., ‘serial’, ‘mpi’, ‘openmp’). Defaults to ‘serial’.
debug (bool, optional) – Enables verbose logging for troubleshooting. Defaults to False.
**kwargs – Other arbitrary keyword arguments.
- property nproc: int
Number of requested processors for the job
- property offset: int
Number of processors to skip from the beginning for the job This allows different jobs to spawm the total available nproc in the allocation
- abstract property nproc_avail: int
Number of available processors on a host machine This should be redefined in subclasses to machine specific behavior
- abstract property execute_command: str
Execute command for running the job on the host machine, replacing ‘JOB_EXECUTE’ in ‘commands’
- parse_commands(commands: str) str[source]
Parse shell command to replace ‘JOB_EXECUTE’ with machine-specific strings.
- abstractmethod run(commands: str) None[source]
Top level run method for a job submitter.
- Parameters:
commands (str) – shell commands to be run by the job submitter. In the commands string, ‘JOB_EXECUTE’ will be replaced by the correct execute_command, and ‘JOB_ARRAY_INDEX’ will be replaced by the scheduler’s index variable name to perform array jobs.
- class NEDAS.core.State(c: Context)[source]
Bases:
objectThe State class manages the state variables for the assimilation system.
The analysis is performed on a regular grid.
The entire state has dimensions: member, variable, time, z, y, x indexed by: mem_id, v, t, k, j, i with size: nens, nv, nt, nz, ny, nx
To parallelize workload, we group the dimensions into 3 indices:
mem_id indexes the ensemble members
rec_id indexes the uniq 2D fields with (v, t, k), since nz and nt may vary for different variables, we stack these dimensions in the ‘record’ dimension with size nrec
par_id indexes the spatial partitions, which are subset of the 2D grid given by (ist, ied, di, jst, jed, dj), for a complete field fld[j,i] the processor with par_id stores fld[ist:ied:di, jst:jed:dj] locally.
The entire state is distributed across the memory of many processors, at any moment, a processor only stores a subset of state in its memory: either having all the mem_id,rec_id but only a subset of par_id (we call this ensemble-complete), or having all the par_id but a subset of mem_id,rec_id (we call this field-complete). It is easier to perform i/o and pre/post processing on field-complete state, while easier to run assimilation algorithms with ensemble-complete state.
- partitions: list = []
- par_list: dict[Annotated[int, 'process id in comm_mem'], list[Annotated[int, 'partition id']]] = {}
- fields_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- fields_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- state_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'] = {}
- fields_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'] = {}
- data = {}
- rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'field record id']]]
- distribute_state_tasks(c: Context) dict[int, list[int]][source]
Distribute rec_id across processors
- prepare_state(c: Context) None[source]
Main method to collect fields from model to form the complete state (field-complete distributed)
- collect_prior_fields(c: Context) None[source]
Collect fields from prior model state, convert them to the analysis grid, preprocess (coarse-graining etc), save to fields[mem_id, rec_id] pointing to the uniq fields
- Parameters:
c (Context) – context object
- Returns:
- fields dictionary [(mem_id, rec_id), fld]
where fld is np.array defined on c.grid, it’s one of the state variable field
- dict: fields_z dictionary [(mem_id, rec_id), zfld]
where zfld is same shape as fld, it’s he z coordinates corresponding to each field
- Return type:
dict
- output_state(c: Context, tag: str, mem_id_out: int | None = None, rec_id_out: int | None = None) None[source]
Parallel output the fields to the binary state_file
- Parameters:
c (Context) – the runtime context obj
tag (str) – which version of state this is: ‘prior’, ‘post’ or ‘z’ coords?
mem_id_out (int, optional) – member id to be output, if None all available ids will output.
rec_id_out (int, optional) – record id to be output, if None all available ids will output.
- output_ens_mean(c: Context, tag: str) None[source]
Compute ensemble mean of a field stored distributively on all pid_mem collect means on pid_mem=0, and output to mean_file
- Parameters:
c (Context) – the runtime context obj
tag (str) – which version of state this is: ‘prior_mean’, ‘post_mean’, or ‘z’
mean_file (str) – path to the output binary file for the ensemble mean
- transpose_to_ensemble_complete(c: Context, fields: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'][source]
Send chunks of field owned by a pid to other pid so that the field-complete fields get transposed into ensemble-complete state with keys (mem_id, rec_id) pointing to the partition in par_list
- Parameters:
c (Context) – the runtime context
fields (FieldEns) – The locally stored field-complete fields with subset of mem_id,rec_id
- Returns:
The locally stored ensemble-complete field chunks on partitions, dict[(mem_id, rec_id), dict[par_id, fld_chk]]
- Return type:
StateEns
- transpose_to_field_complete(c: Context, state: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], ndarray], 'field-complete ensemble data'][source]
Transposes back the state to field-complete fields
- Parameters:
c (Context) – the runtime context
state (StateEns) – the locally stored ensemble-complete field chunks for subset of par_id
- Returns:
the locally stored field-complete fields for subset of mem_id,rec_id.
- Return type:
FieldEns
- pack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], state_z: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data']) dict[source]
pack state dict into arrays to be more easily handled by jitted funcs
- unpack_local_state_data(c: Context, par_id: Annotated[int, 'partition id'], state_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'field record id']], dict[Annotated[int, 'partition id'], ndarray]], 'state-complete ensemble data'], data: dict) None[source]
unpack data and write back to the state dict
- class NEDAS.core.Obs(c: Context)[source]
Bases:
objectClass for handling observations.
The observation has dimensions: variable, time, z, y, x Since the observation network is typically irregular, we store the obs record for each variable in a 1d sequence, with coordinates (t,z,y,x), and size nobs
To parallelize workload, we distribute each obs record over all the processors
for batch assimilation mode, each pid stores the list of local obs within the hroi of its tiles, with size nlobs (number of local obs)
for serial mode, each pid stores a non-overlapping subset of the obs list, here ‘local’ obs (in storage sense) is broadcast to all pid before computing its update to the state/obs near that obs.
The hroi is separately defined for each obs record. For very large hroi, the serial mode is more parallel efficient option, since in batch mode the same obs may need to be stored in multiple pids
To compare to the observation, obs_prior simulated by the model needs to be computed, they have dimension [nens, nlobs], indexed by (mem_id, obs_id)
- obs_inds: dict = {}
- obs_seq: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'] = {}
- obs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
- lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'] = {}
- lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
- lobs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'] = {}
- obs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'] = {}
- data: dict = {}
- obs_rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'obs record id']]] = {}
- distribute_obs_tasks(c: Context)[source]
Distribute obs_rec_id across processors
- Parameters:
c (Context) – the runtime context object.
- Returns:
Dictionary {pid_rec (int): list[obs_rec_id (int)]}
- Return type:
dict
- get_ref_z(c: Context, model_name: str, time: datetime) dict[Annotated[int, 'z level index'], ndarray][source]
Get the reference z coords at level k on the analysis grid from a model, to be used in dataset modules for generating generate_obs_network or superobing/thinning in read_obs.
- Parameters:
c (Context) – the runtime context
model_name (str) – the model name
time (datetime) – the time of the model state
- Returns:
the z coords field at each level
- Return type:
dict[LevelID, np.ndarray]
- state_to_obs(c: Context, tag: str, **kwargs) ndarray[source]
Compute the corresponding obs value given the state variable(s), namely the “obs_prior” This function includes several ways to compute the obs_prior:
1, If obs_name is one of the variables provided by the model_src module, then model_src.read_var shall be able to provide the obs field defined on model native grid. Then we convert the obs field to the analysis grid and do vertical interpolation.
2, If obs_name is one of the variables provided by obs.obs_operator, we call it to obtain the obs seq. Typically the obs_operator performs more complex computation, such as path integration, radiative transfer model, etc. (slowest)
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’, or ‘truth’ if generating synthetic obs
**kwargs – Additional parameters - member: int, member index; or None if dealing with synthetic obs - name: str, obs variable name - time: datetime obj, time of the obs window - is_vector: bool, if True the obs is a vector measurement - dataset_src: str, dataset source module name providing the obs - model_src: str, model source module name providing the state - x, y, z, t: np.array, coordinates from obs_seq
- Returns:
Values corresponding to the obs_seq but from the state identified by kwargs
- Return type:
np.ndarray
- get_model_fld_z_on_grid(c: Context, tag: str, **kwargs) tuple[ndarray, ndarray][source]
Get obs variable field and z coords at level k and convert to c.grid
- horizontal_interp(c: Context, fld: ndarray, zfld: ndarray, is_vector: bool, obs_x: ndarray, obs_y: ndarray) tuple[ndarray, ndarray][source]
Interpolate fld and zfld horizontally to the obs_x,obs_y locations
- vertical_interp(seq: ndarray, k: int, levels: Annotated[ndarray | Sequence, 'z levels'], f: ndarray, fp: ndarray | None, z: ndarray, zp: ndarray | None, dzp: ndarray | None, obs_z: ndarray) tuple[source]
Interpolate f(k) with z(k) coords vertically to the obs z locations.
Vertical interp to obs_z, take ocean depth as example: : - - - - :
————z[k-2] ————————
- k-1 - - - - f[k-1], fp }dzp prevous layer
———- z[k-1], zp ——————–
- k - - - - v[k], f }dz current layer
———- z[k], z ——————–
k+1 - - - - v[k+1]
layer thickness of the current level k is denoted as z, for the previous level as zp; the variable f are considered layer averages, so they are defined at layer centers.
- validate_seq_shape(seq: ndarray, is_vector: bool) None[source]
Validate the shape of an observation sequence. Allowed shape: (nobs,) for scalar obs seq; (2, nobs) for vector obs seq.
- collect_obs_seq(c: Context) Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'][source]
Process the obs in parallel, read dataset files and convert to obs_seq which contains obs value, coordinates and other info
Since this is the actual obs (1 copy), only 1 processor needs to do the work
- Argss:
c (Context): The runtime context object.
- Returns:
- observation sequence. Dictionary {obs_rec_id (int): record}
where each record is a dictionary {key: np.ndarray}, the mandatory keys are ‘obs’ the observed values (measurements) ‘x’, ‘y’, ‘z’, ‘t’ the coordinates for each measurement ‘err_std’ the uncertainties for each measurement there can be other optional keys provided by read_obs() but we don’t use them
- Return type:
ObsSeq
- prepare_obs_from_state(c: Context, tag: str) None[source]
Compute the obs priors in parallel, run state_to_obs to obtain obs_prior_seq
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’ ensemble model states
- global_obs_list(c: Context) list[tuple[Annotated[int, 'obs record id'], int | None, Annotated[int, 'process id in comm'], int]][source]
- transpose_obs_seq(c: Context, input_obs: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']) Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'][source]
Transpose the obs sequence from field-complete to ensemble-complete
- Parameters:
c (Context) – the runtime context
input_obs (ObsSeq) – obs_seq from process_all_obs(), dict[obs_rec_id, dict[key, np.array]]
- Returns,
LocalObsSeq: the lobs dict[obs_rec_id, dict[par_id, dict[key, np.array]]], key = ‘obs’,’x’,’y’,’z’,’t’…
- transpose_to_ensemble_complete(c: Context, input_obs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'][source]
Transpose obs from field-complete to ensemble-complete
Step 1, Within comm_mem, send the subset of input_obs with mem_id and par_id from the source proc (src_pid) to the destination proc (dst_pid), store the result in tmp_obs with all the mem_id (ensemble-complete)
Step 2, Gather all obs_rec_id within comm_rec, so that each pid_rec will have the entire obs record for assimilation
- Parameters:
c (Context) – the runtime context
input_obs (ObsEns) – obs_prior from process_all_obs_priors(), dict[(mem_id, obs_rec_id), np.array];
- Returns,
LocalObsEns: the lobs_prior dict[(mem_id, obs_rec_id), dict[par_id, np.array]]
- transpose_to_field_complete(c: Context, lobs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'][source]
Transpose obs from ensemble-complete to field-complete
- Parameters:
c (Context) – the runtime context
lobs (LocalObsEns) – ensemble-complete local obs
- Returns:
field-complete obs_seq ensemble
- Return type:
ObsEns
- pack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) dict[source]
pack lobs and lobs_prior into arrays for the jitted functions
- unpack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'], data: dict) None[source]
unpack data and write back to the original lobs_prior dict
- class NEDAS.core.Perturbation(c: Context)[source]
Bases:
objectPerturbation top-level manager
- nfld: int = 0
- perturb: dict[str, Any] = {}
- task_list: dict[int, list[dict]] = {}
- prepare_perturb_dir(c)[source]
Prepare and clear the directory where perturbation data will be stored (offline mode)
- save_perturb_data(c: Context, **rec)[source]
Save a copy of perturbation data, for use by the next analysis cycle
- class NEDAS.core.Diagnostics(c: Context)[source]
Bases:
objectThis class manages diagnostics functions
- task_list: dict[Annotated[int, 'process id in comm'], list]
- class NEDAS.core.Scheme(config: Config | None = None, config_file: str | None = None, parse_args: bool = False, **kwargs)[source]
Bases:
ABCRuntime scheme base class.
The Scheme coordinates all runtime generation and manipulation of objects.
- steps_need_mpi: dict[str, bool] = {}
- scheduler: OfflineScheduler | None = None
- online_mode: bool
- use_synthetic_obs: bool = False
- property c
The runtime context, with lazy initialization
- abstractmethod run_all()[source]
A schemem must implement a run_all method to describe the workflow.