from .subfactory import SubFactory
from ..utils import parse_key_val_file
import pandas as pd
from pandas import DataFrame
from .constants import (
BRACKEN,
KRAKEN,
KRAKENHLL,
METAPHLAN2,
)
from .toxonomy_long_form import longform_taxa
[docs]def get_top_n(vec, n):
if n <= 0:
return vec
tups = vec.items()
tups = sorted(tups, key=lambda x: -x[1])
out = {k: v for k, v in tups[:n]}
return out
[docs]def is_rank(key, rank):
rank_map = {
'species': ('s', 't'),
'genus': ('g', 's'),
'phylum': ('p', 'c'),
'kingdom': ('k', 'p')
}
try:
final_key = key.split('|')[-1]
rank, next_rank = rank_map[rank]
rank = rank + '__'
next_rank = next_rank + '__'
return (rank in final_key) and (next_rank not in final_key)
except KeyError:
assert False, f'Rank {rank} not supported.'
[docs]def is_top_taxa(key, top_taxa):
assert top_taxa in ['all', 'bacteria', 'virus', 'eukaryote', 'fungi']
tkns = [tkn.lower() for tkn in key.split('|')]
if top_taxa == 'all':
return True
elif top_taxa == 'bacteria' and 'bacteria' in tkns[0]:
return True
elif top_taxa == 'virus' and ('virus' in key.lower() or 'viridae' in key.lower()):
return True
elif top_taxa == 'fungi' and 'fungi' in key.lower():
return True
elif top_taxa == 'eukaryote' and 'eukaryote' in tkns[0]:
return True
return False
[docs]def clean_taxa(taxa):
return taxa.split('|')[-1].split('__')[-1]
[docs]class TaxonomyFactory(SubFactory):
[docs] def generic(
self, mod_name,
top_n=0, cutoff=0, rank='species', top_taxa='all', proportions=False, rname='mpa'
):
taxafs = self.factory.get_results(module=mod_name,
result=rname)
taxafs = list(taxafs)
def parse(fname):
vec = {}
tot = 0
for k, v in parse_key_val_file(fname, kind=float).items():
if is_rank(k, rank) and is_top_taxa(k, top_taxa):
tot += v
vec[clean_taxa(k)] = v
if proportions:
vec = {
k: v / tot
for k, v in vec.items()
if (v / tot) >= cutoff
}
return get_top_n(vec, top_n)
tbl = {sname: parse(fname)
for sname, fname in taxafs}
tbl = DataFrame(tbl).fillna(0).transpose()
return tbl
[docs] def parse_krakenhll_report(
self, mod_name,
top_n=0, cutoff=0, rank='species', top_taxa='all', proportions=False, rname='mpa'
):
taxafs = self.factory.get_results(module=mod_name, result=rname)
taxafs = list(taxafs)
def parse(fname):
tbl = pd.read_csv(fname, sep='\t', index_col=1)
tbl = tbl['Reads'].to_dict()
vec = {
clean_taxa(k): v
for k, v in tbl.items()
if is_rank(k, rank) and is_top_taxa(k, top_taxa)
}
tot = sum(vec.values())
if proportions:
vec = {k: v / tot for k, v in vec.items() if (v / tot) >= cutoff}
return get_top_n(vec, top_n)
tbl = {sname: parse(fname)
for sname, fname in taxafs}
tbl = DataFrame(tbl).fillna(0).transpose()
return tbl
[docs] def kraken(self, top_n=0, cutoff=0, rank='species', top_taxa='all', proportions=False, level=None):
return self.generic(
KRAKEN,
top_n=top_n,
cutoff=cutoff,
rank=rank,
top_taxa=top_taxa,
proportions=proportions,
)
[docs] def krakenhll(self, top_n=0, cutoff=0, rank='species', top_taxa='all', proportions=False, level=None):
rname = 'report'
if level:
if 'm' in level:
rname = 'report_medium'
elif 's' in level:
rname = 'report_strict'
try:
return self.parse_krakenhll_report(
KRAKENHLL,
top_n=top_n,
cutoff=cutoff,
rank=rank,
top_taxa=top_taxa,
proportions=proportions,
rname=rname,
)
except:
return self.generic(
KRAKENHLL,
top_n=top_n,
cutoff=cutoff,
rank=rank,
top_taxa=top_taxa,
proportions=proportions,
rname=rname,
)
[docs] def krakenhll_long(self):
taxafs = self.factory.get_results(module=KRAKENHLL, result='read_assignments')
return longform_taxa(taxafs)
[docs] def krakenhll_angular(self, min_reads=3, min_kmers=64, slope=(100 / 250)):
pass
[docs] def bracken(self, rank='species'):
assert rank in ['species', 'genus', 'phylum'], f'Rank {rank} not supported.'
result = rank + '_report'
taxafs = self.factory.get_results(module=BRACKEN, result=result)
def parse(fname):
return parse_key_val_file(fname, skip=1, val_column=6)
tbl = {sname: parse(fname)
for sname, fname in taxafs}
tbl = DataFrame(tbl).fillna(0).transpose()
return tbl