Skip to content
Snippets Groups Projects
Commit f97ac4ef authored by Mate Balajti's avatar Mate Balajti
Browse files

refactor: main scripts

parent 60430a41
No related branches found
No related tags found
1 merge request!7feat: add tests
...@@ -4,6 +4,7 @@ import pandas as pd ...@@ -4,6 +4,7 @@ import pandas as pd
import numpy as np import numpy as np
# pylint: disable=C0103
def find_path(filename: str) -> str: def find_path(filename: str) -> str:
"""Find the path to a file. """Find the path to a file.
......
...@@ -223,11 +223,7 @@ class TestMatchReptrans: ...@@ -223,11 +223,7 @@ class TestMatchReptrans:
"at least one row contain NA values" "at least one row contain NA values"
def test_output_tsv(): def test_output_tsv():
""" """Test if a tsv file is generated from a df in the right format."""
This function test if a tsv file is generated from a pandas
dataframe in the right format.
"""
dict_repr_test = { dict_repr_test = {
'ENSMUSG00000079415': 'ENSMUST00000112933', 'ENSMUSG00000079415': 'ENSMUST00000112933',
"ENSMUSG00000024691": "ENSMUST00000025595", "ENSMUSG00000024691": "ENSMUST00000025595",
...@@ -251,7 +247,7 @@ class TestMatchReptrans: ...@@ -251,7 +247,7 @@ class TestMatchReptrans:
ref_path = tFun.find_path("test_ref_output.tsv") ref_path = tFun.find_path("test_ref_output.tsv")
output_path = tFun.find_output() output_path = tFun.find_output()
with open(ref_path, 'r') as t1, open(output_path, 'r') as t2: with open(ref_path, 'r', encoding="utf-8") as t1, open(output_path, 'r') as t2:
fileRef = t1.readlines() fileRef = t1.readlines()
fileOutput = t2.readlines() fileOutput = t2.readlines()
......
"""Find representative transcripts.""" """Find representative transcripts."""
import logging import logging
from typing import Union
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# pylint: disable=R0912,R0915
class FindRepTrans: class FindRepTrans:
"""Find representative transcripts.""" """Find representative transcripts."""
...@@ -12,7 +13,7 @@ class FindRepTrans: ...@@ -12,7 +13,7 @@ class FindRepTrans:
"""Initiate.""" """Initiate."""
@staticmethod @staticmethod
def attributes_converter(attributes: str) -> list: def attributes_converter(attributes):
"""Attributes converter function. """Attributes converter function.
This funtion converts the "unstructured" ;-seperated part of This funtion converts the "unstructured" ;-seperated part of
...@@ -23,7 +24,7 @@ class FindRepTrans: ...@@ -23,7 +24,7 @@ class FindRepTrans:
Input: Input:
attributes = str() # the unstructured part of the entry attributes = str() # the unstructured part of the entry
Output: Output:
attributes = list() # cleaned list with the \ attributes = list() # cleaned list with the
characteristics described above characteristics described above
""" """
attributes = ( attributes = (
...@@ -96,7 +97,7 @@ class FindRepTrans: ...@@ -96,7 +97,7 @@ class FindRepTrans:
ValueError: If an unexpected entry is encountered in the GTF file. ValueError: If an unexpected entry is encountered in the GTF file.
""" """
# setting default variables # setting default variables
rep_transcripts = dict() rep_transcripts: dict = {}
cur_g_id = "" cur_g_id = ""
# [transcript_id, transcript_support_level, transcript_length] # [transcript_id, transcript_support_level, transcript_length]
cur_best_trans = ["", 100, 0] cur_best_trans = ["", 100, 0]
...@@ -122,11 +123,11 @@ class FindRepTrans: ...@@ -122,11 +123,11 @@ class FindRepTrans:
if cur_g_id != attributes[1]: if cur_g_id != attributes[1]:
LOG.error("Exon from an unexpected gene") LOG.error("Exon from an unexpected gene")
raise ValueError("Exon from an unexpected gene") raise ValueError("Exon from an unexpected gene")
elif ( if (
self.find_in_attributes( self.find_in_attributes(
attributes, "transcript_id" attributes, "transcript_id"
) != cur_tID ) != cur_t_ID
): ):
LOG.error("Exon from an unexpected transcript") LOG.error("Exon from an unexpected transcript")
raise ValueError("Exon from an unexpected transcript") raise ValueError("Exon from an unexpected transcript")
...@@ -148,10 +149,10 @@ class FindRepTrans: ...@@ -148,10 +149,10 @@ class FindRepTrans:
raise ValueError("Transcript from an unexpected gene") raise ValueError("Transcript from an unexpected gene")
# finding the transcript id and the support level # finding the transcript id and the support level
cur_tID = self.find_in_attributes( cur_t_ID = self.find_in_attributes(
attributes, "transcript_id" attributes, "transcript_id"
) )
t_supp_lvl = self.find_in_attributes( t_supp_lvl: Union[int, str] = self.find_in_attributes(
attributes, "transcript_support_level" attributes, "transcript_support_level"
) )
...@@ -161,7 +162,7 @@ class FindRepTrans: ...@@ -161,7 +162,7 @@ class FindRepTrans:
if t_supp_lvl == "NA": if t_supp_lvl == "NA":
t_supp_lvl = 100 t_supp_lvl = 100
else: else:
if t_supp_lvl.isdigit(): if isinstance(t_supp_lvl, str) and t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl) t_supp_lvl = int(t_supp_lvl)
else: else:
t_supp_lvl = 100 t_supp_lvl = 100
...@@ -169,11 +170,11 @@ class FindRepTrans: ...@@ -169,11 +170,11 @@ class FindRepTrans:
# decides if the transcript has potential to become the # decides if the transcript has potential to become the
# representative transcript # representative transcript
if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "": if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "":
cur_best_trans = [cur_tID, t_supp_lvl, 0] cur_best_trans = [cur_t_ID, t_supp_lvl, 0]
pot_best_trans = False pot_best_trans = False
ignor_trans = False ignor_trans = False
elif t_supp_lvl == cur_best_trans[1]: elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_tID, t_supp_lvl, 0] pot_best_trans = [cur_t_ID, t_supp_lvl, 0]
else: else:
ignor_trans = True ignor_trans = True
...@@ -203,7 +204,7 @@ class FindRepTrans: ...@@ -203,7 +204,7 @@ class FindRepTrans:
if cur_g_id in rep_transcripts: if cur_g_id in rep_transcripts:
if (rep_transcripts[cur_g_id][1] > cur_best_trans[1] or if (rep_transcripts[cur_g_id][1] > cur_best_trans[1] or
(rep_transcripts[cur_g_id][1] == cur_best_trans[1] and (rep_transcripts[cur_g_id][1] == cur_best_trans[1] and
rep_transcripts[cur_g_id][2] < cur_best_trans[2])): rep_transcripts[cur_g_id][2] < cur_best_trans[2])):
rep_transcripts[cur_g_id] = cur_best_trans rep_transcripts[cur_g_id] = cur_best_trans
else: else:
rep_transcripts[cur_g_id] = cur_best_trans rep_transcripts[cur_g_id] = cur_best_trans
......
"""Match representative transcript with expression level""" """Match representative transcript with expression level."""
# Made by Hugo Gillet # # Made by Hugo Gillet #
import logging import logging
...@@ -40,12 +40,15 @@ class MatchReptransExplvl: ...@@ -40,12 +40,15 @@ class MatchReptransExplvl:
return df_gtf return df_gtf
@staticmethod @staticmethod
def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame: def dict_repr_trans_to_df(
dict_repr_trans: "dict[str, str]"
) -> pd.DataFrame:
""" """
Convert a dictionary of genes and their representative transcript into a DataFrame. Convert a dict of genes and their representative transcript into a df.
Args: Args:
dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...} dict_repr_trans (dict):
{'Gene': ['transcriptA', 'transcriptB'], ...}
Returns: Returns:
Pandas DataFrame with 'Gene' and 'Transcript' as columns. Pandas DataFrame with 'Gene' and 'Transcript' as columns.
...@@ -55,22 +58,32 @@ class MatchReptransExplvl: ...@@ -55,22 +58,32 @@ class MatchReptransExplvl:
TypeError: Keys should be strings. TypeError: Keys should be strings.
TypeError: Values should be strings. TypeError: Values should be strings.
""" """
if not isinstance(dict_reprTrans, dict): if not isinstance(dict_repr_trans, dict):
LOG.error("Only dictionaries are allowed") LOG.error("Only dictionaries are allowed")
raise TypeError("Only dictionaries are allowed") raise TypeError("Only dictionaries are allowed")
if not all(isinstance(key, str) for key in dict_reprTrans.keys()): if not all(
isinstance(key, str) for key in dict_repr_trans.keys()
):
LOG.error("Keys should be strings") LOG.error("Keys should be strings")
raise TypeError("Keys should be strings") raise TypeError("Keys should be strings")
if not all(isinstance(value, str) for value in dict_reprTrans.values()): if not all(
isinstance(value, str) for value in dict_repr_trans.values()
):
LOG.error("Values should be strings") LOG.error("Values should be strings")
raise TypeError("Values should be strings") raise TypeError("Values should be strings")
df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"]) df_repr_trans = pd.DataFrame.from_dict(
df_reprTrans = df_reprTrans.reset_index() dict_repr_trans, orient="index", columns=["reprTranscript"]
df_reprTrans.columns = ["Gene", "reprTrans"] )
df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True) df_repr_trans = df_repr_trans.reset_index()
column_names = ["Gene", "reprTrans"]
df_repr_trans.columns = pd.Index(column_names)
# pylint: disable=E1136,E1137
df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace(
r"\.[1-9]", "", regex=True
)
return df_reprTrans return df_repr_trans
@staticmethod @staticmethod
def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
...@@ -99,85 +112,111 @@ class MatchReptransExplvl: ...@@ -99,85 +112,111 @@ class MatchReptransExplvl:
@staticmethod @staticmethod
def expr_level_by_gene( def expr_level_by_gene(
df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame df_expr_transcript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
) -> pd.DataFrame: ) -> pd.DataFrame:
""" """Sum expression levels.
Find the gene of each transcript given by the expression level CSV/TSV file
and sum the expression level of all transcripts from the same gene. Find the gene of each transcript given by the expression level
CSV/TSV file and sum the expression level of all transcripts
from the same gene.
Args: Args:
df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels, df_expr_transcript (pd.DataFrame):
generated by the "tsv_or_csv_to_df" function. Pandas DataFrame containing transcripts and their
df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts, expression levels, generated by the
generated by the "transcripts_by_gene_inDf" function. "tsv_or_csv_to_df" function.
df_output_gtf_selection (pd.DataFrame):
Pandas DataFrame containing genes and transcripts,
generated by the "transcripts_by_gene_inDf" function.
Returns: Returns:
Pandas DataFrame having 'Gene' and sum of its transcript expression levels. Pandas DataFrame having 'Gene' and sum of its
transcript expression levels.
Raises: Raises:
None None
""" """
df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript") df_merged = pd.merge(
df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index() df_output_gtf_selection, df_expr_transcript,
how="inner", on="Transcript")
df_sum = df_merged.groupby("Gene")["Expression_level"].sum(
).reset_index()
return df_sum return df_sum
@staticmethod @staticmethod
def match_by_gene( def match_by_gene(
df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame df_repr_transcript: pd.DataFrame,
df_expression_level_by_gene: pd.DataFrame
) -> pd.DataFrame: ) -> pd.DataFrame:
""" """
Find matching genes between the two DataFrames. Find matching genes between the two DataFrames.
Args: Args:
df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts, df_repr_transcript (pd.DataFrame): Pandas DataFrame
generated by the "dict_repr_trans_to_df()" function. containing genes and their representative transcripts,
df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels, generated by the "dict_repr_trans_to_df()" function.
generated by the "transcript_by_gene_inDf()" function. df_expression_level_by_gene (pd.DataFrame): Pandas DataFrame
containing genes and their expression levels,
generated by the "transcript_by_gene_inDf()" function.
Returns: Returns:
Pandas DataFrame having representative transcripts and their expression levels. Pandas DataFrame having representative transcripts and
their expression levels.
Raises: Raises:
None None
""" """
df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene") df_merged = pd.merge(
df_repr_transcript, df_expression_level_by_gene,
how="inner", on="Gene"
)
df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]] df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]]
return df_clean return df_clean
def match_repr_transcript_expression_level( def match_repr_transcript_expression_level(
self, exprTrans: str, dict_reprTrans: dict, gtf_file: str, self, expr_trans: str, dict_repr_trans: dict, gtf_file: str,
): ):
""" """Replace expression level with representative transcripts.
Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts.
Combine functions to replace transcripts from an expression level
CSV/TSV file with representative transcripts.
Args: Args:
exprTrans (str): CSV or TSV file containing transcripts and their expression level. expr_trans (str): CSV or TSV file containing transcripts
dict_reprTrans (dict): Dictionary of genes and their representative transcripts. and their expression level.
dict_repr_trans (dict): Dictionary of genes
and their representative transcripts.
gtf_file (str): Path to the GTF file. gtf_file (str): Path to the GTF file.
Returns: Returns:
Pandas DataFrame of representative transcripts and their expression level. Pandas DataFrame of representative transcripts
and their expression level.
Raises: Raises:
None None
""" """
df_gene_transcript = self.gtf_to_df(gtf_file) df_gene_transcript = self.gtf_to_df(gtf_file)
df_exprTrans = self.tsv_or_csv_to_df(exprTrans) df_expr_trans = self.tsv_or_csv_to_df(expr_trans)
df_reprTrans = self.dict_repr_trans_to_df(dict_reprTrans) df_repr_trans = self.dict_repr_trans_to_df(dict_repr_trans)
df_expr_level_by_gene = self.expr_level_by_gene(df_exprTrans, df_gene_transcript) df_expr_level_by_gene = self.expr_level_by_gene(
df_match = self.match_by_gene(df_reprTrans, df_expr_level_by_gene) df_expr_trans, df_gene_transcript
df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True) )
df_match = self.match_by_gene(df_repr_trans, df_expr_level_by_gene)
df_match.rename(
columns={"reprTrans": "id", "Expression_level": "level"},
inplace=True
)
return df_match return df_match
# def dict_repr_trans_to_df(dict_repr_trans: "dict[str, str]") -> pd.DataFrame:
# def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame:
# """Convert a dictionary of genes and their representative # """Convert a dictionary of genes and their representative
# transcript into a dataframe # transcript into a dataframe
# Args: # Args:
# dict_reprTrans (dict): {'Gene':['transcriptA', 'transcriptB'], ...} # dict_repr_trans (dict):
# {'Gene':['transcriptA', 'transcriptB'], ...}
# Returns: # Returns:
# Pandas dataframe having Gene and transcript as columns # Pandas dataframe having Gene and transcript as columns
...@@ -189,22 +228,22 @@ class MatchReptransExplvl: ...@@ -189,22 +228,22 @@ class MatchReptransExplvl:
# """ # """
# pass # pass
# if not type(dict_reprTrans) is dict: # if not type(dict_repr_trans) is dict:
# raise TypeError("Only dict are allowed") # raise TypeError("Only dict are allowed")
# if type(list(dict_reprTrans.keys())[0]) is not str: # if type(list(dict_repr_trans.keys())[0]) is not str:
# raise TypeError("Key should be strings") # raise TypeError("Key should be strings")
# if type(list(dict_reprTrans.values())[0]) is not str: # if type(list(dict_repr_trans.values())[0]) is not str:
# raise TypeError("Values should be strings") # raise TypeError("Values should be strings")
# df_reprTrans = pd.DataFrame.from_dict( # df_repr_trans = pd.DataFrame.from_dict(
# dict_reprTrans, orient="index", columns=["reprTranscript"] # dict_repr_trans, orient="index", columns=["reprTranscript"]
# ) # )
# df_reprTrans = df_reprTrans.reset_index(level=0) # df_repr_trans = df_repr_trans.reset_index(level=0)
# df_reprTrans.columns = ["Gene", "reprTrans"] # df_repr_trans.columns = ["Gene", "reprTrans"]
# df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace( # df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace(
# r"\.[1-9]", "", regex=True # r"\.[1-9]", "", regex=True
# ) # )
# return df_reprTrans # return df_repr_trans
# def gene_and_transcript(gtf_file: str) -> pd.DataFrame: # def gene_and_transcript(gtf_file: str) -> pd.DataFrame:
...@@ -259,7 +298,7 @@ class MatchReptransExplvl: ...@@ -259,7 +298,7 @@ class MatchReptransExplvl:
# file, and summ expression level of all transcipts from the same gene. # file, and summ expression level of all transcipts from the same gene.
# Args: # Args:
# df_exprTranscript: pandas df containing transcript and # df_expr_transcript: pandas df containing transcript and
# their exp level generated by "tsv_or_csv_to_df" function # their exp level generated by "tsv_or_csv_to_df" function
# df_output_gtf_selection : pandas df containing genes and # df_output_gtf_selection : pandas df containing genes and
# transcripts, generated by "transcripts_by_gene_inDf" function # transcripts, generated by "transcripts_by_gene_inDf" function
...@@ -282,15 +321,16 @@ class MatchReptransExplvl: ...@@ -282,15 +321,16 @@ class MatchReptransExplvl:
# def match_by_gene( # def match_by_gene(
# df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame # df_repr_transcript: pd.DataFrame,
# df_expression_level_by_gene: pd.DataFrame
# ) -> pd.DataFrame: # ) -> pd.DataFrame:
# """Find matching genes bewteen the 2 args # """Find matching genes bewteen the 2 args
# Args: # Args:
# df_reprTranscript : pandas Dataframe containing genes # df_repr_transcript : pandas Dataframe containing genes
# and their representative transcript, generated by # and their representative transcript, generated by
# "dict_repr_trans_to_df()" # "dict_repr_trans_to_df()"
# df_expressionLevel_byGene : pandas Dataframe containing # df_expression_level_by_gene : pandas Dataframe containing
# genes and their expression level generated by # genes and their expression level generated by
# "transcript_by_gene_inDf()" # "transcript_by_gene_inDf()"
...@@ -303,7 +343,8 @@ class MatchReptransExplvl: ...@@ -303,7 +343,8 @@ class MatchReptransExplvl:
# """ # """
# pass # pass
# df_merged = pd.merge( # df_merged = pd.merge(
# df_reprTranscript, df_expressionLevel_byGene, how="outer", on="Gene" # df_repr_transcript, df_expression_level_by_gene,
# how="outer", on="Gene"
# ) # )
# df_clean = df_merged.dropna(axis=0) # df_clean = df_merged.dropna(axis=0)
# df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]] # df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]]
...@@ -312,15 +353,15 @@ class MatchReptransExplvl: ...@@ -312,15 +353,15 @@ class MatchReptransExplvl:
# # functions to run this part of the programm # # functions to run this part of the programm
# def match_repr_transcript_expression_level( # def match_repr_transcript_expression_level(
# exprTrans: str, dict_reprTrans: dict, gtf_file: str, # expr_trans: str, dict_repr_trans: dict, gtf_file: str,
# ): # ):
# """Combine functions to replace transcripts from an exp level csv/tsv file # """Combine functions to replace transcripts from exp level csv/tsv file
# with representative transcripts # with representative transcripts
# Args: # Args:
# exprTrans (str): csv or tsv file containing transcripts # expr_trans (str): csv or tsv file containing transcripts
# and their expression level # and their expression level
# dict_reprTrans (dict) : dict of genes and their # dict_repr_trans (dict) : dict of genes and their
# representative transcipt # representative transcipt
# intemediate_file (str) : txt file containing genes, transcript # intemediate_file (str) : txt file containing genes, transcript
# and their expression level from the transkript_extractor function # and their expression level from the transkript_extractor function
...@@ -333,12 +374,12 @@ class MatchReptransExplvl: ...@@ -333,12 +374,12 @@ class MatchReptransExplvl:
# None # None
# """ # """
# df_gene_transcript = gene_and_transcript(gtf_file) # df_gene_transcript = gene_and_transcript(gtf_file)
# df_exprTrans = tsv_or_csv_to_df(exprTrans) # df_expr_trans = tsv_or_csv_to_df(expr_trans)
# df_reprTrans = dict_repr_trans_to_df(dict_reprTrans) # df_repr_trans = dict_repr_trans_to_df(dict_repr_trans)
# df_expr_level_by_gene = expr_level_by_gene( # df_expr_level_by_gene = expr_level_by_gene(
# df_exprTrans, df_gene_transcript # df_expr_trans, df_gene_transcript
# ) # error here # ) # error here
# df_match = match_by_gene(df_reprTrans, df_expr_level_by_gene) # df_match = match_by_gene(df_repr_trans, df_expr_level_by_gene)
# df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'}, # df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'},
# inplace=True) # inplace=True)
# return df_match # return df_match
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment