NEDAS.utils.parallel module

class NEDAS.utils.parallel.Comm[source]

Bases: object

Communicator class supporting both serial and MPI programs.

When the python program is started with MPI environment, for example:

$ mpirun -n 10 python -m mpi4py program.py

A communicator can be obtained from the mpi4py package:

>>> from mpi4py import MPI
>>> comm = MPI.COMM_WORLD

However, when the program is run in

Variables:

parallel_io (bool) – If netCDF4.Dataset is built with parallel I/O support.

mpi_ready: bool = False
parallel_io: bool
init_file_lock(filename)[source]

Initialize file locks for thread-safe I/O.

Parameters:

filename (str) – Path to the file.

check_parallel_io() bool[source]

Check if netCDF4 is built with parallel I/O support.

Returns:

True if netCDF4 module support parallel I/O mode.

Return type:

bool

cleanup_file_locks()[source]
acquire_file_lock(filename)[source]
release_file_lock(filename)[source]
finalize()[source]

Clean up MPI resources cleanly to avoid hangs on exit.

class NEDAS.utils.parallel.DummyComm[source]

Bases: object

Dummy communicator for python without mpi

Get_size()[source]
Get_rank()[source]
Barrier()[source]
Abort(code: int)[source]
Split(color=0, key=0)[source]
bcast(obj, root=0)[source]
send(obj, dest, tag)[source]
recv(source, tag)[source]
allgather(obj)[source]
gather(obj, root=0)[source]
allreduce(obj)[source]
reduce(obj, root=0)[source]
NEDAS.utils.parallel.by_rank(comm: Comm, rank: int) Callable[[Callable[[P], T]], Callable[[P], T | None]][source]

Decorator for func() to be run only by rank 0 in comm

NEDAS.utils.parallel.bcast_by_root(comm: Comm) Callable[[Callable[[P], T]], Callable[[P], T]][source]

Decorator for func() to be run only by rank 0 in comm, and result of func() is then broadcasted to all other ranks.

NEDAS.utils.parallel.distribute_tasks(comm: Comm, tasks: ndarray | Sequence, load: ndarray | Sequence | None = None) dict[int, list][source]

Divide a list of task indices and assign a subset to each rank in comm

Parameters:
  • comm (Comm) – MPI communicator

  • tasks (ArrayLike) – List of task indices (to be distributed over the processors)

  • load (np.ndarray, optional) – Amount of workload for each task element The default is None, we will let tasks have equal workload

Returns:

Dictionary {rank:list}, list is the subset of tasks for the processor rank

calling this function to work on

Return type:

dict

class NEDAS.utils.parallel.OfflineScheduler(c, nworker: int, walltime: int | None = None, check_dt: float = 0.1, debug: bool = False)[source]

Bases: object

An offline scheduler class for queuing and running multiple jobs on available workers (group of processors). The jobs are submitted by one processor with the scheduler, while the job.run code is calling subprocess to be run on the worker

submit_job(name: str, job: Callable, *args, **kwargs) None[source]

Submit a job to the scheduler, hold info in jobs dict.

Parameters:
  • name (str) – unique name to identify this job

  • job (Callable) – callable with is_running and kill methods

  • *args – passed into job()

  • **kwargs – passed into job()

monitor_job_queue() None[source]

Monitor the available_workers and pending_jobs, assign a job to a worker if possible Monitor the running_jobs for jobs that are finished, kill jobs that exceed walltime, and move the finished jobs to completed_jobs

start_queue()[source]

Start the job queue, and wait for jobs to complete

shutdown()[source]