Source code for bout_runners.metadata.metadata_reader

"""Module containing the MetadataReader class."""


import logging
import re
from typing import Callable, Dict, List, Optional, Sequence, Tuple

from pandas import DataFrame

from bout_runners.database.database_connector import DatabaseConnector
from bout_runners.database.database_reader import DatabaseReader


[docs]def drop_ids(func: Callable) -> Callable: """ Return a function which remove excessive ids. Parameters ---------- func : function A function returning a DataFrame Returns ------- drop : function The function dropping the ids """ def drop(self, *args, **kwargs) -> DataFrame: """ Drop columns inplace. Parameters ---------- self : object Self reference to the instance the function is belonging to Must contain self.drop_id args : tuple Arguments belonging to the input function kwargs : dict Keyword arguments to the input function Returns ------- data_frame : DataFrame The DataFrame where the ids has been dropped """ data_frame = func(self, *args, **kwargs) columns = tuple(data_frame.columns) drop_columns = list() if self.drop_id == "parameters": drop_columns = [col for col in columns if col.startswith("parameters.")] elif self.drop_id == "keep_run_id": drop_columns = [ col for col in columns if (col.endswith(".id") and not col == "run.id") or col.endswith("_id") ] elif self.drop_id == "all_id": drop_columns = [ col for col in columns if col.endswith(".id") or col.endswith("_id") ] if self.drop_id is not None: data_frame.drop(drop_columns, axis=1, inplace=True) return data_frame return drop
[docs]class MetadataReader: r""" Class for reading the metadata from the database. Attributes ---------- __db_reader : DatabaseConnector The connection to the database __table_names : tuple Getter variable for table_names __table_column_dict : dict of tuple Getter variable for table_column_dict __table_connections : dict of tuple Getter variable for table_connections __sorted_columns : tuple Getter variable for sorted_columns table_names : tuple A tuple containing all names of the tables table_column_dict : dict of tuple A dict where the keys are table names, and the values are corresponding column names table_connections : dict of tuple A dict where the keys are tables, and the values are tuples of tables connected to the key table sorted_columns : tuple A tuple of the column names as they will be sorted in the all_metadata DataFrame date_columns : tuple Columns containing dates drop_id : None or str Specifies what id columns should be dropped when obtaining the metadata Methods ------- get_all_metadata() Return all of the run metadata get_parameters_metadata() Return only the parameter part of the run metadata get_join_query(from_statement, columns, alias_columns, table_connections) Return the query string of a `SELECT` query with `INNER JOIN` __get_parameters_query() Return the parameters query string __get_sorted_columns() Return all columns sorted __get_table_connections() Return a dict containing the table connections __get_all_table_names() Return all the table names in the schema __get_table_column_dict() Return all the column names of the specified tables Examples -------- >>> from pathlib import Path >>> from bout_runners.database.database_connector import DatabaseConnector >>> db_connector = DatabaseConnector('test', Path()) >>> metadata_reader = MetadataReader(db_connector) >>> metadata_reader.get_parameters_metadata() bar.id bar.foo ... parameters.baz_id parameters.foo_id 0 1 1 ... 1 1 1 2 10 ... 1 2 2 2 10 ... 1 1 [3 rows x 16 columns] >>> metadata_reader.get_all_metadata() run.id ... system_info.version 0 1 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 1 2 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 2 3 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 3 4 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 4 5 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 5 6 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 6 7 ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 [7 rows x 43 columns] >>> metadata_reader.drop_id = 'all_id' >>> metadata_reader.get_all_metadata() run.latest_status ... system_info.version 0 complete ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 1 complete ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 2 complete ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 3 complete ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 4 error ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 5 running ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 6 submitted ... #1 SMP Thu Oct 17 19:31:58 UTC 2019 [7 rows x 28 columns] """ date_columns = ( "run.start_time", "run.stop_time", "run.submitted_time", "file_modification.bout_lib_modified", "file_modification.project_executable_modified", "file_modification.project_makefile_modified", ) def __init__( self, db_connector: DatabaseConnector, drop_id: Optional[str] = "keep_run_id", ) -> None: """ Set the database to use. Parameters ---------- db_connector : DatabaseConnector The connection to the database drop_id : None or str Specifies what id columns should be dropped when obtaining the metadata - None : No columns will be dropped - 'parameters' : All columns containing parameters ids will be dropped - 'keep_run_id' : Only the run.id of the id columns will be kept - 'all_id' : All id columns will be removed """ self.drop_id = drop_id self.__db_reader = DatabaseReader(db_connector) self.__table_names = self.__get_all_table_names() self.__table_column_dict = self.__get_table_column_dict() self.__table_connections = self.__get_table_connections() self.__sorted_columns = self.__get_sorted_columns() parameters_connections = {"parameters": self.__table_connections["parameters"]} parameters_tables = ("parameters", *parameters_connections["parameters"]) self.__parameters_columns = tuple( str(col) for col in self.__sorted_columns if col.split(".")[0] in parameters_tables ) @property def table_names(self) -> Tuple[str, ...]: """ Set the properties of self.table_names. Returns ------- self.__table_names : tuple A tuple containing all names of the tables """ return self.__table_names @property def table_column_dict( self, ) -> Dict[str, Tuple[str, ...]]: """ Set the properties of self.table_column_dict. Returns ------- self.__table_column_dict : dict of tuple A dict where the keys are table names, and the values are corresponding column names """ return self.__table_column_dict @property def table_connection( self, ) -> Dict[str, Tuple[str, ...]]: """ Set the properties of self.table_connections. Returns ------- self.__table_connections : dict of tuple A dict where the keys are tables, and the values are tuples of tables connected to the key table """ return self.__table_connections @property def sorted_columns(self) -> Tuple[str, ...]: """ Set the properties of self.sorted_columns. Returns ------- self.__sorted_columns : tuple A tuple of the column names as they will be sorted in the all_metadata DataFrame """ return self.__sorted_columns
[docs] @drop_ids def get_all_metadata(self): """ Return all of the run metadata. Returns ------- DataFrame The DataFrame of the run metadata """ parameters_query = self.__get_parameters_query() # Adding spaces and parenthesis parameter_sub_query = "\n".join( [f'{" " * 6}{line}' for line in parameters_query.split("\n")] ) parameter_sub_query = ( f"{parameter_sub_query[:5]}({parameter_sub_query[6:-1]}) " f"AS subquery" ) # NOTE: The subquery names are the names of the columns after # the query. We would like to rename them to # sorted_columns. Hence the `columns` field and # `alias_columns` field appears swapped subquery_columns = [ f'subquery."{col}"' if col in self.__parameters_columns else col for col in self.sorted_columns ] # Remove the parameters from the table_connection to avoid # double joining table_connections = self.__table_connections.copy() table_connections.pop("parameters") unfinished_all_metadata_query = self.get_join_query( "run", subquery_columns, self.sorted_columns, table_connections ) # Update the parameters columns all_metadata_query = unfinished_all_metadata_query.replace( " parameters ", f"\n{parameter_sub_query}\n" ).replace("= parameters.id", '= subquery."parameters.id"') return self.__db_reader.query(all_metadata_query, parse_dates=self.date_columns)
[docs] @drop_ids def get_parameters_metadata(self): """ Return only the parameter part of the run metadata. Returns ------- DataFrame The DataFrame of the parameter metadata """ parameters_query = self.__get_parameters_query() return self.__db_reader.query(parameters_query)
[docs] @staticmethod def get_join_query( from_statement: str, columns: Sequence[str], alias_columns: Sequence[str], table_connections: Dict[str, Tuple[str, ...]], ) -> str: """ Return the query string of a `SELECT` query with `INNER JOIN`. Notes ----- The tables in `table_connection` is assumed to be joined by `id`s. I.e. `table_a` is connected to `table_b` by `table_b` having a column named `table_a_id` which corresponds to the `id` column of `table_a` Parameters ---------- from_statement : str The statement after the `FROM` keyword in the query I.e. >>> f'SELECT * FROM {from_statement}' columns : array_like The columns to select from the tables I.e. >>> f'SELECT {columns} FROM *' alias_columns : array_like The name of the columns in the resulting table I.e. >>> f'SELECT {columns[0]} AS {alias_columns[0]} FROM *' table_connections : dict A dict where the keys are the table names, and the values are tuples containing table names connected to the key table as described in the note above Returns ------- query : str The SQL-string which can be used to query where table in databases are joined through `INNER JOIN` operations """ query = "SELECT\n" for column, alias in zip(columns, alias_columns): query += f'{" " * 7}{column} AS "{alias}",\n' # Remove last comma query = f"{query[:-2]}\n" query += f"FROM {from_statement}\n" for left_table in table_connections.keys(): for right_table in table_connections[left_table]: query += ( f'{" " * 4}INNER JOIN {right_table} ON ' f"{left_table}." f"{right_table}_id = {right_table}.id\n" ) return query
def __get_parameters_query(self) -> str: """ Return the parameters query string. Returns ------- parameters_query : str The SQL-string which can be used to query where table in databases are joined through `INNER JOIN` operations """ parameter_connections = {"parameters": self.__table_connections["parameters"]} parameters_query = self.get_join_query( "parameters", self.__parameters_columns, self.__parameters_columns, parameter_connections, ) return parameters_query def __get_sorted_columns(self) -> Tuple[str, ...]: """ Return all columns sorted. The columns will be sorted alphabetically first by table name, then alphabetically by column name, with the following exceptions: 1. The columns from the run table is presented first 2. The id column is the first column in the table Returns ------- tuple Dict containing the column names On the form >>> ('run.id', ... 'run.column_name_1', ... 'run.column_name_2', ... ... ... 'table_name_1.column_name_1', ... 'table_name_1.column_name_2', ...) """ sorted_columns: List[str] = list() table_names = sorted(self.table_column_dict.keys()) table_names.pop(table_names.index("run")) table_names.insert(0, "run") for table_name in table_names: table_columns = list() for column_name in sorted(self.table_column_dict[table_name]): table_columns.append(f"{table_name}.{column_name}") table_columns.pop(table_columns.index(f"{table_name}.id")) table_columns.insert(0, f"{table_name}.id") sorted_columns = [*sorted_columns, *table_columns] return tuple(sorted_columns) def __get_table_connections(self) -> Dict[str, Tuple[str, ...]]: """ Return a dict containing the table connections. Returns ------- table_connection_dict : dict A dict telling which tables are connected to each other, where the key is the table under consideration and the value is a tuple containing the tables which have a key connection to the table under consideration On the form >>> {'table_1': ('table_2', 'table_3'), ... 'table_4': ('table_5',), ...} Raises ------ RuntimeError If match is None """ table_connection_dict = dict() pattern = re.compile("(.*)_id") for table, columns in self.table_column_dict.items(): ids: List[str] = list() for column in columns: if "_id" in column: match = pattern.match(column) if match is None: msg = f"match is None for '(.*)_id' for input '{column}'" logging.critical(msg) raise RuntimeError(msg) ids.append(match[1]) if len(ids) > 0: table_connection_dict[table] = tuple(ids) return table_connection_dict def __get_all_table_names(self) -> Tuple[str, ...]: """ Return all the table names in the schema. Returns ------- tuple A tuple containing all names of the tables """ query = ( "SELECT name FROM sqlite_master\n" "WHERE\n" " type ='table' AND\n" " name NOT LIKE 'sqlite_%'" ) # pylint: disable=no-member return tuple(self.__db_reader.query(query).loc[:, "name"]) def __get_table_column_dict(self) -> Dict[str, Tuple[str, ...]]: """ Return all the column names of the specified tables. Returns ------- table_column_dict : dict of tuple Dict containing the column names On the form >>> {'table_1': ('table_1_column_1', ...), ... 'table_2': ('table_2_column_1', ...), ...} """ table_column_dict = dict() query = "SELECT name FROM pragma_table_info('{}')" for table_name in self.table_names: # pylint: disable=no-member table_column_dict[table_name] = tuple( self.__db_reader.query(query.format(table_name)).loc[:, "name"] ) return table_column_dict