from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Callable, get_args, TYPE_CHECKING
import os
import numpy as np
from .types import IOMode, IOTag
if TYPE_CHECKING:
from .context import Context
[docs]
class IOBackend(ABC):
"""
Base class for handling runtime input/output for state and observation variables
and OS-level operation such as file manipulation and running commands
Attributes:
io_mode (IOMode): 'offline' for file I/O and 'online' for persistent memory I/O
tags (list[str]): List of names for copies of state/obs data
IOTags:
'current': Mutable buffer for the data, being updated by assimilation and outer-loop iterations
'prior': read-only snapshot, also known as background/forecast, kept for O-B statistics.
'post': final state after the assimilation, known as the (re)analysis.
'truth': truth, as reference state in synthetic OSSE experiments
'raw': original information. For obs it is the actual obs
"""
io_mode: IOMode = 'offline'
tags: list[str] = list(get_args(IOTag))
[docs]
def validate_tag(self, tag: str):
if tag not in self.tags:
raise ValueError(f"IOBackend: unknown tag '{tag}', supported: {self.tags}")
[docs]
def prepare_fields_storage(self, c: Context, tag: str) -> None:
"""
Prepare for storage of fields data.
Only needed for offline io modes: initialize the binary file that stores fields
and write its metadata.
"""
pass
[docs]
@abstractmethod
def read_field(self, c: Context, tag: str, rec_id: int, mem_id: int) -> np.ndarray:
"""
Read a 2D field data from the state
Args:
c (Context): the runtime context
tag (str): which copy of the state to read from
rec_id (int): field record index
mem_id (int): ensemble member index from 0 to nens-1
Returns:
np.ndarray: the 2D field data
"""
...
[docs]
@abstractmethod
def write_field(self, fld: np.ndarray, c: Context, tag: str, rec_id: int, mem_id: int) -> None:
"""
Write a 2D field data to the state
Args:
fld (np.ndarray): the 2D field data
c (Context): the runtime context
tag (str): which copy of the state to write to
rec_id (int): field record index
mem_id (int): ensemble member index
"""
...
[docs]
@abstractmethod
def call_method(self, c: Context, tag: str, method: Callable, *args, **kwargs) -> Any:
"""
Call a method to perform some tasks.
Args:
c (Context): the runtime context
tag (str): which copy of the model state to request io from: "prior", "post" or "truth"
method (Callable): method name
*args, **kwargs: will be passed to the method
Returns:
Any: whatever the method(**kwargs) returns
"""
...
[docs]
def save_ndarray(self, c: Context, name: str, data: np.ndarray, path: str | None=None) -> None:
"""
Save ndarray data
Args:
c (Context): the runtime context
name (str): the name of the data
data (np.ndarray): the data
path (str, optional): system path to save the data to.
"""
if path is None:
path = c.config.work_dir # default path
file = os.path.join(path, f"{name}.npy")
# make sure directory exists
os.makedirs(os.path.dirname(file), exist_ok=True)
# save the data to file
np.save(file, data)
[docs]
def load_ndarray(self, c: Context, name: str, path: str | None=None) -> np.ndarray | None:
"""
Load ndarray from saved data
Args:
c (Context): the runtime context
name (str): the name of the data
path (str, optional): system path to the saved data.
Returns:
np.ndarray: the data
"""
if path is None:
path = c.config.work_dir # default path
file = os.path.join(path, f"{name}.npy")
# load data from file
if os.path.exists(file):
return np.load(file)
else:
return None
[docs]
def save_debug_data(self, c: Context, name: str, data: dict, path: str | None=None) -> None:
"""
Save debug data in npy format
Args:
c (Context): the runtime context
name (str): the name of the data
data (dict): the data
path (str, optional): system path to save the data to.
To recover the data, use np.load(file, allow_pickle=True).item()
"""
if path is None:
path = c.config.work_dir # default path
file = os.path.join(path, f"{name}.npy")
# make sure directory exists
c.fs.make_dir(os.path.dirname(file))
# save the data to file
wrapped_data = np.array(data, dtype=object)
np.save(file, wrapped_data)
c.debug_message = f"Saved debug data '{file}'"