"""
This module contains the utilities for Orca-based applications,
including a class for structural variants and plotting utilities.
"""
import os
import pathlib
import uuid
from copy import deepcopy
from collections import OrderedDict, namedtuple
from bisect import bisect
import matplotlib
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import numpy as np
import pygenometracks.plotTracks
from colormaps import hnh_cmap_ext5, bwcmap
matplotlib.rcParams["pdf.fonttype"] = 42
matplotlib.rcParams["ps.fonttype"] = 42
ORCA_PATH = str(pathlib.Path(__file__).parent.absolute())
def _draw_region(ax, linestart, lineend, color, matlen):
ax.plot(
[-matlen / 50, -matlen / 50],
[matlen * linestart - 0.5 - 0.1, (matlen) * lineend - 0.5 + 0.1],
solid_capstyle="butt",
color=color,
linewidth=8,
zorder=10,
clip_on=False,
)
def _draw_site(ax, linepos, mode, matlen, color="black"):
if mode == "double":
ax.plot(
[-(matlen) / 20, -0.5],
[(matlen) * linepos - (matlen) / 100.0 - 0.5, (matlen) * linepos - 0.5],
color=color,
linewidth=0.2,
zorder=10,
clip_on=False,
)
ax.plot(
[-(matlen) / 20, -0.5],
[(matlen) * linepos + (matlen) / 100.0 - 0.5, (matlen) * linepos - 0.5],
color=color,
linewidth=0.2,
zorder=10,
clip_on=False,
)
elif mode == "single":
ax.plot(
[-(matlen) / 20, -0.5],
[(matlen) * linepos - 0.5, (matlen) * linepos - 0.5],
color=color,
linewidth=0.2,
zorder=10,
clip_on=False,
)
[docs]def genomeplot(
output,
show_genes=False,
show_tracks=False,
show_coordinates=True,
unscaled=False,
file=None,
cmap=None,
unscaled_cmap=None,
colorbar=True,
maskpred=False,
vmin=-1,
vmax=2,
model_labels = ["H1-ESC", "HFF"]
):
"""
Plot the multiscale prediction outputs for 32Mb output.
Parameters
----------
output : dict
The result dictionary to plot as returned by `genomepredict_256Mb`.
show_genes : bool, optional
Default is False. If True, plot the retrieved
gene annotations corresponding to all windows used for the multiscale prediction.
show_tracks : bool, optional
Default is False. If True, plot the retrieved
chromatin tracks for CTCF, chromatin accessibility and histone marks
for all windows used for the multiscale prediction.
show_coordinates : bool, optional
Default is True. If True, annotate the generated plot with the
genome coordinates.
unscaled : bool, optional
Default is False. If True, plot the predictions and observations
without normalizing by distance-based expectation.
file : str or None, optional
Default is None. The output file prefix. No output file is generated
if set to None.
cmap : str or None, optional
Default is None. The colormap for plotting scaled interactions (log
fold over distance-based background). If None, use colormaps.hnh_cmap_ext5.
unscaled_cmap : str or None, optional
Default is None. The colormap for plotting unscaled interactions (log
balanced contact score). If None, use colormaps.hnh_cmap_ext5.
colorbar : bool, optional
Default is True. Whether to plot the colorbar.
maskpred : bool, optional
Default is True. If True, the prediction heatmaps are masked at positions
where the observed data have missing values when observed data are provided
in output dict.
vmin : int, optional
Default is -1. The lowerbound value for heatmap colormap.
vmax : int, optional
Default is 2. The upperbound value for heatmap colormap.
model_labels : list(str), optional
Model labels for plotting. Default is ["H1-ESC", "HFF"].
Returns
-------
None
"""
if cmap is None:
cmap = hnh_cmap_ext5
if unscaled_cmap is None:
unscaled_cmap = hnh_cmap_ext5
n_axes = len(output["predictions"])
try:
assert output["predictions"][0][0].ndim == 2
except:
raise ValueError("Plotting predictions with more than two-dimensions are not supported.")
if output["experiments"] is not None:
n_axes += len(output["predictions"])
fig, all_axes = plt.subplots(figsize=(36, 6 * n_axes), nrows=n_axes, ncols=6)
for row_axes in all_axes:
for ax in row_axes:
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
for i, xlabel in enumerate(["1Mb", "2Mb", "4Mb", "8Mb", "16Mb", "32Mb"]):
all_axes[-1, i].set_xlabel(xlabel, labelpad=20, fontsize=20, weight="black")
if output["experiments"] is not None:
current_axis = 0
for label in model_labels:
for suffix in [" Pred", " Obs"]:
all_axes[current_axis, 0].set_ylabel(
label + suffix,
labelpad=20,
fontsize=20,
weight="black",
rotation="horizontal",
ha="right",
va="center",
)
current_axis += 1
else:
current_axis = 0
for label in model_labels:
for suffix in [" Pred"]:
all_axes[current_axis, 0].set_ylabel(
label + suffix,
labelpad=20,
fontsize=20,
weight="black",
rotation="horizontal",
ha="right",
va="center",
)
current_axis += 1
current_row = 0
for model_i in range(len(output["predictions"])):
for ii, ax in enumerate(reversed(all_axes[current_row])):
s = int(output["start_coords"][ii])
e = int(output["end_coords"][ii])
regionstr = output["chr"] + ":" + str(s) + "-" + str(e)
if show_coordinates:
ax.set_title(regionstr, fontsize=14, pad=4)
if unscaled:
plotmat = output["predictions"][model_i][ii] + np.log(output["normmats"][model_i][ii])
im = ax.imshow(
plotmat,
interpolation="none",
cmap=unscaled_cmap,
vmax=np.max(np.diag(plotmat, k=1)),
)
else:
plotmat = output["predictions"][model_i][ii]
im = ax.imshow(plotmat, interpolation="none", cmap=cmap, vmin=vmin, vmax=vmax)
if output["annos"]:
for r in output["annos"][ii]:
if len(r) == 3:
_draw_region(ax, r[0], r[1], r[2], plotmat.shape[1])
elif len(r) == 2:
_draw_site(ax, r[0], r[1], plotmat.shape[1])
ax.axis([-0.5, plotmat.shape[1] - 0.5, -0.5, plotmat.shape[1] - 0.5])
ax.invert_yaxis()
if maskpred:
if output["experiments"]:
ax.imshow(
np.isnan(output["experiments"][0][ii]), interpolation="none", cmap=bwcmap,
)
current_row += 1
if output["experiments"]:
for ii, ax in enumerate(reversed(all_axes[current_row])):
s = int(output["start_coords"][ii])
e = int(output["end_coords"][ii])
regionstr = output["chr"] + ":" + str(s) + "-" + str(e)
if show_coordinates:
ax.set_title(regionstr, fontsize=14, pad=4)
if unscaled:
plotmat = output["experiments"][model_i][ii] + np.log(output["normmats"][model_i][ii])
im = ax.imshow(
plotmat,
interpolation="none",
cmap=unscaled_cmap,
vmax=np.max(np.diag(plotmat, k=1)),
)
else:
plotmat = output["experiments"][model_i][ii]
im = ax.imshow(plotmat, interpolation="none", cmap=cmap, vmin=vmin, vmax=vmax)
if output["annos"]:
for r in output["annos"][ii]:
if len(r) == 3:
_draw_region(ax, r[0], r[1], r[2], plotmat.shape[1])
elif len(r) == 2:
_draw_site(ax, r[0], r[1], plotmat.shape[1])
ax.axis([-0.5, plotmat.shape[1] - 0.5, -0.5, plotmat.shape[1] - 0.5])
ax.invert_yaxis()
current_row += 1
if colorbar:
fig.colorbar(im, ax=all_axes.ravel().tolist(), fraction=0.02, shrink=0.1, pad=0.005)
if show_genes:
for p in [
ORCA_PATH + "/resources/hg38.refGeneSelectMANE.bed.gz",
ORCA_PATH + "/resources/hg38.refGeneSelectMANE.bed.gz.tbi",
]:
if not os.path.exists(p):
show_genes = False
print(
"`show_genes` is turned off because resource file " + p + " is not available."
)
break
if show_tracks:
for p in [
ORCA_PATH + "/extra/H1_CTCF_ENCFF473IZV.bigWig",
ORCA_PATH + "/extra/H1_RAD21_ENCFF913JGA.bigWig",
ORCA_PATH + "/extra/H1_DNase_ENCFF131HMO.bigWig",
ORCA_PATH + "/extra/H1_H3K4me3_ENCFF623ZAW.bigWig",
ORCA_PATH + "/extra/H1_POLR2A_ENCFF379IRQ.bigWig",
ORCA_PATH + "/extra/H1_H3K27ac_ENCFF423TVA.bigWig",
ORCA_PATH + "/extra/H1_H3K4me1_ENCFF584AVI.bigWig",
ORCA_PATH + "/extra/H1_H3K36me3_ENCFF141YAA.bigWig",
ORCA_PATH + "/extra/H1_H3K27me3_ENCFF912ZUR.bigWig",
ORCA_PATH + "/extra/H1_H3K9me3_ENCFF752UGN.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_CTCF_ENCFF761RHS.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_DNase_ENCFF113YFF.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K4me3_ENCFF442WNT.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K27ac_ENCFF078JZB.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K4me1_ENCFF449DEA.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K36me3_ENCFF954UKB.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K27me3_ENCFF027GWJ.bigWig",
ORCA_PATH + "/extra/foreskin_fibroblast_H3K9me3_ENCFF946TXL.bigWig",
]:
if not os.path.exists(p):
show_tracks = False
print(
"`show_tracks` is turned off because resource file " + p + " is not available."
)
break
if show_genes or show_tracks:
browser_tracks = (
"""
[spacer]
height = 0.5
[x-axis]
where = top
fontsize = 12
[spacer]
height = 0.05
"""
+ (
"""
[test gtf collapsed]
file = {ORCA_PATH}/resources/hg38.refGeneSelectMANE.bed.gz
height = 25
merge_transcripts = true
prefered_name = gene_name
max_labels = 10000
fontsize = 9
file_type = bed
gene_rows = 40
display = stacked
""".format(
ORCA_PATH=ORCA_PATH
)
if show_genes
else ""
)
+ (
"""
[bigwig file test]
file = {ORCA_PATH}/extra/H1_CTCF_ENCFF473IZV.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-CTCF
summary_method = mean
file_type = bigwig
[bigwig file test]
file = {ORCA_PATH}/extra/H1_RAD21_ENCFF913JGA.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-RAD21
summary_method = mean
file_type = bigwig
[bigwig file test]
file = {ORCA_PATH}/extra/H1_DNase_ENCFF131HMO.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-DNase
summary_method = mean
file_type = bigwig
color = #2A6D8F
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K4me3_ENCFF623ZAW.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K4me3
summary_method = mean
file_type = bigwig
color = #E76F51
[bigwig file test]
file = {ORCA_PATH}/extra/H1_POLR2A_ENCFF379IRQ.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-POL2
summary_method = mean
file_type = bigwig
color = #E76F51
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K27ac_ENCFF423TVA.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K27ac
summary_method = mean
file_type = bigwig
color = #F4A261
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K4me1_ENCFF584AVI.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K4me1
summary_method = mean
file_type = bigwig
color = #F4A261
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K36me3_ENCFF141YAA.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K36me3
summary_method = mean
file_type = bigwig
color = #E9C46A
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K27me3_ENCFF912ZUR.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K27me3
summary_method = mean
file_type = bigwig
color = #264653
[bigwig file test]
file = {ORCA_PATH}/extra/H1_H3K9me3_ENCFF752UGN.bigWig
# height of the track in cm (optional value)
height = 2
title = H1-H3K9me3
summary_method = mean
file_type = bigwig
color = #264653
[spacer]
height = 2
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_CTCF_ENCFF761RHS.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-CTCF
summary_method = mean
file_type = bigwig
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_DNase_ENCFF113YFF.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-DNase
summary_method = mean
file_type = bigwig
color = #2A6D8F
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K4me3_ENCFF442WNT.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K4me3
summary_method = mean
file_type = bigwig
color = #E76F51
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K27ac_ENCFF078JZB.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K27ac
summary_method = mean
file_type = bigwig
color = #F4A261
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K4me1_ENCFF449DEA.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K4me1
summary_method = mean
file_type = bigwig
color = #F4A261
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K36me3_ENCFF954UKB.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K36me3
summary_method = mean
file_type = bigwig
color = #E9C46A
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K27me3_ENCFF027GWJ.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K27me3
summary_method = mean
file_type = bigwig
color = #264653
[bigwig file test]
file = {ORCA_PATH}/extra/foreskin_fibroblast_H3K9me3_ENCFF946TXL.bigWig
# height of the track in cm (optional value)
height = 2
title = HFF-H3K9me3
summary_method = mean
file_type = bigwig
color = #264653
""".format(
ORCA_PATH=ORCA_PATH
)
if show_tracks
else ""
)
)
filename = str(uuid.uuid4())
with open(f"/dev/shm/{filename}.ini", "w") as fh:
fh.write(browser_tracks)
gbfigs = []
for ii, label in enumerate(["32Mb", "16Mb", "8Mb", "4Mb", "2Mb", "1Mb"]):
regionstr = (
output["chr"]
+ ":"
+ str(int(output["start_coords"][ii]))
+ "-"
+ str(int(output["end_coords"][ii]))
)
args = (
f"--tracks /dev/shm/{filename}.ini --region {regionstr} "
"--trackLabelFraction 0.03 --width 40 --dpi 10 "
f"--outFileName /dev/shm/{filename}.png --title {label}".split()
)
_ = pygenometracks.plotTracks.main(args)
gbfigs.append(plt.gcf())
os.remove(f"/dev/shm/{filename}.png")
os.remove(f"/dev/shm/{filename}.ini")
if file is not None:
with PdfPages(file) as pdf:
pdf.savefig(fig, dpi=300)
plt.show()
with PdfPages((".").join(file.split(".")[:-1]) + ".anno.pdf") as pdf:
for fig in reversed(gbfigs):
pdf.savefig(fig)
else:
if file is not None:
with PdfPages(file) as pdf:
pdf.savefig(fig, dpi=300)
plt.close("all")
[docs]def genomeplot_256Mb(
output,
show_coordinates=True,
unscaled=False,
file=None,
cmap=None,
unscaled_cmap=None,
colorbar=True,
maskpred=True,
vmin=-1,
vmax=2,
model_labels=["H1-ESC", "HFF"]
):
"""
Plot the multiscale prediction outputs for 256Mb output.
Parameters
----------
output : dict
The result dictionary to plot as returned by `genomepredict_256Mb`.
show_coordinates : bool, optional
Default is True. If True, annotate the generated plot with the
genome coordinates.
unscaled : bool, optional
Default is False. If True, plot the predictions and observations
without normalizing by distance-based expectation.
file : str or None, optional
Default is None. The output file prefix. No output file is generated
if set to None.
cmap : str or None, optional
Default is None. The colormap for plotting scaled interactions (log
fold over distance-based background). If None, use colormaps.hnh_cmap_ext5.
unscaled_cmap : str or None, optional
Default is None. The colormap for plotting unscaled interactions (log
balanced contact score). If None, use colormaps.hnh_cmap_ext5.
colorbar : bool, optional
Default is True. Whether to plot the colorbar.
maskpred : bool, optional
Default is True. If True, the prediction heatmaps are masked at positions
where the observed data have missing values when observed data are provided
in output dict.
vmin : int, optional
Default is -1. The lowerbound value for heatmap colormap.
vmax : int, optional
Default is 2. The upperbound value for heatmap colormap.
model_labels : list(str), optional
Model labels for plotting. Default is ["H1-ESC", "HFF"].
Returns
-------
None
"""
if cmap is None:
cmap = hnh_cmap_ext5
if unscaled_cmap is None:
unscaled_cmap = hnh_cmap_ext5
n_axes = len(output["predictions"])
try:
assert output["predictions"][0][0].ndim == 2
except:
raise ValueError("Plotting predictions with more than two-dimensions are not supported.")
if output["experiments"] is not None:
n_axes += len(output["predictions"])
fig, all_axes = plt.subplots(figsize=(24, 6 * n_axes), nrows=n_axes, ncols=4)
for row_axes in all_axes:
for ax in row_axes:
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
for i, xlabel in enumerate(["32Mb", "64Mb", "128Mb", "256Mb"]):
all_axes[-1, i].set_xlabel(xlabel, labelpad=20, fontsize=20, weight="black")
if output["experiments"] is not None:
current_axis = 0
for label in model_labels:
for suffix in [" Pred", " Obs"]:
all_axes[current_axis, 0].set_ylabel(
label + suffix,
labelpad=20,
fontsize=20,
weight="black",
rotation="horizontal",
ha="right",
va="center",
)
current_axis += 1
else:
current_axis = 0
for label in model_labels:
for suffix in [" Pred"]:
all_axes[current_axis, 0].set_ylabel(
label + suffix,
labelpad=20,
fontsize=20,
weight="black",
rotation="horizontal",
ha="right",
va="center",
)
current_axis += 1
def _plot_pred(ii, ax, model_i, key="predictions", maskpred=False):
s = int(output["start_coords"][ii])
e = int(output["end_coords"][ii])
padlen = int(output["start_coords"][ii] + 256000000 / 2 ** (ii)) - e
if padlen > 0 and output["padding_chr"] is not None:
regionstr = (
output["chr"]
+ ":"
+ str(s)
+ "-"
+ str(e)
+ "; "
+ output["padding_chr"]
+ ":0-"
+ str(padlen)
)
else:
regionstr = output["chr"] + ":" + str(s) + "-" + str(e)
if show_coordinates:
ax.set_title(regionstr, fontsize=14, pad=4)
if unscaled:
plotmat = output[key][model_i][ii] + np.log(output["normmats"][model_i][ii])
im = ax.imshow(
plotmat,
interpolation="none",
cmap=unscaled_cmap,
vmax=np.max(np.diag(plotmat, k=1)),
)
else:
plotmat = output[key][model_i][ii]
im = ax.imshow(plotmat, interpolation="none", cmap=cmap, vmin=vmin, vmax=vmax)
if padlen > 0:
# draw chr boundary
chrlen_ratio = 1 - padlen / (256000000 / 2 ** (ii))
ax.plot(
[chrlen_ratio * plotmat.shape[0] - 0.5, chrlen_ratio * plotmat.shape[0] - 0.5],
[-0.5, plotmat.shape[0] - 0.5],
color="black",
linewidth=0.2,
zorder=10,
)
ax.plot(
[-0.5, plotmat.shape[0] - 0.5],
[chrlen_ratio * plotmat.shape[0] - 0.5, chrlen_ratio * plotmat.shape[0] - 0.5],
color="black",
linewidth=0.2,
zorder=10,
)
if output["annos"]:
for r in output["annos"][ii]:
if len(r) == 3:
_draw_region(ax, r[0], r[1], r[2], plotmat.shape[1])
elif len(r) == 2:
_draw_site(ax, r[0], r[1], plotmat.shape[1])
ax.axis([-0.5, plotmat.shape[1] - 0.5, -0.5, plotmat.shape[1] - 0.5])
ax.invert_yaxis()
if maskpred:
if output["experiments"]:
ax.imshow(
np.isnan(output["experiments"][0][ii]), interpolation="none", cmap=bwcmap,
)
return im
current_row = 0
for model_i in range(len(output["predictions"])):
for ii, ax in enumerate(reversed(all_axes[current_row])):
im = _plot_pred(ii, ax, model_i, key="predictions", maskpred=maskpred)
current_row += 1
if output["experiments"]:
for ii, ax in enumerate(reversed(all_axes[current_row])):
im = _plot_pred(ii, ax, model_i, key="experiments")
current_row += 1
if colorbar:
fig.colorbar(im, ax=all_axes.ravel().tolist(), fraction=0.02, shrink=0.1, pad=0.005)
if file is not None:
with PdfPages(file) as pdf:
pdf.savefig(fig, dpi=300)
plt.close("all")
GRange = namedtuple("GRange", ["chr", "start", "end", "strand"])
LGRange = namedtuple("LGRange", ["len", "ref"])
[docs]class StructuralChange2(object):
"""
This class stores and manupulating structural changes for a single
chromosome and allow querying the mutated chromosome by coordinates by providing
utilities for retrieving the corresponding reference genome segments.
The basic operations that StructuralChange2 supports are duplication, deletion,
inversion, insertion, and concatenation. StructuralChange2 objects can be concatenated with '+'
operator, this operation allows concatenating two chromosomes. '+' can be combined with
other basic operations to create fused chromosomes.
These operations can be used sequentially to introduce arbitrarily complex
structural changes. However, note that the coordinates are dynamically updated
after each operation reflecting the current state of the chromosome, thus coordinates
specified in later operation must take into account of the effects of all previous
operations.
Parameters
----------
chr_name : str
Name of the reference chromosome.
length : int
The length of the reference chromosome.
Attributes
-------
segments : list(LGRange)
List of reference genome segments that constitute the (mutated)
chromosome. Each element is a LGRange namedtuple (length and a
GRange namedtuple (chr: str, start: int, end: int, strand: str)).
chr_name : str
Name of the chromosome
coord_points : list(int)
Stores `N+1` key coordinates where `N` is the number of segments. The
key coordinates are 0, segment junction positions, and chromosome end
coordinate. `coord_points` reflects the current state of the chromosome.
"""
def __init__(self, chr_name, length):
self.segments = [LGRange(length, GRange(chr_name, 0, length, "+"))]
self.chr_name = chr_name
self.coord_points = [0, length]
def _coord_sync(self):
self.coord_points = [0]
for seg in self.segments:
self.coord_points.append(self.coord_points[-1] + seg.len)
def _split(self, pos):
segind = bisect(self.coord_points, pos) - 1
segstart = self.coord_points[segind]
if pos != segstart:
# split segment
ref_chr, ref_s, ref_e, ref_strand = self.segments[segind].ref
if ref_strand == "+":
ref_1 = GRange(ref_chr, ref_s, ref_s + pos - segstart, "+")
ref_2 = GRange(ref_chr, ref_s + pos - segstart, ref_e, "+")
else:
ref_1 = GRange(ref_chr, ref_e - (pos - segstart), ref_e, "-")
ref_2 = GRange(ref_chr, ref_s, ref_e - (pos - segstart), "-")
self.segments[segind] = LGRange(ref_1.end - ref_1.start, ref_1)
self.segments.insert(segind + 1, LGRange(ref_2.end - ref_2.start, ref_2))
self._coord_sync()
def __add__(self, b):
a = deepcopy(self)
a.segments = a.segments + b.segments
alen = len(a.coord_points)
a.coord_points = a.coord_points + b.coord_points[1:]
for i in range(alen, len(a.coord_points)):
a.coord_points[i] += a.coord_points[alen - 1]
return a
[docs] def duplicate(self, start, end):
"""
Duplicate a genomic region.
"""
# start and end in cgenome coordinates
self._split(start)
self._split(end)
ind_s = bisect(self.coord_points, start) - 1
ind_e = bisect(self.coord_points, end) - 1
for i, seg in enumerate(self.segments[ind_s:ind_e]):
self.segments.insert(ind_e + i, deepcopy(seg))
self._coord_sync()
[docs] def insert(self, start, length, strand="+", name=None):
"""
Insert a genomic sequence with given length.
"""
self._split(start)
ind_s = bisect(self.coord_points, start) - 1
if not name:
name = "ins" + str(start) + "_" + str(length)
self.segments.insert(ind_s, LGRange(length, GRange(name, 0, length, strand)))
self._coord_sync()
[docs] def delete(self, start, end):
"""
Delete a genomic region.
"""
# start and end in cgenome coordinates
self._split(start)
self._split(end)
ind_s = bisect(self.coord_points, start) - 1
ind_e = bisect(self.coord_points, end) - 1
del self.segments[ind_s:ind_e]
self._coord_sync()
[docs] def invert(self, start, end):
"""
Invert a genomic region.
"""
# start and end in cgenome coordinates
self._split(start)
self._split(end)
ind_s = bisect(self.coord_points, start) - 1
ind_e = bisect(self.coord_points, end) - 1
self.segments[ind_s:ind_e] = self.segments[ind_s:ind_e][::-1]
for i in range(ind_s, ind_e):
self.segments[i] = LGRange(
self.segments[i].len,
GRange(
self.segments[i].ref.chr,
self.segments[i].ref.start,
self.segments[i].ref.end,
"-" if self.segments[i].ref.strand == "+" else "-",
),
)
self._coord_sync()
[docs] def query(self, start, end):
"""
Retrieve the segments in the reference genome that constitute
the specified interval in the mutated genome.
"""
ind_s = bisect(self.coord_points, start) - 1
ind_e = bisect(self.coord_points, end)
ref_coords = [deepcopy(seg.ref) for seg in self.segments[ind_s:ind_e]]
if ref_coords[0]:
if ref_coords[0].strand == "+":
ref_coords[0] = GRange(
ref_coords[0].chr,
ref_coords[0].start + start - self.coord_points[ind_s],
ref_coords[0].end,
ref_coords[0].strand,
)
else:
ref_coords[0] = GRange(
ref_coords[0].chr,
ref_coords[0].start,
ref_coords[0].end - (start - self.coord_points[ind_s]),
ref_coords[0].strand,
)
# when end exceeds length
if ind_e == len(self.coord_points):
if end > self.coord_points[-1]:
print(f"Warning: query end {end} exceed limit {self.coord_points[-1]}!")
else:
if ref_coords[-1]:
if ref_coords[-1].strand == "+":
ref_coords[-1] = GRange(
ref_coords[-1].chr,
ref_coords[-1].start,
ref_coords[-1].end - (self.coord_points[ind_e] - end),
ref_coords[-1].strand,
)
else:
ref_coords[-1] = GRange(
ref_coords[-1].chr,
ref_coords[-1].start + (self.coord_points[ind_e] - end),
ref_coords[-1].end,
ref_coords[-1].strand,
)
return ref_coords
[docs] def query_ref(self, chr_name, start, end):
"""
Retrieve regions that correspond to the specified reference genome interval
in the mutated genome.
"""
current_coords = []
ref_coords = []
for i, (seglen, refseg) in enumerate(self.segments):
if refseg.chr == chr_name:
if start < refseg.end or end >= refseg.start:
ref_coords.append(
[
np.clip(start, refseg.start, refseg.end),
np.clip(end, refseg.start, refseg.end),
]
)
if refseg.strand == "+":
current_coords.append(
[
self.coord_points[i] + np.clip(start - refseg.start, 0, seglen),
self.coord_points[i] + np.clip(end - refseg.start, 0, seglen),
"+",
]
)
else:
current_coords.append(
[
self.coord_points[i + 1] - np.clip(start - refseg.start, 0, seglen),
self.coord_points[i + 1] - np.clip(end - refseg.start, 0, seglen),
"-",
]
)
return ref_coords, current_coords
def __getitem__(self, key):
if isinstance(key, slice):
return self.query(key.start, key.stop)
[docs]def process_anno(anno_scaled, base=0, window_radius=16000000):
"""
Process annotations to the format used by Orca plotting
functions such as `genomeplot` and `genomeplot_256Mb`.
Parameters
----------
anno_scaled : list(list(...))
List of annotations. Each annotation can be a region specified by
`[start: int, end: int, info:str]` or a position specified by
`[pos: int, info:str]`.
Acceptable info strings for region currently include color names for
matplotlib. Acceptable info strings for position are currently
'single' or 'double', which direct whether the annotation is drawn
by single or double lines.
base : int
The starting position of the 32Mb (if window_radius is 16000000)
or 256Mb (if window_radius is 128000000) region analyzed.
window_radius : int
The size of the region analyzed. It must be either 16000000 (32Mb region)
or 128000000 (256Mb region).
Returns
-------
annotation : list
Processed annotations with coordinates transformed to relative coordinate
in the range of 0-1.
"""
annotation = []
for r in anno_scaled:
if len(r) == 3:
annotation.append(
[(r[0] - base) / (window_radius * 2), (r[1] - base) / (window_radius * 2), r[2],]
)
elif len(r) == 2:
annotation.append([(r[0] - base) / (window_radius * 2), r[1]])
else:
raise ValueError
return annotation
[docs]def coord_clip(pos, chrlen, binsize=128000, window_radius=16000000):
"""
Clip the coordinate to make sure that full window
centered at the coordinate to stay within chromosome boundaries.
coord_clip also try to preserve the relative position of the coordinate
to the grid as specified by binsize whenever possible.
Parameters
----------
x : int or numpy.ndarray
Coordinates to round.
gridsize : int
The gridsize to round by
Returns
-------
int
The clipped coordinate
"""
if pos < binsize or pos > chrlen - binsize:
return np.clip(pos, window_radius, chrlen - window_radius)
else:
if (chrlen - window_radius) % binsize > pos % binsize:
endclip = chrlen - window_radius - ((chrlen - window_radius) % binsize - pos % binsize)
else:
endclip = (
chrlen
- window_radius
- binsize
- ((chrlen - window_radius) % binsize - pos % binsize)
)
return np.clip(pos, window_radius + pos % binsize, endclip)
[docs]def coord_round(x, gridsize=4000):
"""
Round coordinate to multiples of gridsize.
Parameters
----------
x : int or numpy.ndarray
Coordinates to round.
gridsize : int
The gridsize to round by
Returns
-------
int
The rounded coordinate
"""
return x - x % gridsize