Source code for bout_runners.submitter.local_submitter

"""Contains the local submitter class."""


import logging

# NOTE: Subprocess below is safe against shell injections
# https://github.com/PyCQA/bandit/issues/280
import subprocess  # nosec
from pathlib import Path
from typing import Optional

from bout_runners.submitter.abstract_submitter import AbstractSubmitter
from bout_runners.submitter.processor_split import ProcessorSplit
from bout_runners.utils.file_operations import get_caller_dir


[docs]class LocalSubmitter(AbstractSubmitter): """ Submits a command. Attributes ---------- __process : None or Popen The Popen process if it has been created run_path : Path or str Directory to run the command from Methods ------- _wait_for_std_out_and_std_err() Wait until the process completes, populate return_code, std_out and std_err submit_command(command) Submit a subprocess completed() Return the completed status raise_error(self) Raise and error from the subprocess in a clean way Examples -------- >>> submitter = LocalSubmitter() >>> submitter.submit_command('ls') >>> submitter.wait_until_completed() >>> print(submitter.std_out) __init__.py test_local_submitter.py test_processor_split.py test_submitter_factory.py """ def __init__( self, run_path: Optional[Path] = None, processor_split: Optional[ProcessorSplit] = None, ) -> None: """ Set the path from where the calls are made from. Parameters ---------- run_path : Path or str or None Directory to run the command from If None, the calling directory will be used processor_split : ProcessorSplit or None Object containing the processor split If None, default values will be used """ AbstractSubmitter.__init__(self, processor_split) # NOTE: We are not setting the default as a keyword argument # as this would mess up the paths self.run_path = ( Path(run_path).absolute() if run_path is not None else get_caller_dir() ) self.__process: Optional[subprocess.Popen] = None def _wait_for_std_out_and_std_err(self) -> None: """ Wait until the process completes if a process has been started. Populate return_code, std_out and std_err """ if self.__process is not None: std_out, std_err = self.__process.communicate() self._status["return_code"] = self.__process.poll() self._status["std_out"] = std_out.decode("utf8").strip() self._status["std_err"] = std_err.decode("utf8").strip() else: logging.warning( "No process started, return_code, std_out, std_err not populated" )
[docs] def submit_command(self, command: str) -> None: """ Submit a subprocess. Parameters ---------- command : str The command to run """ # This starts the job anew, so we restart the instance to clear it from any # spurious member data self.reset() self.__process = subprocess.Popen( command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.run_path, # https://docs.python.org/3/library/subprocess.html#security-considerations # https://github.com/PyCQA/bandit/issues/280 shell=False, # nosec ) self._status["job_id"] = str(self.__process.pid) logging.debug( "job_id %s given to command '%s' in %s", self.job_id, command, self.run_path )
[docs] def completed(self) -> bool: """ Return the completed status. Communicate the process has completed. Returns ------- bool True if the process has completed """ if self.__process is not None: if self.return_code is not None: return True return_code = self.__process.poll() if return_code is not None: self._status["return_code"] = return_code self._wait_for_std_out_and_std_err() return True return False
[docs] def raise_error(self) -> None: """Raise and error from the subprocess in a clean way.""" if self.completed(): if isinstance(self.__process, subprocess.Popen) and isinstance( self.return_code, int ): result = subprocess.CompletedProcess( self.__process.args, self.return_code, self.std_out, self.std_err ) result.check_returncode()