Skip to content
Snippets Groups Projects
Commit 77ae9776 authored by Larissa Glass's avatar Larissa Glass Committed by Andri Fränkl
Browse files

Milestone Review

parent a3b6532f
Branches
No related tags found
1 merge request!21Milestone Review
[flake8]
max-line-length = 120
docstring-convention = google
\ No newline at end of file
[FORMAT]
max-line-length=120
[BASIC]
good-names=df, i, fh, id, s, d
\ No newline at end of file
FROM python:3-slim FROM python:3.9-slim-buster
WORKDIR /usr/src/app WORKDIR /usr/src/app
COPY README.md requirements.txt pyproject.toml ./ COPY README.md pyproject.toml ./
ADD tsg ./tsg ADD tsg ./tsg
ADD data ./data ADD data ./data
RUN pip install . RUN pip install .
CMD [ "transcript-generator", "--transcripts", "data/transcripts_short.csv", "--annotation", "data/Homo_sapiens.GRCh38.103.chr.gtf", "--prob_inclusion", "0.05" ] ENTRYPOINT [ "transcript-generator" ]
\ No newline at end of file \ No newline at end of file
...@@ -39,7 +39,7 @@ To generate the sampled transcripts, open a new shell, activate your environment ...@@ -39,7 +39,7 @@ To generate the sampled transcripts, open a new shell, activate your environment
``` ```
conda activate transcript-structure-generator conda activate transcript-structure-generator
transcript-generator --transcripts <transcripts_file> --annotation <annotations_file> --prob_inclusion=<probability_inclusion> [--log "INFO"] transcript-generator --prob_inclusion <probability_inclusion> [--log "INFO"] <transcripts_file> <annotations_file>
``` ```
where the transcripts file should be csv-formatted, the annotation file gtf-formatted and the inclusion probability for introns a float in the range [0,1]. The log parameter is optional and can be one of `["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]`. The default is `INFO`. where the transcripts file should be csv-formatted, the annotation file gtf-formatted and the inclusion probability for introns a float in the range [0,1]. The log parameter is optional and can be one of `["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]`. The default is `INFO`.
...@@ -53,6 +53,18 @@ To perform all tests, make sure your environment corresponds to the `environment ...@@ -53,6 +53,18 @@ To perform all tests, make sure your environment corresponds to the `environment
pytest tests pytest tests
``` ```
To build Docker image, run
```
docker build -t transcript-app .
```
Afterwards, you can use the Docker image like so
```
docker run transcript-app --prob_inclusion <probability_inclusion> [--log "INFO"] <transcripts_file> <annotations_file>
```
# License # License
MIT license, Copyright (c) 2021 Zavolan Lab, Biozentrum, University of Basel MIT license, Copyright (c) 2021 Zavolan Lab, Biozentrum, University of Basel
... ...
......
...@@ -12,4 +12,5 @@ dependencies: ...@@ -12,4 +12,5 @@ dependencies:
- mypy - mypy
- flake8 - flake8
- pytest - pytest
- pylint
- coverage - coverage
[project] [project]
name = "tsg" name = "tsg"
version = "0.1.0" version = "0.2.0"
authors = [ authors = [
{ name="Michael Zimmermann", email="michael.zimmermann@unibas.ch" }, { name="Michael Zimmermann", email="michael.zimmermann@unibas.ch" },
{ name="Andri Fraenkl", email="andri.fraenkl@unibas.ch" }, { name="Andri Fraenkl", email="andri.fraenkl@unibas.ch" },
... ...
......
...@@ -62,7 +62,7 @@ class TestGtf: ...@@ -62,7 +62,7 @@ class TestGtf:
def test_parsed(self): def test_parsed(self):
annotations = Gtf() annotations = Gtf()
annotations.read_file("tests/resources/Annotation1.gtf") annotations.read_file("tests/resources/Annotation1.gtf")
annotations.parse_free_text() annotations.parse_key_value()
assert annotations.parsed == True assert annotations.parsed == True
assert set(annotations.free_text_columns) == set( assert set(annotations.free_text_columns) == set(
... ...
......
"""Transcript structure generator package.""" """Transcript structure generator package."""
import importlib.metadata
from tsg.main import Gtf, TranscriptGenerator, sample_transcripts
__version__ = '0.0.0' __version__ = importlib.metadata.version("tsg")
__all__ = ["Gtf", "TranscriptGenerator", "sample_transcripts"]
"""Main module."""
from tsg.cli import app from tsg.cli import app
if __name__ == "__main__": if __name__ == "__main__":
... ...
......
"""Command line interface."""
import argparse import argparse
import logging import logging
from pathlib import Path from pathlib import Path
...@@ -9,7 +10,7 @@ def setup_logging(loglevel: str = None) -> None: ...@@ -9,7 +10,7 @@ def setup_logging(loglevel: str = None) -> None:
"""Set up logging. Loglevel can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]. """Set up logging. Loglevel can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"].
Args: Args:
loglevel (str, optional): Level of log output. Defaults to None. loglevel: Level of log output.
Raises: Raises:
ValueError: If string that is not a log level is passed, raise error. ValueError: If string that is not a log level is passed, raise error.
...@@ -17,12 +18,15 @@ def setup_logging(loglevel: str = None) -> None: ...@@ -17,12 +18,15 @@ def setup_logging(loglevel: str = None) -> None:
Returns: Returns:
None None
""" """
# set default log level
numeric_level = getattr(logging, "INFO")
if loglevel: if loglevel:
try:
numeric_level = getattr(logging, loglevel.upper()) numeric_level = getattr(logging, loglevel.upper())
if not isinstance(numeric_level, int): except AttributeError as err:
raise ValueError("Invalid log level: %s" % loglevel) print(f"Unexpected {err=}, {type(err)=}")
else: raise
numeric_level = getattr(logging, "INFO")
logging.basicConfig( logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s (module "%(module)s")', format='[%(asctime)s: %(levelname)s] %(message)s (module "%(module)s")',
...@@ -34,41 +38,47 @@ def build_arg_parser() -> argparse.ArgumentParser: ...@@ -34,41 +38,47 @@ def build_arg_parser() -> argparse.ArgumentParser:
"""Builds the argument parser. """Builds the argument parser.
Args: Args:
1) path to the csv-file with the number of transcripts (str) 1) path to the csv-file with the number of transcripts
2) path to the gtf-file with the annotations for each transcript (str) 2) path to the gtf-file with the annotations for each transcript
3) a value for the probability of intron inclusion (float) 3) a value for the probability of intron inclusion
4) a log message (str) 4) a log message
Raises: Raises:
None None
Returns: Returns:
parser arguments for parser
""" """
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--transcripts", type=str) parser.add_argument(
parser.add_argument("--annotation", type=str) "transcripts",
parser.add_argument("--prob_inclusion", type=float) type=str,
parser.add_argument("--log", type=str) help="Path to csv file with number of transcripts (ID,Count).",
)
return parser parser.add_argument(
"annotation",
type=str,
def get_args() -> argparse.Namespace: help="Path to gtf-file with exon annotation."
"""Builds a parser and returns its arguments. )
parser.add_argument(
Args: "-p",
None "--prob-inclusion",
type=float,
Raises: default=0.05,
None help="Probability of intron inclusion.",
)
parser.add_argument(
"--log",
type=str,
default="INFO",
help='Level of logging. Can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]',
)
Returns:
arguments for parser
"""
parser = build_arg_parser()
args = parser.parse_args() args = parser.parse_args()
assert args.prob_inclusion >= 0
assert args.prob_inclusion <= 1
return args return args
...@@ -76,7 +86,7 @@ def output_filename(filename: str) -> str: ...@@ -76,7 +86,7 @@ def output_filename(filename: str) -> str:
"""Generate output filename for given input filename. """Generate output filename for given input filename.
Args: Args:
filename (str): Input filename filename: Input filename
Raises: Raises:
NotImplementedError: Only accept filetypes .csv, .tsv and .gtf. NotImplementedError: Only accept filetypes .csv, .tsv and .gtf.
...@@ -86,7 +96,7 @@ def output_filename(filename: str) -> str: ...@@ -86,7 +96,7 @@ def output_filename(filename: str) -> str:
str: Output filename str: Output filename
""" """
filepath = Path(filename) filepath = Path(filename)
if filepath.suffix == ".csv" or filepath.suffix == ".tsv": if filepath.suffix in (".csv", ".tsv"):
outfile = "generated_" + filepath.stem + ".csv" outfile = "generated_" + filepath.stem + ".csv"
elif filepath.suffix == ".gtf": elif filepath.suffix == ".gtf":
outfile = "generated_" + filepath.name outfile = "generated_" + filepath.name
...@@ -103,10 +113,10 @@ def app(): ...@@ -103,10 +113,10 @@ def app():
"""Gets the args, sets up the logging and starts the programm with the provided parameters. """Gets the args, sets up the logging and starts the programm with the provided parameters.
Args: Args:
1) path to the csv-file with the number of transcripts (str) 1) path to the csv-file with the number of transcripts
2) path to the gtf-file with the annotations for each transcript (str) 2) path to the gtf-file with the annotations for each transcript
3) a value for the probability of intron inclusion (float) 3) a value for the probability of intron inclusion
4) a log message (str) 4) a log message
Raises: Raises:
None None
...@@ -114,7 +124,7 @@ def app(): ...@@ -114,7 +124,7 @@ def app():
Returns: Returns:
None None
""" """
args = get_args() args = build_arg_parser()
setup_logging(args.log) setup_logging(args.log)
sample_transcripts( sample_transcripts(
... ...
......
...@@ -14,10 +14,10 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame: ...@@ -14,10 +14,10 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame:
"""Read transcript-abundance file into dataframe. """Read transcript-abundance file into dataframe.
Args: Args:
transcripts_file (str): Input filename transcripts_file: Input filename
Returns: Returns:
pd.DataFrame: Transcript abundances ("id", "count") A pd.DataFrame with the transcript abundances ("id", "count").
Raises: Raises:
ValueError: When the input file is neither csv or tsv ValueError: When the input file is neither csv or tsv
...@@ -25,13 +25,12 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame: ...@@ -25,13 +25,12 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame:
cols: list = ["id", "count"] cols: list = ["id", "count"]
if transcripts_file.endswith(".tsv"): if transcripts_file.endswith(".tsv"):
return pd.read_table(transcripts_file, header=None, names=cols) return pd.read_table(transcripts_file, header=None, names=cols)
elif transcripts_file.endswith(".csv"): if transcripts_file.endswith(".csv"):
return pd.read_csv(transcripts_file, header=None, names=cols) return pd.read_csv(transcripts_file, header=None, names=cols)
else:
raise ValueError("File type needs to be either csv or tsv") raise ValueError("File type needs to be either csv or tsv")
def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame: def filter_df(df: pd.DataFrame, transcripts: list = None) -> pd.DataFrame:
"""Filter annotations to include only exons with the highest transcript support level, i.e. TSL1. """Filter annotations to include only exons with the highest transcript support level, i.e. TSL1.
`feature` column is filtered on value "exon" and `feature` column is filtered on value "exon" and
...@@ -48,6 +47,8 @@ def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame: ...@@ -48,6 +47,8 @@ def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame:
A pd.DataFrame containing only rows with exon annotations of highest transcript support level and, A pd.DataFrame containing only rows with exon annotations of highest transcript support level and,
if provided, belonging to one of the given transcripts if provided, belonging to one of the given transcripts
""" """
if transcripts is None:
transcripts = []
df_filter = df[ df_filter = df[
(df["feature"] == "exon") (df["feature"] == "exon")
& (df["free_text"].str.contains('transcript_support_level "1"')) & (df["free_text"].str.contains('transcript_support_level "1"'))
...@@ -63,8 +64,8 @@ def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame: ...@@ -63,8 +64,8 @@ def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame:
def str_to_dict(s: str) -> dict: def str_to_dict(s: str) -> dict:
"""Split between key/value pairs. """Split between key/value pairs.
Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into key/value pairs. Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into
Remove quotes from value strings and create a dictionary. key/value pairs. Remove quotes from value strings and create a dictionary.
Args: Args:
s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";' s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'
...@@ -97,7 +98,7 @@ def dict_to_str(d: dict) -> str: ...@@ -97,7 +98,7 @@ def dict_to_str(d: dict) -> str:
# join key, value pairs from dictionary with a space in a list, # join key, value pairs from dictionary with a space in a list,
# then join items in list by ; # then join items in list by ;
# end on ; # end on ;
# check if value is nan # value == value checks that value is not nan
s: str = ( s: str = (
"; ".join([f'{key} "{value}"' for key, value in d.items() if value == value]) "; ".join([f'{key} "{value}"' for key, value in d.items() if value == value])
+ ";" + ";"
...@@ -106,17 +107,17 @@ def dict_to_str(d: dict) -> str: ...@@ -106,17 +107,17 @@ def dict_to_str(d: dict) -> str:
def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame: def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
"""Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf file standards. """Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf-file standards.
The first 8 defined columns are constant as defined by gtf file standards. The first 8 defined columns are constant as defined by gtf-file standards.
Further columns are assumed to be parsed free-text columns (see Gtf.parse_free_text()). Further columns are assumed to be parsed free-text columns (see Gtf.parse_free_text()).
The parsed free-text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format. The parsed free-text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format.
Args: Args:
df_all: A pd.DataFrame containing a parsed gtf file. df_all: A pd.DataFrame containing a parsed gtf-file.
Returns: Returns:
A DataFrame with the columns as defined by gtf file standards. A pd.DataFrame with the columns as defined by gtf-file standards.
""" """
# Define pd.DataFrame containing only parsed free-text columns # Define pd.DataFrame containing only parsed free-text columns
df_free_text = df_all.iloc[:, 8:] df_free_text = df_all.iloc[:, 8:]
...@@ -133,8 +134,8 @@ def write_gtf(df: pd.DataFrame, filename: str) -> None: ...@@ -133,8 +134,8 @@ def write_gtf(df: pd.DataFrame, filename: str) -> None:
Makes sure data types are correct and saves object in gtf format. Makes sure data types are correct and saves object in gtf format.
Args: Args:
df: A pd.DataFrame containing a gtf file. df: A pd.DataFrame containing a gtf-file.
Filename: File to save to. filename: File to save to.
""" """
# Make sure the data types are correct. # Make sure the data types are correct.
df = df.astype(Gtf.dtypes) df = df.astype(Gtf.dtypes)
...@@ -156,7 +157,7 @@ def write_header(annotations_file: str) -> None: ...@@ -156,7 +157,7 @@ def write_header(annotations_file: str) -> None:
Args: Args:
annotations_file: Filename to write header to. annotations_file: Filename to write header to.
""" """
with open(annotations_file, "w") as fh: with open(annotations_file, "w", encoding="utf_8") as fh:
fh.write("\t".join(Gtf.dtypes.keys()) + "\n") fh.write("\t".join(Gtf.dtypes.keys()) + "\n")
...@@ -184,14 +185,15 @@ class Gtf: ...@@ -184,14 +185,15 @@ class Gtf:
def __init__(self): def __init__(self):
"""Initialize Gtf object.""" """Initialize Gtf object."""
self.df = None
self.parsed = False self.parsed = False
self.original_columns = list(self.dtypes.keys()) self.original_columns = list(self.dtypes.keys())
self.free_text_columns = [] self.free_text_columns = []
def read_file(self, annotations_file: str) -> None: def read_file(self, annotations_file: str) -> None:
"""Read gtf file. """Read gtf-file.
Iterate over chunks of the gtf file reading 100000 rows at a time. Filter chunks for exon annotations of Iterate over chunks of the gtf-file reading 100000 rows at a time. Filter chunks for exon annotations of
the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame. the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame.
Args: Args:
...@@ -219,11 +221,11 @@ class Gtf: ...@@ -219,11 +221,11 @@ class Gtf:
Part of initialization is: Part of initialization is:
Set dataframe attribute Set dataframe attribute
Check which columns belong to the free-text part of the GTF-file. Check which columns belong to the free-text part of the gtf-file.
Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE. Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE.
Args: Args:
df: pd.DataFrame df: A pd.DataFrame containing a gtf-file.
""" """
self.free_text_columns = [ self.free_text_columns = [
col for col in df.columns if col not in self.original_columns col for col in df.columns if col not in self.original_columns
...@@ -232,13 +234,13 @@ class Gtf: ...@@ -232,13 +234,13 @@ class Gtf:
if "free_text" not in df.columns: if "free_text" not in df.columns:
self.parsed = True self.parsed = True
def parse_free_text(self): def parse_key_value(self):
"""Parse key/value pairs from `free_text` column into column `key` with row entry `value`. """Parse key/value pairs from `free_text` column into column `key` with row entry `value`.
Creates a dataframe with columns for keys in the free-text column instead of `free_text` column. Creates a dataframe with columns for keys in the free-text column instead of `free_text` column.
Saves it to Gtf.df attribute. Saves it to Gtf.df attribute.
""" """
assert self.parsed == False assert self.parsed is False
# create dataframe with columns for values in free_text column # create dataframe with columns for values in free_text column
df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series) df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series)
# remember which columns come from free_text # remember which columns come from free_text
...@@ -256,7 +258,7 @@ class Gtf: ...@@ -256,7 +258,7 @@ class Gtf:
Creates a data frame that can be written in gtf format to file. Parsed free-text columns are aggregated Creates a data frame that can be written in gtf format to file. Parsed free-text columns are aggregated
into `free_text` column according to gtf format specification. into `free_text` column according to gtf format specification.
""" """
assert self.parsed == True assert self.parsed is True
# create dataframe with only free_text columns # create dataframe with only free_text columns
df_free_text = self.df[self.free_text_columns] df_free_text = self.df[self.free_text_columns]
# filter current dataframe to only original columns, except "free_text" column # filter current dataframe to only original columns, except "free_text" column
...@@ -276,6 +278,32 @@ class Gtf: ...@@ -276,6 +278,32 @@ class Gtf:
class TranscriptGenerator: class TranscriptGenerator:
"""Class to sample a transcript.""" """Class to sample a transcript."""
def __new__(
cls,
transcript_id: str,
transcript_count: int,
transcript_df: pd.DataFrame,
prob_inclusion: float,
):
"""Initialize TranscriptGenerator object."""
strands = transcript_df["strand"].unique()
if len(transcript_df) == 0:
LOG.warning(
"Transcript %s can't be sampled. Annotation is missing", transcript_id
)
instance = None
elif len(strands) > 1:
LOG.warning(
"Transcript %s can't be sampled. Transcript generator is not implemented for transcripts with"
" exons annotated on different strands",
transcript_id,
)
instance = None
else:
instance = super().__new__(cls)
return instance
def __init__( def __init__(
self, self,
transcript_id: str, transcript_id: str,
...@@ -284,10 +312,6 @@ class TranscriptGenerator: ...@@ -284,10 +312,6 @@ class TranscriptGenerator:
prob_inclusion: float, prob_inclusion: float,
): ):
"""Initialize TranscriptGenerator object.""" """Initialize TranscriptGenerator object."""
assert len(transcript_df) > 0
assert transcript_count > 0
assert (prob_inclusion >= 0) and (prob_inclusion <= 1)
self.id = transcript_id self.id = transcript_id
self.count = transcript_count self.count = transcript_count
self.df = transcript_df self.df = transcript_df
...@@ -301,7 +325,7 @@ class TranscriptGenerator: ...@@ -301,7 +325,7 @@ class TranscriptGenerator:
Each column corresponds to one sample and the number of columns corresponds to the number of samples. Each column corresponds to one sample and the number of columns corresponds to the number of samples.
Returns: Returns:
inclusion_arr: A boolean np.array, where True means intron inclusion. A boolean np.array, where True means intron inclusion.
""" """
inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion
if self.strand == "+": if self.strand == "+":
...@@ -317,10 +341,9 @@ class TranscriptGenerator: ...@@ -317,10 +341,9 @@ class TranscriptGenerator:
Args: Args:
Returns: Returns:
names: List of names for generated exons. - List of names for generated exons.
inclusion_arr_unique: A boolean np.array where columns correspond to generated transcripts and rows to - A boolean np.array where columns correspond to generated transcripts and rows to intron inclusion.
intron inclusion. - A np.array containing sample number per generated inclusions, i.e. transcript.
counts: A np.array containing sample number per generated inclusions, i.e. transcript.
""" """
inclusion_arr = self._get_inclusions() inclusion_arr = self._get_inclusions()
# Unique intron inclusion arrays and counts # Unique intron inclusion arrays and counts
...@@ -330,7 +353,7 @@ class TranscriptGenerator: ...@@ -330,7 +353,7 @@ class TranscriptGenerator:
# Name for each generated transcript # Name for each generated transcript
names = [] names = []
for i in range(inclusion_arr_unique.shape[1]): for i in range(inclusion_arr_unique.shape[1]):
if np.all(inclusion_arr_unique[:, i] == False, axis=0): if np.all(inclusion_arr_unique[:, i] is False, axis=0):
names.append(self.id) names.append(self.id)
else: else:
names.append(f"{self.id}_{i}") names.append(f"{self.id}_{i}")
...@@ -341,26 +364,26 @@ class TranscriptGenerator: ...@@ -341,26 +364,26 @@ class TranscriptGenerator:
"""Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions. """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions.
Args: Args:
inclusions (np.array): A boolean vector denoting intron inclusion. inclusions: A boolean vector denoting intron inclusion.
transcript_id (str): The transcript id. transcript_id: The transcript id.
Returns: Returns:
The generated transcript as a pd.DataFrame. The generated transcript as a pd.DataFrame.
""" """
df_generated = self.df.copy() df_generated = self.df.copy()
if self.strand == "+": if self.strand == "+":
origninal_end = df_generated["end"] original_end = df_generated["end"]
df_generated["end"] = np.where( df_generated["end"] = np.where(
inclusions, inclusions,
df_generated["start"].shift(periods=-1, fill_value=-1) - 1, df_generated["start"].shift(periods=-1, fill_value=-1) - 1,
origninal_end, original_end,
) )
if self.strand == "-": if self.strand == "-":
origninal_start = df_generated["start"] original_start = df_generated["start"]
df_generated["start"] = np.where( df_generated["start"] = np.where(
inclusions, inclusions,
df_generated["end"].shift(periods=-1, fill_value=-1) + 1, df_generated["end"].shift(periods=-1, fill_value=-1) + 1,
origninal_start, original_start,
) )
original_id = df_generated["exon_id"] original_id = df_generated["exon_id"]
...@@ -373,39 +396,36 @@ class TranscriptGenerator: ...@@ -373,39 +396,36 @@ class TranscriptGenerator:
df_generated["transcript_id"] = transcript_id df_generated["transcript_id"] = transcript_id
return df_generated return df_generated
def generate_transcripts(self, filename: str) -> None: def write_sequences(self, filename: str) -> None:
"""Write transcripts to file. """Write transcripts to file.
Args: Args:
filename (str): Output csv filename. filename: Output csv filename.
""" """
ids, inclusions, counts = self._get_unique_inclusions() ids, _, counts = self._get_unique_inclusions()
with open(filename, "a") as fh: with open(filename, "a", encoding="utf_8") as fh:
for transcript_id, transcript_count in zip(ids, counts): for transcript_id, transcript_count in zip(ids, counts):
fh.write(f"{transcript_id},{self.id},{transcript_count}\n") fh.write(f"{transcript_id},{self.id},{transcript_count}\n")
def generate_annotations(self, filename: str) -> None: def write_annotations(self, filename: str) -> None:
"""Generate a annotations in gtf format for sampled transcript. """Generate a annotations in gtf format for sampled transcript.
Args: Args:
Filename: Output gtf filename. Filename: Output gtf-filename.
Raises: Raises:
ValueError: If given transcript ID could not be sampled. ValueError: If given transcript ID could not be sampled.
""" """
ids, inclusions, counts = self._get_unique_inclusions() ids, inclusions, _ = self._get_unique_inclusions()
n_unique = len(ids) n_unique = len(ids)
try:
df = pd.concat( df = pd.concat(
[self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)] [self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
) )
df = reverse_parse_free_text(df) df = reverse_parse_free_text(df)
write_gtf(df, filename) write_gtf(df, filename)
LOG.debug(f"Transcript {self.id} sampled") LOG.debug("Transcript %s sampled", self.id)
except ValueError:
LOG.error(f"Transcript {self.id} could not be sampled.")
def sample_transcripts( def sample_transcripts(
...@@ -418,18 +438,23 @@ def sample_transcripts( ...@@ -418,18 +438,23 @@ def sample_transcripts(
"""Read input files, iterate over transcript IDs, sample each transcript and save results. """Read input files, iterate over transcript IDs, sample each transcript and save results.
Args: Args:
input_transcripts_file (str): Filename of transcript abundances, needs to be csv or tsv. input_transcripts_file: Filename of transcript abundances, needs to be csv or tsv.
input_annotations_file (str): Filename of annotations, needs to be gtf. input_annotations_file: Filename of annotations, needs to be gtf.
prob_inclusion (float): Probability of intron inclusion, needs to be float in range [0,1]. prob_inclusion: Probability of intron inclusion, needs to be float in range [0,1].
output_transcripts_file (str): Filename of file to write sampled transcripts to. output_transcripts_file: Filename of file to write sampled transcripts to.
output_annotations_file (str): Filename of file to write generated annotations to. output_annotations_file: Filename of file to write generated annotations to.
""" """
LOG.info("Probability of intron inclusion: %s", str(prob_inclusion))
LOG.info("Parsing transcript abundances...")
transcripts = read_abundances(input_transcripts_file) transcripts = read_abundances(input_transcripts_file)
# Make sure that abundance is positive
transcripts = transcripts[transcripts["count"] > 0]
LOG.info("Done parsing...")
LOG.info("Parsing annotations...") LOG.info("Parsing annotations...")
annotations = Gtf() annotations = Gtf()
annotations.read_file(input_annotations_file) annotations.read_file(input_annotations_file)
annotations.parse_free_text() annotations.parse_key_value()
LOG.info("Done parsing...") LOG.info("Done parsing...")
LOG.info("Start sampling transcripts...") LOG.info("Start sampling transcripts...")
...@@ -447,6 +472,9 @@ def sample_transcripts( ...@@ -447,6 +472,9 @@ def sample_transcripts(
transcript_df, transcript_df,
prob_inclusion=prob_inclusion, prob_inclusion=prob_inclusion,
) )
transcripts.generate_annotations(output_annotations_file) try:
transcripts.generate_transcripts(output_transcripts_file) transcripts.write_annotations(output_annotations_file)
transcripts.write_sequences(output_transcripts_file)
except AttributeError:
pass
LOG.info("Done.") LOG.info("Done.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment