Source code for dasi.design.library_design

from copy import deepcopy
from typing import Dict
from typing import List
from typing import Tuple

import networkx as nx

from dasi.constants import Constants
from dasi.design import DesignResult
from dasi.design.design import DesignABC
from dasi.design.graph_builder import AssemblyGraphPostProcessor
from dasi.models import Alignment
from dasi.models import AlignmentContainer
from dasi.models import AlignmentGroup
from dasi.utils import group_by
from dasi.utils import log_metadata
from dasi.utils import sort_with_keys


def overlapping_groups(group_list_a, group_list_b):
    """Get all groups in group_list_b that right-hand overlap with
    group_list_a."""
    group_sort, group_keys = sort_with_keys(
        group_list_b, key=lambda x: x.query_region.a
    )
    tuples = []
    for group_a in group_list_a:
        overlapping = AlignmentContainer.filter_alignments_by_span(
            group_sort,
            group_a.query_region,
            key=lambda p: p.query_region.a,
            end_inclusive=False,
        )

        if group_a in overlapping:
            overlapping.remove(group_a)
        tuples.append((group_a, overlapping))
    return tuples


def add_clusters(design):
    # list of all alignment groups
    all_groups = []
    for container in design.container_factory.containers().values():
        all_groups += container.get_groups_by_types(Constants.SHARED_FRAGMENT)

    # alignment_groups grouped by query_key
    grouped_by_qk = {}
    for g in all_groups:
        grouped_by_qk.setdefault(g.query_key, list())
        grouped_by_qk[g.query_key].append(g)

    # overlapping_by_qk
    overlapping = []
    for qk, groups in list(grouped_by_qk.items())[:]:
        overlapping += overlapping_groups(groups, groups)

    new_alignments = []
    for group_a, group_list in overlapping:
        new = container.expand_overlaps(
            group_list + [group_a], include_left=False, atype=Constants.SHARED_FRAGMENT
        )
        new_alignments += new
    new_alignments = list({a.eq_hash(): a for a in new_alignments}.values())
    design.container_factory.add_alignments(new_alignments)


def to_undirected(graph):
    """.to_undirected is implemented in networkx out of the box, however, it
    suffers from occational infinite recursion errors during the deepcopy phase
    of the method (unknown as to why)."""
    undirected = nx.Graph()
    copied = deepcopy(graph)
    for n in copied.nodes:
        ndata = copied.nodes[n]
        undirected.add_node(n, **ndata)
    for n1, n2 in copied.edges:
        edata = copied.edges[n1, n2]
        undirected.add_edge(n1, n2, **edata)
    return undirected


def get_subgraphs(graph):
    """Get independent subgraphs."""
    node_list = list(graph.nodes)
    subgraphs = []
    while len(node_list) > 0:
        node = node_list[-1]
        subgraph = nx.bfs_tree(to_undirected(graph), node)
        for n in subgraph.nodes:
            node_list.remove(n)
        subgraphs.append(graph.subgraph(subgraph.nodes))
    return subgraphs


def has_repeats(g):
    """Check if the interaction graph has a repeated DNA sequence."""
    grouped_by_key = {}
    for n in g.nodes:
        grouped_by_key.setdefault(n[0], list())
        grouped_by_key[n[0]].append((n[1], n[2]))
    for k, v in grouped_by_key.items():
        if len(v) > 1:
            print(grouped_by_key)
            return True
    return False


def cluster_graph(design):
    interaction_graph = nx.Graph()
    all_groups = []
    for container in design.container_factory.containers().values():
        all_groups += container.get_groups_by_types(Constants.SHARED_FRAGMENT)
    for g in all_groups:
        for a in g.alignments:
            n1 = (a.query_key, a.query_region.a, a.query_region.b, a.query_region.c)
            n2 = (
                a.subject_key,
                a.subject_region.a,
                a.subject_region.b,
                a.subject_region.c,
            )
            edata = interaction_graph.get_edge_data(n1, n2)
            if edata:
                edata.setdefault("alignments", list())
                edata["alignments"].append(a)
            else:
                interaction_graph.add_edge(n1, n2, alignments=[a])
    graphs = get_subgraphs(interaction_graph)
    graphs = [g for g in graphs if not has_repeats(g)]
    graphs.sort(reverse=True, key=lambda x: x.number_of_nodes())
    return graphs


[docs]class LibraryDesign(DesignABC): """Design class for producing assemblies for libraries.""" # TODO: move to config FAVOR_SHARED_SEQUENCES = 2 DEFAULT_N_ASSEMBLIES = DesignABC.DEFAULT_N_ASSEMBLIES ALGORITHM = Constants.ALGORITHM_LIBRARY
[docs] def __init__(self, span_cost=None, seqdb=None): """Something.""" super().__init__(span_cost=span_cost, seqdb=seqdb) self.shared_alignments = [] self._edges = []
[docs] def _expand_from_synthesized(self): """Expand PCR products that are next to SYNTHESIZED_FRAGMENTS. :return: """ for query_key, container in self.container_factory.containers().items(): def is_not_shared(group_a: AlignmentGroup, group_b: AlignmentGroup): if group_a.type == Constants.SHARED_SYNTHESIZED_FRAGMENT: return False if group_b.type == Constants.SHARED_SYNTHESIZED_FRAGMENT: return True return False new_alignments = container.expand_overlaps( container.get_groups_by_types( [ Constants.FRAGMENT, Constants.PCR_PRODUCT, Constants.SHARED_SYNTHESIZED_FRAGMENT, ] ), Constants.PCR_PRODUCT, pass_condition=is_not_shared, ) container.add_alignments(new_alignments, lim_size=True) self.logger.info( "{}: Expanded {} using {} and found {} new alignments.".format( query_key, Constants.PCR_PRODUCT, Constants.SHARED_SYNTHESIZED_FRAGMENT, len(new_alignments), ) )
# # grab the pcr products and expand primer pairs (again) # templates = container.get_groups_by_types(Constants.PCR_PRODUCT) # new_primer_pairs = container.expand_primer_pairs(templates) # self.logger.info( # "{}: Expanded {} {} using {}".format( # query_key, # len(new_primer_pairs), # Constants.PCR_PRODUCT, # Constants.SYNTHESIZED_FRAGMENT, # ) # )
[docs] def _expand_synthesized_fragments(self): """ 1. copy groups from SHARED_FRAGMENTS to SYNTHESIZED_FRAGMENT 2. expand overlaps for SYNTHESIZED_FRAGMENTS """ for query_key, container in self.container_factory.containers().items(): # expand the share fragments using their own endpoints original_shared_fragments = container.get_groups_by_types( Constants.SHARED_FRAGMENT ) copied_alignments = container.copy_groups( original_shared_fragments, Constants.SHARED_SYNTHESIZED_FRAGMENT ) container.add_alignments(copied_alignments, lim_size=True) new_alignments = container.expand_overlaps( container.get_groups_by_types(Constants.SHARED_SYNTHESIZED_FRAGMENT), atype=Constants.SHARED_SYNTHESIZED_FRAGMENT, ) self.logger.info( "{}: Expanded {} new {} alignments.".format( query_key, len(new_alignments), Constants.SHARED_SYNTHESIZED_FRAGMENT, ) ) container.add_alignments(new_alignments, lim_size=True)
def _check_shared_repeats(self): repeats = [] for query_key, container in self.container_factory.containers().items(): for align in container.get_alignments_by_types(Constants.SHARED_FRAGMENT): qk = align.query_key sk = align.subject_key if qk == sk: repeats.append(align) assert not repeats
[docs] def _share_query_blast(self): """Find and use shared fragments across queries. :return: """ # step 1: get query-on-query alignments self.logger.info("=== Expanding shared library fragments ===") blast = self.blast_factory(self.QUERIES, self.QUERIES) blast.blastn() results = blast.get_perfect() # step 2: eliminate self binding results results = [ entry for entry in results if entry["query"]["origin_key"] != entry["subject"]["origin_key"] ] self.logger.info( "Found {} shared alignments between the queries".format(len(results)) ) # step 3: load results to the container self.shared_alignments = results self.container_factory.seqdb.update(blast.seq_db.records) self.container_factory.load_blast_json(results, Constants.SHARED_FRAGMENT)
def _precompile_library_expansion(self): self._share_query_blast() self._expand_synthesized_fragments() self._expand_from_synthesized() self._check_shared_repeats() def precompile(self): self.uncompile() tracker = self.logger.track( "INFO", desc="Precompiling library", total=5 ).enter() self.graphs = {} tracker.update(0, "Running blast") self._blast() tracker.update(1, "Running shared fragment blast") self._share_query_blast() # tracker.update(2, "Expanding shared fragments") # self._expand_synthesized_fragments() self._expand_from_synthesized() tracker.update(3, "Finding shared clusters") self.update_library_metadata() tracker.exit() def _adjust_shared_synthetic_fragment(self): for qk, graph in self.graphs.items(): for n1, n2, edata in graph.edges(data=True): if edata["type_def"].name == Constants.SHARED_SYNTHESIZED_FRAGMENT: group = edata["group"] # TODO: better way to make notes on graph edge if "notes" not in edata or edata["notes"] is None: edata["notes"] = {} # if "n_clusters" not in group.meta: # pass edata["notes"]["n_clusters"] = group.meta["n_clusters"] # TODO: adjust n_clusters edata["material"] = ( edata["material"] / (group.meta["n_clusters"]) / self.FAVOR_SHARED_SEQUENCES ) edata["cost"] = edata["material"] / edata["efficiency"] def postcompile(self, post_compile_kwargs: dict = None): self.logger.info("Post-processing graphs") self.post_process_graphs(post_compile_kwargs) self._adjust_shared_synthetic_fragment() def update_library_metadata(self): add_clusters(self) graphs = cluster_graph(self) for c in self.container_list: c.share_group_tag = {} # update the meta data for g in graphs: n_clusters = g.number_of_nodes() for n1, n2, edata in g.edges(data=True): alignments = edata["alignments"] for qk, container_alignments in group_by( alignments, key=lambda x: x.query_key ).items(): container = self.containers[qk] for a in container_alignments: group_key = ( a.query_region.a, a.query_region.b, Constants.SHARED_SYNTHESIZED_FRAGMENT, ) container.group_tags.add( Constants.SHARE_GROUP_TAG, group_key, { "alignments": container_alignments, "cross_container_alignments": alignments, "n_clusters": n_clusters, }, )
# for container in design.container_list(): # groups = container.get_groups_by_types(Constants.SHARED_FRAGMENT) # copied = container.copy_groups(groups, atype=Constants.SYNTHESIZED_FRAGMENT) # container.add_alignments(copied) # def is_not_shared(group_a: AlignmentGroup, group_b: AlignmentGroup): # if group_a.type == Constants.SHARED_FRAGMENT: # return False # if group_b.type == Constants.SYNTHESIZED_FRAGMENT: # return True # return False # new_alignments = container.expand_overlaps( # container.get_groups_by_types( # [ # Constants.FRAGMENT, # Constants.PCR_PRODUCT, # Constants.SYNTHESIZED_FRAGMENT, # ] # ), # Constants.PCR_PRODUCT, # pass_condition=is_not_shared, # ) # container.add_alignments(new_alignments, lim_size=True) # def post_process_library_graphs(self): # for qk, graph in self.graphs.items(): # query = self.seqdb[qk] # processor = AssemblyGraphPostProcessor(graph, query) # processor() # @log_metadata( # "optimize", additional_metadata={"algorithm": Constants.ALGORITHM_LIBRARY} # ) # def optimize( # self, n_paths: int = DEFAULT_N_ASSEMBLIES, n_jobs: int = DEFAULT_N_JOBS # ) -> Dict[str, DesignResult]: # """Optimize the assembly graph for library assembly.""" # return super().optimize(n_paths=n_paths, n_jobs=n_jobs)