Source code for bout_runners.metadata.metadata_recorder
"""Module containing the MetadataRecorder class."""
import logging
from datetime import datetime
from typing import Dict, Mapping, Optional, Union
from bout_runners.database.database_connector import DatabaseConnector
from bout_runners.database.database_reader import DatabaseReader
from bout_runners.database.database_utils import get_file_modification, get_system_info
from bout_runners.database.database_writer import DatabaseWriter
from bout_runners.make.make import Make
from bout_runners.parameters.bout_paths import BoutPaths
from bout_runners.parameters.final_parameters import FinalParameters
from bout_runners.submitter.processor_split import ProcessorSplit
[docs]class MetadataRecorder:
r"""
Class for recording the metadata of the runs.
Attributes
----------
__db_writer : DatabaseWriter
Getter variable for db_writer
__db_reader : DatabaseReader
Getter variable for db_reader
db_writer : DatabaseWriter
Object which writes to the database
db_reader : DatabaseReader
Object which reads from the database
Methods
-------
capture_new_data_from_run(runner, processor_split)
Capture new data from a run
_create_parameter_tables_entry(parameters_dict)
Insert the parameters into a the parameter tables
Examples
--------
Import dependencies
>>> from pathlib import Path
>>> from bout_runners.executor.bout_paths import BoutPaths
>>> from bout_runners.parameters.default_parameters import DefaultParameters
>>> from bout_runners.parameters.final_parameters import FinalParameters
>>> from bout_runners.database.database_connector import DatabaseConnector
>>> from bout_runners.submitter.processor_split import ProcessorSplit
Create the `bout_paths` object
>>> project_path = Path().joinpath('path', 'to', 'project')
>>> bout_inp_src_dir = Path().joinpath('path', 'to', 'source', 'BOUT.inp')
>>> bout_inp_dst_dir = Path().joinpath('path', 'to', 'destination','BOUT.inp')
>>> bout_paths = BoutPaths(project_path=project_path,
... bout_inp_src_dir=bout_inp_src_dir,
... bout_inp_dst_dir=bout_inp_dst_dir)
Obtain the parameters
>>> default_parameters = DefaultParameters(bout_paths)
>>> final_parameters = FinalParameters(default_parameters)
>>> final_parameters_dict = final_parameters.get_final_parameters()
>>> final_parameters_as_sql_types = \
... final_parameters.cast_to_sql_type(
... final_parameters_dict)
Create the metadata recorder object
>>> db_connector = DatabaseConnector('name', project_path)
>>> metadata_recorder = MetadataRecorder(db_connector,
... bout_paths,
... final_parameters)
Capture the data to the database
>>> metadata_recorder.capture_new_data_from_run(ProcessorSplit())
None
"""
def __init__(
self,
db_connector: DatabaseConnector,
bout_paths: BoutPaths,
final_parameters: FinalParameters,
) -> None:
"""
Set the database to use.
Parameters
----------
db_connector : DatabaseConnector
The database connector
bout_paths : BoutPaths
Object containing the paths
final_parameters : FinalParameters
Object containing the final parameters
"""
self.__db_writer = DatabaseWriter(db_connector)
self.__db_reader = DatabaseReader(db_connector)
self.__bout_paths = bout_paths
self.__final_parameters = final_parameters
self.__make = Make(self.__bout_paths.project_path)
@property
def db_reader(self) -> DatabaseReader:
"""
Set the properties of self.db_reader.
Returns
-------
self.__db_reader : DatabaseReader
The database reader object
Notes
-----
The db_reader is read only
"""
return self.__db_reader
@property
def db_writer(self):
"""
Set the properties of self.db_writer.
Returns
-------
self.__db_writer : DatabaseWriter
The database writer object
Notes
-----
The db_writer is read only
"""
return self.__db_writer
[docs] def capture_new_data_from_run(
self,
processor_split: ProcessorSplit,
restart: bool = False,
force: bool = False,
) -> Optional[int]:
"""
Capture new data from a run.
This function will capture all uncaptured data from a run.
If all data has been captured previously, it means that the run has already
been executed, and new_entry = False will be returned.
Parameters
----------
processor_split : ProcessorSplit
The processor split object
restart : bool
If True, the data will be captured (even if it has been executed before)
force : bool
Store entry to the run table even if a entry with the same parameter exists
This will typically be used if the bout_runners is forcefully executing
a run
Returns
-------
run_id : None or int
If no previous run with the same configuration has been executed,
this will return None, else the run_id is returned
"""
# Initiate the run_dict (will be filled with the ids)
run_dict: Dict[str, Union[str, int, float, None]] = {
"name": self.__bout_paths.bout_inp_dst_dir.name
}
# Update the parameters
parameters_dict = self.__final_parameters.get_final_parameters()
if restart:
parameters_dict["global"]["restart"] = 1
run_dict["parameters_id"] = self._create_parameter_tables_entry(parameters_dict)
# Update the file_modification
file_modification_dict = get_file_modification(
self.__bout_paths.project_path,
self.__make.makefile_path,
self.__make.exec_name,
)
run_dict["file_modification_id"] = self.__db_reader.get_entry_id(
"file_modification", file_modification_dict
)
if run_dict["file_modification_id"] is None:
run_dict["file_modification_id"] = self.create_entry(
"file_modification", file_modification_dict
)
# Update the split
split_dict = {
"number_of_processors": processor_split.number_of_processors,
"number_of_nodes": processor_split.number_of_nodes,
"processors_per_node": processor_split.processors_per_node,
}
run_dict["split_id"] = self.__db_reader.get_entry_id("split", split_dict)
if run_dict["split_id"] is None:
run_dict["split_id"] = self.create_entry("split", split_dict)
# Update the system info
system_info_dict = get_system_info()
run_dict["system_info_id"] = self.__db_reader.get_entry_id(
"system_info", system_info_dict
)
if run_dict["system_info_id"] is None:
run_dict["system_info_id"] = self.create_entry(
"system_info", system_info_dict
)
# Update the run
# NOTE: If restart is True, a new run_id will be given as the run_dict["name"]
# will be unique
run_id = self.__db_reader.get_entry_id("run", run_dict)
if force or run_id is None:
run_dict["latest_status"] = "submitted"
run_dict["submitted_time"] = datetime.now().isoformat()
_ = self.create_entry("run", run_dict)
return run_id
[docs] def create_entry(
self, table_name: str, entries_dict: Mapping[str, Union[int, str, float, None]]
) -> int:
"""
Create a database entry and return the entry id.
Parameters
----------
table_name : str
Name of the table
entries_dict : dict
Dictionary containing the entries as key value pairs
Returns
-------
entry_id : int
The id of the newly created entry
Raises
------
RuntimeError
If the newly created id could not be fetched
"""
self.__db_writer.create_entry(table_name, entries_dict)
entry_id = self.__db_reader.get_entry_id(table_name, entries_dict)
if entry_id is None:
msg = "Could not fetch the newly created id"
logging.critical(msg)
raise RuntimeError(msg)
return entry_id
def _create_parameter_tables_entry(
self, parameters_dict: Dict[str, Dict[str, Union[int, str, float]]]
) -> int:
"""
Insert the parameters into a the parameter tables.
Parameters
----------
parameters_dict : dict
The dictionary on the form
>>> {'section': {'parameter': 'value'}}
Returns
-------
parameters_id : int
The id key from the `parameters` table
Notes
-----
All `:` will be replaced by `_` in the section names
"""
parameters_foreign_keys = dict()
parameter_sections = list(parameters_dict.keys())
for section in parameter_sections:
# Replace bad characters for SQL
section_name = section.replace(":", "_")
section_parameters = parameters_dict[section]
section_id = self.__db_reader.get_entry_id(section_name, section_parameters)
if section_id is None:
section_id = self.create_entry(section_name, section_parameters)
parameters_foreign_keys[f"{section_name}_id"] = section_id
# Update the parameters table
parameters_id = self.__db_reader.get_entry_id(
"parameters", parameters_foreign_keys
)
if parameters_id is None:
parameters_id = self.create_entry("parameters", parameters_foreign_keys)
return parameters_id