jdna

Source code for jdna.reaction

"""Simulate molecular reactions."""
import itertools
from collections import namedtuple
from decimal import Decimal

import networkx as nx
import primer3

from jdna.sequence import Sequence
from jdna.sequence import SequenceFlags
from jdna.viewer import SequenceViewer


[docs]class ReactionException(Exception): """Generic reaction exception."""
[docs]class PCRException(ReactionException): """Exception with pcr."""
BindingEvent = namedtuple("BindingEvent", ["template", "primer", "position"])
[docs]class Assembly: """Stores a DNA assembly. Assembly can be printed to display relevant assembly information: .. code:: assembly.print() :: > "Unnamed" (320bp) Cyclic: True Num Fragments: 4 Overhang Tms (°C): 53.65, 53.79, 54.96, 56.37 Junction Lengths (bp): 20, 20, 20, 20 Junction ΔG: -1.91e+4, -1.93e+4, -1.98e+4, -2.05e+4 Junction ΔG (hairpin): (0.0, 232.50050218543765), (203.8205043739763, -84.3344956260189), (0.0, -60.91449781456504), (-1026.7134934374844, -622.0224934343823) Competing ΔG: -2.18e+4, -2.11e+4, -1.92e+4, -2.01e+4 Product length (bp): 300 -----Tm: 53.7°C----- (0) 0 TTGACGACAGTGGCTATCCCCTGTTGCTAGGCACGGTGATATATCAGCCC (1) 0 -------------------------------------------------- (2) 0 -------------------------------------------------- (3) 0 -------------------------------------------------- -----Tm: 53.8°C----- (0) 50 ATAGGGCCGGATCAACGTAGTTTGATCGTAGCTGTTCCGTCAGTC----- (1) 50 -------------------------TCGTAGCTGTTCCGTCAGTCAGTGC (2) 50 -------------------------------------------------- (3) 50 -------------------------------------------------- (0) 100 -------------------------------------------------- (1) 100 CGATGCCAGTCTACTGCTTTTCGCCCAGGGGGACACCTGACACTATGTTA (2) 100 -------------------------------------------------- (3) 100 -------------------------------------------------- -----Tm: 55.0°C----- (0) 150 -------------------------------------------------- (1) 150 TGCCAAGCTCACGTTTCACA------------------------------ (2) 150 TGCCAAGCTCACGTTTCACAAGGCAGATTAAATAAGACTGAAACATTTGG (3) 150 -------------------------------------------------- -----Tm: 56.4°C----- (0) 200 -------------------------------------------------- (1) 200 -------------------------------------------------- (2) 200 TGGAGGCGCGAGTCTTGCCCTTAGGCAGACCTGTTCGAGGGCGAA----- (3) 200 -------------------------CAGACCTGTTCGAGGGCGAAAAACG (0) 250 -------------------------------------------------- (1) 250 -------------------------------------------------- (2) 250 -------------------------------------------------- (3) 250 CCGAAGTCACATAGCGGCATGTTAGGTAGCTGTACACCCGTACTGCTACA """ def __init__(self, templates, junctions, cyclic): """Assembly constructor. :param templates: list of templates :type templates: list :param junctions: list of junctions :type junctions: list :param cyclic: whether assembly is cyclic :type cyclic: bool """ self._templates = tuple([t.copy() for t in templates]) self._junctions = tuple([o.copy() for o in junctions]) self._cyclic = cyclic @property def templates(self): return tuple(t.copy() for t in self._templates) @property def junctions(self): return tuple(j.copy() for j in self._junctions) @property def cyclic(self): return self._cyclic
[docs] def tms(self): """Return the tms of the junctions.""" return [round(o.tm(), 2) for o in self.junctions]
@property def product(self): product = Sequence() product.name = "Unnamed assembly" if not self.cyclic: zipped = zip(self.templates, self.junctions) else: zipped = zip(self.junctions, self.templates) for s1, s2 in zipped: product += s1 + s2 if self.cyclic: product.circularize() return product def view(self, *args, **kwargs): seqs = [] positions = [0] pos = 0 junctions = list(self.junctions) for jxn in junctions: jxn.annotate(None, None, "Tm: {}°C".format(round(jxn.tm(), 1))) if self.cyclic: junctions.append(junctions[0]) else: junctions = [Sequence()] + list(junctions) for i, t in enumerate(self.templates): seq = junctions[i] + t + junctions[i + 1] seqs.append(seq) pos += len(seq) - len(junctions[i + 1]) positions.append(pos) aligned_seqs = [] for p, s in zip(positions, seqs): aligned_seqs.append(Sequence("-" * p) + s) mx = max([len(s) for s in aligned_seqs]) for i, s in enumerate(aligned_seqs): diff = mx - len(s) aligned_seqs[i] = aligned_seqs[i] + Sequence("-" * diff) labels = [ "({i}) {index}".format(i=i, index="{index}") for i in range(len(aligned_seqs)) ] viewer = SequenceViewer( aligned_seqs, *args, sequence_labels=labels, foreground_colors="RANDOM", **kwargs ) for seq in aligned_seqs: Sequence._apply_features_to_view(seq, viewer) viewer.metadata["Cyclic"] = self.cyclic viewer.metadata["Num Fragments"] = len(self.templates) viewer.metadata["Fragment Names"] = "\n\t" + "\n\t".join( ["{} - {}".format(i, t.name) for i, t in enumerate(self.templates)] ) viewer.metadata["Overhang Tms (°C)"] = ", ".join([str(x) for x in self.tms()]) viewer.metadata["Junction Lengths (bp)"] = ", ".join( [str(len(x)) for x in self.junctions] ) viewer.metadata["Junction ΔG"] = self.format_float_array(self.deltaGs()) viewer.metadata["Junction ΔG (hairpin)"] = self.format_array( self.deltaG_hairpins() ) viewer.metadata["Competing ΔG"] = self.format_float_array( self.competing_deltaGs() ) viewer.metadata["Product length (bp)"] = "{}".format(len(self.product)) return viewer @staticmethod def format_array(arr): return ", ".join([str(x) for x in arr]) @staticmethod def format_float_array(arr): return ", ".join(["{:.2e}".format(Decimal(float(x))) for x in arr]) def deltaG_hairpins(self): gs = [] for jxn in self.junctions: fwd = self._hairpin(jxn).dg rev = self._hairpin(jxn.copy().rc()).dg gs.append((fwd, rev)) return gs def _heterodimer(self, s1, s2): return primer3.calcHeterodimer(str(s1[:60]).upper(), str(s2[:60]).upper()) def _hairpin(self, s): return primer3.calcHairpin(str(s[:60]).upper()) def deltaGs(self): overhangs = [o[:60] for o in self.junctions] return [self._heterodimer(o, o.copy().rc()).dg for o in overhangs] def competing_deltaGs(self): gs = [] jxns = list(self.junctions) for jxn in jxns: total = 0 total += self._hairpin(jxn).dg total += self._hairpin(jxn.copy().rc()).dg others = jxns[:] others.remove(jxn) for other in others: dg1 = self._heterodimer(jxn, other.copy().rc()).dg dg2 = self._heterodimer(jxn, other.copy()).dg total += dg1 total += dg2 gs.append(total) return gs
[docs] def print(self, *args, **kwargs): """ Print the assembly. :: > "Unnamed" (320bp) Num Fragments: 4 Overhang Tms (°C): 61.49, 58.37, 54.72, 45.91 Junction Lengths (bp): 20, 20, 20, 20 Junction ΔG: -2.30e+4, -2.18e+4, -1.97e+4, -1.54e+4 Junction ΔG (hairpin): (-1542.5069956260231, -1787.4319956260224), (-1832.7404978145642, -708.4919956260237), (0.0, 0.0), (0.0, 0.0) Competing ΔG: -3.39e+4, -2.43e+4, -2.19e+4, -1.33e+4 Length: 300bp |<Tm: 58.4°C -----Tm: 61.5°C----- ---------- (0) 0 TGCTCGGCGAAGGGCCTGACAAGACCACTTCGTACCCTTTGTAACGTGACTTCTGTGAATCGATGGCGGATGTTGCTTGTGACGC (1) 0 ---------------------------------------------------------------------------CTTGTGACGC (2) 0 ------------------------------------------------------------------------------------- (3) 0 ------------------------------------------------------------------------------------- |<Tm: 58.4°C ---------- -----Tm: 54.7°C----- (0) 85 ACCCGTAGCG--------------------------------------------------------------------------- (1) 85 ACCCGTAGCGACATAACCGCCTTATTCCCACTTGTTTGGGGAGGCAAGTTTTGTAGTAGCCATTCTAATCCCGTTTTCTCCGCCG (2) 85 -----------------------------------------------------------------TAATCCCGTTTTCTCCGCCG (3) 85 ------------------------------------------------------------------------------------- -----Tm: 45.9°C----- (0) 170 ------------------------------------------------------------------------------------- (1) 170 ------------------------------------------------------------------------------------- (2) 170 CTTGTTCAGCTTGATGTGTTGGAACAGCAAGTTTATTTTAGGGTGTCAGGAGGGGGAGGTCCTCTAGTATTAAGC---------- (3) 170 -------------------------------------------------------GAGGTCCTCTAGTATTAAGCACCGGCCAAG -----Tm: 61.5°C----- (0) 255 ----------------------------------------------------------------- (1) 255 ----------------------------------------------------------------- (2) 255 ----------------------------------------------------------------- (3) 255 GCGGCACGGACGCCACTAGAAGGGAGGCGTTCATAGCGAGTCCTTTGCTCGGCGAAGGGCCTGAC """ self.view(*args, **kwargs).print()
def __str__(self): return str(self.view())
class Reaction: MIN_BASES = 13 @classmethod def pcr(cls, template, primers, min_bases=MIN_BASES): """Make a pcr product from a template and primers. :param template: template :type template: Sequence :param primers: primers :type primers: list | tuple | iterable :param min_bases: minimum number of bases for primer annealing :type min_bases: int :return: list of Sequence instances :rtype: list """ bindings = [] for p in primers: bindings += template.anneal(p, min_bases=min_bases) forward_bindings = [] reverse_bindings = [] for bind in bindings: if bind.direction == SequenceFlags.FORWARD: forward_bindings.append(bind) elif bind.direction == SequenceFlags.REVERSE: reverse_bindings.append(bind) else: raise Exception("Direction {} not recognized".format(bind.direction)) if not forward_bindings or not reverse_bindings: raise PCRException( "Some primers did not bind. Number of forward bindings: {}. Number of " "rev bindings: {}".format(len(forward_bindings), len(reverse_bindings)) ) products = [] for fwd, rev in itertools.product(forward_bindings, reverse_bindings): overhang1 = fwd.five_prime_overhang overhang2 = rev.five_prime_overhang amplified = template.new_slice(fwd.start, rev.end) products.append( overhang1 + amplified + overhang2.copy().reverse_complement() ) return products @classmethod def interaction_graph( cls, sequences, min_bases=Sequence.DEFAULTS.MIN_ANNEAL_BASES, bind_reverse_complement=False, max_bases=None, only_ends=False, ): """Make an interaction graph from a list of sequences. :param sequences: list of sequences :type sequences: list :param min_bases: minimum number of bases to bind :type min_bases: int :param bind_reverse_complement: whether to reverse complement each sequence and check interactions :type bind_reverse_complement: int :param max_bases: max number of bases to search :type max_bases: int :param only_ends: whether to only consider end-to-end interactions :type only_ends: bool :return: :rtype: """ G = nx.DiGraph() sequences = [s.copy() for s in sequences] if bind_reverse_complement: reverse_complement = [] for s in sequences: new = s.copy().reverse_complement() new.name = s.name + "(-1)" reverse_complement.append(new) sequences += reverse_complement for s in sequences: G.add_node(s.global_id, sequence=s) seq_pairs = itertools.product(sequences, repeat=2) for s1, s2 in seq_pairs: for binding in s1.anneal_forward(s2, min_bases=min_bases, depth=max_bases): if only_ends: if binding.span[0] != 0 or binding.query_span[-1] != len(s2) - 1: break if not (binding.span[0] == 0 and binding.span[-1] == len(s1) - 1): G.add_edge( s2.global_id, s1.global_id, template=s1, primer=s2, binding=binding, ) return G @classmethod def interaction_report(cls, G): rows = [] for n1, n2 in G.edges: edge = G.edges[n1, n2] binding = edge["binding"] template = edge["template"] primer = edge["primer"] template_info = "{name} (id={gid} ({start},{end}/{length})".format( name=template.name, gid=template.global_id, start=binding.span[0], end=binding.span[1], length=len(template) - 1, ) primer_info = "{name} (id={gid} ({start},{end}/{length})".format( name=primer.name, gid=primer.global_id, start=binding.query_span[0], end=binding.query_span[1], length=len(primer) - 1, ) rows.append("{} -> {}".format(template_info, primer_info)) print("format: primer -> template") print("\n".join(rows)) @classmethod def linear_paths(cls, G): """Find linear paths from an interaction graph.""" paths = [] subgraph = G.subgraph(G.nodes).copy() for _ in nx.simple_cycles(G): return [] while subgraph.nodes: path = nx.dag_longest_path(subgraph) if len(path) < 2: break subgraph.remove_nodes_from(path) paths.append(path) return paths @classmethod def cyclic_paths(cls, G): """Find cyclic paths from an interaction graph.""" paths = [] for path in nx.simple_cycles(G.copy()): paths.append(path) return paths @classmethod def _path_to_edge_data(cls, G, path, cyclic): if cyclic: path_pairs = zip(path, path[1:] + [path[0]]) else: path_pairs = zip(path[:-1], path[1:]) edges = [] for n1, n2 in path_pairs: edges.append(G.edges[n1, n2]) return edges @classmethod def _path_to_assembly(self, path, G, cyclic=False): prev_template_end = None node_pairs = [] overhangs = [] for i, data in enumerate(self._path_to_edge_data(G, path, cyclic)): binding = data["binding"] # use the previous template and this query should refer to the same sequence node_pairs.append( (prev_template_end, binding.query_start.prev(), data["primer"].name) ) # the next segment start is the template start prev_template_end = binding.end.next() overhangs.append(binding.template_anneal) if not cyclic: node_pairs.append((prev_template_end, None, data["template"].name)) overhangs.append(Sequence()) else: pass node_pairs[0] = (prev_template_end, node_pairs[0][1], node_pairs[0][2]) overhangs = [overhangs[-1]] + overhangs[:-1] amplified_sequences = [] for n1, n2, name in node_pairs: sequence = Sequence.new_slice(n1, n2) sequence.name = name amplified_sequences.append(sequence) return Assembly(amplified_sequences, overhangs, cyclic) @classmethod def linear_assemblies( cls, sequences, min_bases=Sequence.DEFAULTS.MIN_ANNEAL_BASES, max_bases=None ): """Finds all unique, longest linear assemblies. If cyclic assemblies are found, returns empty list. :param sequences: list of sequences :type sequences: list :param min_bases: minimum junction length :type min_bases: int :param max_bases: maximum junction length. Smaller values will result in faster search, but may miss valid assemblies. :return: list of Assembly instances. Sequence can be accessed using `assembly.product` :rtype: list """ G = cls.interaction_graph( sequences, bind_reverse_complement=True, min_bases=min_bases, max_bases=max_bases, only_ends=True, ) linear_paths = cls.linear_paths(G) if not linear_paths: return [] assemblies = [] for path in linear_paths: assembly = cls._path_to_assembly(path, G) assemblies.append(assembly) return assemblies @classmethod def cyclic_assemblies( cls, sequences, min_bases=Sequence.DEFAULTS.MIN_ANNEAL_BASES, max_bases=None ): """Finds all unique, longest cyclic assemblies. If no cyclic assemblies found, returns empty list. :param sequences: list of sequences :type sequences: list :param min_bases: minimum junction length :type min_bases: int :param max_bases: maximum junction length. Smaller values will result in faster search, but may miss valid assemblies. :type max_bases: int :return: list of Assembly instances. Sequence can be accessed using `assembly.product` :rtype: list """ G = cls.interaction_graph( sequences, bind_reverse_complement=True, min_bases=min_bases, max_bases=max_bases, only_ends=True, ) nodes = list(G.nodes) fwd = nodes[: len(sequences)] rev = nodes[len(sequences) :] fpriority = list(range(len(fwd))) rpriority = list(range(len(fwd), len(fwd) * 2)) priority_rank = fpriority + rpriority[1:] + [rpriority[0]] node_priority = dict(zip(fwd + rev, priority_rank)) cyclic_paths = cls.cyclic_paths(G) if not cyclic_paths: return [] assemblies = [] cyclic_paths = sorted( cyclic_paths, key=lambda x: min([node_priority[n] for n in x]) ) for path in cyclic_paths: ranks = [node_priority[n] for n in path] mn = ranks.index(min(ranks)) cyclic_path = path[mn:] + path[:mn] assembly = cls._path_to_assembly(cyclic_path, G, cyclic=True) assemblies.append(assembly) return assemblies