Source code for snipgenie.trees

    Tree methods for bacterial phylogenetics, mostly using ete3.
    Created Nov 2019
    Copyright (C) Damien Farrell

    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License
    as published by the Free Software Foundation; either version 3
    of the License, or (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import sys,os,subprocess,glob,shutil,re,random
import platform
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from Bio import SeqIO
from Bio import Phylo, AlignIO
import numpy as np
import pandas as pd
from  . import tools

qcolors = ['blue','green','crimson','blueviolet','orange','cadetblue','chartreuse','chocolate',

[docs]def set_tiplabels(t, labelmap): for l in t.iter_leaves(): #print ( if in labelmap: = labelmap[] return
[docs]def delete_nodes(t, names): for l in t.iter_leaves(): if in names: l.delete()
[docs]def format_nodes(t): from ete3 import NodeStyle for n in t.traverse(): ns = NodeStyle() ns["size"] = 0 n.set_style(ns)
[docs]def remove_tiplabels(t): for l in t.iter_leaves(): = None
[docs]def set_nodesize(t, size=12): """Change the node size""" from ete3 import NodeStyle for l in t.iter_leaves(): clr = l._img_style['fgcolor'] ns = NodeStyle() ns["size"] = size #keep color ns["fgcolor"] = clr l.set_style(ns)
[docs]def color_leaves(t, colors, color_bg=False): from ete3 import NodeStyle for l in t.iter_leaves(): if in colors: #print (, colors[]) clr = colors[] else: clr='black' #print (clr) # create a new label with a color attribute #N = AttrFace("name", fgcolor=clr) #l.add_face(N, 1, position='aligned') ns = NodeStyle() ns["size"] = 12 ns["fgcolor"] = clr if color_bg == True: ns["bgcolor"] = clr l.set_style(ns)
[docs]def get_colormap(values): import pylab as plt labels = values.unique() cmap ='Set1') colors = [cmap(i) for i in range(len(labels))] #colors=qcolors #clrs = {labels[i]:cmap(float(i)/(len(labels))) for i in range(len(labels))} clrs = dict(list(zip(labels,colors))) return clrs
[docs]def run_fasttree(infile, outpath, bootstraps=100): """Run fasttree""" fc = tools.get_cmd('fasttree') out = os.path.join(outpath,'tree.newick') cmd = '{fc} -nt {i} > {o}'.format(fc=fc,b=bootstraps,i=infile,o=out) tmp = subprocess.check_output(cmd, shell=True) return out
[docs]def run_RAXML(infile, name='variants', threads=8, bootstraps=100, outpath='.'): """Run Raxml pthreads. Returns: name of .tree file. """ outpath = os.path.abspath(outpath) if not os.path.exists(outpath): os.makedirs(outpath, exist_ok=True) model = 'GTRCAT' s1 = random.randint(0,1e8) s2 = random.randint(0,1e8) files = glob.glob(os.path.join(outpath,'RAxML_*')) for f in files: os.remove(f) if platform.system() == 'Windows': cmd = tools.get_cmd('RAxML') else: cmd = 'raxmlHPC-PTHREADS' cmd = '{c} -f a -N {nb} -T {t} -m {m} -V -p {s1} -x {s2} -n {n} -w {w} -s {i}'\ .format(c=cmd,t=threads,nb=bootstraps,n=name,i=infile,s1=s1,s2=s2,m=model,w=outpath) print (cmd) try: tmp = subprocess.check_output(cmd, shell=True) except Exception as e: print ('Error building tree. Is RAxML installed?') return None out = os.path.join(outpath,'RAxML_bipartitions.variants') return out
[docs]def convert_branch_lengths(treefile, outfile, snps): tree =, "newick") for parent in tree.find_clades(terminal=False, order="level"): for child in parent.clades: if child.branch_length: child.branch_length *= snps #Phylo.draw(tree) Phylo.write(tree, outfile, "newick") return
[docs]def biopython_draw_tree(filename): from Bio import Phylo tree =,'newick') Phylo.draw(tree) return
[docs]def toytree_draw(tre, meta, labelcol,colorcol): """Draw colored tree with toytree""" tipnames = tre.get_tip_labels() mapping = dict(zip(meta['sample'],meta[labelcol])) mapping['ref'] = 'AF2122/97' tiplabels = [mapping[i] if i in mapping else 'NA' for i in tipnames] mapping = dict(zip(meta['sample'],meta[colorcol])) colormap = colors_from_labels(meta,'sample',colorcol) tip_colors = [colormap[mapping[i]] if i in mapping else 'Black' for i in tipnames] node_sizes=[0 if i else 8 for i in tre.get_node_values(None, 1, 0)] node_colors = [colormap[mapping[n]] if n in mapping else 'black' for n in tre.get_node_values('name', True, True)] canvas,t,r=tre.draw(layout='r',width=600,height=800,tip_labels=tiplabels,node_markers="o",node_hover=True,edge_widths=1, tip_labels_colors=tip_colors,node_sizes=node_sizes,scalebar=True,node_colors=node_colors)#tip_labels_align=True); #canvas.legend(leg,corner=("top", 100, 100, 70)); return
[docs]def create_tree(filename=None, tree=None, ref=None, labelmap=None, colormap=None, color_bg=False, format=1): """Draw a tree """ from ete3 import Tree, PhyloTree, TreeStyle, TextFace if filename != None: t = Tree(filename, format=format) else: t = tree if ref != None: t.set_outgroup(ref) if colormap != None: color_leaves(t, colormap, color_bg) if labelmap != None: set_tiplabels(t,labelmap) #format_nodes(t) ts = TreeStyle() return t, ts
[docs]def colors_from_labels(df,name,group): """Colors from dataframe columns for use with an ete3 tree drawing""" labels = df[group].unique() colors={} i=0 for l in labels: if i>=len(qcolors): i=0 colors[l] = qcolors[i] i+=1 df['color'] = df[group].apply(lambda x: colors[x],1) #colormap = dict(zip(df[name],df.color)) return colors
[docs]def remove_nodes(tree, names): for n in names: node = tree.search_nodes(name=n)[0] node.delete()
[docs]def run_treecluster(f, threshold, method='max_clade'): """Run treecluster on a newick tree. Clustering Method (options: avg_clade, length, length_clade, max, max_clade, med_clade, root_dist, single_linkage_clade) (default: max_clade) see """ import io cmd = ' -i {f} -t {t} -m {m}'.format(f=f,t=threshold,m=method) print (cmd) cl=subprocess.check_output(cmd, shell=True) cl=pd.read_csv(io.BytesIO(cl),sep='\t') return cl
[docs]def get_clusters(tree): """Get snp clusters from newick tree using""" dists = [3,5,7,10,12,20,50,100] c=[] for d in dists: clust = run_treecluster(tree, threshold=d, method='max_clade') #print (clust.ClusterNumber.value_counts()[:10]) clust['d']='snp'+str(d) c.append(clust) clusts = pd.pivot_table(pd.concat(c),index='SequenceName',columns='d',values='ClusterNumber').reset_index() return clusts