import os
import stat
import subprocess
import tempfile
from time import sleep
from NEDAS.utils.conversion import seconds_to_timestr
from .hpc import HPCJobSubmitter
[docs]
class OARJobSubmitter(HPCJobSubmitter):
"""JobSubmitter Class customized for OAR schedulers"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# set up temporary node file
if self.in_job_allocation:
with tempfile.NamedTemporaryFile(delete=False, dir=self.run_dir) as file:
self.node_file = file.name
@property
def nproc_avail(self):
return self.nnode_avail * self.ppn
@property
def node_list_avail(self):
try:
with open(os.environ['OAR_NODE_FILE'], 'r') as node_file:
node_list = [node for node in node_file]
except Exception as e:
print("ERROR while reading available compute nodes from OAR_NODE_FILE")
raise e
return node_list
@property
def nnode_avail(self):
return len(self.node_list_avail)
@property
def ppn_avail(self):
# TODO: how to obtain this info from OAR system?
return 32
[docs]
def update_node_file(self):
with open(self.node_file, 'w') as file:
for i in range(self.nnode):
node = self.node_list_avail[self.offset_node+i]
file.write(node+'\n')
@property
def execute_command(self):
if self.in_job_allocation:
self.update_node_file()
node_file = self.node_file
else:
node_file = '$OAR_NODE_FILE'
return f"mpirun -np {self.nproc} --machinefile {node_file}"
@property
def job_array_index_name(self):
return '$OAR_ARRAY_INDEX'
@property
def in_job_allocation(self) -> bool:
# TODO: implement this to check if a job allocation is already in use
# for now, assume it's not
return False
[docs]
def run_job_as_step(self, commands):
super().run_job_as_step(commands)
# clean up
os.remove(self.node_file)
[docs]
def submit_job_and_monitor(self, commands):
# build the job script
with tempfile.NamedTemporaryFile(mode='w+', delete=False,
dir=self.run_dir,
prefix=self.job_name+'.',
suffix='.sh') as job_script:
job_script.write("#!/bin/bash\n")
# OAR headers
job_script.write(f"#OAR -n {self.job_name}\n")
job_script.write(f"#OAR -l /nodes={self.nnode}/core={self.ppn},walltime={seconds_to_timestr(self.walltime)}\n")
job_script.write(f"#OAR --project {self.project}\n")
if self.queue == 'devel':
job_script.write(f"#OAR -t {self.queue}\n")
log_file = os.path.join(self.run_dir, f"{self.job_name}.%jobid%.stdout")
job_script.write(f"#OAR --stdout {log_file}\n")
job_script.write(f"#OAR --stderr {log_file}\n")
if self.use_job_array:
job_script.write(f"#OAR --array {self.array_size}\n")
commands = super().parse_commands(commands)
job_script.write(commands)
job_script.write('\n')
self.job_script = job_script.name
# the job script has to be executables in OAR system
st = os.stat(self.job_script)
os.chmod(self.job_script, st.st_mode | stat.S_IEXEC)
# submit the job script
submit_cmd = self.job_submit_cmd()
# YY: use text=True in subprocess.run stead of .decode('utf-8') on stdout later
p = subprocess.run(submit_cmd, capture_output=True, text=True)
# s = p.stderr.decode('utf-8')
# print(s, flush=True)
# YY: use return code for error handling
if p.returncode != 0:
raise RuntimeError(f"Submission failed: {p.stderr}")
# YY: only print this out if debugging
if self.debug:
print(p.stdout, flush=True)
self.get_job_id(p.stdout)
# monitor the queue for job completion
if self.use_job_array:
while True:
sleep(self.check_dt)
p = self.check_job_status()
s = p.stdout.replace(' ', '').split('\n')[:-1]
print(s, flush=True)
s = [string for string in s if 'state=' in string]
print(s, flush=True)
jobs_status = [job.split('state=')[-1].replace(' ', '') for job in s]
# end this loop if all jobs are terminated
if all([status == 'Terminated' for status in jobs_status]):
break
if all([status == 'Error' for status in jobs_status]):
raise RuntimeError(f"Error job array {self.job_id}")
else:
while True:
sleep(20)
p = self.check_job_status()
s = p.stdout.replace(' ', '').split('\n')[:-1]
s = s[0]
print(s, flush=True)
jobs_status = s.split('state=')[-1].replace(' ', '')
# end this loop if all jobs are terminated
if jobs_status == 'Terminated':
break
if jobs_status == 'Error':
raise RuntimeError(f"Error in job {self.job_id}")
[docs]
def job_submit_cmd(self):
return ["oarsub", "-S", f"{self.job_script}"]
[docs]
def get_job_id(self, stdout):
if self.use_job_array:
self.job_id = int(stdout.replace(' ', '').replace('\n', '').split('OAR_ARRAY_ID=')[-1])
else:
self.job_id = int(stdout.replace(' ', '').replace('\n', '').split('OAR_JOB_ID=')[-1])
[docs]
def check_job_status(self):
if self.use_job_array:
cmd = f'oarstat -f --array {self.job_id} | grep "state = "'
else:
cmd = f'oarstat -f --job {self.job_id} | grep "state = "'
p = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return p