Source code for NEDAS.job_submitters.local
import subprocess
import sys
import os
from NEDAS.core.job_submitter import JobSubmitter
[docs]
class LocalJobSubmitter(JobSubmitter):
"""
The LocalJobSubmitter class assumes a generic GNU/Linux environment on a single PC.
For 'serial' parallel mode: the command is directly executed in a subprocess.
For 'mpi' or 'openmp' parallel modes: the command will be parsed accordingly to be run in an mpi environment.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# check for available processors on the node
self._cpu_count = os.cpu_count()
assert self._cpu_count is not None, "cannot query available nproc: os.cpu_count returns None"
@property
def nproc_avail(self):
if self._cpu_count:
return self._cpu_count
return 1
@property
def execute_command(self) -> str:
"""
Vanila JobSubmitter will just run "mpirun -np nproc ...", and discard the ppn and offset settings
"""
if self.parallel_mode == 'serial':
return ""
elif self.parallel_mode == 'mpi':
return f"mpirun -np {self.nproc}"
elif self.parallel_mode == 'openmp':
return f"export OMP_NUM_THREADS={self.nproc};"
else:
raise ValueError(f"unknown parallel_mode '{self.parallel_mode}'")
[docs]
def run(self, commands: str) -> None:
"""
Runs 'commands' on the local computer.
Use nproc processors starting from the offset+1 processor of the allocation
"""
commands = self.parse_commands(commands)
if self.debug:
print("JobSubmitter run command as step: ", commands, flush=True)
p = subprocess.Popen(commands, shell=True, text=True, bufsize=1)
p.wait()
# handle error
if p.returncode != 0:
raise RuntimeError(f"JobSubmitter: job '{self.job_name}' exited with error")