NEDAS.core.obs module
- class NEDAS.core.obs.Obs(c: Context)[source]
Bases:
objectClass for handling observations.
The observation has dimensions: variable, time, z, y, x Since the observation network is typically irregular, we store the obs record for each variable in a 1d sequence, with coordinates (t,z,y,x), and size nobs
To parallelize workload, we distribute each obs record over all the processors
for batch assimilation mode, each pid stores the list of local obs within the hroi of its tiles, with size nlobs (number of local obs)
for serial mode, each pid stores a non-overlapping subset of the obs list, here ‘local’ obs (in storage sense) is broadcast to all pid before computing its update to the state/obs near that obs.
The hroi is separately defined for each obs record. For very large hroi, the serial mode is more parallel efficient option, since in batch mode the same obs may need to be stored in multiple pids
To compare to the observation, obs_prior simulated by the model needs to be computed, they have dimension [nens, nlobs], indexed by (mem_id, obs_id)
- obs_rec_list: dict[Annotated[int, 'process id in comm_rec'], list[Annotated[int, 'obs record id']]]
- obs_inds: dict
- obs_seq: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']
- obs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']
- lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence']
- lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']
- lobs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']
- obs_post: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']
- data: dict
- distribute_obs_tasks(c: Context)[source]
Distribute obs_rec_id across processors
- Parameters:
c (Context) – the runtime context object.
- Returns:
Dictionary {pid_rec (int): list[obs_rec_id (int)]}
- Return type:
dict
- get_ref_z(c: Context, model_name: str, time: datetime) dict[Annotated[int, 'z level index'], ndarray][source]
Get the reference z coords at level k on the analysis grid from a model, to be used in dataset modules for generating generate_obs_network or superobing/thinning in read_obs.
- Parameters:
c (Context) – the runtime context
model_name (str) – the model name
time (datetime) – the time of the model state
- Returns:
the z coords field at each level
- Return type:
dict[LevelID, np.ndarray]
- state_to_obs(c: Context, tag: str, **kwargs) ndarray[source]
Compute the corresponding obs value given the state variable(s), namely the “obs_prior” This function includes several ways to compute the obs_prior:
1, If obs_name is one of the variables provided by the model_src module, then model_src.read_var shall be able to provide the obs field defined on model native grid. Then we convert the obs field to the analysis grid and do vertical interpolation.
2, If obs_name is one of the variables provided by obs.obs_operator, we call it to obtain the obs seq. Typically the obs_operator performs more complex computation, such as path integration, radiative transfer model, etc. (slowest)
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’, or ‘truth’ if generating synthetic obs
**kwargs – Additional parameters - member: int, member index; or None if dealing with synthetic obs - name: str, obs variable name - time: datetime obj, time of the obs window - is_vector: bool, if True the obs is a vector measurement - dataset_src: str, dataset source module name providing the obs - model_src: str, model source module name providing the state - x, y, z, t: np.array, coordinates from obs_seq
- Returns:
Values corresponding to the obs_seq but from the state identified by kwargs
- Return type:
np.ndarray
- get_model_fld_z_on_grid(c: Context, tag: str, **kwargs) tuple[ndarray, ndarray][source]
Get obs variable field and z coords at level k and convert to c.grid
- horizontal_interp(c: Context, fld: ndarray, zfld: ndarray, is_vector: bool, obs_x: ndarray, obs_y: ndarray) tuple[ndarray, ndarray][source]
Interpolate fld and zfld horizontally to the obs_x,obs_y locations
- vertical_interp(seq: ndarray, k: int, levels: Annotated[ndarray | Sequence, 'z levels'], f: ndarray, fp: ndarray | None, z: ndarray, zp: ndarray | None, dzp: ndarray | None, obs_z: ndarray) tuple[source]
Interpolate f(k) with z(k) coords vertically to the obs z locations.
Vertical interp to obs_z, take ocean depth as example:
: - - - - : ------------z[k-2] ------------------------ k-1 - - - - f[k-1], fp }dzp prevous layer ---------- z[k-1], zp -------------------- k - - - - v[k], f }dz current layer ---------- z[k], z -------------------- k+1 - - - - v[k+1]
layer thickness of the current level k is denoted as z, for the previous level as zp; the variable f are considered layer averages, so they are defined at layer centers.
- validate_seq_shape(seq: ndarray, is_vector: bool) None[source]
Validate the shape of an observation sequence. Allowed shape: (nobs,) for scalar obs seq; (2, nobs) for vector obs seq.
- collect_obs_seq(c: Context) Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence'][source]
Process the obs in parallel, read dataset files and convert to obs_seq which contains obs value, coordinates and other info
Since this is the actual obs (1 copy), only 1 processor needs to do the work
- Argss:
c (Context): The runtime context object.
- Returns:
- observation sequence. Dictionary {obs_rec_id (int): record}
where each record is a dictionary {key: np.ndarray}, the mandatory keys are ‘obs’ the observed values (measurements) ‘x’, ‘y’, ‘z’, ‘t’ the coordinates for each measurement ‘err_std’ the uncertainties for each measurement there can be other optional keys provided by read_obs() but we don’t use them
- Return type:
ObsSeq
- prepare_obs_from_state(c: Context, tag: str) None[source]
Compute the obs priors in parallel, run state_to_obs to obtain obs_prior_seq
- Parameters:
c (Context) – the runtime context object
tag (str) – ‘prior’ or ‘post’ ensemble model states
- global_obs_list(c: Context) list[tuple[Annotated[int, 'obs record id'], int | None, Annotated[int, 'process id in comm'], int]][source]
- transpose_obs_seq(c: Context, input_obs: Annotated[dict[Annotated[int, 'obs record id'], dict[str, ndarray]], 'obs sequence']) Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'][source]
Transpose the obs sequence from field-complete to ensemble-complete
- Parameters:
c (Context) – the runtime context
input_obs (ObsSeq) – obs_seq from process_all_obs(), dict[obs_rec_id, dict[key, np.array]]
- Returns,
LocalObsSeq: the lobs dict[obs_rec_id, dict[par_id, dict[key, np.array]]], key = ‘obs’,’x’,’y’,’z’,’t’…
- transpose_to_ensemble_complete(c: Context, input_obs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'][source]
Transpose obs from field-complete to ensemble-complete
Step 1, Within comm_mem, send the subset of input_obs with mem_id and par_id from the source proc (src_pid) to the destination proc (dst_pid), store the result in tmp_obs with all the mem_id (ensemble-complete)
Step 2, Gather all obs_rec_id within comm_rec, so that each pid_rec will have the entire obs record for assimilation
- Parameters:
c (Context) – the runtime context
input_obs (ObsEns) – obs_prior from process_all_obs_priors(), dict[(mem_id, obs_rec_id), np.array];
- Returns,
LocalObsEns: the lobs_prior dict[(mem_id, obs_rec_id), dict[par_id, np.array]]
- transpose_to_field_complete(c: Context, lobs: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], ndarray], 'obs ensemble data'][source]
Transpose obs from ensemble-complete to field-complete
- Parameters:
c (Context) – the runtime context
lobs (LocalObsEns) – ensemble-complete local obs
- Returns:
field-complete obs_seq ensemble
- Return type:
ObsEns
- pack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data']) dict[source]
pack lobs and lobs_prior into arrays for the jitted functions
- unpack_local_obs_data(c: Context, par_id: Annotated[int, 'partition id'], lobs: Annotated[dict[Annotated[int, 'obs record id'], dict[Annotated[int, 'partition id'], dict[str, ndarray]]], 'local obs sequence'], lobs_prior: Annotated[dict[tuple[Annotated[int, 'member id'], Annotated[int, 'obs record id']], dict[Annotated[int, 'partition id'], ndarray]], 'local obs ensemble data'], data: dict) None[source]
unpack data and write back to the original lobs_prior dict