Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • zavolan_group/tools/transcript-sampler
1 result
Show changes
Commits on Source (5)
Showing
with 190 additions and 2158 deletions
......@@ -29,6 +29,6 @@ lint-test-job: # This job also runs in the test stage.
- pip install -r requirements.txt
- pip install -r requirements_dev.txt
- pip install -e .
# - flake8 --docstring-convention google transcript_sampler/ tests/
# - pylint transcript_sampler/ tests/
# - mypy transcript_sampler/
\ No newline at end of file
- flake8 --docstring-convention google transcript_sampler/ tests/
- pylint transcript_sampler/ tests/
- mypy transcript_sampler/ tests/
\ No newline at end of file
......@@ -47,4 +47,4 @@ options:
Example :
`transcript_sampler --input_gtf="tests/input_files/test.gtf" --input_csv="tests/input_files/expression.csv" --output_gtf="output_files/output.gtf" --output_csv="output_files/output.csv" --n_to_sample=100`
`transcript-sampler --input_gtf="tests/inputs/test.gtf" --input_csv="tests/inputs/expression.csv" --output_gtf="output_files/output.gtf" --output_csv="output_files/output.csv" --n_to_sample=100`
name: scrna-seq-sim
channels:
- defaults
- bioconda
- conda-forge
dependencies:
- argparse
- biopython>=1.78
- black
- coverage
- flake8
- flake8-docstrings
- gtfparse
- polars==0.16.17
- mypy
- numpy>=1.23.3
- pylint
- pytest
- nextflow
- pandas>=1.4.4
- pip>=20.2.3
- python>=3.6, <=3.10
- pip:
- -e .
argparse
biopython
gtfparse
numpy >= 1.23.3
pandas >= 1.4.4
\ No newline at end of file
numpy>=1.23.3
pandas>=1.4.4
\ No newline at end of file
"""Set up project."""
from pathlib import Path
from setuptools import setup, find_packages
from setuptools import setup, find_packages # type: ignore
project_root_dir = Path(__file__).parent.resolve()
with open(project_root_dir / "requirements.txt",
......
"""Initialize tests."""
File moved
"""Tests functions."""
import os
import pandas as pd
import pandas as pd # type: ignore
import numpy as np
# pylint: disable=C0103
def find_path(filename: str) -> str:
"""Find the path to a file.
......@@ -35,7 +36,7 @@ def find_output():
None
"""
absolute_path = os.path.dirname(__file__)
test_file = "ReprTrans_ExpressionLevel.tsv"
test_file = "inputs/test_ref_output.tsv"
full_path = os.path.join(absolute_path, test_file)
return full_path
......
"""Tests for match representative transcript with expression level."""
import pytest
import pandas as pd
import pandas as pd # type: ignore
import numpy as np
from pandas.testing import assert_frame_equal
import tests.test_functions as tFun
from transcript_sampler.match_reptrans_explvl import \
from pandas.testing import assert_frame_equal # type: ignore
from transcript_sampler.match_reptrans_explvl import (
MatchReptransExplvl as match
)
import tests.test_functions as tFun
class TestMatchReptrans:
"""Tests for match_reptrans_explvl.py."""
# def test_gtf_to_df(self):
# TO DO
def test_dict_repr_trans_to_df(self):
"""Test dict_repr_trans_to_df() function.
......@@ -44,7 +42,7 @@ class TestMatchReptrans:
assert tFun.duplicated_rows(data_frame).empty, \
"at least one row is duplicated"
assert tFun.na_value(data_frame) == 0, \
"at least one row contain NA values"
"at least one row contains NA values"
def test_tsv_or_csv_to_df(self):
"""Test tsv_or_csv_to_df() function.
......@@ -65,9 +63,9 @@ class TestMatchReptrans:
assert tFun.column_d_type(df_tsv) == datatype, \
"at least one column has the wrong datatype"
assert tFun.duplicated_rows(df_tsv).empty, \
"at least one row are duplicated "
"at least one row is duplicated"
assert tFun.na_value(df_tsv) == 0, \
"at least one row contain NA values"
"at least one row contains NA values"
assert assert_frame_equal(df_tsv, df_csv) is None, \
"csv and tsv import doesn't match"
......@@ -75,7 +73,7 @@ class TestMatchReptrans:
"""Test expr_level_by_gene() function.
This function test if the function expr_level_by_gene can find
the gene of each transcipt given by the expression level csv/tsv
the gene of each transcript given by the expression level csv/tsv
file and sum their expression level
"""
path_tsv = tFun.find_path(r"test_gene_exprL")
......@@ -104,9 +102,9 @@ class TestMatchReptrans:
assert tFun.column_d_type(df_exp_lvl) == datatype, \
"at least one column has the wrong datatype"
assert tFun.duplicated_rows(df_exp_lvl).empty, \
"at least one row are duplicated "
"at least one row is duplicated"
assert tFun.na_value(df_exp_lvl) == 0, \
"at least one row contain NA values "
"at least one row contains NA values"
assert tFun.duplicated_index(df_exp_lvl).empty, \
"at least one index element is duplicated"
......@@ -151,9 +149,9 @@ class TestMatchReptrans:
assert tFun.column_d_type(df_match) == datatype, \
"at least one column has the wrong datatype"
assert tFun.duplicated_rows(df_match).empty, \
"at least one row are duplicated "
"at least one row is duplicated"
assert tFun.na_value(df_match) == 0, \
"at least one row contain NA values "
"at least one row contains NA values"
assert tFun.duplicated_index(df_match).empty, \
"at least one index element is duplicated"
......@@ -164,108 +162,37 @@ class TestMatchReptrans:
function match_repr_transcript_expression_level().
"""
input_path = tFun.find_path("test_gene_exprL")
intermediate_path = tFun.find_path_intermediate_file()
gtf_file = tFun.find_path("test.gtf")
dict_repr_test = {
'ENSMUSG00000079415': 'ENSMUST00000112933',
"ENSMUSG00000024691": "ENSMUST00000025595",
"ENSMUSG00000063683": "ENSMUST00000119960"}
match.match_repr_transcript_expression_level(
self,
exprTrans=input_path,
dict_reprTrans=dict_repr_test,
gtf_file=intermediate_path
)
# Create an instance of MatchReptransExplvl
match_instance = match()
df_result = match_instance.match_repr_transcript_expression_level(
expr_trans=input_path,
dict_repr_trans=dict_repr_test,
gtf_file=gtf_file
)
ref_path = tFun.find_path("test_ref_output.tsv")
output_path = tFun.find_output()
with open(ref_path, 'r', encoding="utf-8") as t1,\
open(output_path, 'r', encoding="utf-8") as t2,\
open(input_path, 'r', encoding="utf-8") as t3:
fileRef = t1.readlines()
fileOutput = t2.readlines()
fileInput = t3.readlines()
with open(
ref_path, 'r', encoding="utf-8"
) as test_file_1, open(
output_path, 'r', encoding="utf-8"
) as test_file_2:
file_ref = test_file_1.readlines()
file_output = test_file_2.readlines()
assert (
sorted(fileRef) == sorted(fileOutput)
), "the output does't match the expected tsv file"
sorted(file_ref) == sorted(file_output)
), "the output doesn't match the expected tsv file"
assert (
sorted(fileRef) != sorted(fileInput)
), "the output does't match the expected tsv file"
def test_txt_to_dict(self):
"""This function tests if txt is convertod to dict"""
path = tFun.find_path("test_dict_repr_trans.txt")
dico = match.txt_to_dict(path)
dict_test = {'ENSMUSG00000079415': 'ENSMUST00000112933',
"ENSMUSG00000024691": "ENSMUST00000025595",
"ENSMUSG00000063683": "ENSMUST00000119960"}
assert dico == dict_test
def test_transcripts_by_gene_inDf():
"""
This function test if a dataframe generated from
the intermediate file is converted in another
dataframe without the support level column.
"""
path = tFun.find_path_intermediate_file()
df = repr.import_gtfSelection_to_df(path)
df_gene = match.transcripts_by_gene_inDf(df)
datatype = {'Gene': np.dtype('O'), 'Transcript': np.dtype('O')}
assert tFun.column_number(df_gene) == (
2, "number of columns is not equal to 2")
assert tFun.column_d_type(df_gene) == (
datatype, "at least one column has the wrong datatype")
assert tFun.duplicated_rows(df_gene).empty, \
"at least one row are duplicated"
assert tFun.na_value(df_gene) == 0, \
"at least one row contain NA values"
def test_output_tsv():
"""
This function test if a tsv file is generated from a pandas
dataframe in the right format.
"""
dict_repr_test = {
'ENSMUSG00000079415': 'ENSMUST00000112933',
"ENSMUSG00000024691": "ENSMUST00000025595",
"ENSMUSG00000063683": "ENSMUST00000119960"}
df_dict_repr_trans = match.dict_repr_trans_to_df(dict_repr_test)
path_tsv = tFun.find_path(r"test_gene_exprL")
df_tsv_exp_lvl = match.tsv_or_csv_to_df(path_tsv)
path_intermediate = tFun.find_path_intermediate_file()
df_intermediate = repr.import_gtfSelection_to_df(path_intermediate)
df_gene_transcript = match.transcripts_by_gene_inDf(df_intermediate)
df_exp_lvl = match.expr_level_by_gene(
df_tsv_exp_lvl, df_gene_transcript
sorted(file_ref) != sorted(
df_result.to_csv(index=False).splitlines()
)
df_match = match.match_by_gene(df_dict_repr_trans, df_exp_lvl)
match.output_tsv(df_match)
ref_path = tFun.find_path("test_ref_output.tsv")
output_path = tFun.find_output()
with open(ref_path, 'r') as t1, open(output_path, 'r') as t2:
fileRef = t1.readlines()
fileOutput = t2.readlines()
assert (
sorted(fileRef) == sorted(fileOutput)
), "the output does't match the expected tsv file"
# test_dict_repr_trans_to_df()
# test_txt_to_dict()
# test_transcripts_by_gene_inDf()
# test_tsv_or_csv_to_df()
# test_expr_level_by_gene()
# test_match_by_gene()
# test_output_tsv()
# test_match_repr_transcript_expression_level()
# print("test_match is done ! No error was found")
), "the output doesn't match the expected tsv file"
......@@ -52,8 +52,8 @@ def main():
log.info("Started transcript sampler.")
dict_repr_trans = find_rep_trans.get_rep_trans(args.input_gtf)
df_repr = match_reptrs_explvl.match_repr_transcript_expression_level(
dict_reprTrans=dict_repr_trans,
exprTrans=args.input_csv,
dict_repr_trans=dict_repr_trans,
expr_trans=args.input_csv,
gtf_file=args.input_gtf
)
log.info(
......
"""Find representative transcripts."""
import logging
from typing import Union
LOG = logging.getLogger(__name__)
# pylint: disable=R0912,R0915
class FindRepTrans:
"""Find representative transcripts."""
......@@ -12,7 +13,7 @@ class FindRepTrans:
"""Initiate."""
@staticmethod
def attributes_converter(attributes: str) -> list:
def attributes_converter(attributes):
"""Attributes converter function.
This funtion converts the "unstructured" ;-seperated part of
......@@ -23,7 +24,7 @@ class FindRepTrans:
Input:
attributes = str() # the unstructured part of the entry
Output:
attributes = list() # cleaned list with the \
attributes = list() # cleaned list with the
characteristics described above
"""
attributes = (
......@@ -51,10 +52,9 @@ class FindRepTrans:
if look_for in attributes:
index = attributes.index(look_for) + 1
return attributes[index]
else:
LOG.warning('No %s in the entry, the return was set to NA',
look_for)
return "NA"
LOG.warning('No %s in the entry, the return was set to NA',
look_for)
return "NA"
@staticmethod
def reformat_reptrans(rep_trans_dict: dict) -> dict:
......@@ -96,8 +96,11 @@ class FindRepTrans:
ValueError: If an unexpected entry is encountered in the GTF file.
"""
# setting default variables
rep_transcripts = dict()
rep_transcripts: dict = {}
cur_g_id = ""
cur_t_id = ""
pot_best_trans: list = []
cur_best_trans: list = []
# [transcript_id, transcript_support_level, transcript_length]
cur_best_trans = ["", 100, 0]
......@@ -122,11 +125,11 @@ class FindRepTrans:
if cur_g_id != attributes[1]:
LOG.error("Exon from an unexpected gene")
raise ValueError("Exon from an unexpected gene")
elif (
if (
self.find_in_attributes(
attributes, "transcript_id"
) != cur_tID
):
) != cur_t_id
):
LOG.error("Exon from an unexpected transcript")
raise ValueError("Exon from an unexpected transcript")
......@@ -136,7 +139,6 @@ class FindRepTrans:
pot_best_trans[2] += int(entry[4]) - int(entry[3])
if pot_best_trans[2] > cur_best_trans[2]:
cur_best_trans = pot_best_trans
pot_best_trans = False
else:
cur_best_trans[2] += int(entry[4]) - int(entry[3])
......@@ -148,10 +150,10 @@ class FindRepTrans:
raise ValueError("Transcript from an unexpected gene")
# finding the transcript id and the support level
cur_tID = self.find_in_attributes(
cur_t_id = self.find_in_attributes(
attributes, "transcript_id"
)
t_supp_lvl = self.find_in_attributes(
t_supp_lvl: Union[int, str] = self.find_in_attributes(
attributes, "transcript_support_level"
)
......@@ -161,21 +163,22 @@ class FindRepTrans:
if t_supp_lvl == "NA":
t_supp_lvl = 100
else:
if t_supp_lvl.isdigit():
if isinstance(
t_supp_lvl, str
) and t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl)
else:
t_supp_lvl = 100
# decides if the transcript has potential to become the
# representative transcript
if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "":
cur_best_trans = [cur_tID, t_supp_lvl, 0]
pot_best_trans = False
ignor_trans = False
if (
t_supp_lvl < cur_best_trans[1] or
cur_best_trans[0] == ""
):
cur_best_trans = [cur_t_id, t_supp_lvl, 0]
elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_tID, t_supp_lvl, 0]
else:
ignor_trans = True
pot_best_trans = [cur_t_id, t_supp_lvl, 0]
# looking for and processing gene entries
elif entry[2] == "gene":
......@@ -203,7 +206,7 @@ class FindRepTrans:
if cur_g_id in rep_transcripts:
if (rep_transcripts[cur_g_id][1] > cur_best_trans[1] or
(rep_transcripts[cur_g_id][1] == cur_best_trans[1] and
rep_transcripts[cur_g_id][2] < cur_best_trans[2])):
rep_transcripts[cur_g_id][2] < cur_best_trans[2])):
rep_transcripts[cur_g_id] = cur_best_trans
else:
rep_transcripts[cur_g_id] = cur_best_trans
......@@ -220,8 +223,8 @@ class FindRepTrans:
"""
output = []
with open(original_file, "r", encoding="utf-8") as f:
for line in f:
with open(original_file, "r", encoding="utf-8") as file:
for line in file:
if line.startswith("#"):
continue
......@@ -242,51 +245,3 @@ class FindRepTrans:
with open(output_file, "w", encoding="utf-8") as last_file:
last_file.writelines(output)
# def _test():
# """
# This funtion is meant to be run for test
# Output:
# file with the dictionary generated based on the test file
# """
# file_name = "test.gtf"
# rt = get_rep_trans(file_name)
# expected_result = {"ENSG00000160072": "ENST00000472194",
# "ENSG00000234396": "ENST00000442483",
# "ENSG00000225972": "ENST00000416931",
# "ENSG00000224315": "ENST00000428803",
# "ENSG00000198744": "ENST00000416718",
# "ENSG00000279928": "ENST00000624431",
# "ENSG00000228037": "ENST00000424215",
# 'ENSG00000142611': 'ENST00000378391'}
# if rt != expected_result:
# print("The test failed due to not yielding the same results")
# print("The results the program got\n", rt)
# print("The expected results\n", expected_result)
# else:
# print("The test was successful")
# # Execution part #
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(
# description="find_representativ_transcripts",
# formatter_class=argparse.ArgumentDefaultsHelpFormatter
# )
# parser.add_argument("-file_name", required=True,
# help="gtf file with genome annotation")
# parser.add_argument("-t", required=False, default=False,
# help="to run the test input -t True")
# args = parser.parse_args()
# # standadize the file_name inlude .gtf#
# file_name = args.file_name
# i_gtf = file_name.find(".gtf")
# if i_gtf == -1:
# file_name += ".gtf"
# if args.t:
# _test()
# else:
# get_rep_trans(file_name)
"""Match representative transcript with expression level"""
"""Match representative transcript with expression level."""
# Made by Hugo Gillet #
import logging
import pandas as pd
from gtfparse import read_gtf
import pandas as pd # type: ignore
from gtfparse import read_gtf # type: ignore
LOG = logging.getLogger(__name__)
......@@ -40,12 +40,14 @@ class MatchReptransExplvl:
return df_gtf
@staticmethod
def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame:
"""
Convert a dictionary of genes and their representative transcript into a DataFrame.
def dict_repr_trans_to_df(
dict_repr_trans: "dict[str, str]"
) -> pd.DataFrame:
"""Convert a dict of genes and representative transcript into a df.
Args:
dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...}
dict_repr_trans (dict):
{'Gene': ['transcriptA', 'transcriptB'], ...}
Returns:
Pandas DataFrame with 'Gene' and 'Transcript' as columns.
......@@ -55,27 +57,36 @@ class MatchReptransExplvl:
TypeError: Keys should be strings.
TypeError: Values should be strings.
"""
if not isinstance(dict_reprTrans, dict):
if not isinstance(dict_repr_trans, dict):
LOG.error("Only dictionaries are allowed")
raise TypeError("Only dictionaries are allowed")
if not all(isinstance(key, str) for key in dict_reprTrans.keys()):
if not all(
isinstance(key, str) for key in dict_repr_trans.keys()
):
LOG.error("Keys should be strings")
raise TypeError("Keys should be strings")
if not all(isinstance(value, str) for value in dict_reprTrans.values()):
if not all(
isinstance(value, str) for value in dict_repr_trans.values()
):
LOG.error("Values should be strings")
raise TypeError("Values should be strings")
df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"])
df_reprTrans = df_reprTrans.reset_index()
df_reprTrans.columns = ["Gene", "reprTrans"]
df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True)
df_repr_trans = pd.DataFrame.from_dict(
dict_repr_trans, orient="index", columns=["reprTranscript"]
)
df_repr_trans = df_repr_trans.reset_index()
column_names = ["Gene", "reprTrans"]
df_repr_trans.columns = pd.Index(column_names)
# pylint: disable=E1136,E1137
df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace(
r"\.[1-9]", "", regex=True
)
return df_reprTrans
return df_repr_trans
@staticmethod
def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
"""
Convert a TSV or CSV file into a pandas DataFrame.
"""Convert a TSV or CSV file into a pandas DataFrame.
Args:
input_txt (str): TSV or CSV file containing transcript expression
......@@ -99,251 +110,97 @@ class MatchReptransExplvl:
@staticmethod
def expr_level_by_gene(
df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
df_expr_transcript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
) -> pd.DataFrame:
"""
Find the gene of each transcript given by the expression level CSV/TSV file
and sum the expression level of all transcripts from the same gene.
"""Sum expression levels.
Find the gene of each transcript given by the expression level
CSV/TSV file and sum the expression level of all transcripts
from the same gene.
Args:
df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels,
generated by the "tsv_or_csv_to_df" function.
df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts,
generated by the "transcripts_by_gene_inDf" function.
df_expr_transcript (pd.DataFrame):
Pandas DataFrame containing transcripts and their
expression levels, generated by the
"tsv_or_csv_to_df" function.
df_output_gtf_selection (pd.DataFrame):
Pandas DataFrame containing genes and transcripts,
generated by the "transcripts_by_gene_inDf" function.
Returns:
Pandas DataFrame having 'Gene' and sum of its transcript expression levels.
Pandas DataFrame having 'Gene' and sum of its
transcript expression levels.
Raises:
None
"""
df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript")
df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index()
df_merged = pd.merge(
df_output_gtf_selection, df_expr_transcript,
how="inner", on="Transcript")
df_sum = df_merged.groupby("Gene")["Expression_level"].sum(
).reset_index()
return df_sum
@staticmethod
def match_by_gene(
df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame
df_repr_transcript: pd.DataFrame,
df_expression_level_by_gene: pd.DataFrame
) -> pd.DataFrame:
"""
Find matching genes between the two DataFrames.
"""Find matching genes between the two DataFrames.
Args:
df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts,
generated by the "dict_repr_trans_to_df()" function.
df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels,
generated by the "transcript_by_gene_inDf()" function.
df_repr_transcript (pd.DataFrame): Pandas DataFrame
containing genes and their representative transcripts,
generated by the "dict_repr_trans_to_df()" function.
df_expression_level_by_gene (pd.DataFrame): Pandas DataFrame
containing genes and their expression levels,
generated by the "transcript_by_gene_inDf()" function.
Returns:
Pandas DataFrame having representative transcripts and their expression levels.
Pandas DataFrame having representative transcripts and
their expression levels.
Raises:
None
"""
df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene")
df_merged = pd.merge(
df_repr_transcript, df_expression_level_by_gene,
how="inner", on="Gene"
)
df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]]
return df_clean
def match_repr_transcript_expression_level(
self, exprTrans: str, dict_reprTrans: dict, gtf_file: str,
self, expr_trans: str, dict_repr_trans: dict, gtf_file: str,
):
"""
Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts.
"""Replace expression level with representative transcripts.
Combine functions to replace transcripts from an expression level
CSV/TSV file with representative transcripts.
Args:
exprTrans (str): CSV or TSV file containing transcripts and their expression level.
dict_reprTrans (dict): Dictionary of genes and their representative transcripts.
expr_trans (str): CSV or TSV file containing transcripts
and their expression level.
dict_repr_trans (dict): Dictionary of genes
and their representative transcripts.
gtf_file (str): Path to the GTF file.
Returns:
Pandas DataFrame of representative transcripts and their expression level.
Pandas DataFrame of representative transcripts
and their expression level.
Raises:
None
"""
df_gene_transcript = self.gtf_to_df(gtf_file)
df_exprTrans = self.tsv_or_csv_to_df(exprTrans)
df_reprTrans = self.dict_repr_trans_to_df(dict_reprTrans)
df_expr_level_by_gene = self.expr_level_by_gene(df_exprTrans, df_gene_transcript)
df_match = self.match_by_gene(df_reprTrans, df_expr_level_by_gene)
df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True)
df_expr_trans = self.tsv_or_csv_to_df(expr_trans)
df_repr_trans = self.dict_repr_trans_to_df(dict_repr_trans)
df_expr_level_by_gene = self.expr_level_by_gene(
df_expr_trans, df_gene_transcript
)
df_match = self.match_by_gene(df_repr_trans, df_expr_level_by_gene)
df_match.rename(
columns={"reprTrans": "id", "Expression_level": "level"},
inplace=True
)
return df_match
# def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame:
# """Convert a dictionary of genes and their representative
# transcript into a dataframe
# Args:
# dict_reprTrans (dict): {'Gene':['transcriptA', 'transcriptB'], ...}
# Returns:
# Pandas dataframe having Gene and transcript as columns
# Raises:
# Only dict are allowed
# Key should be strings
# Value should be strings
# """
# pass
# if not type(dict_reprTrans) is dict:
# raise TypeError("Only dict are allowed")
# if type(list(dict_reprTrans.keys())[0]) is not str:
# raise TypeError("Key should be strings")
# if type(list(dict_reprTrans.values())[0]) is not str:
# raise TypeError("Values should be strings")
# df_reprTrans = pd.DataFrame.from_dict(
# dict_reprTrans, orient="index", columns=["reprTranscript"]
# )
# df_reprTrans = df_reprTrans.reset_index(level=0)
# df_reprTrans.columns = ["Gene", "reprTrans"]
# df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(
# r"\.[1-9]", "", regex=True
# )
# return df_reprTrans
# def gene_and_transcript(gtf_file: str) -> pd.DataFrame:
# """
# This function take a .gtf file and convert it into a
# dataframe containing gene_id and their transcripts_id.
# Args:
# gtf_file(str) : path to the .gtf file
# Returns:
# df_gtf(pd.DataFrame): pandas df containing having has columns
# gene_id and their transcripts_id.
# Raises:
# None
# """
# df_gtf = read_gtf(gtf_file)
# df_gtf = df_gtf.loc[df_gtf["feature"] == "transcript"]
# df_gtf = df_gtf[["gene_id", "transcript_id"]]
# df_gtf = df_gtf.rename(columns={"gene_id": "Gene",
# "transcript_id": "Transcript"})
# return df_gtf
# def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
# """Convert tsv or csv file into a pandas dataframe
# Args:
# input_txt (str): csv or tsv file containing transcript exp level
# Returns:
# df_gene (str): Pandas dataframe having transcript and exp level
# as columns
# Raises:
# None
# """
# pass
# df_input = pd.read_csv(
# input_txt,
# sep=r"[\t,]",
# lineterminator="\n",
# names=["Transcript", "Expression_level"],
# engine="python",
# )
# return df_input
# def expr_level_by_gene(
# df_exprTrasncript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
# ) -> pd.DataFrame:
# """find the gene of each transcipt given by the expression level csv/tsv
# file, and summ expression level of all transcipts from the same gene.
# Args:
# df_exprTranscript: pandas df containing transcript and
# their exp level generated by "tsv_or_csv_to_df" function
# df_output_gtf_selection : pandas df containing genes and
# transcripts, generated by "transcripts_by_gene_inDf" function
# Returns:
# Pandas dataframe having gene and sum of its transcript exp level
# Raises:
# None
# """
# pass
# df_merged = pd.merge(
# df_output_gtf_selection, df_exprTrasncript,
# how="inner", on="Transcript"
# )
# df_sum = df_merged.groupby("Gene").sum(
# "Expression_level"
# )
# return df_sum
# def match_by_gene(
# df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame
# ) -> pd.DataFrame:
# """Find matching genes bewteen the 2 args
# Args:
# df_reprTranscript : pandas Dataframe containing genes
# and their representative transcript, generated by
# "dict_repr_trans_to_df()"
# df_expressionLevel_byGene : pandas Dataframe containing
# genes and their expression level generated by
# "transcript_by_gene_inDf()"
# Returns:
# Pandas dataframe having representative trasncripts
# and their expression level
# Raises:
# None
# """
# pass
# df_merged = pd.merge(
# df_reprTranscript, df_expressionLevel_byGene, how="outer", on="Gene"
# )
# df_clean = df_merged.dropna(axis=0)
# df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]]
# return df_clean
# # functions to run this part of the programm
# def match_repr_transcript_expression_level(
# exprTrans: str, dict_reprTrans: dict, gtf_file: str,
# ):
# """Combine functions to replace transcripts from an exp level csv/tsv file
# with representative transcripts
# Args:
# exprTrans (str): csv or tsv file containing transcripts
# and their expression level
# dict_reprTrans (dict) : dict of genes and their
# representative transcipt
# intemediate_file (str) : txt file containing genes, transcript
# and their expression level from the transkript_extractor function
# output_path : path indicating were the tsv file should be written
# Returns:
# tsv file of representative trasncripts and their expression level
# Raises:
# None
# """
# df_gene_transcript = gene_and_transcript(gtf_file)
# df_exprTrans = tsv_or_csv_to_df(exprTrans)
# df_reprTrans = dict_repr_trans_to_df(dict_reprTrans)
# df_expr_level_by_gene = expr_level_by_gene(
# df_exprTrans, df_gene_transcript
# ) # error here
# df_match = match_by_gene(df_reprTrans, df_expr_level_by_gene)
# df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'},
# inplace=True)
# return df_match
# # run the program
# if __name__ == "__main__":
# match_repr_transcript_expression_level()
# Exon length filter #
"""Exon length filter
Version 2.1.0"""
# Called Packages #
import re
import os
import transcript_extractor as te
python_version = "3.7.13"
module_list = [re, os]
modul_name_list = ["re", "os"]
# Functions #
def exon_length_calculator(entry):
"""This function finds the start and end cordinates of the
exon and uses them to calculate its length"""
try:
find_exon_coordinates = re.compile(r"\t\d{1,15}\t")
# this difines the pattern of the coordinates
try_find_start_coordinates = find_exon_coordinates.search(entry)
# this line findes the start coordinares based on the pattern
start_coordinates = int(try_find_start_coordinates[0].replace("\t", ""))
# this line removes the \t at the end and the start of the pattern and
# turn the string of the coordinates into intergers
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
# this line determines the indes of the final digit
# of the start coordinates
sub_entry = entry[final_index_start_coordinates:]
# this lineused the index determin above a starting point
# for a new sub entry
try_find_end_coordinates = find_exon_coordinates.search(sub_entry)
end_coordinates = int(try_find_end_coordinates[0].replace("\t", ""))
# these two lines find the end coordinates and turn tham int an int
exon_length = end_coordinates-start_coordinates
# this line claculates the transcript length
except:
print("\n\nIn the following enty only one or no valid coordinates \
could be found:\n",entry,"the value will be set to NA")
exon_length = "NA"
return exon_length
def exon_fider(entry):
"""This funtion determines if a given entry belongs to an exon
Expected inputs:
entry: str #any enty of a gtf file"""
exon_test = entry.find(r"\texon\t")
# This line look for the entry exon in the file
if exon_test == -1:
try_exon_test = False
else:
try_exon_test = True
# The block above evaluates the results of the search for the wort exon
return try_exon_test
def __longest_transcript_finder(
current_exon_length,
longest_transcript,
longest_transcript_ID,
old_transcript_ID
):
"""This funtion encapsulates an operation that has to be carried out
at several points in the exon_length_filter function and serves to
make that function more modular"""
if current_exon_length > longest_transcript:
# This condition updates the most promesing for
# beeing the representative transcript
longest_transcript = current_exon_length
longest_transcript_ID = old_transcript_ID
current_exon_length = 0
return current_exon_length, longest_transcript, longest_transcript_ID
def _representative_transcript_csv(
representative_transcript, file_name = "test", deposit_pathway_name =os.getcwd()
):
with open(os.path.join(
deposit_pathway_name, file_name+"_"+"representative_transcripts"+".csv"
), "w", encoding="utf-8") as rt:
for i in representative_transcript:
transcript = representative_transcript[i]
new_entry = str(i)+","+transcript+"\n"
rt.write(new_entry)
def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This funtion selects only the transcripts for a dictionary that have the longest total mRNA"""
bar,start_time = te.bar_builder(length_multiplyer = 3)
total_genes = len(gen_dict)
gens_done = 0
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
old_gen = str()
old_transcript_ID = str()
representative_transcript = dict()
representative_trasnscript_not_found = True
longest_transcript_ID = str()
current_exon_length = 0
longest_transcript = 0
percentage_done = 0
for entry in f:
try:
corrent_gen = te.gene_ID_finder(entry)
except:
corrent_gen = old_gen
#The block above test if there is a gen name in the entry
if corrent_gen != old_gen:
representative_trasnscript_not_found = True
#The block above determines if the Gen name is new and set the test
#representative_trasnscript_not_found back to true which is used to
#make the program faster if there is just one transcript for a given
#gen in the dict
if representative_trasnscript_not_found and corrent_gen != str():
#print(corrent_gen)
#The conditon prvents serges if a representative transcript has
#all ready been chosen
if corrent_gen != old_gen:
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
try:
del gen_dict[old_gen]
old_gen = corrent_gen
gens_done += 1
corrent_percentage_done = (gens_done/total_genes)*100
if corrent_percentage_done > percentage_done+10:
bar,start_time = te.bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar)
percentage_done = int(corrent_percentage_done)
except:
old_gen = corrent_gen
longest_transcript = 0
#The block above adds the transcript of the last gen that
#had the longest exons into the representative transcripts dict
try:
#This try / except block test if the gen is in the input dictionary
transcript_IDs = gen_dict[corrent_gen]
if len(gen_dict[corrent_gen]) == 1:
#This conditions is a short cut for Genes that
#allready have a representative transcript
representative_transcript=gen_dict[corrent_gen[0]]
representative_trasnscript_not_found = False
continue
except:
continue
try:
current_transcript_ID = te.transcript_ID_finder(entry)
except:
continue
#The block above searches for a transcript ID in the current entry
if current_transcript_ID in transcript_IDs:
#This condition test if the Transcript is one of the
#candidates for representative transcripts
if current_transcript_ID != old_transcript_ID:
#This condition if the enty still belongs to the
#previous transcript and is triggers if that is not the case
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
try:
transcript_IDs.remove(old_transcript_ID)
old_transcript_ID = current_transcript_ID
except:
old_transcript_ID = current_transcript_ID
if exon_fider(entry):
exon_length = exon_length_calculator(entry)
current_exon_length += exon_length
else:
continue
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
del representative_transcript[str()]
te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
return(representative_transcript)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene and returns an dictionary {gene_ID : transcript_ID}.
Expected inputs:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = current work directory #path for files
gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]}"""
print("Representative trascipts are filterd based on exon length please wait...")
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict)
print("\nRepresentative transcripts collected")
return representative_transcript
if __name__ == "__main__":
# te.version_control(module_list,modul_name_list,python_version)
exon_length_filter()
# This line allows the file to be executed on its own also from
import pytest
import representative as repr
import numpy as np
import test_Functions as tFun
def test_import_gtfSelection_to_df():
"""
Test if gencode.vM31.annotation_intermediat_file.txt
is imported in the correct pandas df format
Args:
None
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
df = repr.import_gtfSelection_to_df(path)
datatype = {'Gene': np.dtype('O'), 'Transcript': np.dtype('O'),
'Support_level': np.dtype('float64')}
assert tFun.column_number(df) == (
3, "number of columns is not equal to 3")
assert tFun.column_dType(df) == (
datatype, "at lease one column has the wrong datatype")
assert tFun.duplicated_rows(df).empty, "at lease one row are duplicated "
assert tFun.NA_value(df) == 0, "at lease one row contain NA values "
with pytest.raises(TypeError, match=r"Only str path is allowed"):
repr.import_gtfSelection_to_df(123)
def test_representative_transcript_inDict():
"""
Test if df generated by "import_gtfSelection_to_df()" output
a dict in the right format
Args:
Pandas dataframe with [Gene, Transcript, Support_level]
as columns, validated with test_import_gtfSelection_to_df()
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
df = repr.import_gtfSelection_to_df(path)
dict_to_test = repr.representative_transcripts_inDict(df)
dict_expected = {
'ENSMUSG00000024691': ['ENSMUST00000025595.5'],
'ENSMUSG00000063683': ['ENSMUST00000044976.12',
'ENSMUST00000119960.2'],
'ENSMUSG00000079415': ['ENSMUST00000112933.2']}
assert dict_to_test == dict_expected
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict(123)
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict("hello")
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict(["hello", "world", 123])
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict({"hello": "world",
"bonjour": ["le monde", 123]})
def test_find_repr_by_SupportLevel():
"""
Test if the correct dict is generated from
gencode.vM31.annotation_intermediat_file.txt
Args:
None
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
dict_to_test = repr.find_repr_by_SupportLevel(path)
dict_expected = {
'ENSMUSG00000024691': ['ENSMUST00000025595.5'],
'ENSMUSG00000063683': ['ENSMUST00000044976.12',
'ENSMUST00000119960.2'],
'ENSMUSG00000079415': ['ENSMUST00000112933.2']}
assert dict_to_test == dict_expected
test_representative_transcript_inDict()
test_find_repr_by_SupportLevel()
test_import_gtfSelection_to_df()
print("test_representative is done ! No error was found")
'''
This part of the code take as input a gtf modified file
and return a dictionary of transcripts with best
support level for each gene of the input
'''
import pandas as pd
# import os
def import_gtf_selection_to_df(gtf_modified_file: str) -> pd.DataFrame:
"""Import intermediate file from gtf and create a df
Args:
gtf_modified_file (str) : path to the intermediate file
Returns:
Pandas dataframe having Gene, transcript
and support level as columns
Raises:
TypeError : Only str path is allowed
"""
if not isinstance(gtf_modified_file, str):
raise TypeError("Only str path is allowed")
df_input = pd.read_csv(
gtf_modified_file, sep='\t', lineterminator='\n',
names=["Gene_mixed", "Transcript", "Support_level", "Na1", "Na2"]
)
df_input["Support_level"] = df_input["Support_level"].replace(" ", "")
df_input["Gene"] = df_input["Gene_mixed"].str.extract(
r'([A-Z]\w{0,})', expand=True # noqa: W605
)
df_input["Transcript_number"] = df_input["Gene_mixed"].str.extract(
r'(^\d)', expand=True # noqa: W605
)
df_clean = df_input.loc[:, ["Gene", "Transcript", "Support_level"]]
df_clean["Gene"] = df_clean["Gene"].fillna(method='ffill')
df_clean = df_clean.dropna(axis=0)
return df_clean
def representative_transcripts_in_dict(
df_gtf_selection: pd.DataFrame) -> pd.DataFrame:
"""Return a dict containing for each gene transcripts
with highest confidence level
Args:
df_gtf_selection (str): Pandas dataframe having Gene,
transcript and support level as columns
Returns:
Dict {'Gene':['transcriptA', 'transcriptB'], ...}
Raises:
TypeError : Only pandas DataFrame is allowed
"""
if not isinstance(df_gtf_selection, pd.DataFrame):
raise TypeError("Only pandas DataFrame is allowed")
df_min = df_gtf_selection[
df_gtf_selection["Support_level"] ==
df_gtf_selection.groupby("Gene")["Support_level"].transform(min)
]
df_final = df_min.drop(columns=["Support_level"])
dict_representative_transcripts = df_final.groupby("Gene")[
"Transcript"].apply(list).to_dict()
return dict_representative_transcripts
def find_repr_by_support_level(intermediate_file: str) -> dict[str, str]:
"""Combine functions import_gtf_selection_to_df()
and representative_transcripts_in_dict()
Args:
intermediate_file : path to the intermediate file
Returns:
Dict {'Gene':['transcriptA', 'transcriptB'], ...}
Raises:
None
"""
df_gtf = import_gtf_selection_to_df(intermediate_file)
dict_repr_trans = representative_transcripts_in_dict(df_gtf)
return dict_repr_trans
# if __name__ == "__main__":
# find_repr_by_support_level()
#### Transcript extractor #####
"""Transcript extractor
Version 1.2.0"""
### Called Packages ###
import re
import os
import time
python_version = "3.7.13"
module_list =[re,os,time]
modul_name_list = ["re","os","time"]
### Functions ###
def version_control(module_list,modul_name_list,python_version):
with open("required.txt","a") as req:
for i in range(len(module_list)):
try:
version = module_list[i].__version__
entry = modul_name_list[i]+"\t"+str(version)+"\n"
req.write(entry)
except:
version = python_version
entry = modul_name_list[i]+"\t"+str(version)+"\n"
req.write(entry)
def __parameter_editor(file_name,source_pathway_name,deposit_pathway_name):
"""This function allows for changing the parameters after running the program"""
while True:
print("The program will run with the following parameters:\nFile name:\t\t",file_name,"\nSource pathway:\t",source_pathway_name,"\nDeposit pathway:\t",deposit_pathway_name,"\n")
parameter_conformation = input("To continue with these parameters input [continue or c] to change them input [edit]\n>")
if parameter_conformation == "continue"or parameter_conformation =="c":
break
elif parameter_conformation == "edit":
#edit the parameters
while True:
change_question = input("select the parameter you want to change [nfile/spath/dpath] or input [b] to go back\n>")
if change_question == "nfile":
#This condition allows the user to chenge the file name
file_name = input("Please input the new file name\n>")
break
elif change_question == "spath":
#This condition allows the user to change the source path
source_pathway_name = input("Please input the new source path\n>")
does_source_pathway_exist = os.path.exists(source_pathway_name)
if does_source_pathway_exist:
break
else:
print("The new source pathway:",source_pathway_name,"does not exist\nThe source pathway was returned to default:",os.getcwd())
source_pathway_name = os.getcwd()
elif change_question == "dpath":
#This condition allows the user to change output file location
deposit_pathway_name = input("Please input the new output file path name\n>")
does_deposit_pathway_exist = os.path.exists(deposit_pathway_name)
if does_deposit_pathway_exist:
break
else:
print("The new deposit pathway:",deposit_pathway_name,"does not existe\nThe deposit pathway was returnt to default:",source_pathway_name)
deposit_pathway_name = source_pathway_name
#The block above test if the new deposit pathway is valid
elif change_question == "b":
# This condition allows the user to return to the main loop
break
else:
#This condition covers all non valid inputs into the secund loop
print("The input",change_question,"is not valid. Please use one of the specified commands")
else:
#This condition covers all non valid input for the main loop
print("The input",parameter_conformation,"is not valide please use one of the specified comands\n")
return(file_name,source_pathway_name,deposit_pathway_name)
def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()):
"""This function searches for preexisting files of the same name as the results file of the current program. It allows the user to choose to move on with the pre-existing file """
File_of_same_name_found = False
generat_new_file = False
directory_content = os.listdir(deposit_pathway_name)
for file in directory_content:
if file == file_name:
while True:
File_found_input = input (file_name+" has allready been generated\nDo you want to generate a new one [y/n] \n>")
if File_found_input == "n":
File_of_same_name_found = True
break
elif File_found_input == "y":
generat_new_file = True
break
else:
print("Invalid input\nPlease press [y] if you want to generate a new file or [n] if you want to use the preexisting file")
break
else:
continue
if File_of_same_name_found:
print("No new file will be generated, the program can continue")
elif generat_new_file:
print("A new file will be generated please wait...\n")
else:
print("No pre-existing file of the relevant type has been found.\nA new file will be generated please wait...\n")
return(File_of_same_name_found)
def bar_builder(percentage = 0,length_multiplyer = 2,start_time = time.time(),bar = str()):
"""This function creates a loading bar that can load in 10% increments starting a 0% and ending at 100%
Expected inputs:
percentage: int between 0 and 100 in steps of 10; default = 0 #defines the current loading increment
length_multiplyer: int > 0 ; default = 2 #determiens the amount of symbols per loading increment
start_time: any int ; default= time.time() #for determening loading time
bar: str ; default = str()#input of the current bar status does not need to be defined if for the 0% increment
"""
if percentage == 100:
bar = bar.replace("-","#")
print("\r"+bar+"\t"+"100%\t\t"+str(int(time.time()-start_time)))
elif percentage > 0:
bar = bar.replace("-","#",length_multiplyer)
print("\r"+bar+"\t"+str(percentage)+"%", end='',flush=True)
elif percentage == 0:
bar = "["+"-"*length_multiplyer*10+"]"
print(bar+"\t", end='',flush=True)
return(bar,start_time)
def __test_file_name(file_name,source_pathway_name = os.getcwd()):
"""This function validates that the source file exists at the source path. It turns the file name input in a standardized format that can be used in the next steps"""
directory_content = os.listdir(source_pathway_name)
index_of_the_dot = file_name.rfind(".")
valide_source_file = False
validate_source_file = True
if index_of_the_dot ==-1:
file_name += ".gtf"
else:
source_file_typ = file_name[index_of_the_dot:]
not_a_file_type = re.compile(".\d{1,13}")
try_not_a_file_type = not_a_file_type.search(source_file_typ)
if source_file_typ == ".gtf":
file_name = file_name
elif try_not_a_file_type:
file_name += ".gtf"
else:
print("This program can not handle",source_file_typ,"files. \nplease use a .gtf file" )
validate_source_file = False
#The block above tests if the file_name includes the file type and if no
#file type is found adds ".gtf" und if a non ".gtf" file is found gives an error
if validate_source_file:
for file in directory_content:
if file == file_name:
valide_source_file = True
break
#The block above tests if a file on the given name is in the given directora
if valide_source_file:
print("The file:",file_name,"has been found.\n")
else:
print("No .gtf file of the name",file_name,"has been found in this pathway")
#The bock above gives feed back regarding the results of the file test
file_name = file_name.replace(".gtf","")
#This line normalizes the file name
return(valide_source_file,file_name)
def __do_pathways_exist__(source_pathway_name,deposit_pathway_name):
"""This funtion tests that the entered pathways actualy exist"""
does_source_pathway_exist = os.path.exists(source_pathway_name)
does_deposit_pathway_exist = os.path.exists(deposit_pathway_name)
#The Block above does the actual testing
if does_source_pathway_exist:
source_pathway_name = source_pathway_name
else:
print("The source pathway:",source_pathway_name,"has not been found\nThe source pathway was set to the default")
source_pathway_name = os.getcwd()
#The block above detail the possible reactions for the source pathe existing or not existing
if does_deposit_pathway_exist:
deposit_pathway_name = deposit_pathway_name
else:
print("The deposit pathway:",deposit_pathway_name,"has not been found\nThe deposit pathway was set to the default")
deposit_pathway_name = source_pathway_name
#The block above details the possible reactions for the deposit pathway existing or not existing
return(source_pathway_name,deposit_pathway_name)
def gene_ID_finder(entry):
"""This function is supposed to find the gene ID of a known gene entry
Expected inputs:
entry: str #a line from a gtf file that contains a gene ID"""
index_gene_id = entry.find("gene_id")
find_gene_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_gene_id:]
try_find_gene_id_name = find_gene_id_name.search(sub_entry)
gene_ID = try_find_gene_id_name[0].replace("\"","")
return (gene_ID)
def transcript_ID_finder (entry):
"""This function is supposed to finde the transcript ID in a known transcript entry
Expected inputs:
entry: str #a line from a gtf file that contains a transcript ID"""
index_transcript_id = entry.find("transcript_id")
find_transcript_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_transcript_id:]
try_find_transcript_id_name = find_transcript_id_name.search(sub_entry)
try:
transcript_ID = try_find_transcript_id_name[0].replace("\"","")
except:
transcript_ID = ""
return (transcript_ID)
def transcript_support_level_finder(entry):
"""This function is supposed to find the transcript support level in a known transcript entry
Expected input:
entry: str #a line from a gtf file that be blongs to a transcript"""
transcript_support_level_start_ID = entry.find("transcript_support_level")
sub_entry = entry[transcript_support_level_start_ID:]
try:
score_finder = re.compile("\W\w{1,16}\W{2}")
try_score_finder = score_finder.search(sub_entry)
Pre_score_1 = try_score_finder[0]
Pre_score_2 = Pre_score_1.replace("\"","")
Pre_score_2 = Pre_score_2.replace("(","")
transcript_support_level = Pre_score_2.replace(";","")
if "NA" in transcript_support_level:
transcript_support_level = 100
#I changed This tell laura
except:
transcript_support_level = 100
return (transcript_support_level)
def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name):
"""This function extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt.
Expected input:
file_name: str #the name of the gft file you want to look at without the .gtf part
source_pathway_name: str #path of the gtf file
deposit_pathway_name: str #path for saving the intermediat file"""
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
total_entrys =len(f.readlines())
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
current_entry = 0
percentage_done = 0
bar,start_time = bar_builder(length_multiplyer = 3)
Old_gen_ID = str()
#stand-in as the first couple entrys are not genes
with open(os.path.join(deposit_pathway_name,file_name+"_"+"intermediate_file"+".txt"),"w") as IMF:
transcript_number = 0
for entry in f:
current_entry += 1
current_percentage_done = 100* current_entry/total_entrys
if current_percentage_done > percentage_done +10:
bar,start_time = bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar)
percentage_done = int(current_percentage_done)
if "gene_id" in entry:
Gen_ID = gene_ID_finder(entry)
else:
Gen_ID = Old_gen_ID
if Gen_ID != Old_gen_ID:
Gen_entry = ">"+ Gen_ID +"\n"
IMF.write(Gen_entry)
transcript_number = 0
Old_gen_ID = Gen_ID
if "\ttranscript\t" in entry:
transcript_number += 1
Transcript_ID = transcript_ID_finder(entry)
#the function that determins the transcript ID is called
transcript_support_level = transcript_support_level_finder(entry)
#the function that determins the transcript support level is called
New_entry = str(transcript_number)+"\t"+str(Transcript_ID)+"\t"+str(transcript_support_level)+"\t"+"\t\n"
IMF.write(New_entry)
bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
print("The transcripts have been collected")
def extract_transcript(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = False,Input_free = False):
""" This it the overall exetutable funtion that will execute the transcript extraction process for a given file with all checks.
Expected input:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = source_pathway_name #path for saving the intermediat file
Outputs:
file_name: str
source_pathway_name: str
deposit_pathway_name: str
"""
if deposit_pathway_name == False:
deposit_pathway_name = source_pathway_name
if Input_free:
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
else:
file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name)
source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
if validated_file_name[0]:
if __searche_for_preexisting_files(file_name+"_intermediate_file.txt",deposit_pathway_name):
print("The transcripts has been collected\n")
else:
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
return(file_name,source_pathway_name,deposit_pathway_name)
#### Dev part ####
if __name__ == "__main__":
#version_control(module_list,modul_name_list,python_version)
extract_transcript()
#This line allows the file to be executed on its own also from
"""Sample transcripts by Poisson-sampling."""
import pandas as pd
import pandas as pd # type: ignore
import numpy as np
......@@ -30,47 +30,3 @@ class SampleTranscript:
"id": df_repr["id"], "count": levels
})
transcript_numbers.to_csv(output_csv, index=False, header=False)
# python_version = "3.7.13"
# module_list = [pd, np, argparse]
# modul_name_list = ["pd", "np", "argparse"]
# def transcript_sampling(total_transcript_number, df_repr, output_csv):
# # df = pd.read_csv(
# # csv_file, sep="\t", lineterminator="\n", names=["id", "level"])
# # the function match_reprTranscript_expressionLevel() now outputs a df
# df = df_repr
# levels = []
# sums = df['level'].tolist()
# total = sum(sums)
# # I added this because writting a number in the terminal inputed a string
# total_transcript_number = int(total_transcript_number)
# normalized = total_transcript_number/total
# for expression_level in df['level']:
# poisson_sampled = np.random.poisson(expression_level*normalized)
# levels.append(poisson_sampled)
# transcript_numbers = pd.DataFrame({'id': df['id'], 'count': levels})
# pd.DataFrame.to_csv(transcript_numbers, output_csv)
# if __name__ == '__main__':
# # te.version_control(module_list,modul_name_list,python_version)
# parser = argparse.ArgumentParser(
# description="Transcript Poisson sampler, csv output",
# formatter_class=argparse.ArgumentDefaultsHelpFormatter
# )
# parser.add_argument("--expression_level", required=True,
# help="csv file with expression level")
# parser.add_argument("--output_csv", required=True,
# help="output csv file")
# parser.add_argument("--input_csv", required=True,
# help="input csv file")
# parser.add_argument("--transcript_number", required=True,
# help="total number of transcripts to sample")
# args = parser.parse_args()
# transcript_sampling(args.transcript_number, args.input_csv,
# args.output_csv, args.transcript_number)
This diff is collapsed.
import pandas as pd
import numpy as np
import logging
from gtfparse import read_gtf
LOG = logging.getLogger(__name__)
def attributes_converter(attributes: str) -> list:
"""
This funtion converts the "unstructured" ;-seperated part of he line into
a list of identifiers and corresponding data, the structure of
which can be used ot find the data easily e.g the index of the identifier
transcript_id + 1 will give the transcript id of the current gene
Input:
attributes = str() # the unstructured part of the entry
Output:
attributes = list() # cleaned list with the characteristics described
"""
attributes = (
attributes.replace('"', "")
.replace(";", "")
.replace("\\n", "")
.split(" ")
)
return attributes
def find_in_attributes(attributes: list, look_for: str) -> str:
"""
This function finds a keyword and used that to locate the value of that
keyword e.g key = gene_id, value = 'ENSMUSG00002074970',
this works as they are next to each other in the attributes list.
Inputs:
attributes = list()
look_for = str() # string of the name of the key to look for
Output:
attributes[index] or NA = str() # NA is returned if the key
was not found in the attributes
"""
if look_for in attributes:
index = attributes.index(look_for) + 1
return attributes[index]
else:
LOG.warning(f'No {look_for} in the entry, the return was set to NA')
return "NA"
def _re_format(rep_trans_dict: dict) -> dict:
"""
This function is meant to reformat dictionary of the representative
transcripts into an dictionary with only one entry per key
Input:
rep_trans_dict = {gene_id : [
transcript_id, transcript_support_level, transcript_length]}
Output:
rep_transcripts = {gene_id : transcript_id}
"""
rep_transcripts = dict()
for gene_id in rep_trans_dict:
rep_transcripts[gene_id] = rep_trans_dict[gene_id][0]
return rep_transcripts
def get_rep_trans(file_name: str = "test.gtf") -> dict:
"""
This is the main function of this script. It selects one representative transcript per gene based on a GTF annotation file.
It does so by two criteria: the transcript support level and if there are several transcripts of one gene that have the same transcript_support_level, it chooses the one that corresponds to the longest mRNA.
Args:
file_name (str): Name of the annotation file with or without the .gtf extension.
Returns:
rep_transcripts (dict): Dictionary of gene_id to transcript_id representing the selected representative transcripts.
Raises:
ValueError: If an unexpected entry is encountered in the GTF file.
"""
# setting default variables
rep_transcripts = {}
cur_gID = ""
cur_best_trans = ["", 100, 0] # [transcript_id, transcript_support_level, transcript_length]
with open(file_name, "r") as f:
for line in f:
entry = line.split("\t")
# removes expected but unneeded entries
if len(entry) == 1 or entry[2] in [
"CDS",
"stop_codon",
"five_prime_utr",
"three_prime_utr",
"start_codon",
"Selenocysteine"
]:
continue
# this function turns the less organized part of the entry
# into a readable list
attributes = attributes_converter(entry[8])
# looking for and processing exons entries
if entry[2] == "exon":
if ignor_trans:
continue
elif cur_gID != attributes[1]:
LOG.error()
raise ValueError("Exon from an unexpected gene")
elif find_in_attributes(attributes, "transcript_id") != cur_tID:
LOG.error()
raise ValueError("Exon from an unexpected transcript")
# adding the length of the exon to the appropriate list and
# checking for changes in best transcript
if pot_best_trans:
pot_best_trans[2] += int(entry[4]) - int(entry[3])
if pot_best_trans[2] > cur_best_trans[2]:
cur_best_trans = pot_best_trans
pot_best_trans = False
else:
cur_best_trans[2] += int(entry[4]) - int(entry[3])
# looking for and processing transcript entries
elif entry[2] == "transcript":
# verify that the gen is correct
if cur_gID != attributes[1]:
LOG.error()
raise ValueError("Transcript from an unexpected gene")
# finding the transcript id and the support level
cur_tID = find_in_attributes(attributes, "transcript_id")
t_supp_lvl = find_in_attributes(attributes, "transcript_support_level")
# If there is no transcript support level or the level is
# given as NA it is nomed as 100. else the transcript
# support level is turned into int
if t_supp_lvl == "NA":
t_supp_lvl = 100
else:
if t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl)
else:
t_supp_lvl = 100
# decides if the transcript has potential to become the
# representative transcript
if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "":
cur_best_trans = [cur_tID, t_supp_lvl, 0]
pot_best_trans = False
ignor_trans = False
elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_tID, t_supp_lvl, 0]
else:
ignor_trans = True
# looking for and processing gene entries
elif entry[2] == "gene":
# updating rep_transcripts dict
if cur_gID in rep_transcripts:
if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]):
rep_transcripts[cur_gID] = cur_best_trans
else:
rep_transcripts[cur_gID] = cur_best_trans
# updating cur_gID and resetting cur_best_trans
cur_gID = attributes[1]
cur_best_trans = ["", 100, 0]
# raises an error for unidentifiable entries
else:
LOG.error()
raise ValueError("This entry could not be identified")
# adding the final gene to the dictionary
if cur_gID in rep_transcripts:
if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]):
rep_transcripts[cur_gID] = cur_best_trans
else:
rep_transcripts[cur_gID] = cur_best_trans
del rep_transcripts[""]
rep_transcripts = _re_format(rep_transcripts)
return rep_transcripts
def _test():
"""
This funtion is meant to be run for test
Output:
file with the dictionary generated based on the test file
"""
file_name = "test.gtf"
rt = get_rep_trans(file_name)
expected_result = {
"ENSG00000160072": "ENST00000472194",
"ENSG00000234396": "ENST00000442483",
"ENSG00000225972": "ENST00000416931",
"ENSG00000224315": "ENST00000428803",
"ENSG00000198744": "ENST00000416718",
"ENSG00000279928": "ENST00000624431",
"ENSG00000228037": "ENST00000424215",
"ENSG00000142611": "ENST00000378391",
}
if rt != expected_result:
print("The test fail due to not yieding the same results")
print("The results the program got\n", rt)
print("The expected results\n", expected_result)
else:
print("The test was succsesfull")
def gtf_file_writer(original_file: str, rep_transcript_dict: dict, output_file: str):
"""
This function writes the output GTF file.
"""
output = []
with open(original_file, "r") as f:
for line in f:
if line.startswith("#"):
continue
entry = line.split("\t")
attributes = attributes_converter(entry[8])
feature_type = entry[2]
if feature_type == "gene":
gene_id = find_in_attributes(attributes, "gene_id")
output.append(line)
else:
transcript_id = find_in_attributes(attributes, "transcript_id")
if gene_id in rep_transcript_dict and rep_transcript_dict[gene_id] == transcript_id:
output.append(line)
with open(output_file, "w") as last_file:
last_file.writelines(output)
def gtf_to_df(gtf_file: str) -> pd.DataFrame:
"""
This function takes a .gtf file and converts it into a pandas DataFrame
containing gene_id and their transcript_id.
Args:
gtf_file (str): Path to the .gtf file.
Returns:
df_gtf (pd.DataFrame): Pandas DataFrame containing columns 'Gene' and 'Transcript'.
Raises:
None
"""
df_gtf = read_gtf(gtf_file,).to_pandas()
df_gtf = df_gtf[df_gtf["feature"] == "transcript"]
df_gtf = df_gtf[["gene_id", "transcript_id"]]
df_gtf = df_gtf.rename(columns={"gene_id": "Gene", "transcript_id": "Transcript"})
return df_gtf
def dict_reprTrans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame:
"""
Convert a dictionary of genes and their representative transcript into a DataFrame.
Args:
dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...}
Returns:
Pandas DataFrame with 'Gene' and 'Transcript' as columns.
Raises:
TypeError: Only dictionaries are allowed.
TypeError: Keys should be strings.
TypeError: Values should be strings.
"""
if not isinstance(dict_reprTrans, dict):
LOG.error()
raise TypeError("Only dictionaries are allowed")
if not all(isinstance(key, str) for key in dict_reprTrans.keys()):
LOG.error()
raise TypeError("Keys should be strings")
if not all(isinstance(value, str) for value in dict_reprTrans.values()):
LOG.error()
raise TypeError("Values should be strings")
df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"])
df_reprTrans = df_reprTrans.reset_index()
df_reprTrans.columns = ["Gene", "reprTrans"]
df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True)
return df_reprTrans
def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
"""
Convert a TSV or CSV file into a pandas DataFrame.
Args:
input_txt (str): TSV or CSV file containing transcript expression levels.
Returns:
df_gene (pd.DataFrame): Pandas DataFrame with 'Transcript' and 'Expression_level' as columns.
Raises:
None
"""
df_input = pd.read_csv(
input_txt,
sep=r"[\t,]",
lineterminator="\n",
names=["Transcript", "Expression_level"],
engine="python",
)
return df_input
def exprLevel_byGene(
df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
) -> pd.DataFrame:
"""
Find the gene of each transcript given by the expression level CSV/TSV file
and sum the expression level of all transcripts from the same gene.
Args:
df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels,
generated by the "tsv_or_csv_to_df" function.
df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts,
generated by the "transcripts_by_gene_inDf" function.
Returns:
Pandas DataFrame having 'Gene' and sum of its transcript expression levels.
Raises:
None
"""
df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript")
df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index()
return df_sum
def match_byGene(
df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame
) -> pd.DataFrame:
"""
Find matching genes between the two DataFrames.
Args:
df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts,
generated by the "dict_reprTrans_to_df()" function.
df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels,
generated by the "transcript_by_gene_inDf()" function.
Returns:
Pandas DataFrame having representative transcripts and their expression levels.
Raises:
None
"""
df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene")
df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]]
return df_clean
# functions to run this part of the program
def match_reprTranscript_expressionLevel(
exprTrans: str, dict_reprTrans: dict, gtf_file: str,
):
"""
Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts.
Args:
exprTrans (str): CSV or TSV file containing transcripts and their expression level.
dict_reprTrans (dict): Dictionary of genes and their representative transcripts.
gtf_file (str): Path to the GTF file.
Returns:
Pandas DataFrame of representative transcripts and their expression level.
Raises:
None
"""
df_gene_transcript = gtf_to_df(gtf_file)
df_exprTrans = tsv_or_csv_to_df(exprTrans)
df_reprTrans = dict_reprTrans_to_df(dict_reprTrans)
df_exprLevel_byGene = exprLevel_byGene(df_exprTrans, df_gene_transcript)
df_match = match_byGene(df_reprTrans, df_exprLevel_byGene)
df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True)
return df_match
def transcript_sampling(total_transcript_number, df_repr, output_csv):
total = df_repr["level"].sum()
total_transcript_number = int(total_transcript_number)
normalized = total_transcript_number / total
levels = np.random.poisson(df_repr["level"] * normalized)
transcript_numbers = pd.DataFrame({"id": df_repr["id"], "count": levels})
transcript_numbers.to_csv(output_csv, index=False)