Source code for bout_runners.metadata.status_checker

"""Module containing the StatusChecker class."""


import logging
import time
from pathlib import Path
from typing import Optional, Union

import psutil
from pandas import DataFrame

from bout_runners.database.database_connector import DatabaseConnector
from bout_runners.database.database_reader import DatabaseReader
from bout_runners.log.log_reader import LogReader
from bout_runners.metadata.metadata_updater import MetadataUpdater


[docs]class StatusChecker: r""" Class to check and update the status of runs. Attributes ---------- __db_connector : DatabaseConnector Connection to the database under consideration __db_reader : DatabaseReader Object to read the database with project_path : Path Path to the project Methods ------- check_and_update_status() Check and update the status for the schema check_and_update_status_until_complete() Check and update the status until all runs are stopped __check_submitted(metadata_updater, submitted_to_check) Check the status of all runs which has status `submitted` __check_running(metadata_updater, running_to_check) Check the status of all runs which has status `running` __check_if_stopped(log_reader, metadata_updater) Check if a run has stopped check_if_running_or_errored(log_reader) Check if a run is still running or has errored Examples -------- >>> from pathlib import Path >>> from bout_runners.database.database_connector import \ ... DatabaseConnector >>> db_connector = DatabaseConnector('name_of_db', ... Path().joinpath('path', 'to', 'db')) >>> project_path = Path('path').joinpath('to', 'project') >>> status_checker = StatusChecker(db_connector, project_path) >>> status_checker.check_and_update_status() Any updates to the runs will be written to the database. Alternatively, one can run the program until all jobs have stopped by calling >>> status_checker.check_and_update_until_complete() """ def __init__( self, db_connector: DatabaseConnector, project_path: Optional[Union[Path, str]] = None, ) -> None: """ Set connector, reader and a project path. Notes ----- The StatusChecker instance only checks the project belonging to the same database schema grouped together by the `db_connector` Parameters ---------- db_connector : DatabaseConnector Connection to the database project_path : Path Path to the project (the root directory with which usually contains the makefile and the executable) """ self.__db_connector = db_connector self.__db_reader = DatabaseReader(self.__db_connector) self.__project_path = Path(project_path) if project_path is not None else Path()
[docs] def check_and_update_status(self) -> None: """ Check and update the status for the schema. Raises ------ RuntimeError If the schema does not exist """ # Check that run table exist if not self.__db_reader.check_tables_created(): logging.error( "No tables found in %s", self.__db_reader.db_connector.db_path, ) message = "Can not check the status of schemas that does not exist" logging.critical(message) raise RuntimeError(message) # Create place holder metadata_updater metadata_updater = MetadataUpdater(self.__db_connector, run_id=-1) # Check runs with status 'submitted' query = ( "SELECT name, id AS run_id FROM run WHERE\n" "latest_status = 'submitted' OR\n" "latest_status = 'created'" ) submitted_to_check = self.__db_reader.query(query) self.__check_submitted(metadata_updater, submitted_to_check) # Check runs with status 'running' query = 'SELECT name, id FROM run WHERE latest_status = "running"' running_to_check = self.__db_reader.query(query) self.__check_running(metadata_updater, running_to_check)
[docs] @staticmethod def get_query_string_for_non_errored_runs() -> str: """ Return the query string for non errored results. Returns ------- str Query string for non errored results """ return ( "SELECT name, id AS run_id FROM run WHERE\n" "latest_status = 'submitted' OR\n" "latest_status = 'created' OR\n" "latest_status = 'running'" )
[docs] def check_and_update_until_complete(self, seconds_between_update: int = 5) -> None: """ Check and update the status until all runs are stopped. Parameters ---------- seconds_between_update : int Number of seconds before a new status check is performed """ query = self.get_query_string_for_non_errored_runs() while len(self.__db_reader.query(query).index) != 0: self.check_and_update_status() time.sleep(seconds_between_update)
def __check_submitted( self, metadata_updater: MetadataUpdater, submitted_to_check: DataFrame ) -> None: """ Check the status of all runs which has status `submitted`. Parameters ---------- metadata_updater : MetadataUpdater Object which updates the database submitted_to_check : DataFrame DataFrame containing the `id` and `name` of the runs with status `submitted` Raises ------ RuntimeError In case log_reader.started() is True and log_reader.start_time is None """ for name, run_id in submitted_to_check.itertuples(index=False): metadata_updater.run_id = run_id log_path = self.__project_path.joinpath(name, "BOUT.log.0") if log_path.is_file(): log_reader = LogReader(log_path) if log_reader.started(): start_time = log_reader.start_time # Assert to prevent "Incompatible types in assignment" with Optional if start_time is None: msg = ( "log_reader.start_time is None although " "log_reader.started is True" ) logging.critical(msg) raise RuntimeError(msg) metadata_updater.update_start_time(start_time) latest_status = self.__check_if_stopped( log_reader, metadata_updater ) else: # No started time is found in the log latest_status = self.check_if_running_or_errored(log_reader) else: # No log file exists # NOTE: This means that the execution is either in a # queue or has failed the submission. # For now, we still consider this as submitted # This can maybe be decided by checking either the # pid or the status from the submitter latest_status = "submitted" metadata_updater.update_latest_status(latest_status) def __check_running( self, metadata_updater: MetadataUpdater, running_to_check: DataFrame ) -> None: """ Check the status of all runs which has status `running`. Parameters ---------- metadata_updater : MetadataUpdater Object which updates the database running_to_check : DataFrame DataFrame containing the `id` and `name` of the runs with status `running` """ for name, run_id in running_to_check.itertuples(index=False): metadata_updater.run_id = run_id log_path = self.__project_path.joinpath(name, "BOUT.log.0") log_reader = LogReader(log_path) latest_status = self.check_if_running_or_errored(log_reader) metadata_updater.update_latest_status(latest_status) def __check_if_stopped( self, log_reader: LogReader, metadata_updater: MetadataUpdater ) -> str: """ Check if a run has stopped. Parameters ---------- log_reader : LogReader The object which reads log files metadata_updater : MetadataUpdater Object which updates the database Returns ------- latest_status : str The latest status Raises ------ RuntimeError In case log_reader.ended() is True and log_reader.end_time is None """ if log_reader.ended(): end_time = log_reader.end_time # Assert to prevent "Incompatible types in assignment" with Optional if end_time is None: msg = "log_reader.end_time is None although log_reader.ended() is True" logging.critical(msg) raise RuntimeError(msg) metadata_updater.update_stop_time(end_time) latest_status = "complete" else: latest_status = self.check_if_running_or_errored(log_reader) return latest_status
[docs] @staticmethod def check_if_running_or_errored(log_reader: LogReader) -> str: """ Check if a run is still running or has errored. Parameters ---------- log_reader : LogReader The object which reads log files Returns ------- latest_status : str The latest status """ pid = log_reader.pid if pid is None: latest_status = "created" elif psutil.pid_exists(pid): latest_status = "running" else: latest_status = "error" return latest_status