"""
Plotting methods for snpgenie
Created Jan 2020
Copyright (C) Damien Farrell
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 3
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
import os, sys, io, random, subprocess, time
import string
import numpy as np
import pandas as pd
pd.set_option('display.width', 200)
from importlib import reload
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Align import MultipleSeqAlignment
from Bio import AlignIO, SeqIO
from pyfaidx import Fasta
import pylab as plt
import matplotlib as mpl
import matplotlib.colors as colors
from . import tools
[docs]def make_legend(fig, colormap, loc=(1.05, .6), title='',fontsize=12):
"""Make a figure legend wth provided color mapping"""
import matplotlib.patches as mpatches
pts=[]
for c in colormap:
pts.append(mpatches.Patch(color=colormap[c],label=c))
fig.legend(handles=pts,bbox_to_anchor=loc,fontsize=fontsize,title=title)
return pts
[docs]def heatmap(df, cmap='gist_gray_r', w=15, h=5, ax=None):
"""Plot dataframe matrix"""
if ax == None:
fig, ax = plt.subplots(figsize=(w,h))
im = ax.pcolor(df, cmap=cmap)
ax.set_xticks(np.arange(len(df.columns))+0.5)
ax.set_yticks(np.arange(len(df))+0.5)
ax.set_xticklabels(df.columns)
ax.set_yticklabels(df.index)
plt.setp(ax.get_xticklabels(), rotation=80, ha="right",
rotation_mode="anchor")
plt.tight_layout()
return
[docs]def random_colors(n=10, seed=1):
"""Generate random hex colors as list of length n."""
import random
random.seed(seed)
clrs=[]
for i in range(n):
r = lambda: random.randint(0,255)
c='#%02X%02X%02X' % (r(),r(),r())
clrs.append(c)
return clrs
[docs]def gen_colors(cmap,n,reverse=False):
'''Generates n distinct color from a given colormap.
Args:
cmap(str): The name of the colormap you want to use.
Refer https://matplotlib.org/stable/tutorials/colors/colormaps.html to choose
Suggestions:
For Metallicity in Astrophysics: Use coolwarm, bwr, seismic in reverse
For distinct objects: Use gnuplot, brg, jet,turbo.
n(int): Number of colors you want from the cmap you entered.
reverse(bool): False by default. Set it to True if you want the cmap result to be reversed.
Returns:
colorlist(list): A list with hex values of colors.
Taken from the mycolorpy package by binodbhttr
see also https://matplotlib.org/stable/tutorials/colors/colormaps.html
'''
c_map = plt.cm.get_cmap(str(cmap)) # select the desired cmap
arr=np.linspace(0,1,n) #create a list with numbers from 0 to 1 with n items
colorlist=list()
for c in arr:
rgba=c_map(c) #select the rgba value of the cmap at point c which is a number between 0 to 1
clr=colors.rgb2hex(rgba) #convert to hex
colorlist.append(str(clr)) # create a list of these colors
if reverse==True:
colorlist.reverse()
return colorlist
[docs]def show_colors(colors):
"""display a list of colors"""
plt.figure(figsize=(6,1))
plt.bar(range(len(colors)),height=1,color=colors,width=1)
plt.axis('off')
return
[docs]def get_color_mapping(df, col, cmap=None, seed=1):
"""Get random color map for categorcical dataframe column"""
c = df[col].unique()
if cmap == None:
rcolors = random_colors(len(c),seed)
else:
cmap = mpl.cm.get_cmap(cmap)
rcolors = [cmap(i) for i in range(len(c))]
colormap = dict(zip(c, rcolors))
newcolors = [colormap[i] if i in colormap else 'Black' for i in df[col]]
return newcolors, colormap
[docs]def draw_pie(vals, xpos, ypos, colors, size=500, ax=None):
"""Draw a pie at a specific position on an mpl axis.
Used to draw spatial pie charts on maps.
Args:
vals: values for pie
xpos: x coord
ypos: y coord
colors: colors of values
size: size of pie chart
"""
cumsum = np.cumsum(vals)
cumsum = cumsum / cumsum[-1]
pie = [0] + cumsum.tolist()
#colors = ["blue", "red", "yellow"]
for i, (r1, r2) in enumerate(zip(pie[:-1], pie[1:])):
angles = np.linspace(2 * np.pi * r1, 2 * np.pi * r2)
x = [0] + np.cos(angles).tolist()
y = [0] + np.sin(angles).tolist()
xy = np.column_stack([x, y])
ax.scatter([xpos], [ypos], marker=xy, s=size, color=colors[i], alpha=.9)
return ax
[docs]def create_grid(gdf=None, bounds=None, n_cells=10, overlap=False, crs="EPSG:29902"):
"""Create square grid that covers a geodataframe area
or a fixed boundary with x-y coords
returns: a GeoDataFrame of grid polygons
"""
import geopandas as gpd
import shapely
if bounds != None:
xmin, ymin, xmax, ymax= bounds
else:
xmin, ymin, xmax, ymax= gdf.total_bounds
# get cell size
cell_size = (xmax-xmin)/n_cells
# create the cells in a loop
grid_cells = []
for x0 in np.arange(xmin, xmax+cell_size, cell_size ):
for y0 in np.arange(ymin, ymax+cell_size, cell_size):
x1 = x0-cell_size
y1 = y0+cell_size
poly = shapely.geometry.box(x0, y0, x1, y1)
#print (gdf.overlay(poly, how='intersection'))
grid_cells.append( poly )
cells = gpd.GeoDataFrame(grid_cells, columns=['geometry'],
crs=crs)
if overlap == True:
cols = ['grid_id','geometry','grid_area']
cells = cells.sjoin(gdf, how='inner').drop_duplicates('geometry')
return cells
[docs]def create_hex_grid(gdf=None, bounds=None, n_cells=10, overlap=False, crs="EPSG:29902"):
"""Hexagonal grid over geometry.
See https://sabrinadchan.github.io/data-blog/building-a-hexagonal-cartogram.html
"""
from shapely.geometry import Polygon
import geopandas as gpd
if bounds != None:
xmin, ymin, xmax, ymax= bounds
else:
xmin, ymin, xmax, ymax= gdf.total_bounds
unit = (xmax-xmin)/n_cells
a = np.sin(np.pi / 3)
cols = np.arange(np.floor(xmin), np.ceil(xmax), 3 * unit)
rows = np.arange(np.floor(ymin) / a, np.ceil(ymax) / a, unit)
#print (len(cols))
hexagons = []
for x in cols:
for i, y in enumerate(rows):
if (i % 2 == 0):
x0 = x
else:
x0 = x + 1.5 * unit
hexagons.append(Polygon([
(x0, y * a),
(x0 + unit, y * a),
(x0 + (1.5 * unit), (y + unit) * a),
(x0 + unit, (y + (2 * unit)) * a),
(x0, (y + (2 * unit)) * a),
(x0 - (0.5 * unit), (y + unit) * a),
]))
grid = gpd.GeoDataFrame({'geometry': hexagons},crs=crs)
grid["grid_area"] = grid.area
grid = grid.reset_index().rename(columns={"index": "grid_id"})
if overlap == True:
cols = ['grid_id','geometry','grid_area']
grid = grid.sjoin(gdf, how='inner').drop_duplicates('geometry')[cols]
return grid
# below are sequence plotting related functions
[docs]def get_fasta_length(filename, key=None):
"""Get length of reference sequence"""
refseq = Fasta(filename)
if key==None:
key = list(refseq.keys())[0]
l = len(refseq[key])
return l
[docs]def get_fasta_names(filename):
"""Get names of fasta sequences"""
refseq = Fasta(filename)
return list(refseq.keys())
[docs]def get_fasta_sequence(filename, start, end, key=0):
"""Get chunk of indexed fasta sequence at start/end points"""
from pyfaidx import Fasta
refseq = Fasta(filename)
if type(key) is int:
chrom = list(refseq.keys())[key]
print (chrom)
seq = refseq[chrom][start:end].seq
return seq
[docs]def get_chrom_from_bam(bam_file):
"""Get first sequence name in a bam file"""
import pysam
samfile = pysam.AlignmentFile(bam_file, "r")
iter=samfile.fetch(start=0,end=10)
for read in iter:
if read.reference_name:
return read.reference_name
[docs]def get_coverage(bam_file, chr, start, end):
"""Get coverage from bam file at specified region"""
import pysam
if bam_file is None or not os.path.exists(bam_file):
return
samfile = pysam.AlignmentFile(bam_file, "r")
vals = [(pileupcolumn.pos, pileupcolumn.n) for pileupcolumn in samfile.pileup(chr, start, end)]
df = pd.DataFrame(vals,columns=['pos','coverage'])
df = df[(df.pos>=start) & (df.pos<=end)]
#fill with zeroes if there is no data at ends
if df.pos.max() < end:
new = pd.DataFrame({'pos':range(df.pos.max(), end)})
new['coverage'] = 0
df = df.append(new).reset_index(drop=True)
return df
[docs]def get_bam_aln(bam_file, chr, start, end, group=False):
"""Get all aligned reads from a sorted bam file for within the given coords"""
import pysam
if not os.path.exists(bam_file):
return
if chr is None:
return
if start<1:
start=0
samfile = pysam.AlignmentFile(bam_file, "r")
iter = samfile.fetch(chr, start, end)
d=[]
for read in iter:
st = read.reference_start
d.append([read.reference_start, read.reference_end, read.cigarstring,
read.query_name,read.query_length,read.mapping_quality])
df = pd.DataFrame(d,columns=['start','end','cigar','name','length','mapq'])
if len(df) == 0:
return pd.DataFrame()
if group == True:
df['counts'] = df.groupby(['start','end']).name.transform('count')
df = df.drop_duplicates(['start','end'])
df['y'] = 1
bins = (end-start)/150
if bins < 1:
bins = 1
xbins = pd.cut(df.start,bins=bins)
df['y'] = df.groupby(xbins)['y'].transform(lambda x: x.cumsum())
return df
[docs]def plot_coverage(df, plot_width=800, plot_height=60, xaxis=True, ax=None):
"""Plot a bam coverage dataframe returned from get_coverage
Args:
df: dataframe of coverage data (from get_coverage)
plot_width: width of plot
xaxis: plot the x-axis ticks and labels
"""
#if df is None or len(df)==0:
# return plot_empty(plot_width=plot_width,plot_height=plot_height)
df['y'] = df.coverage/2
x_range = (df.pos.min(),df.pos.max())
top = df.coverage.max()
if ax==None:
fig,ax = plt.subplots(1,1,figsize=(15,1))
ax.fill_between(df.pos,df.y,color='gray')
ax.set_xlim(x_range)
if xaxis==False:
ax.get_xaxis().set_visible(False)
return
[docs]def plot_bam_alignment(bam_file, chr, xstart, xend, ystart=0, yend=100,
rect_height=.6, fill_color='gray', ax=None):
"""bam alignments plotter.
Args:
bam_file: name of a sorted bam file
start: start of range to show
end: end of range
"""
h = rect_height
#cover the visible range from start-end
o = (xend-xstart)/2
#get reads in range into a dataframe
df = get_bam_aln(bam_file, chr, xstart-o, xend+o)
#print (df[:4])
df['x'] = df.start+df.length/2
df['y'] = df.y*(h+1)
#set colors by quality
df['color'] = df.apply(lambda x: 'red' if x.mapq==0 else fill_color ,1)
df['span'] = df.apply(lambda x: str(x.start)+':'+str(x.end),1)
if ax==None:
fig,ax = plt.subplots(1,1,figsize=(15,3))
from matplotlib.collections import PatchCollection
patches=[]
for i,r in df.iterrows():
rect = plt.Rectangle((r.x, r.y), r.length, h,
alpha=.6, linewidth=.5,
edgecolor='black', facecolor=r.color)
patches.append(rect)
#cmap = ListedColormap(list(df.color))
ax.add_collection(PatchCollection(patches, match_original=True))
ax.set_ylim(ystart,yend)
ax.set_xlim(xstart, xend)
plt.yticks([])
plt.tight_layout()
return
[docs]def plot_features(rec, ax, rows=3, xstart=0, xend=30000):
h=1
df = tools.records_to_dataframe([rec])
df = df[(df.feat_type!='region') & (df['feat_type']!='source')]
df = df[(df.start>xstart) & (df.end<xend)]
df['length'] = df.end-df.start
y = list(range(1,rows)) * len(df)
df['y'] = y[:len(df)]
df['color'] = 'blue'
df = df.fillna('')
#print (df)
from matplotlib.collections import PatchCollection
import matplotlib.patches as mpatches
patches=[]
for i,r in df.iterrows():
if r.strand == 1:
x = r.start
dx = r.length
else:
x = r.end
dx = -r.length
arrow = mpatches.Arrow(x, r.y, dx, 0, alpha=.7, width=.3,
edgecolor='black')
txt = ax.text(r.start, r.y-h/2, r.gene, size=16)
patches.append(arrow)
ax.add_collection(PatchCollection(patches, match_original=True))
ax.set_xlim(xstart, xend)
ax.set_ylim(.4,rows-.5)
plt.yticks([])
plt.tight_layout()
def onclick(event):
print('%s click: button=%d, x=%d, y=%d, xdata=%f, ydata=%f' %
('double' if event.dblclick else 'single', event.button,
event.x, event.y, event.xdata, event.ydata))
ax.text(event.x, event.y, 'HI!')
ax.figure.canvas.draw()
#cid = ax.figure.canvas.mpl_connect('button_press_event', onclick)
return
[docs]def display_igv(url='http://localhost:8888/files/', ref_fasta='', bams=[],
gff_file=None, vcf_file=None):
"""
Display IGV tracks in jupyter, requires the igv_jupyterlab package.
Example usage:
bams = {'24':'results/mapped/24-MBovis.bam'}
igv=display_igv(url='http://localhost:8888/files/', ref_fasta='Mbovis_AF212297.fa',
gff_file='results/Mbovis_AF212297.gb.gff',
vcf_file='results/filtered.vcf.gz', bams=bams)
"""
from igv_jupyterlab import IGV
track_list = []
if gff_file != None:
track_list += [{"name": "Mbovis",
"url": url+gff_file,
"format": "gff",
"type": "annotation",
"height":120,
"indexed": False }
]
colors = random_colors(len(bams))
i=0
for b in bams:
d = {"name": b,
"url":url+bams[b],
"type": "alignment",
"displayMode":"SQUISHED",
"height":110,
"removable":True,
"color":colors[i],
"indexed": True }
track_list.append(d)
i+=1
if vcf_file != None:
v = IGV.create_track(
name="VCF",
url=url+vcf_file,
#format="vcf",
type="variant",
indexed=True
)
track_list.append(v)
genome = IGV.create_genome(
name="Mbovis",
fasta_url=url+ref_fasta,
index_url=url+ref_fasta+'.fai',
tracks=track_list
)
igv = IGV(genome=genome)
#igv.locus="LT708304.1:2283938-2285249"
return igv
bams={'cat':'wicklow_results/mapped/cat-003488.bam',
'cat2018':'wicklow_results/mapped/TB18-001924.bam',
'cat2020':'wicklow_results/mapped/TB20-003486.bam',
'24':'wicklow_results/mapped/24-MBovis.bam'
}