Newer
Older
LOG = logging.getLogger(__name__)
def read_abundances(transcripts_file: str) -> pd.DataFrame:
"""Read abundance file into dataframe
Args:
transcripts_file (str): Input filename
Returns:
pd.DataFrame: Transcript abundances ("id", "count")
Raises:
ValueError: When the input file is neither csv or tsv
"""
cols: list = ["id", "count"]
if transcripts_file.endswith(".tsv"):
return pd.read_table(transcripts_file, header=None, names=cols)
elif transcripts_file.endswith(".csv"):
return pd.read_csv(transcripts_file, header=None, names=cols)
else:
raise ValueError("File type needs to be either csv or tsv")
def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame:
"""Filters inclusions of exons and the highest transcript support level (TSL1)
Data is filtered from the pd.DataFrame to include the string entery of "exons", based on the number of transcripts
it will choose the transcript with the highest transcript support level (TSL1-5). It will filter a list of transcript
IDs if that is given as an input.
Args:
df: pd.DataFrame, transcript: list
Returns:
df_filter: filter strings from pd.DataFrame ("exons", "transcript_support_level "1"")
"""
# Filter annotations to exon and highest transcript support level.
# If list of transcript ids is given, filter for that as well.
df_filter = df[
(df["feature"] == "exon")
& (df["free_text"].str.contains('transcript_support_level "1"'))
]
if len(transcripts) > 0:
df_filter = df_filter.str.contains("|".join(transcripts), regex=True)
return df_filter
def str_to_dict(s: str) -> dict:
"""Split between key/value pairs
Creates a dictionary based on the split between key and value pairs from the item_list
Also removes quotes values, empty list items and then returns the dictionary
# split between key/value pairs
# remove empty list items and split key, value pairs
item_list: list = [x.split() for x in s.split(";") if len(x) > 0]
# remove quotes for values and return dictionary
return {item[0]: item[1].strip('"') for item in item_list}
def dict_to_str(d: dict) -> str:
"""Joins a key/value pair with a space in a list
Takes key/value pairs from a dictionary and joins them with a space on a list
Joins items from said list that are marked with ; and end with ;
Checks if the value is Not a Number (nan)
Args:
"key", "value" (str)
Returns:
# join key, value pairs from dictionary with a space in a list,
# then join items in list by ;
# end on ;
# check if value is nan
s: str = (
"; ".join([f'{key} "{value}"' for key, value in d.items() if value == value])
+ ";"
)
return s
def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
"""Creates columns that are well defnined by .gtf file standards
The first 8 defined columns are constant as defined by gtf file standards
Further columns are assumed to be free text columns and superflous
Args:
Returns:
DataFrame with 8 columns as defined by gtf file standards
"""
# the first 8 columns should be constant according to gtf file standard
# we assume that further columns are parsed free text columns
df_free_text = df_all.iloc[:, 8:]
df = df_all.iloc[:, :8]
df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str)
return df
def write_gtf(df: pd.DataFrame, filename: str) -> None:
"""Checks all data types in the pd.DataFrame
Goes through the updated pd.DataFrame after formatting to gtf file standards
and checks if the data types have been formatted correctly.
Args:
Types ("filename", "sep", "header", "index", "quoting", "quotechar", "mode")
Filename: str
Returns:
DataFrame defined correctly via gtf.dtypes
"""
# Make sure the data types are correct.
df = df.astype(Gtf.dtypes)
df.to_csv(
filename,
sep="\t",
header=False,
index=False,
quoting=None,
quotechar="'",
mode="a",
)
def write_header(annotations_file: str) -> None:
"""Opens up an annotation file with the datatypes defined as correct
Args:
Returns:
"""
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
with open(annotations_file, "w") as fh:
fh.write("\t".join(Gtf.dtypes.keys()) + "\n")
class Gtf:
"""Class to read transcripts annotations file and parse it into a pandas Dataframe.
Args:
annotations_file: Path to gtf file.
Attributes:
annotations_file: File with transcript annotation of the genome
"""
dtypes = {
"seqname": object,
"source": object,
"feature": object,
"start": int,
"end": int,
"score": object,
"strand": object,
"frame": object,
"free_text": object,
}
def __init__(self):
self.parsed = False
self.original_columns = list(self.dtypes.keys())
self.free_text_columns = []
def read_file(self, annotations_file: str) -> None:
"""Defines a limit for larger input Files. Iterates lines and Filters on bool.
If the chuncksize of the inputed annotation file is larger than 100000 it will
iterate over the lines and filters before saving.
Args:
Returns:
If the file chunk is over a certain size it will reiterate the lines and files.
Raises:
ValueError: The file type is required to be .gtf
"""
# for large annotation files, iterate over lines and filter before saving to dataframe
if not annotations_file.endswith("gtf"):
raise ValueError("File type needs to be gtf")
reader = pd.read_table(
annotations_file,
sep="\t",
comment="#",
names=self.dtypes.keys(),
dtype=self.dtypes,
chunksize=100000,
iterator=True,
)
self.df = pd.concat([filter_df(chunk) for chunk in reader])
def from_dataframe(self, df: pd.DataFrame) -> None:
""" Initializes Gtf object from pandas Dataframe.
Part of initialization is:
Set dataframe attribute
Check which columns belong to the free-text part of the GTF-file.
Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE.
Args:
df: DataFrame
Returns:
None
"""
self.free_text_columns = [
col for col in df.columns if col not in self.original_columns
]
self.df = df
if not "free_text" in df.columns:
self.parsed = True
def parse_free_text(self):
"""Creates a self DataFrame with columns for parsed free text
Creates a dataframe with columns for values in the free text column and then joins
the free_text column to orginal dataframe. Drops the free_text column itself.
Args:
Returns:
Parsed DataFrame with free_text column joined with orginal and dropped.
"""
assert self.parsed == False
# create dataframe with columns for values in free_text column
df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series)
# remember which columns come from free_text
self.free_text_columns = df_free_text.columns
# join free_text columns to original dataframe and drop the "free_text" column itself
self.df = self.df.drop("free_text", axis=1)
self.original_columns = self.df.columns
self.df = self.df.join(df_free_text, how="inner")
# remember that current dataframe is parsed, i.e. can't be written in gtf format
self.parsed = True
def reverse_parse_free_text(self):
"""Creates a reversed self DataFrame with columns for non parsed free text
Creates a data frame with only free_text columns and then filters current dataframe down
to only orginal_columns leaving the free_text column untouched. The parsing is undone and the results
saved in the free_text column and defined as non parsed.
assert self.parsed == True
# create dataframe with only free_text columns
df_free_text = self.df[self.free_text_columns]
# filter current dataframe to only original columns, except "free_text" column
self.df = self.df[self.original_columns]
# undo parsing and save result in "free_text" column
self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(
dict_to_str
)
# remember that current dataframe is not parsed
self.parsed = False
def pick_transcript(self, transcript_id: str) -> pd.DataFrame:
return self.df.query(f"transcript_id == '{transcript_id}'")
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class TranscriptGenerator:
def __init__(
self,
transcript_id: str,
transcript_count: int,
transcript_df: pd.DataFrame,
prob_inclusion: float,
):
assert len(transcript_df) > 0
assert transcript_count > 0
assert (prob_inclusion >= 0) and (prob_inclusion <= 1)
self.id = transcript_id
self.count = transcript_count
self.df = transcript_df
self.no_exons = len(transcript_df)
self.strand = self.df["strand"].unique().item()
self.prob_inclusion = prob_inclusion
def _get_inclusions(self) -> np.array:
"""Generate inclusions array where each column corresponds to one sample and the number of columns corresponds to the number of samples.
Returns:
np.array: inclusions, where True means intron inclusion
"""
inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion
if self.strand == "+":
inclusion_arr[-1, :] = False
elif self.strand == "-":
inclusion_arr[-1, :] = False
return inclusion_arr
def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]:
"""Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count.
Args:
Returns:
Bool: If true include unique intron array and count and create unique name and count.
"""
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
inclusion_arr = self._get_inclusions()
# Unique intron inclusion arrays and counts
inclusion_arr_unique, counts = np.unique(
inclusion_arr, axis=1, return_counts=True
)
# Name for each generated transcript
names = []
for i in range(inclusion_arr_unique.shape[1]):
if np.all(inclusion_arr_unique[:, i] == False, axis=0):
names.append(self.id)
else:
names.append(f"{self.id}_{i}")
return names, inclusion_arr_unique, counts
def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame:
"""Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions.
Args:
inclusions (np.array): boolean vector denoting intron inclusion
transcript_id (str): transcript id
Returns:
pd.DataFrame: Derived dataframe
"""
df_generated = self.df.copy()
if self.strand == "+":
origninal_end = df_generated["end"]
df_generated["end"] = np.where(
inclusions,
df_generated["start"].shift(periods=-1, fill_value=-1) - 1,
origninal_end,
)
if self.strand == "-":
origninal_start = df_generated["start"]
df_generated["start"] = np.where(
inclusions,
df_generated["end"].shift(periods=-1, fill_value=-1) + 1,
origninal_start,
)
original_id = df_generated["exon_id"]
df_generated["exon_id"] = np.where(
inclusions,
df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str),
original_id,
)
df_generated["transcript_id"] = transcript_id
return df_generated
def generate_transcripts(self, filename: str) -> None:
"""Write transcripts to file.
Args:
filename (str): Output csv filename
"""
ids, inclusions, counts = self._get_unique_inclusions()
with open(filename, "a") as fh:
for transcript_id, transcript_count in zip(ids, counts):
fh.write(f"{transcript_id},{self.id},{transcript_count}\n")
def generate_annotations(self, filename: str) -> None:
"""Generates a gtf file including IDs, inclusion, and counts from reverse parse free text
Args:
Filename: str
Returns:
Gtf file with filename
Raises:
ValueError: If self.ID could not be sampled (No ID generated for the inclusion transcript)
"""
ids, inclusions, counts = self._get_unique_inclusions()
n_unique = len(ids)
try:
df = pd.concat(
[self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
)
df = reverse_parse_free_text(df)
write_gtf(df, filename)
LOG.debug(f"Transcript {self.id} sampled")
except ValueError:
LOG.error(f"Transcript {self.id} could not be sampled.")
def sample_transcripts(
input_transcripts_file: str,
input_annotations_file: str,
prob_inclusion: float,
output_transcripts_file: str,
output_annotations_file: str,
):
transcripts = read_abundances(input_transcripts_file)
annotations = Gtf()
annotations.read_file(input_annotations_file)
annotations.parse_free_text()
LOG.info("Start sampling transcripts...")
# Set up output file, write header once and append data in loop
write_header(output_annotations_file)
for _, row in tqdm(transcripts.iterrows()):
transcript_id = row["id"]
transcript_count = row["count"]
transcript_df = annotations.pick_transcript(transcript_id)
transcripts = TranscriptGenerator(
transcript_id,
transcript_count,
transcript_df,
prob_inclusion=prob_inclusion,
)
transcripts.generate_annotations(output_annotations_file)
transcripts.generate_transcripts(output_transcripts_file)