Source code for dasi.design.optimize

from typing import Dict
from typing import Hashable
from typing import List
from typing import Tuple

import networkx as nx
import numpy as np

from dasi.exceptions import DasiDesignException
from dasi.exceptions import DASiWarning
from dasi.models import AssemblyNode
from dasi.utils import sort_with_keys
from dasi.utils.networkx import sympy_floyd_warshall
from dasi.utils.networkx import sympy_multipoint_shortest_path
from warnings import warn

# definition of how to compute path length
# c = SUM(m) / PRODUCT(e), where m and e are arrays of attributes 'material'
# and 'efficiency' for a given path
path_length_config = {
    "f": "material / efficiency ",
    "accumulators": {"efficiency": "product"},
}


[docs]def _check_paths(paths: List[List[tuple]]) -> None: """Validates a path to check for duplicate nodes. :param paths: :return: """ invalid_paths = [] for path in paths: lastseen = path[0][2] for p in path[1:]: if p[2] == lastseen: invalid_paths.append(path) break lastseen = p[2] if invalid_paths: raise DasiDesignException( "There are {} invalid paths:\n{}\n...{} more".format( len(invalid_paths), "\n".join([str(x) for x in invalid_paths[:5]]), max(len(invalid_paths) - 5, 0), ) )
def argmin(w): index = np.argsort(w.ravel()) return np.unravel_index(index, w.shape) def fill_diag(w, fillval): iden = np.identity(len(w)) w[np.where(iden == 1)] = fillval return w def fill_diag_inf(w): return fill_diag(w, np.inf)
[docs]def index_slice(indices, arr): """From tuple of indices, return zipped items.""" unzipped = [] for i in indices: unzipped.append([arr[_i] for _i in i]) return list(zip(*unzipped))
# TODO: is this necessary? def only_ab(w, nodelist): b_array = np.array([n.type == "B" for n in nodelist]).reshape(1, -1) w[np.where(~np.logical_xor(b_array, b_array.T))] = np.inf return w def only_long(w: np.ndarray, query_length: int, nodelist: List[Hashable]): # filter array based on nodes that span longer than the query index = np.array([n.index for n in nodelist]).reshape(1, -1) length = index - index.T w[np.where(length < query_length)] = np.inf return w def only_short(w: np.ndarray, query_length: int, nodelist: List[Hashable]): # filter array based on nodes that span longer than the query index = np.array([n.index for n in nodelist]).reshape(1, -1) length = index - index.T w[np.where(length >= query_length)] = np.inf return w
[docs]def _multinode_to_shortest_path( graph, nodes, cyclic, return_length=False ) -> List[tuple]: """Estimate the shortest path that touches the specified nodes.""" path_length, path = sympy_multipoint_shortest_path( graph, nodes, f=path_length_config["f"], accumulators=path_length_config["accumulators"], cyclic=cyclic, ) if return_length: return path, path_length return path
[docs]def _nodes_to_fullpaths( graph: nx.DiGraph, cycle_endpoints: Tuple, cyclic: bool, n_paths=None ) -> List[List[Tuple]]: """Recover full paths from cycle endpoints. :param graph: :param cycle_endpoints: :param n_paths: :return: """ unique_cyclic_paths = [] for nodes in cycle_endpoints: if n_paths is not None and len(unique_cyclic_paths) >= n_paths: break try: path = _multinode_to_shortest_path(graph, nodes, cyclic) if path not in unique_cyclic_paths: unique_cyclic_paths.append(path) except nx.NetworkXNoPath: pass return unique_cyclic_paths
def _get_closing_edge_indices( nodelist: List[AssemblyNode], query_length: int, matrix_dict ) -> Tuple[np.ndarray, np.ndarray]: node_to_i = {n: i for i, n in enumerate(nodelist)} src, dest = [], [] # closing edge indices for n in nodelist: i1 = node_to_i[n] n2 = AssemblyNode(n.index + query_length, *list(n)[1:]) if n2 in node_to_i: i2 = node_to_i[n2] v = matrix_dict["material"][i1, i2] if not np.isinf(v): src.append(i1) dest.append(i2) index = (np.array(src), np.array(dest)) return index def _get_closure_matrix( mat: np.ndarray, eff: np.ndarray, nodelist: List[AssemblyNode], query_length: int, matrix_dict: dict, ) -> Tuple[np.ndarray, np.ndarray]: indices = _get_closing_edge_indices(nodelist, query_length, matrix_dict) m = np.full_like(mat, np.inf) e = np.full_like(eff, 0.0) if len(indices[0]): m[indices] = mat[indices] e[indices] = eff[indices] return m, e def cyclic_matrix( matrix_dict: Dict[str, np.matrix], nodelist: List[AssemblyNode], query_length: int ) -> np.ndarray: m1 = np.array(matrix_dict["material"]) e1 = np.array(matrix_dict["efficiency"]) m2, e2 = _get_closure_matrix(m1, e1, nodelist, query_length, matrix_dict) m = m1 + m2.min(1) e = e1 * e2.max(1) w = np.divide(m, e) fill_diag_inf(w) return w def optimize_graph( graph: nx.DiGraph, query_length: int, cyclic: bool, n_paths: int ) -> Tuple[List[List[tuple]], List[float]]: # get ordered nodelist and nodekeys nodelist, nodekeys = sort_with_keys(list(graph.nodes()), key=lambda x: x[0]) # 2D matrix of 'efficiency' and 'material' costs weight_matrix, matrix_dict, ori_matrix_dict = sympy_floyd_warshall( graph, f=path_length_config["f"], accumulators=path_length_config["accumulators"], nodelist=nodelist, dtype=np.float64, return_all=True, ) matrix_dict = {k: np.array(v) for k, v in matrix_dict.items()} if cyclic: weight_matrix = cyclic_matrix(matrix_dict, nodelist, query_length) else: raise NotImplementedError("Linear assemblies not yet implemented.") min_index = tuple([i[:n_paths] for i in argmin(weight_matrix)]) costs = weight_matrix[min_index] costs = [c for c in costs if c != np.inf] a_nodes = [nodelist[i] for i in min_index[0]] b_nodes = [nodelist[i] for i in min_index[1]] nodes = list(zip(a_nodes, b_nodes)) nodes_and_costs = list(zip(nodes, costs)) if nodes_and_costs: trimmed_nodes, trimmed_costs = zip(*nodes_and_costs) else: trimmed_nodes, trimmed_costs = [], [] paths = _nodes_to_fullpaths(graph, trimmed_nodes, cyclic, n_paths=n_paths) if len(paths) < n_paths: warn("Number of paths returned is less than requested paths {} < {}".format( len(paths), n_paths), DASiWarning) _check_paths(paths) return paths, trimmed_costs