Skip to content
Snippets Groups Projects
Commit 5a912247 authored by Mate Balajti's avatar Mate Balajti
Browse files

refactor: update main scripts

parent b700c8a1
No related branches found
No related tags found
1 merge request!7feat: add tests
......@@ -52,8 +52,8 @@ def main():
log.info("Started transcript sampler.")
dict_repr_trans = find_rep_trans.get_rep_trans(args.input_gtf)
df_repr = match_reptrs_explvl.match_repr_transcript_expression_level(
dict_reprTrans=dict_repr_trans,
exprTrans=args.input_csv,
dict_repr_trans=dict_repr_trans,
expr_trans=args.input_csv,
gtf_file=args.input_gtf
)
log.info(
......
......@@ -52,10 +52,9 @@ class FindRepTrans:
if look_for in attributes:
index = attributes.index(look_for) + 1
return attributes[index]
else:
LOG.warning('No %s in the entry, the return was set to NA',
look_for)
return "NA"
LOG.warning('No %s in the entry, the return was set to NA',
look_for)
return "NA"
@staticmethod
def reformat_reptrans(rep_trans_dict: dict) -> dict:
......@@ -99,6 +98,9 @@ class FindRepTrans:
# setting default variables
rep_transcripts: dict = {}
cur_g_id = ""
cur_t_id = ""
pot_best_trans: list = []
cur_best_trans: list = []
# [transcript_id, transcript_support_level, transcript_length]
cur_best_trans = ["", 100, 0]
......@@ -126,7 +128,7 @@ class FindRepTrans:
if (
self.find_in_attributes(
attributes, "transcript_id"
) != cur_t_ID
) != cur_t_id
):
LOG.error("Exon from an unexpected transcript")
raise ValueError("Exon from an unexpected transcript")
......@@ -137,7 +139,6 @@ class FindRepTrans:
pot_best_trans[2] += int(entry[4]) - int(entry[3])
if pot_best_trans[2] > cur_best_trans[2]:
cur_best_trans = pot_best_trans
pot_best_trans = False
else:
cur_best_trans[2] += int(entry[4]) - int(entry[3])
......@@ -149,7 +150,7 @@ class FindRepTrans:
raise ValueError("Transcript from an unexpected gene")
# finding the transcript id and the support level
cur_t_ID = self.find_in_attributes(
cur_t_id = self.find_in_attributes(
attributes, "transcript_id"
)
t_supp_lvl: Union[int, str] = self.find_in_attributes(
......@@ -162,21 +163,22 @@ class FindRepTrans:
if t_supp_lvl == "NA":
t_supp_lvl = 100
else:
if isinstance(t_supp_lvl, str) and t_supp_lvl.isdigit():
if isinstance(
t_supp_lvl, str
) and t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl)
else:
t_supp_lvl = 100
# decides if the transcript has potential to become the
# representative transcript
if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "":
cur_best_trans = [cur_t_ID, t_supp_lvl, 0]
pot_best_trans = False
ignor_trans = False
if (
t_supp_lvl < cur_best_trans[1] or
cur_best_trans[0] == ""
):
cur_best_trans = [cur_t_id, t_supp_lvl, 0]
elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_t_ID, t_supp_lvl, 0]
else:
ignor_trans = True
pot_best_trans = [cur_t_id, t_supp_lvl, 0]
# looking for and processing gene entries
elif entry[2] == "gene":
......@@ -221,8 +223,8 @@ class FindRepTrans:
"""
output = []
with open(original_file, "r", encoding="utf-8") as f:
for line in f:
with open(original_file, "r", encoding="utf-8") as file:
for line in file:
if line.startswith("#"):
continue
......@@ -243,51 +245,3 @@ class FindRepTrans:
with open(output_file, "w", encoding="utf-8") as last_file:
last_file.writelines(output)
# def _test():
# """
# This funtion is meant to be run for test
# Output:
# file with the dictionary generated based on the test file
# """
# file_name = "test.gtf"
# rt = get_rep_trans(file_name)
# expected_result = {"ENSG00000160072": "ENST00000472194",
# "ENSG00000234396": "ENST00000442483",
# "ENSG00000225972": "ENST00000416931",
# "ENSG00000224315": "ENST00000428803",
# "ENSG00000198744": "ENST00000416718",
# "ENSG00000279928": "ENST00000624431",
# "ENSG00000228037": "ENST00000424215",
# 'ENSG00000142611': 'ENST00000378391'}
# if rt != expected_result:
# print("The test failed due to not yielding the same results")
# print("The results the program got\n", rt)
# print("The expected results\n", expected_result)
# else:
# print("The test was successful")
# # Execution part #
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(
# description="find_representativ_transcripts",
# formatter_class=argparse.ArgumentDefaultsHelpFormatter
# )
# parser.add_argument("-file_name", required=True,
# help="gtf file with genome annotation")
# parser.add_argument("-t", required=False, default=False,
# help="to run the test input -t True")
# args = parser.parse_args()
# # standadize the file_name inlude .gtf#
# file_name = args.file_name
# i_gtf = file_name.find(".gtf")
# if i_gtf == -1:
# file_name += ".gtf"
# if args.t:
# _test()
# else:
# get_rep_trans(file_name)
......@@ -2,8 +2,8 @@
# Made by Hugo Gillet #
import logging
import pandas as pd
from gtfparse import read_gtf
import pandas as pd # type: ignore
from gtfparse import read_gtf # type: ignore
LOG = logging.getLogger(__name__)
......@@ -43,8 +43,7 @@ class MatchReptransExplvl:
def dict_repr_trans_to_df(
dict_repr_trans: "dict[str, str]"
) -> pd.DataFrame:
"""
Convert a dict of genes and their representative transcript into a df.
"""Convert a dict of genes and representative transcript into a df.
Args:
dict_repr_trans (dict):
......@@ -87,8 +86,7 @@ class MatchReptransExplvl:
@staticmethod
def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
"""
Convert a TSV or CSV file into a pandas DataFrame.
"""Convert a TSV or CSV file into a pandas DataFrame.
Args:
input_txt (str): TSV or CSV file containing transcript expression
......@@ -148,8 +146,7 @@ class MatchReptransExplvl:
df_repr_transcript: pd.DataFrame,
df_expression_level_by_gene: pd.DataFrame
) -> pd.DataFrame:
"""
Find matching genes between the two DataFrames.
"""Find matching genes between the two DataFrames.
Args:
df_repr_transcript (pd.DataFrame): Pandas DataFrame
......@@ -207,184 +204,3 @@ class MatchReptransExplvl:
inplace=True
)
return df_match
# def dict_repr_trans_to_df(dict_repr_trans: "dict[str, str]") -> pd.DataFrame:
# """Convert a dictionary of genes and their representative
# transcript into a dataframe
# Args:
# dict_repr_trans (dict):
# {'Gene':['transcriptA', 'transcriptB'], ...}
# Returns:
# Pandas dataframe having Gene and transcript as columns
# Raises:
# Only dict are allowed
# Key should be strings
# Value should be strings
# """
# pass
# if not type(dict_repr_trans) is dict:
# raise TypeError("Only dict are allowed")
# if type(list(dict_repr_trans.keys())[0]) is not str:
# raise TypeError("Key should be strings")
# if type(list(dict_repr_trans.values())[0]) is not str:
# raise TypeError("Values should be strings")
# df_repr_trans = pd.DataFrame.from_dict(
# dict_repr_trans, orient="index", columns=["reprTranscript"]
# )
# df_repr_trans = df_repr_trans.reset_index(level=0)
# df_repr_trans.columns = ["Gene", "reprTrans"]
# df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace(
# r"\.[1-9]", "", regex=True
# )
# return df_repr_trans
# def gene_and_transcript(gtf_file: str) -> pd.DataFrame:
# """
# This function take a .gtf file and convert it into a
# dataframe containing gene_id and their transcripts_id.
# Args:
# gtf_file(str) : path to the .gtf file
# Returns:
# df_gtf(pd.DataFrame): pandas df containing having has columns
# gene_id and their transcripts_id.
# Raises:
# None
# """
# df_gtf = read_gtf(gtf_file)
# df_gtf = df_gtf.loc[df_gtf["feature"] == "transcript"]
# df_gtf = df_gtf[["gene_id", "transcript_id"]]
# df_gtf = df_gtf.rename(columns={"gene_id": "Gene",
# "transcript_id": "Transcript"})
# return df_gtf
# def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
# """Convert tsv or csv file into a pandas dataframe
# Args:
# input_txt (str): csv or tsv file containing transcript exp level
# Returns:
# df_gene (str): Pandas dataframe having transcript and exp level
# as columns
# Raises:
# None
# """
# pass
# df_input = pd.read_csv(
# input_txt,
# sep=r"[\t,]",
# lineterminator="\n",
# names=["Transcript", "Expression_level"],
# engine="python",
# )
# return df_input
# def expr_level_by_gene(
# df_exprTrasncript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
# ) -> pd.DataFrame:
# """find the gene of each transcipt given by the expression level csv/tsv
# file, and summ expression level of all transcipts from the same gene.
# Args:
# df_expr_transcript: pandas df containing transcript and
# their exp level generated by "tsv_or_csv_to_df" function
# df_output_gtf_selection : pandas df containing genes and
# transcripts, generated by "transcripts_by_gene_inDf" function
# Returns:
# Pandas dataframe having gene and sum of its transcript exp level
# Raises:
# None
# """
# pass
# df_merged = pd.merge(
# df_output_gtf_selection, df_exprTrasncript,
# how="inner", on="Transcript"
# )
# df_sum = df_merged.groupby("Gene").sum(
# "Expression_level"
# )
# return df_sum
# def match_by_gene(
# df_repr_transcript: pd.DataFrame,
# df_expression_level_by_gene: pd.DataFrame
# ) -> pd.DataFrame:
# """Find matching genes bewteen the 2 args
# Args:
# df_repr_transcript : pandas Dataframe containing genes
# and their representative transcript, generated by
# "dict_repr_trans_to_df()"
# df_expression_level_by_gene : pandas Dataframe containing
# genes and their expression level generated by
# "transcript_by_gene_inDf()"
# Returns:
# Pandas dataframe having representative trasncripts
# and their expression level
# Raises:
# None
# """
# pass
# df_merged = pd.merge(
# df_repr_transcript, df_expression_level_by_gene,
# how="outer", on="Gene"
# )
# df_clean = df_merged.dropna(axis=0)
# df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]]
# return df_clean
# # functions to run this part of the programm
# def match_repr_transcript_expression_level(
# expr_trans: str, dict_repr_trans: dict, gtf_file: str,
# ):
# """Combine functions to replace transcripts from exp level csv/tsv file
# with representative transcripts
# Args:
# expr_trans (str): csv or tsv file containing transcripts
# and their expression level
# dict_repr_trans (dict) : dict of genes and their
# representative transcipt
# intemediate_file (str) : txt file containing genes, transcript
# and their expression level from the transkript_extractor function
# output_path : path indicating were the tsv file should be written
# Returns:
# tsv file of representative trasncripts and their expression level
# Raises:
# None
# """
# df_gene_transcript = gene_and_transcript(gtf_file)
# df_expr_trans = tsv_or_csv_to_df(expr_trans)
# df_repr_trans = dict_repr_trans_to_df(dict_repr_trans)
# df_expr_level_by_gene = expr_level_by_gene(
# df_expr_trans, df_gene_transcript
# ) # error here
# df_match = match_by_gene(df_repr_trans, df_expr_level_by_gene)
# df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'},
# inplace=True)
# return df_match
# # run the program
# if __name__ == "__main__":
# match_repr_transcript_expression_level()
"""Sample transcripts by Poisson-sampling."""
import pandas as pd
import pandas as pd # type: ignore
import numpy as np
......@@ -30,47 +30,3 @@ class SampleTranscript:
"id": df_repr["id"], "count": levels
})
transcript_numbers.to_csv(output_csv, index=False, header=False)
# python_version = "3.7.13"
# module_list = [pd, np, argparse]
# modul_name_list = ["pd", "np", "argparse"]
# def transcript_sampling(total_transcript_number, df_repr, output_csv):
# # df = pd.read_csv(
# # csv_file, sep="\t", lineterminator="\n", names=["id", "level"])
# # the function match_reprTranscript_expressionLevel() now outputs a df
# df = df_repr
# levels = []
# sums = df['level'].tolist()
# total = sum(sums)
# # I added this because writting a number in the terminal inputed a string
# total_transcript_number = int(total_transcript_number)
# normalized = total_transcript_number/total
# for expression_level in df['level']:
# poisson_sampled = np.random.poisson(expression_level*normalized)
# levels.append(poisson_sampled)
# transcript_numbers = pd.DataFrame({'id': df['id'], 'count': levels})
# pd.DataFrame.to_csv(transcript_numbers, output_csv)
# if __name__ == '__main__':
# # te.version_control(module_list,modul_name_list,python_version)
# parser = argparse.ArgumentParser(
# description="Transcript Poisson sampler, csv output",
# formatter_class=argparse.ArgumentDefaultsHelpFormatter
# )
# parser.add_argument("--expression_level", required=True,
# help="csv file with expression level")
# parser.add_argument("--output_csv", required=True,
# help="output csv file")
# parser.add_argument("--input_csv", required=True,
# help="input csv file")
# parser.add_argument("--transcript_number", required=True,
# help="total number of transcripts to sample")
# args = parser.parse_args()
# transcript_sampling(args.transcript_number, args.input_csv,
# args.output_csv, args.transcript_number)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment