r"""
Utilities (:mod:`dasi.utils`)
=============================
This module provide various utility functions.
.. currentmodule:: dasi.utils
Utility modules
---------------
.. autosummary::
:toctree: generated/
npdf
region
sequence_design
Networkx utilities
------------------
.. currentmodule:: dasi.utils.networkx
Specialized networkx algorithms for path and cycle finding.
.. autosummary::
:toctree: generated/
algorithsm
exceptions
shortest_path
utils
"""
import bisect
import inspect
from copy import deepcopy
from datetime import datetime
from functools import wraps
from itertools import tee
from typing import Any
from typing import Callable
from typing import Generator
from typing import Iterable
from typing import List
from typing import Tuple
from typing import TypeVar
from typing import Union
from .networkx.shortest_path import multipoint_shortest_path
from .networkx.utils import sort_cycle
from .npdf import NumpyDataFrame
from .npdf import NumpyDataFrameException
from .region import Region
[docs]def sort_with_keys(a: Iterable[Any], key: Callable) -> Tuple[List, List]:
"""Sort an iterable, returning both the sorted array and the sorted keys.
:param a: the iterable
:param key: key function to use for sorting
:return:
"""
s = sorted(a, key=key)
keys = [key(x) for x in s]
return s, keys
[docs]def bisect_between(a: Iterable, low: Any, high: Any) -> Tuple[int, int]:
"""Returns the start (inclusive) and end (exclusive) indices for a sorted
array using a key function.
:param a: sorted array (does not check)
:param low: low key
:param high: high key
:return: tuple of start (inclusive) and end (exclusive) indices
"""
i = bisect.bisect_left(a, low)
j = bisect.bisect_right(a[i:], high)
return i, j + i
[docs]def bisect_slice_between(a: Iterable, keys: Iterable, low: Any, high: Any) -> Iterable:
"""Slice the iterable using inclusive bisection. Assumes both the iterable
and keys are sorted. Bisect at specified `low` and `high`.
:param a: pre-sorted iterable to slice
:param keys: pre-sorted keys to bisect
:param low: low key
:param high: high key
:return: sliced iterable
"""
i, j = bisect_between(keys, low, high)
return a[i:j]
[docs]def perfect_subject(data):
"""Determine whether a blast result consumes the entire subject."""
if data["strand"] == 1 and data["start"] == 1 and data["raw_end"] == data["length"]:
return True
elif (
data["strand"] == -1
and data["raw_end"] == 1
and data["start"] == data["length"]
):
return True
def prep_df(df):
colnames = [
"DESIGN_ID",
"DESIGN_KEY",
"ASSEMBLY_ID",
"REACTION_ID",
"NAME",
"TYPE",
"KEY",
"ROLE",
"REGION",
"SEQUENCE",
"LENGTH",
"META",
]
df.columns = colnames
df.sort_values(by=["TYPE", "DESIGN_ID", "REACTION_ID", "ASSEMBLY_ID", "ROLE"])
return df
[docs]def group_by(arr: List[Any], key: Callable):
"""Group a list by some key."""
grouped = {}
for x in arr:
k = key(x)
grouped.setdefault(k, list())
grouped[k].append(x)
return grouped
def now():
return datetime.now()
[docs]def log_times(key: str = None, class_attribute: str = "_method_trace"):
"""wrapper for logging method run times for a class."""
def wrapped(f):
@wraps(f)
def _wrapped(self, *args, **kwargs):
if not hasattr(self, class_attribute):
raise ValueError(
"Instance {} must have attribute '{}'".format(self, class_attribute)
)
elif not isinstance(getattr(self, class_attribute), dict):
raise ValueError(
"Attribute {} of {} must be a {}".format(
class_attribute, self, dict
)
)
t1 = now()
result = f(self, *args, **kwargs)
t2 = now()
if key is None:
use_key = f.__name__
else:
use_key = key
getattr(self, class_attribute)[use_key] = (t1, t2)
return result
return _wrapped
return wrapped
def fmt_datetime(t):
return str(t)
T = TypeVar("T")
def argsorted(
arr: Iterable[T], key: Callable, return_items: bool = False
) -> Union[List[Tuple[int, T]], List[T]]:
s = sorted(enumerate(tee(arr)[0]), key=lambda x: key(x[1]))
if return_items:
return s
else:
return [_s[0] for _s in s]
def lexsorted(keys: Iterable, target: Iterable[T], key: Callable) -> List[T]:
sorted_indices = argsorted(enumerate(tee(keys)[0]), key=key)
return [target[i] for i in sorted_indices]
def chunkify(arr: Iterable[T], chunk_size: int) -> Generator[List[T], None, None]:
new_list = []
for x in tee(arr, 1)[0]:
new_list.append(x)
if len(new_list) == chunk_size:
yield new_list
new_list = []
if new_list:
yield new_list