Source code for bout_runners.submitter.submitter_factory

"""Contains the submitter factory."""


import configparser
import logging
from typing import Any, Dict, Optional, Tuple

from bout_runners.submitter.local_submitter import AbstractSubmitter, LocalSubmitter
from bout_runners.submitter.pbs_submitter import PBSSubmitter
from bout_runners.submitter.processor_split import ProcessorSplit
from bout_runners.submitter.slurm_submitter import SLURMSubmitter
from bout_runners.utils.paths import get_submitters_configuration


[docs]def get_submitter( name: Optional[str] = None, argument_dict: Optional[Dict[str, Any]] = None, ) -> AbstractSubmitter: """ Return a Submitter object. Parameters ---------- name : str or None Name of the submitter to use If None the submitter will be inferred argument_dict : dict Dict containing positional and keyword arguments Other Parameters ---------------- The following parameters can be given argument_dict processor_split : ProcessorSplit or None Object containing the processor split Used for all submitters run_path : Path or str or None Positional argument Directory to run the command from Used in LocalSubmitters job_name : str or None Positional argument Name of the job Used for cluster submitters store_directory : path or None Keyword agrument Directory to store the scripts Used for cluster submitters submission_dict : None or dict of str of None or str Keyword agrument Dict containing optional submission options One the form >>> {'walltime': None or str, ... 'account': None or str, ... 'queue': None or str, ... 'mail': None or str} These options will not be used if the submission_dict is None Used for cluster submitters Returns ------- submitter : AbstractSubmitter The implemented submitter class Raises ------ ValueError If the input does not match the desired submitter NotImplementedError If the name is not a supported submitter class """ implemented = ("local", "pbs", "slurm") if name is None or argument_dict is None: name, argument_dict = infer_submitter() logging.debug("Choosing a %s submitter", name) if "processor_split" not in argument_dict.keys(): argument_dict["processor_split"] = ProcessorSplit() if name == "local": if "run_path" not in argument_dict.keys(): argument_dict["run_path"] = None return LocalSubmitter( run_path=argument_dict["run_path"], processor_split=argument_dict["processor_split"], ) if name in ("pbs", "slurm"): for argument in ("job_name", "store_directory", "submission_dict"): if argument not in argument_dict.keys(): argument_dict[argument] = None if name == "pbs": return PBSSubmitter( job_name=argument_dict["job_name"], store_directory=argument_dict["store_directory"], submission_dict=argument_dict["submission_dict"], processor_split=argument_dict["processor_split"], ) if name == "slurm": return SLURMSubmitter( job_name=argument_dict["job_name"], store_directory=argument_dict["store_directory"], submission_dict=argument_dict["submission_dict"], processor_split=argument_dict["processor_split"], ) msg = f"{name} is not a valid submitter class, choose " f"from {implemented}" logging.critical(msg) raise NotImplementedError(msg)
[docs]def infer_submitter() -> Tuple[str, Dict[str, Any]]: """ Infer the submitter and return appropriate positional and keyword arguments. Returns ------- name : str Name of the submitter type argument_dict : dict Dict containing positional and keyword arguments """ submitter_config = get_submitters_configuration() slurm_available = slurm_is_available() pbs_available = pbs_is_available() if pbs_available or slurm_available: submission_dict = get_submission_dict(submitter_config["cluster"]) processor_split = get_processor_split(submitter_config["cluster"]) argument_dict = { "submission_dict": submission_dict, "processor_split": processor_split, } if slurm_available: logging.info("Inferred to use the SLURM submitter") name = "slurm" else: # Use PBS logging.info("Inferred to use the PBS submitter") name = "pbs" else: name = "local" # NOTE: We will always run one node for local submissions processor_split = get_processor_split(submitter_config["local"]) # NOTE: As the keyword argument run_path defaults to the caller dir # we will not override the option here, but let the constructor # give a default value argument_dict = {"processor_split": processor_split} return name, argument_dict
[docs]def slurm_is_available() -> bool: """ Check if the SLURM system is available. Returns ------- slurm_available : bool True if SLURM is available """ # Submit the command through a local submitter local_submitter = LocalSubmitter() try: local_submitter.submit_command("squeue") local_submitter.wait_until_completed(raise_error=False) slurm_available = not local_submitter.errored() except FileNotFoundError: # subprocess.Popen throws FileNotFoundError if a command is not in scope slurm_available = False logging.debug("SLURM is%s available", " not" if not slurm_available else "") return slurm_available
[docs]def pbs_is_available() -> bool: """ Check if the PBS system is available. Returns ------- pbs_available : bool True if PBS is available """ # Submit the command through a local submitter local_submitter = LocalSubmitter() try: local_submitter.submit_command("qstat") local_submitter.wait_until_completed(raise_error=False) pbs_available = not local_submitter.errored() except FileNotFoundError: # subprocess.Popen throws FileNotFoundError if a command is not in scope pbs_available = False logging.debug("PBS is%s available", " not" if not pbs_available else "") return pbs_available
[docs]def get_submission_dict( cluster_section: configparser.SectionProxy, ) -> Dict[str, Optional[str]]: """ Return the submission dict based on the submitter configuration file. Parameters ---------- cluster_section : configparser.SectionProxy The cluster section of the submitters configuration Must contain the sections walltime, account, queue, mail Returns ------- submission_dict : dict Dictionary containing options for the submission_dict The submission_dict is a keyword parameter to the cluster constructors See Also -------- AbstractClusterSubmitter : Submitter object used for clusters """ submission_dict: Dict[str, Optional[str]] = dict() for key in ("walltime", "account", "queue", "mail"): submission_dict[key] = ( None if cluster_section[key].lower() == "none" else cluster_section[key] ) return submission_dict
[docs]def get_processor_split(submitter_section: configparser.SectionProxy) -> ProcessorSplit: """ Return the processor split based on the submitter configuration file. Parameters ---------- submitter_section : configparser.SectionProxy The section of the submitters configuration to use Must contain the section number_of_processors Returns ------- ProcessorSplit The processor split object """ number_of_processors = int(submitter_section["number_of_processors"]) number_of_nodes = ( 1 if "number_of_nodes" not in submitter_section else int(submitter_section["number_of_nodes"]) ) processors_per_node = ( 1 if "processors_per_node" not in submitter_section else int(submitter_section["processors_per_node"]) ) return ProcessorSplit( number_of_processors=number_of_processors, number_of_nodes=number_of_nodes, processors_per_node=processors_per_node, )