Skip to content
Snippets Groups Projects
Commit f0286852 authored by Mate Balajti's avatar Mate Balajti
Browse files

refactor: update main and tests for CI workflow

parent 9d559d42
Branches
Tags
1 merge request!46refactor: update main and tests for CI workflow
Pipeline #17248 passed
[flake8]
max-line-length = 120
docstring-convention = google
\ No newline at end of file
[FORMAT]
max-line-length=120
[BASIC]
good-names=df, i, fh, id, s, d
\ No newline at end of file
MIT License
Copyright (c) 2021 Zavolan Lab, Biozentrum, University of Basel
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
matplotlib
pandas
pandas-stubs
pip
tqdm
types-tqdm
flake8-docstrings
mypy
flake8
......
"""Tests for main module"""
"""Tests for main module."""
import numpy as np
import pandas as pd
import pytest
from tsg.main import Gtf, TranscriptGenerator, dict_to_str, str_to_dict
......@@ -10,8 +8,10 @@ class TestFreeTextParsing:
"""Test if free text dictionary is correctly parsed."""
def test_str2dict(self):
"""Test for str2dict function."""
res = str_to_dict(
'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; exon_number "1"; exon_id "EXON1";'
'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; \
exon_number "1"; exon_id "EXON1";'
)
assert res == {
......@@ -22,6 +22,7 @@ class TestFreeTextParsing:
}
def test_dict2str(self):
"""Test for dict2str function."""
res = dict_to_str(
{
"gene_id": "GENE2",
......@@ -31,14 +32,17 @@ class TestFreeTextParsing:
}
)
print(res)
assert (
res
== 'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; exon_number "1"; exon_id "EXON1";'
assert res == (
'gene_id "GENE2"; '
'transcript_id "TRANSCRIPT2"; '
'exon_number "1"; '
'exon_id "EXON1";'
)
class TestGtf:
"Test if Gtf class works correctly."
"""Test if Gtf class works correctly."""
cols = [
"seqname",
"source",
......@@ -52,19 +56,21 @@ class TestGtf:
]
def test_init(self):
"""Test for init function."""
annotations = Gtf()
annotations.read_file("tests/resources/Annotation1.gtf")
assert annotations.parsed == False
assert annotations.parsed is False
assert annotations.original_columns == self.cols
assert annotations.free_text_columns == []
def test_parsed(self):
"""Test for parsed function."""
annotations = Gtf()
annotations.read_file("tests/resources/Annotation1.gtf")
annotations.parse_key_value()
assert annotations.parsed == True
assert annotations.parsed is True
assert set(annotations.free_text_columns) == set(
[
"gene_id",
......@@ -75,11 +81,14 @@ class TestGtf:
]
)
assert set(annotations.original_columns) == set(
["seqname", "source", "feature", "start", "end", "score", "strand", "frame"]
["seqname", "source", "feature", "start",
"end", "score", "strand", "frame"]
)
class TestTranscriptGenerator:
"""Test for TranscriptGenerator class."""
cols = [
"start",
"end",
......@@ -98,35 +107,31 @@ class TestTranscriptGenerator:
df2 = pd.DataFrame(columns=["start", "end", "strand"])
def test_init(self):
"""Test for init."""
transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.05)
assert transcripts.strand == "+"
def test_init_2(self):
with pytest.raises(AssertionError):
transcripts = TranscriptGenerator("TRANSCRIPT2", 3, self.df2, 0.05)
def test_init_3(self):
with pytest.raises(AssertionError):
transcripts = TranscriptGenerator("TRANSCRIPT1", 0, self.df1, 0.05)
def test_inclusions(self):
"""Test for inclusions."""
transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5)
res = transcripts._get_inclusions()
res = transcripts.get_inclusions()
assert res.shape == (3, 3)
def test_unique_inclusions(self):
"""Test for unique inclusions."""
transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5)
res1, res2, res3 = transcripts._get_unique_inclusions()
transcripts.get_unique_inclusions()
def test_get_df(self):
"""Test for get_df function."""
inclusions = [False, True, False]
expected_end = pd.Series([20, 79, 100], name="end")
transcript_id = "TRANSCRIPT1_1"
transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5)
res = transcripts._get_df(inclusions, transcript_id)
res = transcripts.get_df(inclusions, transcript_id)
assert res["transcript_id"].unique().item() == "TRANSCRIPT1_1"
assert res["strand"].unique().item() == "+"
......
......@@ -6,8 +6,9 @@ from pathlib import Path
from tsg.main import sample_transcripts
def setup_logging(loglevel: str = None) -> None:
"""Set up logging. Loglevel can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"].
def setup_logging(loglevel: str) -> None:
"""Set up logging. Loglevel can be one of \
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"].
Args:
loglevel: Level of log output.
......@@ -29,13 +30,14 @@ def setup_logging(loglevel: str = None) -> None:
raise
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s (module "%(module)s")',
format='[%(asctime)s: %(levelname)s] \
%(message)s (module "%(module)s")',
level=numeric_level,
)
def build_arg_parser() -> argparse.ArgumentParser:
"""Builds the argument parser.
def build_arg_parser() -> argparse.Namespace:
"""Build the argument parser.
Args:
1) path to the csv-file with the number of transcripts
......@@ -71,7 +73,8 @@ def build_arg_parser() -> argparse.ArgumentParser:
"--log",
type=str,
default="INFO",
help='Level of logging. Can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]',
help='Level of logging. Can be one of \
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]',
)
args = parser.parse_args()
......@@ -110,7 +113,8 @@ def output_filename(filename: str) -> str:
def app():
"""Gets the args, sets up the logging and starts the programm with the provided parameters.
"""Get the args, sets up the logging \
and starts the programm with the provided parameters.
Args:
1) path to the csv-file with the number of transcripts
......
......@@ -30,67 +30,76 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame:
raise ValueError("File type needs to be either csv or tsv")
def filter_df(df: pd.DataFrame, transcripts: list = None) -> pd.DataFrame:
"""Filter annotations to include only exons with the highest transcript support level, i.e. TSL1.
def filter_df(gtf_df: pd.DataFrame, transcripts: list) -> pd.DataFrame:
"""Filter annotations to include only exons \
with the highest transcript support level, i.e. TSL1.
`feature` column is filtered on value "exon" and
`free_text` column is filtered to include the string denoting the highest transcript support level
`free_text` column is filtered to include the string \
denoting the highest transcript support level
('transcript_support_level "1"').
If a list of transcript IDs is given, `free_text` column is filtered to include one of the IDs.
If a list of transcript IDs is given, `free_text` column \
is filtered to include one of the IDs.
Args:
df: A pd.DataFrame containing an unparsed gtf-file
transcript: list of transcript IDs
Returns:
A pd.DataFrame containing only rows with exon annotations of highest transcript support level and,
A pd.DataFrame containing only rows with exon annotations \
of highest transcript support level and,
if provided, belonging to one of the given transcripts
"""
if transcripts is None:
transcripts = []
df_filter = df[
(df["feature"] == "exon")
& (df["free_text"].str.contains('transcript_support_level "1"'))
df_filter = gtf_df[
(gtf_df["feature"] == "exon")
& (gtf_df["free_text"].str.contains('transcript_support_level "1"'))
]
if len(transcripts) > 0:
df_filter = df_filter["free_text"].str.contains(
df_filter = df_filter[df_filter["free_text"].str.contains(
"|".join(transcripts), regex=True
)
)]
return df_filter
def str_to_dict(s: str) -> dict:
def str_to_dict(gene_string: str) -> dict:
"""Split between key/value pairs.
Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into
Split string based on delimiter ';' into items, remove empty items and \
split items on delimiter ' ' into
key/value pairs. Remove quotes from value strings and create a dictionary.
Args:
s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'
Returns:
A dictionary containing e.g. {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
A dictionary containing e.g. \
{'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
"""
# split into items
# remove empty items
# split items into key/value pairs
item_list: list = [x.split() for x in s.split(";") if len(x) > 0]
item_list: list = [x.split() for x in gene_string.split(";") if len(x) > 0]
# remove quotes for values and return dictionary
return {item[0]: item[1].strip('"') for item in item_list}
def dict_to_str(d: dict) -> str:
def dict_to_str(gene_dict: dict) -> str:
"""Parse dictionary in gtf free_text column format.
Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} and returns
string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
Key/value pairs are joined by space to form an item and items are joinded by ';' to form a string.
If a value is Not a Number (nan), the key/value pair is omitted from the string.
Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
and returns string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
Key/value pairs are joined by space to form an item and items are \
joinded by ';' to form a string.
If a value is Not a Number (nan), the key/value pair is omitted \
from the string.
Args:
d: A dictionary of the form {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
d: A dictionary of the form {'gene_id': 'GENE1', \
'transcript_id': 'TRANSCRIPT1'}
Returns:
A string, e.g. 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
......@@ -99,19 +108,21 @@ def dict_to_str(d: dict) -> str:
# then join items in list by ;
# end on ;
# value == value checks that value is not nan
s: str = (
"; ".join([f'{key} "{value}"' for key, value in d.items() if value == value])
+ ";"
)
return s
gene_string: str = "; ".join(
[f'{key} "{value}"' for key, value in gene_dict.items()]
) + ";"
return gene_string
def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
"""Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf-file standards.
"""Reverse parsing of gtf based pd.DataFrame to include only columns that \
are well defnined by gtf-file standards.
The first 8 defined columns are constant as defined by gtf-file standards.
Further columns are assumed to be parsed free-text columns (see Gtf.parse_free_text()).
The parsed free-text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format.
Further columns are assumed to be parsed free-text columns \
(see Gtf.parse_free_text()).
The parsed free-text columns are aggregated as a dictionary and \
the dictionry is parsed as a string in gtf format.
Args:
df_all: A pd.DataFrame containing a parsed gtf-file.
......@@ -122,43 +133,46 @@ def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
# Define pd.DataFrame containing only parsed free-text columns
df_free_text = df_all.iloc[:, 8:]
# Define pd.DataFrame containing only non-parsed columns
df = df_all.iloc[:, :8]
# Reverse parsing of free-text columns and add the result as column `free_text` to output pd.DataFrame
df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str)
return df
df_non_parsed = df_all.iloc[:, :8]
# Reverse parsing of free-text columns and add the result as column \
# `free_text` to output pd.DataFrame
df_non_parsed["free_text"] = df_free_text.agg(
pd.Series.to_dict, axis=1
).apply(dict_to_str)
return df_non_parsed
def write_gtf(df: pd.DataFrame, filename: str) -> None:
def write_gtf(gtf_df: pd.DataFrame, filename: str) -> None:
"""Save a Gtf object to file in gtf format.
Makes sure data types are correct and saves object in gtf format.
Args:
df: A pd.DataFrame containing a gtf-file.
gtf_df: A pd.DataFrame containing a gtf-file.
filename: File to save to.
"""
# Make sure the data types are correct.
df = df.astype(Gtf.dtypes)
gtf_df = gtf_df.astype(Gtf.dtypes)
df.to_csv(
gtf_df.to_csv(
filename,
sep="\t",
header=False,
index=False,
quoting=None,
quotechar="'",
mode="a",
)
def write_header(annotations_file: str) -> None:
"""Write the header of an annotations file, consisting of the tab delimited column names.
"""Write the header of an annotations file, consisting of the \
tab delimited column names.
Args:
annotations_file: Filename to write header to.
"""
with open(annotations_file, "w", encoding="utf_8") as fh:
fh.write("\t".join(Gtf.dtypes.keys()) + "\n")
with open(annotations_file, "w", encoding="utf_8") as file_header:
file_header.write("\t".join(Gtf.dtypes.keys()) + "\n")
class Gtf:
......@@ -168,7 +182,8 @@ class Gtf:
dtypes: A dictionary containing column names and respective data types.
parsed: A boolean indicating if the pd.DataFrame is parsed.
original_columns: A list of columns not touched by parsing.
free_text_columns: A list of columns created during parsing of column `free_text`.
free_text_columns: A list of columns created during parsing \
of column `free_text`.
"""
dtypes = {
......@@ -185,7 +200,7 @@ class Gtf:
def __init__(self):
"""Initialize Gtf object."""
self.df = None
self.data_frame = None
self.parsed = False
self.original_columns = list(self.dtypes.keys())
self.free_text_columns = []
......@@ -193,8 +208,9 @@ class Gtf:
def read_file(self, annotations_file: str) -> None:
"""Read gtf-file.
Iterate over chunks of the gtf-file reading 100000 rows at a time. Filter chunks for exon annotations of
the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame.
Iterate over chunks of the gtf-file reading 100000 rows at a time.
Filter chunks for exon annotations of the highest transcript support
level. Concatenate chunks to get resulting pd.DataFrame.
Args:
annotations_file: Filename of annotations.
......@@ -209,70 +225,81 @@ class Gtf:
annotations_file,
sep="\t",
comment="#",
names=self.dtypes.keys(),
names=list(self.dtypes.keys()),
dtype=self.dtypes,
chunksize=100000,
iterator=True,
)
self.df = pd.concat([filter_df(chunk) for chunk in reader])
self.data_frame = pd.concat(
[filter_df(chunk, transcripts=[]) for chunk in reader]
)
def from_dataframe(self, df: pd.DataFrame) -> None:
def from_dataframe(self, gtf_df: pd.DataFrame) -> None:
"""Initialize Gtf object from pandas Dataframe.
Part of initialization is:
Set dataframe attribute
Check which columns belong to the free-text part of the gtf-file.
Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE.
Check if there are no columns called free-text and if so, sets \
the value of parsed attribute to TRUE.
Args:
df: A pd.DataFrame containing a gtf-file.
gtf_df: A pd.DataFrame containing a gtf-file.
"""
self.free_text_columns = [
col for col in df.columns if col not in self.original_columns
col for col in gtf_df.columns if col not in self.original_columns
]
self.df = df
if "free_text" not in df.columns:
self.data_frame = gtf_df
if "free_text" not in gtf_df.columns:
self.parsed = True
def parse_key_value(self):
"""Parse key/value pairs from `free_text` column into column `key` with row entry `value`.
"""Parse key/value pairs from `free_text` column into column `key` \
with row entry `value`.
Creates a dataframe with columns for keys in the free-text column instead of `free_text` column.
Creates a dataframe with columns for keys in the free-text column \
instead of `free_text` column.
Saves it to Gtf.df attribute.
"""
assert self.parsed is False
# create dataframe with columns for values in free_text column
df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series)
df_free_text = self.data_frame["free_text"].map(
str_to_dict
).apply(pd.Series)
# remember which columns come from free_text
self.free_text_columns = df_free_text.columns
# join free_text columns to original dataframe and drop the "free_text" column itself
self.df = self.df.drop("free_text", axis=1)
self.original_columns = self.df.columns
self.df = self.df.join(df_free_text, how="inner")
# remember that current dataframe is parsed, i.e. can't be written in gtf format
# join free_text columns to original dataframe and \
# drop the "free_text" column itself
self.data_frame = self.data_frame.drop("free_text", axis=1)
self.original_columns = self.data_frame.columns
self.data_frame = self.data_frame.join(df_free_text, how="inner")
# remember that current dataframe is parsed, \
# i.e. can't be written in gtf format
self.parsed = True
def reverse_parse_free_text(self):
"""Reverses parsing of `free_text` column.
Creates a data frame that can be written in gtf format to file. Parsed free-text columns are aggregated
Creates a data frame that can be written in gtf format to file.
Parsed free-text columns are aggregated
into `free_text` column according to gtf format specification.
"""
assert self.parsed is True
# create dataframe with only free_text columns
df_free_text = self.df[self.free_text_columns]
# filter current dataframe to only original columns, except "free_text" column
self.df = self.df[self.original_columns]
df_free_text = self.data_frame[self.free_text_columns]
# filter current dataframe to only original columns, \
# except "free_text" column
self.data_frame = self.data_frame[self.original_columns]
# undo parsing and save result in "free_text" column
self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(
dict_to_str
)
self.data_frame["free_text"] = df_free_text.agg(
pd.Series.to_dict, axis=1
).apply(dict_to_str)
# remember that current dataframe is not parsed
self.parsed = False
def pick_transcript(self, transcript_id: str) -> pd.DataFrame:
"""Filter annotations to a given transcript ID."""
return self.df.query(f"transcript_id == '{transcript_id}'")
return self.data_frame.query(f"transcript_id == '{transcript_id}'")
class TranscriptGenerator:
......@@ -289,13 +316,15 @@ class TranscriptGenerator:
strands = transcript_df["strand"].unique()
if len(transcript_df) == 0:
LOG.warning(
"Transcript %s can't be sampled. Annotation is missing", transcript_id
"Transcript %s can't be sampled. \
Annotation is missing", transcript_id
)
instance = None
elif len(strands) > 1:
LOG.warning(
"Transcript %s can't be sampled. Transcript generator is not implemented for transcripts with"
" exons annotated on different strands",
"Transcript %s can't be sampled. Transcript generator \
is not implemented for transcripts with \
exons annotated on different strands",
transcript_id,
)
instance = None
......@@ -312,22 +341,25 @@ class TranscriptGenerator:
prob_inclusion: float,
):
"""Initialize TranscriptGenerator object."""
self.id = transcript_id
self.ts_id = transcript_id
self.count = transcript_count
self.df = transcript_df
self.data_frame = transcript_df
self.no_exons = len(transcript_df)
self.strand = self.df["strand"].unique().item()
self.strand = self.data_frame["strand"].unique().item()
self.prob_inclusion = prob_inclusion
def _get_inclusions(self) -> np.array:
def get_inclusions(self) -> np.ndarray:
"""Generate inclusions array.
Each column corresponds to one sample and the number of columns corresponds to the number of samples.
Each column corresponds to one sample and the number of columns \
corresponds to the number of samples.
Returns:
A boolean np.array, where True means intron inclusion.
"""
inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion
inclusion_arr = np.random.rand(
self.no_exons, self.count
) < self.prob_inclusion
if self.strand == "+":
inclusion_arr[-1, :] = False
elif self.strand == "-":
......@@ -335,17 +367,20 @@ class TranscriptGenerator:
return inclusion_arr
def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]:
"""Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count.
def get_unique_inclusions(self) -> tuple[list, np.ndarray, np.ndarray]:
"""Inclusion of unique intron inclusion via arrays and counts and \
name generation of each unique count.
Args:
Returns:
- List of names for generated exons.
- A boolean np.array where columns correspond to generated transcripts and rows to intron inclusion.
- A np.array containing sample number per generated inclusions, i.e. transcript.
- A boolean np.array where columns correspond to generated \
transcripts and rows to intron inclusion.
- A np.array containing sample number per generated inclusions, \
i.e. transcript.
"""
inclusion_arr = self._get_inclusions()
inclusion_arr = self.get_inclusions()
# Unique intron inclusion arrays and counts
inclusion_arr_unique, counts = np.unique(
inclusion_arr, axis=1, return_counts=True
......@@ -354,14 +389,17 @@ class TranscriptGenerator:
names = []
for i in range(inclusion_arr_unique.shape[1]):
if np.all(inclusion_arr_unique[:, i] is False, axis=0):
names.append(self.id)
names.append(self.ts_id)
else:
names.append(f"{self.id}_{i}")
names.append(f"{self.ts_id}_{i}")
return names, inclusion_arr_unique, counts
def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame:
"""Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions.
def get_df(
self, inclusions: np.ndarray, transcript_id: str
) -> pd.DataFrame:
"""Take as input a dataframe filtered to one transcript and \
a boolean vector denoting intron inclusions.
Args:
inclusions: A boolean vector denoting intron inclusion.
......@@ -370,7 +408,7 @@ class TranscriptGenerator:
Returns:
The generated transcript as a pd.DataFrame.
"""
df_generated = self.df.copy()
df_generated = self.data_frame.copy()
if self.strand == "+":
original_end = df_generated["end"]
df_generated["end"] = np.where(
......@@ -389,7 +427,9 @@ class TranscriptGenerator:
original_id = df_generated["exon_id"]
df_generated["exon_id"] = np.where(
inclusions,
df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str),
df_generated["exon_id"] + "_" + np.arange(
len(df_generated)
).astype(str),
original_id,
)
......@@ -402,10 +442,12 @@ class TranscriptGenerator:
Args:
filename: Output csv filename.
"""
ids, _, counts = self._get_unique_inclusions()
with open(filename, "a", encoding="utf_8") as fh:
ids, _, counts = self.get_unique_inclusions()
with open(filename, "a", encoding="utf_8") as file_handle:
for transcript_id, transcript_count in zip(ids, counts):
fh.write(f"{transcript_id},{self.id},{transcript_count}\n")
file_handle.write(
f"{transcript_id},{self.ts_id},{transcript_count}\n"
)
def write_annotations(self, filename: str) -> None:
"""Generate a annotations in gtf format for sampled transcript.
......@@ -416,16 +458,16 @@ class TranscriptGenerator:
Raises:
ValueError: If given transcript ID could not be sampled.
"""
ids, inclusions, _ = self._get_unique_inclusions()
ids, inclusions, _ = self.get_unique_inclusions()
n_unique = len(ids)
df = pd.concat(
[self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
data_frame = pd.concat(
[self.get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
)
df = reverse_parse_free_text(df)
data_frame = reverse_parse_free_text(data_frame)
write_gtf(df, filename)
LOG.debug("Transcript %s sampled", self.id)
write_gtf(data_frame, filename)
LOG.debug("Transcript %s sampled", self.ts_id)
def sample_transcripts(
......@@ -435,14 +477,20 @@ def sample_transcripts(
output_transcripts_file: str,
output_annotations_file: str,
):
"""Read input files, iterate over transcript IDs, sample each transcript and save results.
"""Read input files, iterate over transcript IDs, \
sample each transcript and save results.
Args:
input_transcripts_file: Filename of transcript abundances, needs to be csv or tsv.
input_annotations_file: Filename of annotations, needs to be gtf.
prob_inclusion: Probability of intron inclusion, needs to be float in range [0,1].
output_transcripts_file: Filename of file to write sampled transcripts to.
output_annotations_file: Filename of file to write generated annotations to.
input_transcripts_file: Filename of transcript abundances, \
needs to be csv or tsv.
input_annotations_file: Filename of annotations, \
needs to be gtf.
prob_inclusion: Probability of intron inclusion, \
needs to be float in range [0,1].
output_transcripts_file: Filename of file to write \
sampled transcripts to.
output_annotations_file: Filename of file to write \
generated annotations to.
"""
LOG.info("Probability of intron inclusion: %s", str(prob_inclusion))
LOG.info("Parsing transcript abundances...")
......@@ -466,15 +514,15 @@ def sample_transcripts(
transcript_count = row["count"]
transcript_df = annotations.pick_transcript(transcript_id)
transcripts = TranscriptGenerator(
transcript_generator = TranscriptGenerator(
transcript_id,
transcript_count,
transcript_df,
prob_inclusion=prob_inclusion,
)
try:
transcripts.write_annotations(output_annotations_file)
transcripts.write_sequences(output_transcripts_file)
transcript_generator.write_annotations(output_annotations_file)
transcript_generator.write_sequences(output_transcripts_file)
except AttributeError:
pass
LOG.info("Done.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment