"""Contains the SLURM submitter class."""
import logging
import re
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
from typing import Dict, Optional, Tuple
from bout_runners.submitter.abstract_cluster_submitter import AbstractClusterSubmitter
from bout_runners.submitter.local_submitter import LocalSubmitter
from bout_runners.submitter.processor_split import ProcessorSplit
[docs]class SLURMSubmitter(AbstractClusterSubmitter):
"""
The SLURM submitter class.
Attributes
----------
__sacct_starttime : str
Time to search from in sacct
Methods
-------
_wait_for_std_out_and_std_err()
Wait until the process completes if a process has been started
extract_job_id(std_out)
Return the job_id
get_return_code(sacct_str)
Return the exit code if any
get_state(sacct_str)
Return the state from sacct
structure_time_to_slurm_format(time_str)
Structure the time string to a SLURM time string
completed()
Return the completed status
create_submission_string(command, waiting_for)
Return the PBS script as a string
get_sacct()
Return the trace from ``sacct``
reset()
Reset released, waiting_for and status dict
Examples
--------
>>> submitter = SLURMSubmitter(job_name, store_path)
>>> submitter.submit_command("echo 'Hello'")
>>> submitter.wait_until_completed()
>>> submitter.std_out
Hello
"""
def __init__(
self,
job_name: str,
store_directory: Path,
submission_dict: Optional[Dict[str, Optional[str]]] = None,
processor_split: Optional[ProcessorSplit] = None,
):
"""
Set the member data.
Parameters
----------
job_name : str or None
Name of the job
If None, a timestamp will be given as job_name
store_directory : Path or None
Directory to store the script
If None, the caller directory will be used as the store directory
submission_dict : None or dict of str of None or str
Dict containing optional submission options
One the form
>>> {'walltime': None or str,
... 'account': None or str,
... 'queue': None or str,
... 'mail': None or str}
These options will not be used if the submission_dict is None
processor_split : ProcessorSplit or None
Object containing the processor split
If None, default values will be used
"""
super().__init__(job_name, store_directory, submission_dict, processor_split)
self.__sacct_starttime = (datetime.now() - timedelta(days=365)).strftime(
r"%Y-%m-%d"
)
if self._submission_dict["walltime"] is not None:
self._submission_dict["walltime"] = self.structure_time_to_slurm_format(
self._submission_dict["walltime"]
)
self._cluster_specific["cancel_str"] = "scancel"
self._cluster_specific["release_str"] = "scontrol release"
self._cluster_specific["submit_str"] = "sbatch --hold"
def _wait_for_std_out_and_std_err(self) -> None:
"""
Wait until the process completes if a process has been started.
Populate return_code, std_out and std_err
"""
if self._status["job_id"] is not None:
self.release()
while self._status["return_code"] is None:
sacct_str = self.get_sacct()
self._status["return_code"] = self.get_return_code(sacct_str)
sleep(5)
logging.debug("sacct is reading:\n%s", sacct_str)
if self._status["return_code"] is not None:
self._populate_std_out_and_std_err()
else:
# No job_id
logging.warning(
"Tried to wait for a process without job_id %s (%s). "
"return_code, std_out, std_err not populated for SLURM job",
self.job_id,
self.job_name,
)
[docs] @staticmethod
def get_return_code(sacct_str: str) -> Optional[int]:
"""
Return the exit code if any.
Parameters
----------
sacct_str : str
Trace obtained from the ``sacct`` command
Returns
-------
return_code : None or int
Return code obtained from the cluster
Notes
-----
Assumes the job line is the third line of sacct_str
(the two others being headers), and that the "ExitCode" is the last
column in ``sacct``
"""
job_lines = sacct_str.split("\n")
if len(job_lines) <= 2:
return None
job_line = job_lines[2]
pattern = r"(-?\d+):-?\d+\s?$"
# Using search as match will only search the beginning of
# the string
# https://stackoverflow.com/a/32134461/2786884
match = re.search(pattern, job_line, flags=re.MULTILINE)
if match is None:
return None
return int(match.group(1))
[docs] @staticmethod
def get_state(sacct_str: str) -> Optional[str]:
"""
Return the state from sacct.
Parameters
----------
sacct_str : str
Trace obtained from the ``sacct`` command
Returns
-------
status : None or str
Status code obtained from the cluster
Notes
-----
Assumes the job line is the third line of sacct_str
(the two others being headers), and that the "State" is the second last
column before "ExitCode" in ``sacct``
"""
job_lines = sacct_str.split("\n")
if len(job_lines) <= 2:
return None
job_line = job_lines[2]
pattern = r"([A-Z]+\+?)\s+-?\d+:-?\d+\s?$"
# Using search as match will only search the beginning of
# the string
# https://stackoverflow.com/a/32134461/2786884
match = re.search(pattern, job_line, flags=re.MULTILINE)
if match is None:
return None
return match.group(1)
[docs] def completed(self) -> bool:
"""
Return the completed status.
Returns
-------
bool
Whether the job has completed
"""
if self._status["job_id"] is not None and self._released:
if self._status["return_code"] is not None:
return True
sacct_str = self.get_sacct()
if self.get_state(sacct_str) in ("RUNNING", None):
return False
return_code = self.get_return_code(sacct_str)
if return_code is not None:
self._status["return_code"] = return_code
self._wait_for_std_out_and_std_err()
return True
return False
[docs] def create_submission_string(
self, command: str, waiting_for: Tuple[str, ...]
) -> str:
"""
Return the PBS script as a string.
Parameters
----------
command : str
The command to submit
waiting_for : tuple of str
Tuple of ids that this job will wait for
Returns
-------
job_script : str
The script to be submitted
"""
wall_time = self._submission_dict["walltime"]
acc = self._submission_dict["account"]
sub_queue = self._submission_dict["queue"]
email = self._submission_dict["mail"]
# Backslash not allowed in f-string expression
newline = "\n"
if len(waiting_for) != 0:
waiting_for_str = (
f"#SBATCH --dependency=afterok:{':'.join(waiting_for)}{newline}"
)
else:
waiting_for_str = ""
# Notice that we do not add the stem here
self._log_and_error_base = self.store_dir.joinpath(self._job_name)
log_file_str = f"{self._log_and_error_base}.log"
err_file_str = f"{self._log_and_error_base}.err"
# NOTE: job_string += added in order not to trigger pylint R0801
# (which seems not possible to disable)
job_string = (
"#!/bin/bash\n"
f"#SBATCH --job-name={self._job_name}\n"
f"#SBATCH --nodes={self.processor_split.number_of_nodes}\n"
f"#SBATCH --tasks-per-node={self.processor_split.processors_per_node}\n"
f"{f'#SBATCH --time={wall_time}{newline}' if wall_time is not None else ''}"
)
job_string += (
f"{f'#SBATCH --account={acc}{newline}' if acc is not None else ''}"
f"{f'#SBATCH -p {sub_queue}{newline}' if sub_queue is not None else ''}"
)
job_string += (
f"#SBATCH -o {log_file_str}\n"
f"#SBATCH -e {err_file_str}\n"
f"{f'#SBATCH --mail-type=ALL{newline}' if email is not None else ''}"
f"{f'#SBATCH --mail-user={email}{newline}' if email is not None else ''}"
f"{waiting_for_str}"
"\n"
# Change directory to the directory of this script
"cd $SLURM_SUBMIT_DIR\n"
f"{command}"
)
return job_string
[docs] def get_sacct(self) -> str:
"""
Return the result from ``sacct``.
Returns
-------
sacct_str : str
The string obtained from ``sacct``
An empty string is will be returned if no job_id exist
"""
if self._status["job_id"] is not None:
# Submit the command through a local submitter
local_submitter = LocalSubmitter(run_path=self.store_dir)
local_submitter.submit_command(
f"sacct "
f"--starttime {self.__sacct_starttime} "
f"--j {self._status['job_id']} "
f"--brief"
)
local_submitter.wait_until_completed()
sacct_str = (
local_submitter.std_out if local_submitter.std_out is not None else ""
)
return sacct_str
return ""
[docs] def reset(self) -> None:
"""Reset released, waiting_for and status dict."""
self._released = False
self._waiting_for = list()
self._reset_status()