Newer
Older
import pandas as pd # type: ignore
from tqdm import tqdm # type: ignore
LOG = logging.getLogger(__name__)
def read_abundances(transcripts_file: str) -> pd.DataFrame:
"""Read transcript-abundance file into dataframe.
A pd.DataFrame with the transcript abundances ("id", "count").
"""
cols: list = ["id", "count"]
if transcripts_file.endswith(".tsv"):
return pd.read_table(transcripts_file, header=None, names=cols)
return pd.read_csv(transcripts_file, header=None, names=cols)
raise ValueError("File type needs to be either csv or tsv")
def filter_df(gtf_df: pd.DataFrame, transcripts: list) -> pd.DataFrame:
"""Filter annotations to include only exons \
with the highest transcript support level, i.e. TSL1.
`free_text` column is filtered to include the string \
denoting the highest transcript support level
If a list of transcript IDs is given, `free_text` column \
is filtered to include one of the IDs.
df: A pd.DataFrame containing an unparsed gtf-file
A pd.DataFrame containing only rows with exon annotations \
of highest transcript support level and,
if provided, belonging to one of the given transcripts
df_filter = gtf_df[
(gtf_df["feature"] == "exon")
& (gtf_df["free_text"].str.contains('transcript_support_level "1'))
df_filter = df_filter[df_filter["free_text"].str.contains(
def str_to_dict(gene_string: str) -> dict:
Split string based on delimiter ';' into items, remove empty items and \
split items on delimiter ' ' into
key/value pairs. Remove quotes from value strings and create a dictionary.
Args:
s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'
A dictionary containing e.g. \
{'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
# split into items
# remove empty items
# split items into key/value pairs
item_list: list = [x.split() for x in gene_string.split(";") if len(x) > 0]
# remove quotes for values and return dictionary
return {item[0]: item[1].strip('"') for item in item_list}
def dict_to_str(gene_dict: dict) -> str:
Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
and returns string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
Key/value pairs are joined by space to form an item and items are \
joinded by ';' to form a string.
If a value is Not a Number (nan), the key/value pair is omitted \
from the string.
d: A dictionary of the form {'gene_id': 'GENE1', \
'transcript_id': 'TRANSCRIPT1'}
Returns:
A string, e.g. 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
# join key, value pairs from dictionary with a space in a list,
# then join items in list by ;
# end on ;
gene_string: str = "; ".join(
[f'{key} "{value}"' for key, value in gene_dict.items()]
) + ";"
return gene_string
def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
"""Reverse parsing of gtf based pd.DataFrame to include only columns that \
are well defnined by gtf-file standards.
The first 8 defined columns are constant as defined by gtf-file standards.
Further columns are assumed to be parsed free-text columns \
(see Gtf.parse_free_text()).
The parsed free-text columns are aggregated as a dictionary and \
the dictionry is parsed as a string in gtf format.
A pd.DataFrame with the columns as defined by gtf-file standards.
# Define pd.DataFrame containing only parsed free-text columns
# Define pd.DataFrame containing only non-parsed columns
df_non_parsed = df_all.iloc[:, :8]
# Reverse parsing of free-text columns and add the result as column \
# `free_text` to output pd.DataFrame
df_non_parsed["free_text"] = df_free_text.agg(
pd.Series.to_dict, axis=1
).apply(dict_to_str)
return df_non_parsed
def write_gtf(gtf_df: pd.DataFrame, filename: str) -> None:
Makes sure data types are correct and saves object in gtf format.
gtf_df: A pd.DataFrame containing a gtf-file.
# Make sure the data types are correct.
gtf_df = gtf_df.astype(Gtf.dtypes)
filename,
sep="\t",
header=False,
index=False,
quotechar="'",
mode="a",
)
def write_header(annotations_file: str) -> None:
"""Write the header of an annotations file, consisting of the \
tab delimited column names.
with open(annotations_file, "w", encoding="utf_8") as file_header:
file_header.write("\t".join(Gtf.dtypes.keys()) + "\n")
"""Class to read transcripts annotations file into a Gtf object.
dtypes: A dictionary containing column names and respective data types.
parsed: A boolean indicating if the pd.DataFrame is parsed.
original_columns: A list of columns not touched by parsing.
free_text_columns: A list of columns created during parsing \
of column `free_text`.
"""
dtypes = {
"seqname": object,
"source": object,
"feature": object,
"start": int,
"end": int,
"score": object,
"strand": object,
"frame": object,
"free_text": object,
}
def __init__(self):
self.parsed = False
self.original_columns = list(self.dtypes.keys())
self.free_text_columns = []
def read_file(self, annotations_file: str) -> None:
Iterate over chunks of the gtf-file reading 100000 rows at a time.
Filter chunks for exon annotations of the highest transcript support
level. Concatenate chunks to get resulting pd.DataFrame.
if not annotations_file.endswith("gtf"):
raise ValueError("File type needs to be gtf")
reader = pd.read_table(
annotations_file,
sep="\t",
comment="#",
names=list(self.dtypes.keys()),
dtype=self.dtypes,
chunksize=100000,
iterator=True,
)
self.data_frame = pd.concat(
[filter_df(chunk, transcripts=[]) for chunk in reader]
)
def from_dataframe(self, gtf_df: pd.DataFrame) -> None:
"""Initialize Gtf object from pandas Dataframe.
Part of initialization is:
Set dataframe attribute
Check which columns belong to the free-text part of the gtf-file.
Check if there are no columns called free-text and if so, sets \
the value of parsed attribute to TRUE.
gtf_df: A pd.DataFrame containing a gtf-file.
col for col in gtf_df.columns if col not in self.original_columns
self.data_frame = gtf_df
if "free_text" not in gtf_df.columns:
"""Parse key/value pairs from `free_text` column into column `key` \
with row entry `value`.
Creates a dataframe with columns for keys in the free-text column \
instead of `free_text` column.
# create dataframe with columns for values in free_text column
df_free_text = self.data_frame["free_text"].map(
str_to_dict
).apply(pd.Series)
# remember which columns come from free_text
self.free_text_columns = df_free_text.columns
# join free_text columns to original dataframe and \
# drop the "free_text" column itself
self.data_frame = self.data_frame.drop("free_text", axis=1)
self.original_columns = self.data_frame.columns
self.data_frame = self.data_frame.join(df_free_text, how="inner")
# remember that current dataframe is parsed, \
# i.e. can't be written in gtf format
self.parsed = True
def reverse_parse_free_text(self):
Creates a data frame that can be written in gtf format to file.
Parsed free-text columns are aggregated
into `free_text` column according to gtf format specification.
"""
# create dataframe with only free_text columns
df_free_text = self.data_frame[self.free_text_columns]
# filter current dataframe to only original columns, \
# except "free_text" column
self.data_frame = self.data_frame[self.original_columns]
# undo parsing and save result in "free_text" column
self.data_frame["free_text"] = df_free_text.agg(
pd.Series.to_dict, axis=1
).apply(dict_to_str)
# remember that current dataframe is not parsed
self.parsed = False
def pick_transcript(self, transcript_id: str) -> pd.DataFrame:
return self.data_frame.query(f"transcript_id == '{transcript_id}'")
def __new__(
cls,
transcript_id: str,
transcript_count: int,
transcript_df: pd.DataFrame,
prob_inclusion: float,
):
"""Initialize TranscriptGenerator object."""
strands = transcript_df["strand"].unique()
if len(transcript_df) == 0:
LOG.warning(
"Transcript %s can't be sampled. "
"Annotation is missing or TSL is not 1.", transcript_id
)
instance = None
elif len(strands) > 1:
LOG.warning(
"Transcript %s can't be sampled. Transcript generator is "
"not implemented for transcripts with exons annotated on "
"different strands", transcript_id,
)
instance = None
else:
instance = super().__new__(cls)
return instance
def __init__(
self,
transcript_id: str,
transcript_count: int,
transcript_df: pd.DataFrame,
prob_inclusion: float,
):
self.ts_id = transcript_id
self.data_frame = transcript_df
self.strand = self.data_frame["strand"].unique().item()
def get_inclusions(self) -> np.ndarray:
Each column corresponds to one sample and the number of columns \
corresponds to the number of samples.
inclusion_arr = np.random.rand(
self.no_exons, self.count
) < self.prob_inclusion
if self.strand == "+":
inclusion_arr[-1, :] = False
elif self.strand == "-":
inclusion_arr[-1, :] = False
return inclusion_arr
def get_unique_inclusions(self) -> tuple[list, np.ndarray, np.ndarray]:
"""Inclusion of unique intron inclusion via arrays and counts and \
name generation of each unique count.
- A boolean np.array where columns correspond to generated \
transcripts and rows to intron inclusion.
- A np.array containing sample number per generated inclusions, \
i.e. transcript.
inclusion_arr = self.get_inclusions()
# Unique intron inclusion arrays and counts
inclusion_arr_unique, counts = np.unique(
inclusion_arr, axis=1, return_counts=True
)
# Name for each generated transcript
names = []
for i in range(inclusion_arr_unique.shape[1]):
if np.all(inclusion_arr_unique[:, i] is False, axis=0):
names.append(f"{self.ts_id}_{i}")
return names, inclusion_arr_unique, counts
def get_df(
self, inclusions: np.ndarray, transcript_id: str
) -> pd.DataFrame:
"""Take as input a dataframe filtered to one transcript and \
a boolean vector denoting intron inclusions.
inclusions: A boolean vector denoting intron inclusion.
transcript_id: The transcript id.
df_generated = self.data_frame.copy()
inclusions,
df_generated["start"].shift(periods=-1, fill_value=-1) - 1,
inclusions,
df_generated["end"].shift(periods=-1, fill_value=-1) + 1,
)
original_id = df_generated["exon_id"]
df_generated["exon_id"] = np.where(
inclusions,
df_generated["exon_id"] + "_" + np.arange(
len(df_generated)
).astype(str),
original_id,
)
df_generated["transcript_id"] = transcript_id
return df_generated
"""Write transcripts to file.
Args:
ids, _, counts = self.get_unique_inclusions()
with open(filename, "a", encoding="utf_8") as file_handle:
for transcript_id, transcript_count in zip(ids, counts):
file_handle.write(
f"{transcript_id},{self.ts_id},{transcript_count}\n"
)
"""Generate a annotations in gtf format for sampled transcript.
ValueError: If given transcript ID could not be sampled.
ids, inclusions, _ = self.get_unique_inclusions()
data_frame = pd.concat(
[self.get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
data_frame = reverse_parse_free_text(data_frame)
write_gtf(data_frame, filename)
LOG.debug("Transcript %s sampled", self.ts_id)
def sample_transcripts(
input_transcripts_file: str,
input_annotations_file: str,
prob_inclusion: float,
output_transcripts_file: str,
output_annotations_file: str,
):
"""Read input files, iterate over transcript IDs, \
sample each transcript and save results.
input_transcripts_file: Filename of transcript abundances, \
needs to be csv or tsv.
input_annotations_file: Filename of annotations, \
needs to be gtf.
prob_inclusion: Probability of intron inclusion, \
needs to be float in range [0,1].
output_transcripts_file: Filename of file to write \
sampled transcripts to.
output_annotations_file: Filename of file to write \
generated annotations to.
LOG.info("Probability of intron inclusion: %s", str(prob_inclusion))
LOG.info("Parsing transcript abundances...")
transcripts = read_abundances(input_transcripts_file)
# Make sure that abundance is positive
transcripts = transcripts[transcripts["count"] > 0]
LOG.info("Done parsing...")
annotations = Gtf()
annotations.read_file(input_annotations_file)
LOG.info("Start sampling transcripts...")
# Set up output file, write header once and append data in loop
write_header(output_annotations_file)
for _, row in tqdm(transcripts.iterrows()):
transcript_id = row["id"]
transcript_count = row["count"]
transcript_df = annotations.pick_transcript(transcript_id)
transcript_generator = TranscriptGenerator(
transcript_id,
transcript_count,
transcript_df,
prob_inclusion=prob_inclusion,
)
transcript_generator.write_annotations(output_annotations_file)
transcript_generator.write_sequences(output_transcripts_file)