"""Contains the RunGroup class."""
import logging
import re
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from bout_runners.database.database_connector import DatabaseConnector
from bout_runners.parameters.bout_paths import BoutPaths
from bout_runners.parameters.bout_run_setup import BoutRunSetup
from bout_runners.runner.run_graph import RunGraph
from bout_runners.submitter.abstract_cluster_submitter import AbstractClusterSubmitter
from bout_runners.submitter.abstract_submitter import AbstractSubmitter
from bout_runners.submitter.submitter_factory import get_submitter
[docs]class RunGroup:
"""
Class for building a run group.
A run group contains one recipe for executing the project (called bout_run_setup).
The run group may consist of pre-processors (functions that will run prior to the
bout_run_setup execution), and it may consist of post-processors (functions that
will run after the bout_run_setup execution).
Attributes
----------
__counter : int
Counter used if no name is given in the constructor
__names : list of str
List of the run group names, makes sure there will be no name collision
__dst_dir : Path
The path to the dump directory
__name : str
Name of the RunGroup
__run_graph : RunGraph
The getter variable of run_graph
__bout_run_setup : BoutRunSetup
The setup of the BOUT++ run
__bout_run_node_name : str
Getter variable for bout_run_node_name
__pre_processors : list of str
Getter variable for pre_processors
__post_processors : list of str
Getter variable for post_processors
bout_run_node_name : str
Name of the BOUT++ run part of the run group
bout_paths : BoutPaths
The BoutPaths of the BOUT++ run
db_connector : DatabaseConnector
The DatabaseConnector of the BOUT++ run
run_graph : RunGraph
The RunGraph which the RunGroup will be attached to
pre_processors : list of str
List of pre-processors (which will run before the BOUT++ run)
post_processors : list of str
List of post-processors (which will run after the BOUT++ run)
Methods
-------
__increment_name()
Increment the name with a number to avoid name collision
add_pre_processor(function_dict, directory, submitter, waiting_for)
Add a function which will run prior to the BOUT++ run
add_post_processor(function_dict, directory, submitter, waiting_for)
Add a function which will run after the BOUT++ run
Examples
--------
The RunGroup contains the a bout run and it's pre- and post-processors
>>> bout_run_setup = BoutRunSetup(executor, db_connector, final_parameters)
>>> run_graph = RunGraph()
>>> # Attach a RunGroups to the run_graph
>>> run_group_1 = RunGroup(run_graph, bout_run_setup_1)
>>> run_group_2 = RunGroup(run_graph, bout_run_setup_2)
>>> # Add the function `foo` as a post-processor to run_group_1
>>> post_processor_node_name = run_group_1.add_post_processor(
... {'function': foo, 'args': (foo_1,), 'kwargs':None})
>>> # Add the function `bar` as a pre-processor to run_group_2,
... # which waits for the post-processor of run_group_1
>>> run_group_2.add_pre_processor(
... {'function': bar, 'args': None, 'kwargs':None}),
... waiting_for=post_processor_node_name)
>>> runner = BoutRunner(run_graph)
>>> runner.run()
See Also
--------
RunGraph : Class to create a run graph
"""
__counter = 0
__names: List[str] = list()
def __init__(
self,
run_graph: RunGraph,
bout_run_setup: BoutRunSetup,
name: Optional[str] = None,
waiting_for: Optional[Union[str, Iterable[str]]] = None,
):
"""
Set the member data.
If you want to connect nodes to this RunGroup after creation, you can use
RunGraph.add_function_node
Parameters
----------
run_graph : RunGraph
The RunGraph which the RunGroup will be attached to
bout_run_setup : BoutRunSetup
The setup of the BOUT++ run
name : None or str
Name of the RunGroup
If None, the class counter will be used
waiting_for : None or str or iterable
Name of nodes the name_of_waiting_node will wait for
"""
logging.info("Start: Making a RunGroup object")
self.__run_graph = run_graph
self.__name = name
self.__bout_run_setup = bout_run_setup
self.__dst_dir = self.__bout_run_setup.bout_paths.bout_inp_dst_dir
self.__pre_processors: List[str] = list()
self.__post_processors: List[str] = list()
if self.__name is None:
self.__name = str(RunGroup.__counter)
RunGroup.__counter += 1
if self.__name in RunGroup.__names:
self.__increment_name()
RunGroup.__names.append(self.__name)
# Assign a node to bout_run_setup
self.__bout_run_node_name = f"bout_run_{self.__name}"
if isinstance(self.__bout_run_setup.submitter, AbstractClusterSubmitter):
self.__bout_run_setup.submitter.job_name = self.__bout_run_node_name
self.__run_graph.add_bout_run_node(
self.bout_run_node_name, self.__bout_run_setup
)
# Add edges to the nodes
self.__run_graph.add_waiting_for(self.bout_run_node_name, waiting_for)
logging.info("Done: Making a RunGroup object")
def __increment_name(self) -> None:
"""Increment the name of the RunGroup."""
old_name = self.__name if self.__name is not None else ""
pattern = old_name + r"_(\d)+$"
numbers = list()
for name in RunGroup.__names:
match = re.search(pattern, name)
if match is not None:
# NOTE: THe zeroth group is the matching string
numbers.append(int(match.group(1)))
if len(numbers) == 0:
self.__name = f"{old_name}_1"
else:
self.__name = f"{old_name}_{max(numbers) + 1}"
logging.warning(
"%s is already registered as a RunGroup name. Changing the name to %s",
old_name,
self.__name,
)
@property
def bout_run_node_name(self) -> str:
"""
Return the name of the BOUT++ run node.
Returns
-------
str
The name of the BOUT++ run node
"""
return self.__bout_run_node_name
@property
def bout_paths(self) -> BoutPaths:
"""
Return the BoutPaths.
Returns
-------
BoutPaths
The BoutPaths
"""
return self.__bout_run_setup.bout_paths
@property
def db_connector(self) -> DatabaseConnector:
"""
Return the DatabaseConnector.
Returns
-------
BoutPaths
The DatabaseConnector
"""
return self.__bout_run_setup.db_connector
@property
def run_graph(self) -> RunGraph:
"""
Return the run graph.
Returns
-------
RunGraph
The run graph
"""
return self.__run_graph
@property
def pre_processors(self) -> Tuple[str, ...]:
"""
Return the pre_processors.
Returns
-------
tuple
The tuple of pre_processors
"""
return tuple(self.__pre_processors)
@property
def post_processors(self) -> Tuple[str, ...]:
"""
Return the post_processors.
Returns
-------
tuple
The tuple of post_processors
"""
return tuple(self.__post_processors)
[docs] def add_pre_processor(
self,
function_dict: Dict[
str, Optional[Union[Callable, Tuple[Any, ...], Dict[str, Any]]]
],
directory: Optional[Path] = None,
submitter: Optional[AbstractSubmitter] = None,
waiting_for: Optional[Union[str, Iterable[str]]] = None,
) -> str:
"""
Add a pre-processor to the BOUT++ run.
The function and the parameters will be saved to a python script which will
be submitted
Parameters
----------
function_dict : dict
Dict with the function to call
On the form
>>> {'function': callable,
... 'args': None or tuple,
... 'kwargs': None or dict}
directory : None or Path
Absolute path to directory to store the python script
If None, the destination directory of BoutRun will be used
submitter : AbstractSubmitter
Submitter to submit the function with
If None, the default LocalSubmitter will be used
waiting_for : None or str or iterable
Name of nodes this node will wait for to finish before executing
Returns
-------
pre_processor_node_name : str
The node name of the pre-processor
Raises
------
ValueError
If the function in the function_dict is not callable
"""
if directory is None:
directory = self.__dst_dir
if "function" not in function_dict.keys() or not callable(
function_dict["function"]
):
msg = 'function_dict["function"] must be callable'
logging.error(msg)
raise ValueError(msg)
pre_processor_node_name = (
f"pre_processor_{self.__name}_{len(self.__pre_processors)}"
)
path = directory.joinpath(
f"{function_dict['function'].__name__}_{pre_processor_node_name}.py"
)
if submitter is None:
submitter = get_submitter()
if isinstance(submitter, AbstractClusterSubmitter):
submitter.job_name = pre_processor_node_name
submitter.store_dir = self.__bout_run_setup.bout_paths.bout_inp_dst_dir
self.__run_graph.add_function_node(
pre_processor_node_name,
function_dict=function_dict,
path=path,
submitter=submitter,
)
self.__run_graph.add_edge(pre_processor_node_name, self.bout_run_node_name)
self.__run_graph.add_waiting_for(pre_processor_node_name, waiting_for)
self.__pre_processors.append(pre_processor_node_name)
return pre_processor_node_name
[docs] def add_post_processor(
self,
function_dict: Dict[
str, Optional[Union[Callable, Tuple[Any, ...], Dict[str, Any]]]
],
directory: Optional[Path] = None,
submitter: Optional[AbstractSubmitter] = None,
waiting_for: Optional[Union[str, Iterable[str]]] = None,
) -> str:
"""
Add a post-processor to the BOUT++ run.
The function and the parameters will be saved to a python script which will
be submitted
Parameters
----------
function_dict : dict
Dict with the function to call
On the form
>>> {'function': callable,
... 'args': None or tuple,
... 'kwargs': None or dict}
directory : None or Path
Absolute path to directory to store the python script
If None, the destination directory of BoutRun will be used
waiting_for : None or str or iterable
Name of nodes this node will wait for to finish before executing
submitter : None or AbstractSubmitter
Submitter to submit the function with
If None, the default LocalSubmitter will be used
Returns
-------
post_processor_node_name : str
The node name of the pre-processor
Raises
------
ValueError
If the function in the function_dict is not callable
"""
if directory is None:
directory = self.__dst_dir
if "function" not in function_dict.keys() or not callable(
function_dict["function"]
):
msg = 'function_dict["function"] must be callable'
logging.error(msg)
raise ValueError(msg)
post_processor_node_name = (
f"post_processor_{self.__name}_{len(self.__post_processors)}"
)
path = directory.joinpath(
f"{function_dict['function'].__name__}_{post_processor_node_name}.py"
)
if submitter is None:
submitter = get_submitter()
if isinstance(submitter, AbstractClusterSubmitter):
submitter.job_name = post_processor_node_name
submitter.store_dir = self.__bout_run_setup.bout_paths.bout_inp_dst_dir
self.__run_graph.add_function_node(
post_processor_node_name,
function_dict=function_dict,
path=path,
submitter=submitter,
)
self.__run_graph.add_edge(self.bout_run_node_name, post_processor_node_name)
self.__run_graph.add_waiting_for(post_processor_node_name, waiting_for)
self.__post_processors.append(post_processor_node_name)
return post_processor_node_name