Source code for capalyzer.packet_parser.taxa_tree


import pandas as pd

from os import environ
from os.path import join, dirname
import gzip


NCBI_DELIM = '\t|'  # really...
NAMES_ENV_VAR = 'CAPALYZER_NCBI_NAMES'
NODES_ENV_VAR = 'CAPALYZER_NCBI_NODES'
NAMES_DEF = join(dirname(__file__), 'ncbi_tree/names.dmp.gz')
NODES_DEF = join(dirname(__file__), 'ncbi_tree/nodes.dmp.gz')

RANKS = ['species', 'genus', 'family', 'order', 'class', 'phylum', 'superkingdom']


[docs]class NCBITaxaTree: def __init__(self, parent_map, names_to_nodes, nodes_to_name): self.parent_map = parent_map self.names_to_nodes = names_to_nodes self.nodes_to_name = nodes_to_name def _node(self, taxon_name): return self.names_to_nodes[taxon_name] def _name(self, node_num): return self.nodes_to_name[node_num]['name']
[docs] def ancestors(self, taxon): """Return a list of all ancestors of the taxon starting with the taxon itself.""" parents = [taxon] parent_num = self.parent_map[self._node(taxon)] while parent_num: parents.append(self.nodes_to_name[parent_num]['name']) parent_num = self.parent_map[parent_num] return parents
[docs] def ranked_ancestors(self, taxon): """Return a dict of all ancestors of the taxon starting with the taxon itself. Keys of the dict are taxon ranks """ parents = {self.rank(taxon): taxon} parent_num = self.parent_map[self._node(taxon)] while parent_num: parent_node = self.nodes_to_name[parent_num] parents[parent_node['rank']] = parent_node['name'] parent_num = self.parent_map[parent_num] return parents
[docs] def parent(self, taxon): """Return the name of the parent taxon.""" return self._name(self.parent_map[self._node(taxon)])
[docs] def ancestor_rank(self, rank, taxon, default=None): """Return the ancestor of taxon at the given rank.""" parent_num = self.parent_map[self._node(taxon)] while parent_num: if rank == self.nodes_to_name[parent_num]['rank']: return self.nodes_to_name[parent_num]['name'] parent_num = self.parent_map[parent_num] if not default: raise KeyError(f'{rank} for taxa {taxon} not found.') return default
[docs] def rank(self, taxon): """Return the rank of the given taxon.""" node = self._node(taxon) return self.nodes_to_name[node]['rank']
[docs] def phyla(self, taxon, default=None): """Return the phyla for the given taxon.""" return self.ancestor_rank('phylum', taxon, default=default)
[docs] def genus(self, taxon, default=None): """Return the genus for the given taxon.""" return self.ancestor_rank('genus', taxon, default=default)
def _tree(self, taxa): queue, tree = {self._node(el) for el in taxa}, {} root = None while len(queue): cur_node = queue.pop() parent_node = self.parent_map[cur_node] if cur_node not in tree: tree[cur_node] = {'parent': parent_node, 'children': set()} if not parent_node: root = cur_node continue try: tree[parent_node]['children'].add(cur_node) except KeyError: tree[parent_node] = { 'parent': self.parent_map[parent_node], 'children': set([cur_node]) } queue.add(parent_node) return root, tree
[docs] def taxa_sort(self, taxa): """Return a list with all elements of taxa in DFS order.""" taxa, sort = set(taxa), [] root, tree = self._tree(taxa) def dfs(node): for child_node in tree[node]['children']: child = self._name(child_node) if child in taxa: sort.append(child) dfs(child_node) dfs(root) # typically 1 is the root node return sort
[docs] @classmethod def parse_files(cls, names_filename=None, nodes_filename=None): """Return a tree parsed from the given files.""" names_filename = names_filename if names_filename else environ.get(NAMES_ENV_VAR, NAMES_DEF) nodes_filename = nodes_filename if nodes_filename else environ.get(NODES_ENV_VAR, NODES_DEF) with gzip.open(names_filename) as names_file, gzip.open(nodes_filename) as nodes_file: names_to_nodes, nodes_to_name = {}, {} for line in names_file: line = line.decode('utf-8') tkns = [tkn.strip() for tkn in line.strip().split(NCBI_DELIM)] if tkns[3] != 'scientific name': continue node, name = tkns[:2] names_to_nodes[name] = node nodes_to_name[node] = {'name': name, 'rank': None} parent_map = {} for line in nodes_file: line = line.decode('utf-8') tkns = [tkn.strip() for tkn in line.strip().split(NCBI_DELIM)] node, parent, rank = tkns[:3] nodes_to_name[node]['rank'] = rank if node == parent: # NCBI has a self loop at the root parent = None parent_map[node] = parent return cls(parent_map, names_to_nodes, nodes_to_name)
[docs]class TaxaTree: """Represent a taxa tree for a specific set of taxa. Uses the NCBI taxonomy. """ def __init__(self, taxa, ncbi_tree=None): self.top_node = { 'name': 'root', 'rank': 'no_rank', 'children': {} } if not ncbi_tree: ncbi_tree = NCBITaxaTree.parse_files() self._ancestry = self._make_ancestry_table(taxa, ncbi_tree) self._make_internal_tree(taxa) def _make_ancestry_table(self, taxa, ncbi_tree): """Return a pandas dataframe with the canonical ranks for each taxa.""" taxonomy, missing = [], [] for taxon in taxa: try: taxonomy.append(ncbi_tree.ranked_ancestors(taxon)) except KeyError: missing.append(taxon) # TODO: some sort of error checking here ancestry = pd.DataFrame(taxonomy)[RANKS] ancestry.index = ancestry['species'] return ancestry def _make_internal_tree(self, taxa): """Construct a tree starting from the top node.""" for taxon, row in self._ancestry.iterrows(): node = self.top_node if taxon not in taxa: continue for rank in RANKS[::-1]: if row[rank] in node['children']: node = node['children'][row[rank]] else: node['children'][row[rank]] = { 'name': str(row[rank]), 'rank': rank, 'children': {} } node = node['children'][row[rank]]
[docs] def to_newick(self): """Return a newick string corresponding to this tree.""" def newick(node): if len(node['children']) > 0: child_str = '(' for child_node in node['children'].values(): child_str += newick(child_node) + ',' name = '_'.join(node["name"].split()) child_str = child_str[:-1] + f'){name}' return child_str else: return '_'.join(node["name"].split()) newick_str = newick(self.top_node) + ';' return newick_str