NEDAS.utils.parallel module
- class NEDAS.utils.parallel.Comm[source]
Bases:
objectCommunicator class supporting both serial and MPI programs.
When the python program is started with MPI environment, for example:
$ mpirun -n 10 python -m mpi4py program.py
A communicator can be obtained from the mpi4py package:
>>> from mpi4py import MPI >>> comm = MPI.COMM_WORLD
However, when the program is run in
- Variables:
parallel_io (bool) – If netCDF4.Dataset is built with parallel I/O support.
- mpi_ready: bool = False
- parallel_io: bool
- init_file_lock(filename)[source]
Initialize file locks for thread-safe I/O.
- Parameters:
filename (str) – Path to the file.
- class NEDAS.utils.parallel.DummyComm[source]
Bases:
objectDummy communicator for python without mpi
- NEDAS.utils.parallel.by_rank(comm: Comm, rank: int) Callable[[Callable[[P], T]], Callable[[P], T | None]][source]
Decorator for func() to be run only by rank 0 in comm
- NEDAS.utils.parallel.bcast_by_root(comm: Comm) Callable[[Callable[[P], T]], Callable[[P], T]][source]
Decorator for func() to be run only by rank 0 in comm, and result of func() is then broadcasted to all other ranks.
- NEDAS.utils.parallel.distribute_tasks(comm: Comm, tasks: ndarray | Sequence, load: ndarray | Sequence | None = None) dict[int, list][source]
Divide a list of task indices and assign a subset to each rank in comm
- Parameters:
comm (Comm) – MPI communicator
tasks (ArrayLike) – List of task indices (to be distributed over the processors)
load (np.ndarray, optional) – Amount of workload for each task element The default is None, we will let tasks have equal workload
- Returns:
- Dictionary {rank:list}, list is the subset of tasks for the processor rank
calling this function to work on
- Return type:
dict
- class NEDAS.utils.parallel.OfflineScheduler(c, nworker: int, walltime: int | None = None, check_dt: float = 0.1, debug: bool = False)[source]
Bases:
objectAn offline scheduler class for queuing and running multiple jobs on available workers (group of processors). The jobs are submitted by one processor with the scheduler, while the job.run code is calling subprocess to be run on the worker
- submit_job(name: str, job: Callable, *args, **kwargs) None[source]
Submit a job to the scheduler, hold info in jobs dict.
- Parameters:
name (str) – unique name to identify this job
job (Callable) – callable with is_running and kill methods
*args – passed into job()
**kwargs – passed into job()