Source code for bout_runners.runner.run_graph

"""Contains the RunGraph class."""


import logging
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union

import networkx as nx

from bout_runners.parameters.bout_run_setup import BoutRunSetup
from bout_runners.submitter.abstract_submitter import AbstractSubmitter
from bout_runners.submitter.local_submitter import LocalSubmitter


[docs]class RunGraph: """ A directed acyclic graph where the nodes contains instructions for execution. Attributes ---------- __graph : nx.DiGraph The run graph __node_set : set The set of nodes belonging to the graph nodes : nx.classes.reportviews.NodeView Return the nodes Methods ------- __iter__() Make the class iterable __next__() Return the next order nodes from graph (ordered by the breadth) __len__() Return the number of nodes with status ready __getitem__(nodename) Return the content of a node get_node_orders(reverse) Return nodes sorted after order predecessors(node_name) Return the predecessors of the node successors(node_name) Return the successors of the node reset() Reset the nodes by setting the status to 'ready' add_bout_run_node(name, bout_run_setup) Add a node where the setup of a BOUT++ run is attached add_function_node(name, function_dict=None, path=None, submitter=None) Add a node with an optionally attached callable to the graph add_edge(start_node, end_node) Connect two nodes through an directed edge remove_edge(start_node, end_node) Remove edge between two nodes add_waiting_for(nodes_to_wait_for, name_of_waiting_node) Make a node wait for the completion of one or more nodes get_waiting_for_tuple(start_node_name) Return the list of nodes waiting for a given node change_status_node_and_dependencies(start_node_name, status="errored") Remove node and all nodes waiting for the specified node get_dot_string() Return the graph as a string i the dot format Examples -------- The RunGraph contains the graph executed by BoutRunners >>> bout_run_setup = BoutRunSetup(executor, db_connector, final_parameters) >>> run_graph = RunGraph() >>> # Attach a RunGroup to the run_graph >>> _ = RunGroup(run_graph, bout_run_setup) >>> runner = BoutRunner(run_graph) >>> runner.run() See Also -------- RunGroup : Class for building a run group """ def __init__(self) -> None: """Instantiate the graph.""" logging.info("Start: Making a RunGraph object") self.__graph = nx.DiGraph() self.__node_set = set(self.__graph.nodes) logging.info("Done: Making a RunGraph object") # Loop variables self.__node_orders: Optional[Tuple[Tuple[str, ...], ...]] = None self.__index = -1 def __iter__(self) -> "RunGraph": """ Make the class iterable. Returns ------- self : RunGraph The class as an iterable """ return self def __next__(self) -> Tuple[str, ...]: """ Return the next order nodes from graph (ordered by the breadth). Raises ------ StopIteration When the iteration is exhausted Returns ------- order : tuple of str A tuple consisting of the current order """ if self.__node_orders is None: self.__node_orders = self.get_node_orders() self.__index += 1 if self.__index >= len(self.__node_orders): self.__index = -1 self.__node_orders = None raise StopIteration return self.__node_orders[self.__index] def __len__(self) -> int: """ Return the number of nodes with status ready. Returns ------- length : len Number of nodes with status ready. """ length = 0 for node_name in self.__graph: if self.__graph.nodes[node_name]["status"] == "ready": length += 1 return length def __getitem__(self, node_name: str) -> Dict[str, Any]: """ Return the content of a node. Parameters ---------- node_name : str The name of the node Returns ------- dict The node content """ # It seems like this is producing a false positive return self.nodes[node_name] # type: ignore @property def nodes(self) -> nx.classes.reportviews.NodeView: """Return the nodes.""" # NOTE: The set of nodes only contain the name of the nodes, not their # attributes return self.__graph.nodes
[docs] def get_node_orders(self, reverse: bool = False) -> Tuple[Tuple[str, ...], ...]: """ Return nodes sorted after order. One order is considered as the nodes without any in edges To find the next order remove the first order from the graph and repeat the first step Warnings -------- As we are counting an order from the nodes with no in edges the result of >>> self.get_node_orders(reverse=True) != self.get_node_orders()[::-1] Parameters ---------- reverse : bool Whether or not to reverse the graph before finding the orders Returns ------- orders : tuple of tuple of str A tuple of tuple where the innermost tuple constitutes an order """ graph_copy = self.__graph.copy() if reverse: # NOTE: Not possible to make a deepcopy as some of the attributes # of the nodes are not pickable (example sqlite3.connection) # Thus we set copy=False # https://networkx.org/documentation/stable/reference/classes/generated/networkx.DiGraph.reverse.html graph_copy = graph_copy.reverse(copy=False) # NOTE: As copy=False, the resulting graph is frozen # To unfreeze we make a new object # https://networkx.org/documentation/stable/reference/generated/networkx.classes.function.freeze.html graph_copy = nx.DiGraph(graph_copy) orders = list() while len(graph_copy.nodes) != 0: current_roots = tuple( node for node, degree in graph_copy.in_degree() if degree == 0 ) orders.append(tuple(current_roots)) graph_copy.remove_nodes_from(current_roots) return tuple(orders)
[docs] def predecessors(self, node_name: str) -> Tuple[str, ...]: """ Return the predecessors of the node. Parameters ---------- node_name : str Name of the node to get the predecessors from Returns ------- predecessor_names : tuple of str Names of predecessors """ # NOTE: The set of nodes only contain the name of the nodes, not their # attributes return tuple(self.__graph.predecessors(node_name))
[docs] def successors(self, node_name: str) -> Tuple[str, ...]: """ Return the successors of the node. Parameters ---------- node_name : str Name of the node to get the predecessors from Returns ------- successors_names : tuple of str Names of predecessors """ # NOTE: The set of nodes only contain the name of the nodes, not their # attributes return tuple(self.__graph.successors(node_name))
[docs] def reset(self) -> None: """Reset the nodes by setting status to 'ready' and calling node.reset().""" logging.debug("Resetting the nodes") for node_name in self.__graph: self.__graph.nodes[node_name]["status"] = "ready" self.__graph.nodes[node_name]["submitter"].reset()
[docs] def add_bout_run_node( self, name: str, bout_run_setup: BoutRunSetup, ) -> None: """ Add a node where the setup of a BOUT++ run is attached. Parameters ---------- name : str Name of the node bout_run_setup : BoutRunSetup The setup of the BOUT++ run Raises ------ ValueError If the node is already present in the graph """ if name in self.__node_set: raise ValueError(f"'{name}' is already present in the graph") self.__graph.add_node( name, bout_run_setup=bout_run_setup, submitter=bout_run_setup.submitter, status="ready", ) self.__node_set = set(self.__graph.nodes)
[docs] def add_function_node( self, name: str, function_dict: Optional[ Dict[str, Optional[Union[Callable, Tuple[Any, ...], Dict[str, Any]]]] ] = None, path: Optional[Path] = None, submitter: Optional[AbstractSubmitter] = None, ) -> None: """ Add a node with an optionally attached callable to the graph. Parameters ---------- name : str Name of the node function_dict : None or dict Dict with the function to call On the form >>> {'function': None or callable, ... 'args': None or tuple, ... 'kwargs': None or dict} path : None or Path Absolute path to store the python file which holds the function and its arguments submitter : AbstractSubmitter Submitter to submit the function with If None, the default LocalSubmitter will be used Raises ------ ValueError If the node is already present in the graph """ if name in self.__node_set: raise ValueError(f"'{name}' is already present in the graph") if function_dict is None: function_dict = {"function": None, "args": None, "kwargs": None} submitter = submitter if submitter is not None else LocalSubmitter() logging.debug( "Adding node=%s with function_dict=%s, path=%s and submitter=%s", name, function_dict, path, submitter, ) self.__graph.add_node( name, function=function_dict["function"], args=function_dict["args"], kwargs=function_dict["kwargs"], path=path, submitter=submitter, status="ready", ) self.__node_set = set(self.__graph.nodes)
[docs] def add_edge(self, start_node: str, end_node: str) -> None: """ Connect two nodes through an directed edge. Parameters ---------- start_node : str Name of the start node end_node : str Name of the end node Raises ------ ValueError If the graph after adding the nodes becomes cyclic """ self.__graph.add_edge(start_node, end_node) logging.debug("Adding edge from %s to %s", start_node, end_node) if not nx.is_directed_acyclic_graph(self.__graph): raise ValueError( f"The node connection from {start_node} to {end_node} " f"resulted in a cyclic graph" )
[docs] def remove_edge(self, start_node: str, end_node: str) -> None: """ Remove edge between two nodes. Parameters ---------- start_node : str Name of the start node end_node : str Name of the end node Raises ------ ValueError If the graph after adding the nodes becomes cyclic """ self.__graph.remove_edge(start_node, end_node) logging.debug("Removing edge from %s to %s", start_node, end_node)
[docs] def add_waiting_for( self, name_of_waiting_node: str, nodes_to_wait_for: Optional[Union[str, Iterable[str]]], ) -> None: """ Make a node wait for the completion of one or more nodes. In other words we will let one or more nodes point to name_of_waiting_node. Parameters ---------- name_of_waiting_node : str Name of the node which will wait for the node(s) in waiting_for to finish nodes_to_wait_for : str or iterable Name of nodes the name_of_waiting_node will wait for """ if nodes_to_wait_for is not None: if hasattr(nodes_to_wait_for, "__iter__") and not isinstance( nodes_to_wait_for, str ): for waiting_for_node in nodes_to_wait_for: self.add_edge(waiting_for_node, name_of_waiting_node) elif isinstance(nodes_to_wait_for, str): self.add_edge(nodes_to_wait_for, name_of_waiting_node)
[docs] def get_waiting_for_tuple(self, start_node_name) -> Tuple[str, ...]: """ Return the list of nodes waiting for a given node. Parameters ---------- start_node_name : str Name of the node other nodes are waiting for Returns ------- tuple Tuple of the nodes which are waiting for the given node """ return tuple(nx.dfs_tree(self.__graph, start_node_name))
[docs] def change_status_node_and_dependencies( self, start_node_name, status: str = "errored" ) -> None: """ Remove node and all nodes waiting for the specified node. Parameters ---------- start_node_name : str Name of the node to remove All nodes waiting for start_node_name will be removed status : str Status to set on start_node_name and all its dependencies """ nodes_to_remove = self.get_waiting_for_tuple(start_node_name) for node_name in nodes_to_remove: self.__graph.nodes[node_name]["status"] = status logging.debug("Changed status of %s to %s", node_name, status)
[docs] def get_dot_string(self) -> str: """ Return the graph as a string i the dot format. This can be visualized through GraphViz or online at http://www.webgraphviz.com/ Returns ------- str The graph written in the dot format """ return str(nx.nx_pydot.to_pydot(self.__graph))