diff --git a/transcript_sampler/cli.py b/transcript_sampler/cli.py index f4d278805d2f8277dc9d2e71813aeaf85cf9c01a..13446a4524f90fb98ce3f1ff16114f8110b19a91 100644 --- a/transcript_sampler/cli.py +++ b/transcript_sampler/cli.py @@ -52,8 +52,8 @@ def main(): log.info("Started transcript sampler.") dict_repr_trans = find_rep_trans.get_rep_trans(args.input_gtf) df_repr = match_reptrs_explvl.match_repr_transcript_expression_level( - dict_reprTrans=dict_repr_trans, - exprTrans=args.input_csv, + dict_repr_trans=dict_repr_trans, + expr_trans=args.input_csv, gtf_file=args.input_gtf ) log.info( diff --git a/transcript_sampler/find_reptrans.py b/transcript_sampler/find_reptrans.py index 46af0ba6d35fde24fea74d108e80c18dfd66f85c..9c2511f5e42c9d32c1adc9ecfd4ad41f535c4cce 100644 --- a/transcript_sampler/find_reptrans.py +++ b/transcript_sampler/find_reptrans.py @@ -52,10 +52,9 @@ class FindRepTrans: if look_for in attributes: index = attributes.index(look_for) + 1 return attributes[index] - else: - LOG.warning('No %s in the entry, the return was set to NA', - look_for) - return "NA" + LOG.warning('No %s in the entry, the return was set to NA', + look_for) + return "NA" @staticmethod def reformat_reptrans(rep_trans_dict: dict) -> dict: @@ -99,6 +98,9 @@ class FindRepTrans: # setting default variables rep_transcripts: dict = {} cur_g_id = "" + cur_t_id = "" + pot_best_trans: list = [] + cur_best_trans: list = [] # [transcript_id, transcript_support_level, transcript_length] cur_best_trans = ["", 100, 0] @@ -126,7 +128,7 @@ class FindRepTrans: if ( self.find_in_attributes( attributes, "transcript_id" - ) != cur_t_ID + ) != cur_t_id ): LOG.error("Exon from an unexpected transcript") raise ValueError("Exon from an unexpected transcript") @@ -137,7 +139,6 @@ class FindRepTrans: pot_best_trans[2] += int(entry[4]) - int(entry[3]) if pot_best_trans[2] > cur_best_trans[2]: cur_best_trans = pot_best_trans - pot_best_trans = False else: cur_best_trans[2] += int(entry[4]) - int(entry[3]) @@ -149,7 +150,7 @@ class FindRepTrans: raise ValueError("Transcript from an unexpected gene") # finding the transcript id and the support level - cur_t_ID = self.find_in_attributes( + cur_t_id = self.find_in_attributes( attributes, "transcript_id" ) t_supp_lvl: Union[int, str] = self.find_in_attributes( @@ -162,21 +163,22 @@ class FindRepTrans: if t_supp_lvl == "NA": t_supp_lvl = 100 else: - if isinstance(t_supp_lvl, str) and t_supp_lvl.isdigit(): + if isinstance( + t_supp_lvl, str + ) and t_supp_lvl.isdigit(): t_supp_lvl = int(t_supp_lvl) else: t_supp_lvl = 100 # decides if the transcript has potential to become the # representative transcript - if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "": - cur_best_trans = [cur_t_ID, t_supp_lvl, 0] - pot_best_trans = False - ignor_trans = False + if ( + t_supp_lvl < cur_best_trans[1] or + cur_best_trans[0] == "" + ): + cur_best_trans = [cur_t_id, t_supp_lvl, 0] elif t_supp_lvl == cur_best_trans[1]: - pot_best_trans = [cur_t_ID, t_supp_lvl, 0] - else: - ignor_trans = True + pot_best_trans = [cur_t_id, t_supp_lvl, 0] # looking for and processing gene entries elif entry[2] == "gene": @@ -221,8 +223,8 @@ class FindRepTrans: """ output = [] - with open(original_file, "r", encoding="utf-8") as f: - for line in f: + with open(original_file, "r", encoding="utf-8") as file: + for line in file: if line.startswith("#"): continue @@ -243,51 +245,3 @@ class FindRepTrans: with open(output_file, "w", encoding="utf-8") as last_file: last_file.writelines(output) - - -# def _test(): -# """ -# This funtion is meant to be run for test -# Output: -# file with the dictionary generated based on the test file -# """ -# file_name = "test.gtf" -# rt = get_rep_trans(file_name) -# expected_result = {"ENSG00000160072": "ENST00000472194", -# "ENSG00000234396": "ENST00000442483", -# "ENSG00000225972": "ENST00000416931", -# "ENSG00000224315": "ENST00000428803", -# "ENSG00000198744": "ENST00000416718", -# "ENSG00000279928": "ENST00000624431", -# "ENSG00000228037": "ENST00000424215", -# 'ENSG00000142611': 'ENST00000378391'} -# if rt != expected_result: -# print("The test failed due to not yielding the same results") -# print("The results the program got\n", rt) -# print("The expected results\n", expected_result) -# else: -# print("The test was successful") - - -# # Execution part # -# if __name__ == "__main__": -# parser = argparse.ArgumentParser( -# description="find_representativ_transcripts", -# formatter_class=argparse.ArgumentDefaultsHelpFormatter -# ) -# parser.add_argument("-file_name", required=True, -# help="gtf file with genome annotation") -# parser.add_argument("-t", required=False, default=False, -# help="to run the test input -t True") -# args = parser.parse_args() - -# # standadize the file_name inlude .gtf# -# file_name = args.file_name -# i_gtf = file_name.find(".gtf") -# if i_gtf == -1: -# file_name += ".gtf" - -# if args.t: -# _test() -# else: -# get_rep_trans(file_name) diff --git a/transcript_sampler/match_reptrans_explvl.py b/transcript_sampler/match_reptrans_explvl.py index 654f8dc95c04ac41822508b82e04bbe85277e114..a914d8ca744c19e8c59b43ce7b526f1d9233cd72 100644 --- a/transcript_sampler/match_reptrans_explvl.py +++ b/transcript_sampler/match_reptrans_explvl.py @@ -2,8 +2,8 @@ # Made by Hugo Gillet # import logging -import pandas as pd -from gtfparse import read_gtf +import pandas as pd # type: ignore +from gtfparse import read_gtf # type: ignore LOG = logging.getLogger(__name__) @@ -43,8 +43,7 @@ class MatchReptransExplvl: def dict_repr_trans_to_df( dict_repr_trans: "dict[str, str]" ) -> pd.DataFrame: - """ - Convert a dict of genes and their representative transcript into a df. + """Convert a dict of genes and representative transcript into a df. Args: dict_repr_trans (dict): @@ -87,8 +86,7 @@ class MatchReptransExplvl: @staticmethod def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: - """ - Convert a TSV or CSV file into a pandas DataFrame. + """Convert a TSV or CSV file into a pandas DataFrame. Args: input_txt (str): TSV or CSV file containing transcript expression @@ -148,8 +146,7 @@ class MatchReptransExplvl: df_repr_transcript: pd.DataFrame, df_expression_level_by_gene: pd.DataFrame ) -> pd.DataFrame: - """ - Find matching genes between the two DataFrames. + """Find matching genes between the two DataFrames. Args: df_repr_transcript (pd.DataFrame): Pandas DataFrame @@ -207,184 +204,3 @@ class MatchReptransExplvl: inplace=True ) return df_match - - -# def dict_repr_trans_to_df(dict_repr_trans: "dict[str, str]") -> pd.DataFrame: - -# """Convert a dictionary of genes and their representative -# transcript into a dataframe - -# Args: -# dict_repr_trans (dict): -# {'Gene':['transcriptA', 'transcriptB'], ...} - -# Returns: -# Pandas dataframe having Gene and transcript as columns - -# Raises: -# Only dict are allowed -# Key should be strings -# Value should be strings - -# """ -# pass -# if not type(dict_repr_trans) is dict: -# raise TypeError("Only dict are allowed") -# if type(list(dict_repr_trans.keys())[0]) is not str: -# raise TypeError("Key should be strings") -# if type(list(dict_repr_trans.values())[0]) is not str: -# raise TypeError("Values should be strings") - -# df_repr_trans = pd.DataFrame.from_dict( -# dict_repr_trans, orient="index", columns=["reprTranscript"] -# ) -# df_repr_trans = df_repr_trans.reset_index(level=0) -# df_repr_trans.columns = ["Gene", "reprTrans"] -# df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace( -# r"\.[1-9]", "", regex=True -# ) -# return df_repr_trans - - -# def gene_and_transcript(gtf_file: str) -> pd.DataFrame: -# """ -# This function take a .gtf file and convert it into a -# dataframe containing gene_id and their transcripts_id. -# Args: -# gtf_file(str) : path to the .gtf file - -# Returns: -# df_gtf(pd.DataFrame): pandas df containing having has columns -# gene_id and their transcripts_id. -# Raises: -# None -# """ -# df_gtf = read_gtf(gtf_file) -# df_gtf = df_gtf.loc[df_gtf["feature"] == "transcript"] -# df_gtf = df_gtf[["gene_id", "transcript_id"]] -# df_gtf = df_gtf.rename(columns={"gene_id": "Gene", -# "transcript_id": "Transcript"}) -# return df_gtf - - -# def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: -# """Convert tsv or csv file into a pandas dataframe - -# Args: -# input_txt (str): csv or tsv file containing transcript exp level - -# Returns: -# df_gene (str): Pandas dataframe having transcript and exp level -# as columns - -# Raises: -# None -# """ -# pass -# df_input = pd.read_csv( -# input_txt, -# sep=r"[\t,]", -# lineterminator="\n", -# names=["Transcript", "Expression_level"], -# engine="python", -# ) -# return df_input - - -# def expr_level_by_gene( -# df_exprTrasncript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame -# ) -> pd.DataFrame: -# """find the gene of each transcipt given by the expression level csv/tsv -# file, and summ expression level of all transcipts from the same gene. - -# Args: -# df_expr_transcript: pandas df containing transcript and -# their exp level generated by "tsv_or_csv_to_df" function -# df_output_gtf_selection : pandas df containing genes and -# transcripts, generated by "transcripts_by_gene_inDf" function - -# Returns: -# Pandas dataframe having gene and sum of its transcript exp level - -# Raises: -# None -# """ -# pass -# df_merged = pd.merge( -# df_output_gtf_selection, df_exprTrasncript, -# how="inner", on="Transcript" -# ) -# df_sum = df_merged.groupby("Gene").sum( -# "Expression_level" -# ) -# return df_sum - - -# def match_by_gene( -# df_repr_transcript: pd.DataFrame, -# df_expression_level_by_gene: pd.DataFrame -# ) -> pd.DataFrame: -# """Find matching genes bewteen the 2 args - -# Args: -# df_repr_transcript : pandas Dataframe containing genes -# and their representative transcript, generated by -# "dict_repr_trans_to_df()" -# df_expression_level_by_gene : pandas Dataframe containing -# genes and their expression level generated by -# "transcript_by_gene_inDf()" - -# Returns: -# Pandas dataframe having representative trasncripts -# and their expression level - -# Raises: -# None -# """ -# pass -# df_merged = pd.merge( -# df_repr_transcript, df_expression_level_by_gene, -# how="outer", on="Gene" -# ) -# df_clean = df_merged.dropna(axis=0) -# df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]] -# return df_clean - - -# # functions to run this part of the programm -# def match_repr_transcript_expression_level( -# expr_trans: str, dict_repr_trans: dict, gtf_file: str, -# ): -# """Combine functions to replace transcripts from exp level csv/tsv file -# with representative transcripts - -# Args: -# expr_trans (str): csv or tsv file containing transcripts -# and their expression level -# dict_repr_trans (dict) : dict of genes and their -# representative transcipt -# intemediate_file (str) : txt file containing genes, transcript -# and their expression level from the transkript_extractor function -# output_path : path indicating were the tsv file should be written - -# Returns: -# tsv file of representative trasncripts and their expression level - -# Raises: -# None -# """ -# df_gene_transcript = gene_and_transcript(gtf_file) -# df_expr_trans = tsv_or_csv_to_df(expr_trans) -# df_repr_trans = dict_repr_trans_to_df(dict_repr_trans) -# df_expr_level_by_gene = expr_level_by_gene( -# df_expr_trans, df_gene_transcript -# ) # error here -# df_match = match_by_gene(df_repr_trans, df_expr_level_by_gene) -# df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'}, -# inplace=True) -# return df_match - - -# # run the program -# if __name__ == "__main__": -# match_repr_transcript_expression_level() diff --git a/transcript_sampler/poisson_sampling.py b/transcript_sampler/poisson_sampling.py index f86e2bb22cf0c290c61d4b28a4340f195d7f51a1..0df129c1fbc688507858970bc8ca2657e8552aca 100644 --- a/transcript_sampler/poisson_sampling.py +++ b/transcript_sampler/poisson_sampling.py @@ -1,6 +1,6 @@ """Sample transcripts by Poisson-sampling.""" -import pandas as pd +import pandas as pd # type: ignore import numpy as np @@ -30,47 +30,3 @@ class SampleTranscript: "id": df_repr["id"], "count": levels }) transcript_numbers.to_csv(output_csv, index=False, header=False) - - -# python_version = "3.7.13" -# module_list = [pd, np, argparse] -# modul_name_list = ["pd", "np", "argparse"] - -# def transcript_sampling(total_transcript_number, df_repr, output_csv): -# # df = pd.read_csv( -# # csv_file, sep="\t", lineterminator="\n", names=["id", "level"]) -# # the function match_reprTranscript_expressionLevel() now outputs a df -# df = df_repr -# levels = [] -# sums = df['level'].tolist() -# total = sum(sums) -# # I added this because writting a number in the terminal inputed a string -# total_transcript_number = int(total_transcript_number) -# normalized = total_transcript_number/total -# for expression_level in df['level']: -# poisson_sampled = np.random.poisson(expression_level*normalized) -# levels.append(poisson_sampled) - -# transcript_numbers = pd.DataFrame({'id': df['id'], 'count': levels}) -# pd.DataFrame.to_csv(transcript_numbers, output_csv) - - -# if __name__ == '__main__': -# # te.version_control(module_list,modul_name_list,python_version) -# parser = argparse.ArgumentParser( -# description="Transcript Poisson sampler, csv output", -# formatter_class=argparse.ArgumentDefaultsHelpFormatter -# ) - -# parser.add_argument("--expression_level", required=True, -# help="csv file with expression level") -# parser.add_argument("--output_csv", required=True, -# help="output csv file") -# parser.add_argument("--input_csv", required=True, -# help="input csv file") -# parser.add_argument("--transcript_number", required=True, -# help="total number of transcripts to sample") -# args = parser.parse_args() - -# transcript_sampling(args.transcript_number, args.input_csv, -# args.output_csv, args.transcript_number)