Source code for NEDAS.job_submitters.hpc
from abc import abstractmethod
import os
import sys
import subprocess
from NEDAS.core import JobSubmitter
[docs]
class HPCJobSubmitter(JobSubmitter):
"""
JobSubmitter Class customized for a HPC with a job scheduler.
Args:
project (str, optional): Project account name for billing/allocation. Defaults to None.
queue (str, optional): The submission queue name. Defaults to None.
ppn (int, optional): Processors per node. Defaults to self.nproc.
walltime (int, optional): Maximum execution time in seconds. Defaults to 3600.
check_dt (int, optional): Time interval (sec) between status checks. Defaults to 20.
use_job_array (bool, optional): Whether to utilize scheduler job arrays. Defaults to False.
array_size (int, optional): Number of elements in the job array. Defaults to 1.
"""
def __init__(self,
project: str|None=None,
queue: str|None=None,
ppn: int|None=None,
walltime: int=3600,
check_dt: int=20,
use_job_array: bool=False,
array_size: int=1,
**kwargs):
super().__init__(**kwargs)
# processors per node
if ppn:
self._ppn = ppn
elif self.in_job_allocation:
self._ppn = self.ppn_avail
else:
cpu_count = os.cpu_count()
if cpu_count:
self._ppn = cpu_count
else:
raise RuntimeError("{self.__class__.__name__}: cannot determine ppn on host machine.")
# other HPC specific settings
self.project = project
self.queue = queue
self.walltime = walltime
self.check_dt = check_dt
self.use_job_array = use_job_array
self.array_size = array_size
@property
def ppn(self) -> int:
"""
Number of processors per compute node requested for the job
"""
return self._ppn
@ppn.setter
def ppn(self, value):
if not isinstance(value, int) or value <= 0:
raise ValueError(f"invalid ppn specified: {value}")
if self.parallel_mode == 'serial' and value > 1:
raise ValueError(f"cannot set ppn = {value} in serial mode")
if value > self.ppn_avail:
raise ValueError(f"requested ppn {value} exceeds available ppn={self.ppn_avail}")
self._ppn = value
@property
def nnode(self) -> int:
"""
Number of compute nodes for the job
"""
return (self._nproc + self._ppn - 1) // self._ppn
@property
def offset_node(self) -> int:
"""
Number of compute nodes to skip from the beginning
"""
return self._offset // self._ppn
@property
@abstractmethod
def nnode_avail(self) -> int:
"""
Number of available compute nodes on a host machine
"""
...
@property
@abstractmethod
def ppn_avail(self) -> int:
"""
Number of available processors per compute node
"""
...
@property
@abstractmethod
def job_array_index_name(self) -> str:
"""
Job array index variable name for the host machine, replacing 'JOB_ARRAY_INDEX' in 'commands'
"""
...
[docs]
def parse_commands(self, commands: str) -> str:
commands = super().parse_commands(commands)
commands = commands.replace('JOB_ARRAY_INDEX', self.job_array_index_name)
return commands
@property
@abstractmethod
def in_job_allocation(self) -> bool:
"""
Determines if a job allocation is already availalbe on the HPC
If so, the job can be run as a sub step directly, otherwise will need to submit it to the queue.
"""
...
[docs]
def check_resources(self) -> None:
"""
Checks if requested resource is available
"""
assert self.nproc+self.offset <= self.nproc_avail, f"Requested nproc={self.nproc} and offset={self.offset} exceeds nproc_avail={self.nproc_avail}"
assert self.nnode+self.offset_node <= self.nnode_avail, f"Requested nnode={self.nnode} and offset_node={self.offset_node} exceeds nnode_avail={self.nnode_avail}"
[docs]
def run(self, commands):
if self.in_job_allocation or self.nproc==1:
self.run_job_as_step(commands)
else:
self.submit_job_and_monitor(commands)
[docs]
def run_job_as_step(self, commands) -> None:
"""
Run 'commands' from within a job allocation
Use nproc processors starting from the offset+1 processor of the allocation
"""
self.check_resources()
commands = self.parse_commands(commands)
if self.debug:
print("JobSubmitter run command as step: ", commands, flush=True)
p = subprocess.Popen(commands, shell=True, text=True, bufsize=1)
p.wait()
# handle error
if p.returncode != 0:
raise RuntimeError(f"JobSubmitter: job '{self.job_name}' exited with error")
[docs]
@abstractmethod
def submit_job_and_monitor(self, commands):
"""
Submit 'commands' as a job script to the scheduler on HPC and monitor for its completion.
"""
...