Source code for NEDAS.job_submitters.oar

import os
import stat
import subprocess
import tempfile
from time import sleep
from NEDAS.utils.conversion import seconds_to_timestr
from .hpc import HPCJobSubmitter

[docs] class OARJobSubmitter(HPCJobSubmitter): """JobSubmitter Class customized for OAR schedulers""" def __init__(self, **kwargs): super().__init__(**kwargs) # set up temporary node file if self.in_job_allocation: with tempfile.NamedTemporaryFile(delete=False, dir=self.run_dir) as file: self.node_file = file.name @property def nproc_avail(self): return self.nnode_avail * self.ppn @property def node_list_avail(self): try: with open(os.environ['OAR_NODE_FILE'], 'r') as node_file: node_list = [node for node in node_file] except Exception as e: print("ERROR while reading available compute nodes from OAR_NODE_FILE") raise e return node_list @property def nnode_avail(self): return len(self.node_list_avail) @property def ppn_avail(self): # TODO: how to obtain this info from OAR system? return 32
[docs] def update_node_file(self): with open(self.node_file, 'w') as file: for i in range(self.nnode): node = self.node_list_avail[self.offset_node+i] file.write(node+'\n')
@property def execute_command(self): if self.in_job_allocation: self.update_node_file() node_file = self.node_file else: node_file = '$OAR_NODE_FILE' return f"mpirun -np {self.nproc} --machinefile {node_file}" @property def job_array_index_name(self): return '$OAR_ARRAY_INDEX' @property def in_job_allocation(self) -> bool: # TODO: implement this to check if a job allocation is already in use # for now, assume it's not return False
[docs] def run_job_as_step(self, commands): super().run_job_as_step(commands) # clean up os.remove(self.node_file)
[docs] def submit_job_and_monitor(self, commands): # build the job script with tempfile.NamedTemporaryFile(mode='w+', delete=False, dir=self.run_dir, prefix=self.job_name+'.', suffix='.sh') as job_script: job_script.write("#!/bin/bash\n") # OAR headers job_script.write(f"#OAR -n {self.job_name}\n") job_script.write(f"#OAR -l /nodes={self.nnode}/core={self.ppn},walltime={seconds_to_timestr(self.walltime)}\n") job_script.write(f"#OAR --project {self.project}\n") if self.queue == 'devel': job_script.write(f"#OAR -t {self.queue}\n") log_file = os.path.join(self.run_dir, f"{self.job_name}.%jobid%.stdout") job_script.write(f"#OAR --stdout {log_file}\n") job_script.write(f"#OAR --stderr {log_file}\n") if self.use_job_array: job_script.write(f"#OAR --array {self.array_size}\n") commands = super().parse_commands(commands) job_script.write(commands) job_script.write('\n') self.job_script = job_script.name # the job script has to be executables in OAR system st = os.stat(self.job_script) os.chmod(self.job_script, st.st_mode | stat.S_IEXEC) # submit the job script submit_cmd = self.job_submit_cmd() # YY: use text=True in subprocess.run stead of .decode('utf-8') on stdout later p = subprocess.run(submit_cmd, capture_output=True, text=True) # s = p.stderr.decode('utf-8') # print(s, flush=True) # YY: use return code for error handling if p.returncode != 0: raise RuntimeError(f"Submission failed: {p.stderr}") # YY: only print this out if debugging if self.debug: print(p.stdout, flush=True) self.get_job_id(p.stdout) # monitor the queue for job completion if self.use_job_array: while True: sleep(self.check_dt) p = self.check_job_status() s = p.stdout.replace(' ', '').split('\n')[:-1] print(s, flush=True) s = [string for string in s if 'state=' in string] print(s, flush=True) jobs_status = [job.split('state=')[-1].replace(' ', '') for job in s] # end this loop if all jobs are terminated if all([status == 'Terminated' for status in jobs_status]): break if all([status == 'Error' for status in jobs_status]): raise RuntimeError(f"Error job array {self.job_id}") else: while True: sleep(20) p = self.check_job_status() s = p.stdout.replace(' ', '').split('\n')[:-1] s = s[0] print(s, flush=True) jobs_status = s.split('state=')[-1].replace(' ', '') # end this loop if all jobs are terminated if jobs_status == 'Terminated': break if jobs_status == 'Error': raise RuntimeError(f"Error in job {self.job_id}")
[docs] def job_submit_cmd(self): return ["oarsub", "-S", f"{self.job_script}"]
[docs] def get_job_id(self, stdout): if self.use_job_array: self.job_id = int(stdout.replace(' ', '').replace('\n', '').split('OAR_ARRAY_ID=')[-1]) else: self.job_id = int(stdout.replace(' ', '').replace('\n', '').split('OAR_JOB_ID=')[-1])
[docs] def check_job_status(self): if self.use_job_array: cmd = f'oarstat -f --array {self.job_id} | grep "state = "' else: cmd = f'oarstat -f --job {self.job_id} | grep "state = "' p = subprocess.run(cmd, shell=True, capture_output=True, text=True) return p