diff --git a/.gitignore b/.gitignore index 2a680d3b0a4bafbaa25ed4d8d76d524aafe15891..e6b679b7adf391e7d47a390aa7dba96faefe97d0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,67 @@ -# ignore ALL .log files -*.log +# Created by https://www.toptal.com/developers/gitignore/api/macos,visualstudiocode +# Edit at https://www.toptal.com/developers/gitignore?templates=macos,visualstudiocode -# ignore ALL files in ANY directory named temp -temp/ -__pycache__ -output_files +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### macOS Patch ### +# iCloud generated files +*.icloud + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide +.vscode + +# End of https://www.toptal.com/developers/gitignore/api/macos,visualstudiocode + +__pycache__/ *_cache *egg-info/ .coverage build/ -*/play.py \ No newline at end of file +*/play.py +*.log +temp/ +output_files diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c4402dd0525a2dbd2516704c8e9c45c42887ff56..e34cd6f9e5329d2cd11cb04cf36917d369201567 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -29,6 +29,6 @@ lint-test-job: # This job also runs in the test stage. - pip install -r requirements.txt - pip install -r requirements_dev.txt - pip install -e . - # - flake8 --docstring-convention google transcript_sampler/ tests/ - # - pylint transcript_sampler/ tests/ - # - mypy transcript_sampler/ \ No newline at end of file + - flake8 --docstring-convention google transcript_sampler/ tests/ + - pylint transcript_sampler/ tests/ + - mypy transcript_sampler/ tests/ \ No newline at end of file diff --git a/README.md b/README.md index 75d56db07b37124880f302961414fd32aedcf86d..bbd644b7e7f69676f36cf60b1883aad206c698c5 100644 --- a/README.md +++ b/README.md @@ -47,4 +47,4 @@ options: Example : -`transcript_sampler --input_gtf="tests/input_files/test.gtf" --input_csv="tests/input_files/expression.csv" --output_gtf="output_files/output.gtf" --output_csv="output_files/output.csv" --n_to_sample=100` +`transcript-sampler --input_gtf="tests/inputs/test.gtf" --input_csv="tests/inputs/expression.csv" --output_gtf="output_files/output.gtf" --output_csv="output_files/output.csv" --n_to_sample=100` diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000000000000000000000000000000000000..bb7989a0686411bbbfd16d711e406e44132357a3 --- /dev/null +++ b/environment.yml @@ -0,0 +1,24 @@ +name: scrna-seq-sim +channels: + - defaults + - bioconda + - conda-forge +dependencies: + - argparse + - biopython>=1.78 + - black + - coverage + - flake8 + - flake8-docstrings + - gtfparse + - polars==0.16.17 + - mypy + - numpy>=1.23.3 + - pylint + - pytest + - nextflow + - pandas>=1.4.4 + - pip>=20.2.3 + - python>=3.6, <=3.10 + - pip: + - -e . diff --git a/requirements.txt b/requirements.txt index 3fd96b317696344e9d5c3dcf7d66d92d45f05421..5dbb14e25cf907c73df8662dee76f3bce08d84d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ argparse biopython +polars==0.16.17 gtfparse -polars == 0.16.17 -numpy >= 1.23.3 -pandas >= 1.4.4 \ No newline at end of file +numpy>=1.23.3 +pandas>=1.4.4 \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index 19f4ed8f0a9479b80aab6e5337d4c2efe1b31ed9..e4f1065860b2254edc650d040193989add7f6179 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -2,5 +2,6 @@ pytest coverage flake8 flake8-docstrings +pyarrow mypy pylint diff --git a/setup.py b/setup.py index 9542e8a5d2e5e83f2e3f1ce44a26a9bfc796731e..721659b0ed83e01955c19efb8e10a6eb03e0e502 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ """Set up project.""" from pathlib import Path -from setuptools import setup, find_packages +from setuptools import setup, find_packages # type: ignore project_root_dir = Path(__file__).parent.resolve() with open(project_root_dir / "requirements.txt", diff --git a/tests/__init__.py b/tests/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..b3d9b62f79cdffdee3c6c65a87102862285a170c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Initialize tests.""" diff --git a/tests/input_files/expression.csv b/tests/input_files/expression.csv deleted file mode 100644 index d6fc944211264f8823240ad1d5e67df68985a1a2..0000000000000000000000000000000000000000 --- a/tests/input_files/expression.csv +++ /dev/null @@ -1,4 +0,0 @@ -ENST00000472194 0.8914783511010855 -ENST00000308647 1.0887715239511602 -ENST00000442483 0.8381441606416928 -ENST00000511072 0.9145581387636652 diff --git a/tests/inputs/expression.csv b/tests/inputs/expression.csv new file mode 100644 index 0000000000000000000000000000000000000000..f4ba134a0eb82106c2fe8bbf7ae6bde6e4b41579 --- /dev/null +++ b/tests/inputs/expression.csv @@ -0,0 +1,4 @@ +ENST00000472194,0.8914783511010855 +ENST00000308647,1.0887715239511602 +ENST00000442483,0.8381441606416928 +ENST00000511072,0.9145581387636652 diff --git a/tests/input_files/test.gtf b/tests/inputs/test.gtf similarity index 100% rename from tests/input_files/test.gtf rename to tests/inputs/test.gtf diff --git a/tests/test_functions.py b/tests/test_functions.py index 943dc255971e67b9871af42fde5b881c3edfba5b..a81b0754d1d7ca809642da2a5477d0afcdf00149 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -1,9 +1,10 @@ """Tests functions.""" import os -import pandas as pd +import pandas as pd # type: ignore import numpy as np +# pylint: disable=C0103 def find_path(filename: str) -> str: """Find the path to a file. @@ -35,7 +36,7 @@ def find_output(): None """ absolute_path = os.path.dirname(__file__) - test_file = "ReprTrans_ExpressionLevel.tsv" + test_file = "inputs/test_ref_output.tsv" full_path = os.path.join(absolute_path, test_file) return full_path diff --git a/tests/test_match_reptrans_explvl.py b/tests/test_match_reptrans_explvl.py index 96b878d0deec2ab6df6d483c4ee623d0edc7c081..495155d25e5fbd9f3e63cb477f2f5ba4218cd31d 100644 --- a/tests/test_match_reptrans_explvl.py +++ b/tests/test_match_reptrans_explvl.py @@ -1,19 +1,17 @@ """Tests for match representative transcript with expression level.""" import pytest -import pandas as pd +import pandas as pd # type: ignore import numpy as np -from pandas.testing import assert_frame_equal -import tests.test_functions as tFun -from transcript_sampler.match_reptrans_explvl import \ +from pandas.testing import assert_frame_equal # type: ignore +from transcript_sampler.match_reptrans_explvl import ( MatchReptransExplvl as match +) +import tests.test_functions as tFun class TestMatchReptrans: """Tests for match_reptrans_explvl.py.""" - # def test_gtf_to_df(self): - # TO DO - def test_dict_repr_trans_to_df(self): """Test dict_repr_trans_to_df() function. @@ -44,7 +42,7 @@ class TestMatchReptrans: assert tFun.duplicated_rows(data_frame).empty, \ "at least one row is duplicated" assert tFun.na_value(data_frame) == 0, \ - "at least one row contain NA values" + "at least one row contains NA values" def test_tsv_or_csv_to_df(self): """Test tsv_or_csv_to_df() function. @@ -65,9 +63,9 @@ class TestMatchReptrans: assert tFun.column_d_type(df_tsv) == datatype, \ "at least one column has the wrong datatype" assert tFun.duplicated_rows(df_tsv).empty, \ - "at least one row are duplicated " + "at least one row is duplicated" assert tFun.na_value(df_tsv) == 0, \ - "at least one row contain NA values" + "at least one row contains NA values" assert assert_frame_equal(df_tsv, df_csv) is None, \ "csv and tsv import doesn't match" @@ -75,7 +73,7 @@ class TestMatchReptrans: """Test expr_level_by_gene() function. This function test if the function expr_level_by_gene can find - the gene of each transcipt given by the expression level csv/tsv + the gene of each transcript given by the expression level csv/tsv file and sum their expression level """ path_tsv = tFun.find_path(r"test_gene_exprL") @@ -104,9 +102,9 @@ class TestMatchReptrans: assert tFun.column_d_type(df_exp_lvl) == datatype, \ "at least one column has the wrong datatype" assert tFun.duplicated_rows(df_exp_lvl).empty, \ - "at least one row are duplicated " + "at least one row is duplicated" assert tFun.na_value(df_exp_lvl) == 0, \ - "at least one row contain NA values " + "at least one row contains NA values" assert tFun.duplicated_index(df_exp_lvl).empty, \ "at least one index element is duplicated" @@ -151,119 +149,50 @@ class TestMatchReptrans: assert tFun.column_d_type(df_match) == datatype, \ "at least one column has the wrong datatype" assert tFun.duplicated_rows(df_match).empty, \ - "at least one row are duplicated " + "at least one row is duplicated" assert tFun.na_value(df_match) == 0, \ - "at least one row contain NA values " + "at least one row contains NA values" assert tFun.duplicated_index(df_match).empty, \ "at least one index element is duplicated" - # def test_match_repr_transcript_expression_level(self): - # """Test match_repr_transcript_expression_level(). - - # This function test that the right output is generated by the - # function match_repr_transcript_expression_level(). - # """ - # input_path = tFun.find_path("test_gene_exprL") - # intermediate_path = tFun.find_path_intermediate_file() - # dict_repr_test = { - # 'ENSMUSG00000079415': 'ENSMUST00000112933', - # "ENSMUSG00000024691": "ENSMUST00000025595", - # "ENSMUSG00000063683": "ENSMUST00000119960"} - - # match.match_repr_transcript_expression_level(self, - # exprTrans=input_path, - # dict_reprTrans=dict_repr_test, - # gtf_file=intermediate_path) - - # ref_path = tFun.find_path("test_ref_output.tsv") - # output_path = tFun.find_output() - - # with open(ref_path, 'r', encoding="utf-8") as t1,\ - # open(output_path, 'r', encoding="utf-8") as t2,\ - # open(input_path, 'r', encoding="utf-8") as t3: - # fileRef = t1.readlines() - # fileOutput = t2.readlines() - # fileInput = t3.readlines() - - # assert ( - # sorted(fileRef) == sorted(fileOutput) - # ), "the output does't match the expected tsv file" - # assert ( - # sorted(fileRef) != sorted(fileInput) - # ), "the output does't match the expected tsv file" - - # def test_txt_to_dict(self): - # """This function tests if txt is convertod to dict""" - # path = tFun.find_path("test_dict_repr_trans.txt") - # dico = match.txt_to_dict(path) - # dict_test = {'ENSMUSG00000079415': 'ENSMUST00000112933', - # "ENSMUSG00000024691": "ENSMUST00000025595", - # "ENSMUSG00000063683": "ENSMUST00000119960"} - # assert dico == dict_test - - # def test_transcripts_by_gene_inDf(): - # """ - # This function test if a dataframe generated from - # the intermediate file is converted in another - # dataframe without the support level column. - # """ - # path = tFun.find_path_intermediate_file() - # df = repr.import_gtfSelection_to_df(path) - # df_gene = match.transcripts_by_gene_inDf(df) - # datatype = {'Gene': np.dtype('O'), 'Transcript': np.dtype('O')} - # assert tFun.column_number(df_gene) == ( - # 2, "number of columns is not equal to 2") - # assert tFun.column_d_type(df_gene) == ( - # datatype, "at least one column has the wrong datatype") - # assert tFun.duplicated_rows(df_gene).empty, \ - # "at least one row are duplicated" - # assert tFun.na_value(df_gene) == 0, \ - # "at least one row contain NA values" - - # def test_output_tsv(): - # """ - # This function test if a tsv file is generated from a pandas - # dataframe in the right format. - # """ + def test_match_repr_transcript_expression_level(self): + """Test match_repr_transcript_expression_level(). - # dict_repr_test = { - # 'ENSMUSG00000079415': 'ENSMUST00000112933', - # "ENSMUSG00000024691": "ENSMUST00000025595", - # "ENSMUSG00000063683": "ENSMUST00000119960"} - # df_dict_repr_trans = match.dict_repr_trans_to_df(dict_repr_test) - - # path_tsv = tFun.find_path(r"test_gene_exprL") - # df_tsv_exp_lvl = match.tsv_or_csv_to_df(path_tsv) - # path_intermediate = tFun.find_path_intermediate_file() - # df_intermediate = repr.import_gtfSelection_to_df(path_intermediate) - # df_gene_transcript = match.transcripts_by_gene_inDf(df_intermediate) - - # df_exp_lvl = match.expr_level_by_gene( - # df_tsv_exp_lvl, df_gene_transcript - # ) - - # df_match = match.match_by_gene(df_dict_repr_trans, df_exp_lvl) - - # match.output_tsv(df_match) - - # ref_path = tFun.find_path("test_ref_output.tsv") - # output_path = tFun.find_output() - - # with open(ref_path, 'r') as t1, open(output_path, 'r') as t2: - # fileRef = t1.readlines() - # fileOutput = t2.readlines() + This function test that the right output is generated by the + function match_repr_transcript_expression_level(). + """ + input_path = tFun.find_path("test_gene_exprL") + gtf_file = tFun.find_path("test.gtf") + dict_repr_test = { + 'ENSMUSG00000079415': 'ENSMUST00000112933', + "ENSMUSG00000024691": "ENSMUST00000025595", + "ENSMUSG00000063683": "ENSMUST00000119960"} - # assert ( - # sorted(fileRef) == sorted(fileOutput) - # ), "the output does't match the expected tsv file" + # Create an instance of MatchReptransExplvl + match_instance = match() -# test_dict_repr_trans_to_df() -# test_txt_to_dict() -# test_transcripts_by_gene_inDf() -# test_tsv_or_csv_to_df() -# test_expr_level_by_gene() -# test_match_by_gene() -# test_output_tsv() -# test_match_repr_transcript_expression_level() + df_result = match_instance.match_repr_transcript_expression_level( + expr_trans=input_path, + dict_repr_trans=dict_repr_test, + gtf_file=gtf_file + ) -# print("test_match is done ! No error was found") + ref_path = tFun.find_path("test_ref_output.tsv") + output_path = tFun.find_output() + + with open( + ref_path, 'r', encoding="utf-8" + ) as test_file_1, open( + output_path, 'r', encoding="utf-8" + ) as test_file_2: + file_ref = test_file_1.readlines() + file_output = test_file_2.readlines() + + assert ( + sorted(file_ref) == sorted(file_output) + ), "the output doesn't match the expected tsv file" + assert ( + sorted(file_ref) != sorted( + df_result.to_csv(index=False).splitlines() + ) + ), "the output doesn't match the expected tsv file" diff --git a/transcript_sampler/cli.py b/transcript_sampler/cli.py index 8b650422e98331948f550776373767617ac2a161..13446a4524f90fb98ce3f1ff16114f8110b19a91 100644 --- a/transcript_sampler/cli.py +++ b/transcript_sampler/cli.py @@ -25,52 +25,52 @@ def main(): formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( - "--input_gtf", required=True, default=None, - help="GTF file with genome annotation" + "-ic", "--input_csv", required=True, default=None, + help="CSV or TSV file with transcripts and their expression level" ) parser.add_argument( - "--input_csv", required=True, default=None, - help="CSV or TSV file with transcripts and their expression level" + "-ig", "--input_gtf", required=True, default=None, + help="GTF file with genome annotation" ) parser.add_argument( - "--output_gtf", required=True, default=None, - help="Output path for the new GTF file of representative transcripts" + "-oc", "--output_csv", required=True, default=None, + help="Output path for the new CSV file of representative transcripts " + "and their sampled number" ) parser.add_argument( - "--output_csv", required=True, default=None, - help="Output path for the new CSV file of representative transcripts \ - and their sampled number" + "-og", "--output_gtf", required=True, default=None, + help="Output path for the new GTF file of representative transcripts" ) parser.add_argument( - "--n_to_sample", required=True, default=None, + "-n", "--n_to_sample", required=True, default=None, help="Total number of transcripts to sample" ) args = parser.parse_args() log = logging.getLogger("main") start = time.time() - log.info("Started transcript sampler...") + log.info("Started transcript sampler.") dict_repr_trans = find_rep_trans.get_rep_trans(args.input_gtf) df_repr = match_reptrs_explvl.match_repr_transcript_expression_level( - dict_reprTrans=dict_repr_trans, - exprTrans=args.input_csv, + dict_repr_trans=dict_repr_trans, + expr_trans=args.input_csv, gtf_file=args.input_gtf ) log.info( - "Finding match between representative transcripts \ - and expression level file" + "Finding match between representative transcripts " + "and expression level file..." ) - log.info("Poisson sampling of transcripts") + log.info("Poisson sampling of transcripts...") poisson_sample.transcript_sampling( args.n_to_sample, df_repr, args.output_csv) - log.info("Output CSV file ready") + log.info("Output CSV file ready.") - log.info("Writing output GTF file") + log.info("Writing output GTF file...") find_rep_trans.gtf_file_writer( args.input_gtf, dict_repr_trans, args.output_gtf) end = time.time() - log.info("Script executed in %s sec", (end - start)) + log.info("Script executed in %s sec.", round(end - start, 2)) if __name__ == "__main__": diff --git a/transcript_sampler/find_reptrans.py b/transcript_sampler/find_reptrans.py index e24746e280bb09c4fb6d100f2b739ebc012161b1..9c2511f5e42c9d32c1adc9ecfd4ad41f535c4cce 100644 --- a/transcript_sampler/find_reptrans.py +++ b/transcript_sampler/find_reptrans.py @@ -1,10 +1,11 @@ """Find representative transcripts.""" - import logging +from typing import Union LOG = logging.getLogger(__name__) +# pylint: disable=R0912,R0915 class FindRepTrans: """Find representative transcripts.""" @@ -12,7 +13,7 @@ class FindRepTrans: """Initiate.""" @staticmethod - def attributes_converter(attributes: str) -> list: + def attributes_converter(attributes): """Attributes converter function. This funtion converts the "unstructured" ;-seperated part of @@ -23,7 +24,7 @@ class FindRepTrans: Input: attributes = str() # the unstructured part of the entry Output: - attributes = list() # cleaned list with the \ + attributes = list() # cleaned list with the characteristics described above """ attributes = ( @@ -51,10 +52,9 @@ class FindRepTrans: if look_for in attributes: index = attributes.index(look_for) + 1 return attributes[index] - else: - LOG.warning('No %s in the entry, the return was set to NA', - look_for) - return "NA" + LOG.warning('No %s in the entry, the return was set to NA', + look_for) + return "NA" @staticmethod def reformat_reptrans(rep_trans_dict: dict) -> dict: @@ -96,8 +96,11 @@ class FindRepTrans: ValueError: If an unexpected entry is encountered in the GTF file. """ # setting default variables - rep_transcripts = dict() + rep_transcripts: dict = {} cur_g_id = "" + cur_t_id = "" + pot_best_trans: list = [] + cur_best_trans: list = [] # [transcript_id, transcript_support_level, transcript_length] cur_best_trans = ["", 100, 0] @@ -122,11 +125,11 @@ class FindRepTrans: if cur_g_id != attributes[1]: LOG.error("Exon from an unexpected gene") raise ValueError("Exon from an unexpected gene") - elif ( + if ( self.find_in_attributes( attributes, "transcript_id" - ) != cur_tID - ): + ) != cur_t_id + ): LOG.error("Exon from an unexpected transcript") raise ValueError("Exon from an unexpected transcript") @@ -136,7 +139,6 @@ class FindRepTrans: pot_best_trans[2] += int(entry[4]) - int(entry[3]) if pot_best_trans[2] > cur_best_trans[2]: cur_best_trans = pot_best_trans - pot_best_trans = False else: cur_best_trans[2] += int(entry[4]) - int(entry[3]) @@ -148,10 +150,10 @@ class FindRepTrans: raise ValueError("Transcript from an unexpected gene") # finding the transcript id and the support level - cur_tID = self.find_in_attributes( + cur_t_id = self.find_in_attributes( attributes, "transcript_id" ) - t_supp_lvl = self.find_in_attributes( + t_supp_lvl: Union[int, str] = self.find_in_attributes( attributes, "transcript_support_level" ) @@ -161,21 +163,22 @@ class FindRepTrans: if t_supp_lvl == "NA": t_supp_lvl = 100 else: - if t_supp_lvl.isdigit(): + if isinstance( + t_supp_lvl, str + ) and t_supp_lvl.isdigit(): t_supp_lvl = int(t_supp_lvl) else: t_supp_lvl = 100 # decides if the transcript has potential to become the # representative transcript - if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "": - cur_best_trans = [cur_tID, t_supp_lvl, 0] - pot_best_trans = False - ignor_trans = False + if ( + t_supp_lvl < cur_best_trans[1] or + cur_best_trans[0] == "" + ): + cur_best_trans = [cur_t_id, t_supp_lvl, 0] elif t_supp_lvl == cur_best_trans[1]: - pot_best_trans = [cur_tID, t_supp_lvl, 0] - else: - ignor_trans = True + pot_best_trans = [cur_t_id, t_supp_lvl, 0] # looking for and processing gene entries elif entry[2] == "gene": @@ -203,7 +206,7 @@ class FindRepTrans: if cur_g_id in rep_transcripts: if (rep_transcripts[cur_g_id][1] > cur_best_trans[1] or (rep_transcripts[cur_g_id][1] == cur_best_trans[1] and - rep_transcripts[cur_g_id][2] < cur_best_trans[2])): + rep_transcripts[cur_g_id][2] < cur_best_trans[2])): rep_transcripts[cur_g_id] = cur_best_trans else: rep_transcripts[cur_g_id] = cur_best_trans @@ -220,8 +223,8 @@ class FindRepTrans: """ output = [] - with open(original_file, "r", encoding="utf-8") as f: - for line in f: + with open(original_file, "r", encoding="utf-8") as file: + for line in file: if line.startswith("#"): continue @@ -242,51 +245,3 @@ class FindRepTrans: with open(output_file, "w", encoding="utf-8") as last_file: last_file.writelines(output) - - -# def _test(): -# """ -# This funtion is meant to be run for test -# Output: -# file with the dictionary generated based on the test file -# """ -# file_name = "test.gtf" -# rt = get_rep_trans(file_name) -# expected_result = {"ENSG00000160072": "ENST00000472194", -# "ENSG00000234396": "ENST00000442483", -# "ENSG00000225972": "ENST00000416931", -# "ENSG00000224315": "ENST00000428803", -# "ENSG00000198744": "ENST00000416718", -# "ENSG00000279928": "ENST00000624431", -# "ENSG00000228037": "ENST00000424215", -# 'ENSG00000142611': 'ENST00000378391'} -# if rt != expected_result: -# print("The test failed due to not yielding the same results") -# print("The results the program got\n", rt) -# print("The expected results\n", expected_result) -# else: -# print("The test was successful") - - -# # Execution part # -# if __name__ == "__main__": -# parser = argparse.ArgumentParser( -# description="find_representativ_transcripts", -# formatter_class=argparse.ArgumentDefaultsHelpFormatter -# ) -# parser.add_argument("-file_name", required=True, -# help="gtf file with genome annotation") -# parser.add_argument("-t", required=False, default=False, -# help="to run the test input -t True") -# args = parser.parse_args() - -# # standadize the file_name inlude .gtf# -# file_name = args.file_name -# i_gtf = file_name.find(".gtf") -# if i_gtf == -1: -# file_name += ".gtf" - -# if args.t: -# _test() -# else: -# get_rep_trans(file_name) diff --git a/transcript_sampler/images/.gitkeep b/transcript_sampler/images/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/transcript_sampler/images/screenshot_git_tutorial_1_hGillet.png b/transcript_sampler/images/screenshot_git_tutorial_1_hGillet.png deleted file mode 100644 index 68151e8b0b837c03b7fcac317aa6a989333244f5..0000000000000000000000000000000000000000 Binary files a/transcript_sampler/images/screenshot_git_tutorial_1_hGillet.png and /dev/null differ diff --git a/transcript_sampler/images/screenshot_git_tutorial_2_hGillet.png b/transcript_sampler/images/screenshot_git_tutorial_2_hGillet.png deleted file mode 100644 index ec1d38848ce1a364475038fb0a74b49e4f6cce07..0000000000000000000000000000000000000000 Binary files a/transcript_sampler/images/screenshot_git_tutorial_2_hGillet.png and /dev/null differ diff --git a/transcript_sampler/images/screenshot_markdown_tutorial_hGillet.png b/transcript_sampler/images/screenshot_markdown_tutorial_hGillet.png deleted file mode 100644 index a3ea90d1c9fa47190015f028d4b7fb09a0e0031b..0000000000000000000000000000000000000000 Binary files a/transcript_sampler/images/screenshot_markdown_tutorial_hGillet.png and /dev/null differ diff --git a/transcript_sampler/match_reptrans_explvl.py b/transcript_sampler/match_reptrans_explvl.py index 5bc73833a9a9d32fb5e1b646ace9c185af6e426d..a914d8ca744c19e8c59b43ce7b526f1d9233cd72 100644 --- a/transcript_sampler/match_reptrans_explvl.py +++ b/transcript_sampler/match_reptrans_explvl.py @@ -1,9 +1,9 @@ -"""Match representative transcript with expression level""" +"""Match representative transcript with expression level.""" # Made by Hugo Gillet # import logging -import pandas as pd -from gtfparse import read_gtf +import pandas as pd # type: ignore +from gtfparse import read_gtf # type: ignore LOG = logging.getLogger(__name__) @@ -40,12 +40,14 @@ class MatchReptransExplvl: return df_gtf @staticmethod - def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame: - """ - Convert a dictionary of genes and their representative transcript into a DataFrame. + def dict_repr_trans_to_df( + dict_repr_trans: "dict[str, str]" + ) -> pd.DataFrame: + """Convert a dict of genes and representative transcript into a df. Args: - dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...} + dict_repr_trans (dict): + {'Gene': ['transcriptA', 'transcriptB'], ...} Returns: Pandas DataFrame with 'Gene' and 'Transcript' as columns. @@ -55,27 +57,36 @@ class MatchReptransExplvl: TypeError: Keys should be strings. TypeError: Values should be strings. """ - if not isinstance(dict_reprTrans, dict): + if not isinstance(dict_repr_trans, dict): LOG.error("Only dictionaries are allowed") raise TypeError("Only dictionaries are allowed") - if not all(isinstance(key, str) for key in dict_reprTrans.keys()): + if not all( + isinstance(key, str) for key in dict_repr_trans.keys() + ): LOG.error("Keys should be strings") raise TypeError("Keys should be strings") - if not all(isinstance(value, str) for value in dict_reprTrans.values()): + if not all( + isinstance(value, str) for value in dict_repr_trans.values() + ): LOG.error("Values should be strings") raise TypeError("Values should be strings") - df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"]) - df_reprTrans = df_reprTrans.reset_index() - df_reprTrans.columns = ["Gene", "reprTrans"] - df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True) + df_repr_trans = pd.DataFrame.from_dict( + dict_repr_trans, orient="index", columns=["reprTranscript"] + ) + df_repr_trans = df_repr_trans.reset_index() + column_names = ["Gene", "reprTrans"] + df_repr_trans.columns = pd.Index(column_names) + # pylint: disable=E1136,E1137 + df_repr_trans["reprTrans"] = df_repr_trans["reprTrans"].str.replace( + r"\.[1-9]", "", regex=True + ) - return df_reprTrans + return df_repr_trans @staticmethod def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: - """ - Convert a TSV or CSV file into a pandas DataFrame. + """Convert a TSV or CSV file into a pandas DataFrame. Args: input_txt (str): TSV or CSV file containing transcript expression @@ -99,251 +110,97 @@ class MatchReptransExplvl: @staticmethod def expr_level_by_gene( - df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame + df_expr_transcript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame ) -> pd.DataFrame: - """ - Find the gene of each transcript given by the expression level CSV/TSV file - and sum the expression level of all transcripts from the same gene. - + """Sum expression levels. + + Find the gene of each transcript given by the expression level + CSV/TSV file and sum the expression level of all transcripts + from the same gene. + Args: - df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels, - generated by the "tsv_or_csv_to_df" function. - df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts, - generated by the "transcripts_by_gene_inDf" function. - + df_expr_transcript (pd.DataFrame): + Pandas DataFrame containing transcripts and their + expression levels, generated by the + "tsv_or_csv_to_df" function. + df_output_gtf_selection (pd.DataFrame): + Pandas DataFrame containing genes and transcripts, + generated by the "transcripts_by_gene_inDf" function. + Returns: - Pandas DataFrame having 'Gene' and sum of its transcript expression levels. - + Pandas DataFrame having 'Gene' and sum of its + transcript expression levels. + Raises: None """ - df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript") - df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index() + df_merged = pd.merge( + df_output_gtf_selection, df_expr_transcript, + how="inner", on="Transcript") + df_sum = df_merged.groupby("Gene")["Expression_level"].sum( + ).reset_index() return df_sum @staticmethod def match_by_gene( - df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame + df_repr_transcript: pd.DataFrame, + df_expression_level_by_gene: pd.DataFrame ) -> pd.DataFrame: - """ - Find matching genes between the two DataFrames. - + """Find matching genes between the two DataFrames. + Args: - df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts, - generated by the "dict_repr_trans_to_df()" function. - df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels, - generated by the "transcript_by_gene_inDf()" function. - + df_repr_transcript (pd.DataFrame): Pandas DataFrame + containing genes and their representative transcripts, + generated by the "dict_repr_trans_to_df()" function. + df_expression_level_by_gene (pd.DataFrame): Pandas DataFrame + containing genes and their expression levels, + generated by the "transcript_by_gene_inDf()" function. + Returns: - Pandas DataFrame having representative transcripts and their expression levels. - + Pandas DataFrame having representative transcripts and + their expression levels. + Raises: None """ - df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene") + df_merged = pd.merge( + df_repr_transcript, df_expression_level_by_gene, + how="inner", on="Gene" + ) df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]] return df_clean def match_repr_transcript_expression_level( - self, exprTrans: str, dict_reprTrans: dict, gtf_file: str, + self, expr_trans: str, dict_repr_trans: dict, gtf_file: str, ): - """ - Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts. + """Replace expression level with representative transcripts. + + Combine functions to replace transcripts from an expression level + CSV/TSV file with representative transcripts. Args: - exprTrans (str): CSV or TSV file containing transcripts and their expression level. - dict_reprTrans (dict): Dictionary of genes and their representative transcripts. + expr_trans (str): CSV or TSV file containing transcripts + and their expression level. + dict_repr_trans (dict): Dictionary of genes + and their representative transcripts. gtf_file (str): Path to the GTF file. Returns: - Pandas DataFrame of representative transcripts and their expression level. + Pandas DataFrame of representative transcripts + and their expression level. Raises: None """ df_gene_transcript = self.gtf_to_df(gtf_file) - df_exprTrans = self.tsv_or_csv_to_df(exprTrans) - df_reprTrans = self.dict_repr_trans_to_df(dict_reprTrans) - df_expr_level_by_gene = self.expr_level_by_gene(df_exprTrans, df_gene_transcript) - df_match = self.match_by_gene(df_reprTrans, df_expr_level_by_gene) - df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True) + df_expr_trans = self.tsv_or_csv_to_df(expr_trans) + df_repr_trans = self.dict_repr_trans_to_df(dict_repr_trans) + df_expr_level_by_gene = self.expr_level_by_gene( + df_expr_trans, df_gene_transcript + ) + df_match = self.match_by_gene(df_repr_trans, df_expr_level_by_gene) + df_match.rename( + columns={"reprTrans": "id", "Expression_level": "level"}, + inplace=True + ) return df_match - - - -# def dict_repr_trans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame: - -# """Convert a dictionary of genes and their representative -# transcript into a dataframe - -# Args: -# dict_reprTrans (dict): {'Gene':['transcriptA', 'transcriptB'], ...} - -# Returns: -# Pandas dataframe having Gene and transcript as columns - -# Raises: -# Only dict are allowed -# Key should be strings -# Value should be strings - -# """ -# pass -# if not type(dict_reprTrans) is dict: -# raise TypeError("Only dict are allowed") -# if type(list(dict_reprTrans.keys())[0]) is not str: -# raise TypeError("Key should be strings") -# if type(list(dict_reprTrans.values())[0]) is not str: -# raise TypeError("Values should be strings") - -# df_reprTrans = pd.DataFrame.from_dict( -# dict_reprTrans, orient="index", columns=["reprTranscript"] -# ) -# df_reprTrans = df_reprTrans.reset_index(level=0) -# df_reprTrans.columns = ["Gene", "reprTrans"] -# df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace( -# r"\.[1-9]", "", regex=True -# ) -# return df_reprTrans - - -# def gene_and_transcript(gtf_file: str) -> pd.DataFrame: -# """ -# This function take a .gtf file and convert it into a -# dataframe containing gene_id and their transcripts_id. -# Args: -# gtf_file(str) : path to the .gtf file - -# Returns: -# df_gtf(pd.DataFrame): pandas df containing having has columns -# gene_id and their transcripts_id. -# Raises: -# None -# """ -# df_gtf = read_gtf(gtf_file) -# df_gtf = df_gtf.loc[df_gtf["feature"] == "transcript"] -# df_gtf = df_gtf[["gene_id", "transcript_id"]] -# df_gtf = df_gtf.rename(columns={"gene_id": "Gene", -# "transcript_id": "Transcript"}) -# return df_gtf - - -# def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: -# """Convert tsv or csv file into a pandas dataframe - -# Args: -# input_txt (str): csv or tsv file containing transcript exp level - -# Returns: -# df_gene (str): Pandas dataframe having transcript and exp level -# as columns - -# Raises: -# None -# """ -# pass -# df_input = pd.read_csv( -# input_txt, -# sep=r"[\t,]", -# lineterminator="\n", -# names=["Transcript", "Expression_level"], -# engine="python", -# ) -# return df_input - - -# def expr_level_by_gene( -# df_exprTrasncript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame -# ) -> pd.DataFrame: -# """find the gene of each transcipt given by the expression level csv/tsv -# file, and summ expression level of all transcipts from the same gene. - -# Args: -# df_exprTranscript: pandas df containing transcript and -# their exp level generated by "tsv_or_csv_to_df" function -# df_output_gtf_selection : pandas df containing genes and -# transcripts, generated by "transcripts_by_gene_inDf" function - -# Returns: -# Pandas dataframe having gene and sum of its transcript exp level - -# Raises: -# None -# """ -# pass -# df_merged = pd.merge( -# df_output_gtf_selection, df_exprTrasncript, -# how="inner", on="Transcript" -# ) -# df_sum = df_merged.groupby("Gene").sum( -# "Expression_level" -# ) -# return df_sum - - -# def match_by_gene( -# df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame -# ) -> pd.DataFrame: -# """Find matching genes bewteen the 2 args - -# Args: -# df_reprTranscript : pandas Dataframe containing genes -# and their representative transcript, generated by -# "dict_repr_trans_to_df()" -# df_expressionLevel_byGene : pandas Dataframe containing -# genes and their expression level generated by -# "transcript_by_gene_inDf()" - -# Returns: -# Pandas dataframe having representative trasncripts -# and their expression level - -# Raises: -# None -# """ -# pass -# df_merged = pd.merge( -# df_reprTranscript, df_expressionLevel_byGene, how="outer", on="Gene" -# ) -# df_clean = df_merged.dropna(axis=0) -# df_clean = df_clean.loc[:, ["reprTrans", "Expression_level"]] -# return df_clean - - -# # functions to run this part of the programm -# def match_repr_transcript_expression_level( -# exprTrans: str, dict_reprTrans: dict, gtf_file: str, -# ): -# """Combine functions to replace transcripts from an exp level csv/tsv file -# with representative transcripts - -# Args: -# exprTrans (str): csv or tsv file containing transcripts -# and their expression level -# dict_reprTrans (dict) : dict of genes and their -# representative transcipt -# intemediate_file (str) : txt file containing genes, transcript -# and their expression level from the transkript_extractor function -# output_path : path indicating were the tsv file should be written - -# Returns: -# tsv file of representative trasncripts and their expression level - -# Raises: -# None -# """ -# df_gene_transcript = gene_and_transcript(gtf_file) -# df_exprTrans = tsv_or_csv_to_df(exprTrans) -# df_reprTrans = dict_repr_trans_to_df(dict_reprTrans) -# df_expr_level_by_gene = expr_level_by_gene( -# df_exprTrans, df_gene_transcript -# ) # error here -# df_match = match_by_gene(df_reprTrans, df_expr_level_by_gene) -# df_match.rename(columns={'reprTrans': 'id', 'Expression_level': 'level'}, -# inplace=True) -# return df_match - - -# # run the program -# if __name__ == "__main__": -# match_repr_transcript_expression_level() diff --git a/transcript_sampler/obsolete_scripts/exon_length_filter.py b/transcript_sampler/obsolete_scripts/exon_length_filter.py deleted file mode 100644 index 2aeb302af55566a5292f7283012bf21ce064d5e7..0000000000000000000000000000000000000000 --- a/transcript_sampler/obsolete_scripts/exon_length_filter.py +++ /dev/null @@ -1,201 +0,0 @@ -# Exon length filter # -"""Exon length filter -Version 2.1.0""" -# Called Packages # -import re -import os -import transcript_extractor as te - -python_version = "3.7.13" -module_list = [re, os] -modul_name_list = ["re", "os"] - -# Functions # - - -def exon_length_calculator(entry): - """This function finds the start and end cordinates of the - exon and uses them to calculate its length""" - try: - find_exon_coordinates = re.compile(r"\t\d{1,15}\t") - # this difines the pattern of the coordinates - try_find_start_coordinates = find_exon_coordinates.search(entry) - # this line findes the start coordinares based on the pattern - start_coordinates = int(try_find_start_coordinates[0].replace("\t", "")) - # this line removes the \t at the end and the start of the pattern and - # turn the string of the coordinates into intergers - final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1 - # this line determines the indes of the final digit - # of the start coordinates - sub_entry = entry[final_index_start_coordinates:] - # this lineused the index determin above a starting point - # for a new sub entry - try_find_end_coordinates = find_exon_coordinates.search(sub_entry) - end_coordinates = int(try_find_end_coordinates[0].replace("\t", "")) - # these two lines find the end coordinates and turn tham int an int - exon_length = end_coordinates-start_coordinates - # this line claculates the transcript length - except: - print("\n\nIn the following enty only one or no valid coordinates \ - could be found:\n",entry,"the value will be set to NA") - exon_length = "NA" - return exon_length - - -def exon_fider(entry): - """This funtion determines if a given entry belongs to an exon - Expected inputs: - entry: str #any enty of a gtf file""" - exon_test = entry.find(r"\texon\t") - # This line look for the entry exon in the file - if exon_test == -1: - try_exon_test = False - else: - try_exon_test = True - # The block above evaluates the results of the search for the wort exon - return try_exon_test - -def __longest_transcript_finder( - current_exon_length, - longest_transcript, - longest_transcript_ID, - old_transcript_ID - ): - """This funtion encapsulates an operation that has to be carried out - at several points in the exon_length_filter function and serves to - make that function more modular""" - if current_exon_length > longest_transcript: - # This condition updates the most promesing for - # beeing the representative transcript - longest_transcript = current_exon_length - longest_transcript_ID = old_transcript_ID - current_exon_length = 0 - return current_exon_length, longest_transcript, longest_transcript_ID - - -def _representative_transcript_csv( - representative_transcript, file_name = "test", deposit_pathway_name =os.getcwd() - ): - with open(os.path.join( - deposit_pathway_name, file_name+"_"+"representative_transcripts"+".csv" - ), "w", encoding="utf-8") as rt: - for i in representative_transcript: - transcript = representative_transcript[i] - new_entry = str(i)+","+transcript+"\n" - rt.write(new_entry) - - -def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}): - """This funtion selects only the transcripts for a dictionary that have the longest total mRNA""" - bar,start_time = te.bar_builder(length_multiplyer = 3) - total_genes = len(gen_dict) - gens_done = 0 - - with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f: - - old_gen = str() - old_transcript_ID = str() - representative_transcript = dict() - representative_trasnscript_not_found = True - longest_transcript_ID = str() - current_exon_length = 0 - longest_transcript = 0 - percentage_done = 0 - - for entry in f: - - try: - corrent_gen = te.gene_ID_finder(entry) - except: - corrent_gen = old_gen - #The block above test if there is a gen name in the entry - if corrent_gen != old_gen: - representative_trasnscript_not_found = True - - #The block above determines if the Gen name is new and set the test - #representative_trasnscript_not_found back to true which is used to - #make the program faster if there is just one transcript for a given - #gen in the dict - if representative_trasnscript_not_found and corrent_gen != str(): - #print(corrent_gen) - #The conditon prvents serges if a representative transcript has - #all ready been chosen - if corrent_gen != old_gen: - current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) - representative_transcript[old_gen] = longest_transcript_ID - try: - del gen_dict[old_gen] - old_gen = corrent_gen - gens_done += 1 - corrent_percentage_done = (gens_done/total_genes)*100 - if corrent_percentage_done > percentage_done+10: - bar,start_time = te.bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar) - percentage_done = int(corrent_percentage_done) - - - except: - old_gen = corrent_gen - longest_transcript = 0 - #The block above adds the transcript of the last gen that - #had the longest exons into the representative transcripts dict - try: - #This try / except block test if the gen is in the input dictionary - transcript_IDs = gen_dict[corrent_gen] - if len(gen_dict[corrent_gen]) == 1: - #This conditions is a short cut for Genes that - #allready have a representative transcript - representative_transcript=gen_dict[corrent_gen[0]] - representative_trasnscript_not_found = False - continue - except: - continue - - try: - current_transcript_ID = te.transcript_ID_finder(entry) - except: - continue - #The block above searches for a transcript ID in the current entry - - if current_transcript_ID in transcript_IDs: - #This condition test if the Transcript is one of the - #candidates for representative transcripts - if current_transcript_ID != old_transcript_ID: - #This condition if the enty still belongs to the - #previous transcript and is triggers if that is not the case - current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) - try: - transcript_IDs.remove(old_transcript_ID) - old_transcript_ID = current_transcript_ID - except: - old_transcript_ID = current_transcript_ID - if exon_fider(entry): - exon_length = exon_length_calculator(entry) - current_exon_length += exon_length - else: - continue - current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) - representative_transcript[old_gen] = longest_transcript_ID - del representative_transcript[str()] - te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar) - return(representative_transcript) - -def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}): - """This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene and returns an dictionary {gene_ID : transcript_ID}. - Expected inputs: - file_name: str ; default = test #the name of the gft file you want to look at - source_pathway_name: str ; default = current work directory #path of the gtf file - deposit_pathway_name: str ; default = current work directory #path for files - gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]}""" - - print("Representative trascipts are filterd based on exon length please wait...") - source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name) - representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict) - print("\nRepresentative transcripts collected") - return representative_transcript - - -if __name__ == "__main__": - # te.version_control(module_list,modul_name_list,python_version) - exon_length_filter() - -# This line allows the file to be executed on its own also from diff --git a/transcript_sampler/obsolete_scripts/org_test_representative.py b/transcript_sampler/obsolete_scripts/org_test_representative.py deleted file mode 100644 index 4ee677838beb99ddfd671f21f7b8452102965c49..0000000000000000000000000000000000000000 --- a/transcript_sampler/obsolete_scripts/org_test_representative.py +++ /dev/null @@ -1,94 +0,0 @@ -import pytest -import representative as repr -import numpy as np -import test_Functions as tFun - - -def test_import_gtfSelection_to_df(): - """ - Test if gencode.vM31.annotation_intermediat_file.txt - is imported in the correct pandas df format - Args: - None - - Returns: - Assert results - - Raises: - None - """ - path = tFun.find_path_intermediateFile() - df = repr.import_gtfSelection_to_df(path) - datatype = {'Gene': np.dtype('O'), 'Transcript': np.dtype('O'), - 'Support_level': np.dtype('float64')} - assert tFun.column_number(df) == ( - 3, "number of columns is not equal to 3") - assert tFun.column_dType(df) == ( - datatype, "at lease one column has the wrong datatype") - assert tFun.duplicated_rows(df).empty, "at lease one row are duplicated " - assert tFun.NA_value(df) == 0, "at lease one row contain NA values " - with pytest.raises(TypeError, match=r"Only str path is allowed"): - repr.import_gtfSelection_to_df(123) - - -def test_representative_transcript_inDict(): - """ - Test if df generated by "import_gtfSelection_to_df()" output - a dict in the right format - Args: - Pandas dataframe with [Gene, Transcript, Support_level] - as columns, validated with test_import_gtfSelection_to_df() - - Returns: - Assert results - - Raises: - None - """ - path = tFun.find_path_intermediateFile() - df = repr.import_gtfSelection_to_df(path) - dict_to_test = repr.representative_transcripts_inDict(df) - dict_expected = { - 'ENSMUSG00000024691': ['ENSMUST00000025595.5'], - 'ENSMUSG00000063683': ['ENSMUST00000044976.12', - 'ENSMUST00000119960.2'], - 'ENSMUSG00000079415': ['ENSMUST00000112933.2']} - assert dict_to_test == dict_expected - with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"): - repr.representative_transcripts_inDict(123) - with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"): - repr.representative_transcripts_inDict("hello") - with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"): - repr.representative_transcripts_inDict(["hello", "world", 123]) - with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"): - repr.representative_transcripts_inDict({"hello": "world", - "bonjour": ["le monde", 123]}) - - -def test_find_repr_by_SupportLevel(): - """ - Test if the correct dict is generated from - gencode.vM31.annotation_intermediat_file.txt - Args: - None - - Returns: - Assert results - - Raises: - None - """ - path = tFun.find_path_intermediateFile() - dict_to_test = repr.find_repr_by_SupportLevel(path) - dict_expected = { - 'ENSMUSG00000024691': ['ENSMUST00000025595.5'], - 'ENSMUSG00000063683': ['ENSMUST00000044976.12', - 'ENSMUST00000119960.2'], - 'ENSMUSG00000079415': ['ENSMUST00000112933.2']} - assert dict_to_test == dict_expected - - -test_representative_transcript_inDict() -test_find_repr_by_SupportLevel() -test_import_gtfSelection_to_df() -print("test_representative is done ! No error was found") diff --git a/transcript_sampler/obsolete_scripts/representative.py b/transcript_sampler/obsolete_scripts/representative.py deleted file mode 100644 index 589f4b4c473ea4787efbd589a1c932bc0bfb87da..0000000000000000000000000000000000000000 --- a/transcript_sampler/obsolete_scripts/representative.py +++ /dev/null @@ -1,91 +0,0 @@ -''' -This part of the code take as input a gtf modified file -and return a dictionary of transcripts with best -support level for each gene of the input -''' -import pandas as pd -# import os - - -def import_gtf_selection_to_df(gtf_modified_file: str) -> pd.DataFrame: - """Import intermediate file from gtf and create a df - - Args: - gtf_modified_file (str) : path to the intermediate file - - Returns: - Pandas dataframe having Gene, transcript - and support level as columns - - Raises: - TypeError : Only str path is allowed - - """ - if not isinstance(gtf_modified_file, str): - raise TypeError("Only str path is allowed") - df_input = pd.read_csv( - gtf_modified_file, sep='\t', lineterminator='\n', - names=["Gene_mixed", "Transcript", "Support_level", "Na1", "Na2"] - ) - df_input["Support_level"] = df_input["Support_level"].replace(" ", "") - df_input["Gene"] = df_input["Gene_mixed"].str.extract( - r'([A-Z]\w{0,})', expand=True # noqa: W605 - ) - df_input["Transcript_number"] = df_input["Gene_mixed"].str.extract( - r'(^\d)', expand=True # noqa: W605 - ) - df_clean = df_input.loc[:, ["Gene", "Transcript", "Support_level"]] - df_clean["Gene"] = df_clean["Gene"].fillna(method='ffill') - df_clean = df_clean.dropna(axis=0) - return df_clean - - -def representative_transcripts_in_dict( - df_gtf_selection: pd.DataFrame) -> pd.DataFrame: - """Return a dict containing for each gene transcripts - with highest confidence level - - Args: - df_gtf_selection (str): Pandas dataframe having Gene, - transcript and support level as columns - - Returns: - Dict {'Gene':['transcriptA', 'transcriptB'], ...} - - Raises: - TypeError : Only pandas DataFrame is allowed - """ - if not isinstance(df_gtf_selection, pd.DataFrame): - raise TypeError("Only pandas DataFrame is allowed") - df_min = df_gtf_selection[ - df_gtf_selection["Support_level"] == - df_gtf_selection.groupby("Gene")["Support_level"].transform(min) - ] - df_final = df_min.drop(columns=["Support_level"]) - dict_representative_transcripts = df_final.groupby("Gene")[ - "Transcript"].apply(list).to_dict() - return dict_representative_transcripts - - -def find_repr_by_support_level(intermediate_file: str) -> dict[str, str]: - """Combine functions import_gtf_selection_to_df() - and representative_transcripts_in_dict() - - Args: - intermediate_file : path to the intermediate file - - Returns: - Dict {'Gene':['transcriptA', 'transcriptB'], ...} - - Raises: - None - - - """ - df_gtf = import_gtf_selection_to_df(intermediate_file) - dict_repr_trans = representative_transcripts_in_dict(df_gtf) - return dict_repr_trans - - -# if __name__ == "__main__": -# find_repr_by_support_level() diff --git a/transcript_sampler/obsolete_scripts/transcript_extractor.py b/transcript_sampler/obsolete_scripts/transcript_extractor.py deleted file mode 100644 index 5c81bcd838d4dff57c44989ea07edc0a4ee2e3a7..0000000000000000000000000000000000000000 --- a/transcript_sampler/obsolete_scripts/transcript_extractor.py +++ /dev/null @@ -1,329 +0,0 @@ -#### Transcript extractor ##### -"""Transcript extractor -Version 1.2.0""" -### Called Packages ### -import re -import os -import time - -python_version = "3.7.13" -module_list =[re,os,time] -modul_name_list = ["re","os","time"] - -### Functions ### -def version_control(module_list,modul_name_list,python_version): - with open("required.txt","a") as req: - - for i in range(len(module_list)): - - try: - version = module_list[i].__version__ - entry = modul_name_list[i]+"\t"+str(version)+"\n" - req.write(entry) - except: - version = python_version - entry = modul_name_list[i]+"\t"+str(version)+"\n" - req.write(entry) - -def __parameter_editor(file_name,source_pathway_name,deposit_pathway_name): - """This function allows for changing the parameters after running the program""" - while True: - print("The program will run with the following parameters:\nFile name:\t\t",file_name,"\nSource pathway:\t",source_pathway_name,"\nDeposit pathway:\t",deposit_pathway_name,"\n") - parameter_conformation = input("To continue with these parameters input [continue or c] to change them input [edit]\n>") - if parameter_conformation == "continue"or parameter_conformation =="c": - break - elif parameter_conformation == "edit": - #edit the parameters - while True: - change_question = input("select the parameter you want to change [nfile/spath/dpath] or input [b] to go back\n>") - if change_question == "nfile": - #This condition allows the user to chenge the file name - file_name = input("Please input the new file name\n>") - break - elif change_question == "spath": - #This condition allows the user to change the source path - source_pathway_name = input("Please input the new source path\n>") - - does_source_pathway_exist = os.path.exists(source_pathway_name) - if does_source_pathway_exist: - break - else: - print("The new source pathway:",source_pathway_name,"does not exist\nThe source pathway was returned to default:",os.getcwd()) - source_pathway_name = os.getcwd() - elif change_question == "dpath": - #This condition allows the user to change output file location - deposit_pathway_name = input("Please input the new output file path name\n>") - does_deposit_pathway_exist = os.path.exists(deposit_pathway_name) - if does_deposit_pathway_exist: - break - else: - print("The new deposit pathway:",deposit_pathway_name,"does not existe\nThe deposit pathway was returnt to default:",source_pathway_name) - deposit_pathway_name = source_pathway_name - #The block above test if the new deposit pathway is valid - elif change_question == "b": - # This condition allows the user to return to the main loop - break - else: - #This condition covers all non valid inputs into the secund loop - print("The input",change_question,"is not valid. Please use one of the specified commands") - - else: - #This condition covers all non valid input for the main loop - print("The input",parameter_conformation,"is not valide please use one of the specified comands\n") - return(file_name,source_pathway_name,deposit_pathway_name) - - - - - - - -def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()): - """This function searches for preexisting files of the same name as the results file of the current program. It allows the user to choose to move on with the pre-existing file """ - File_of_same_name_found = False - generat_new_file = False - directory_content = os.listdir(deposit_pathway_name) - for file in directory_content: - if file == file_name: - while True: - File_found_input = input (file_name+" has allready been generated\nDo you want to generate a new one [y/n] \n>") - if File_found_input == "n": - File_of_same_name_found = True - break - elif File_found_input == "y": - generat_new_file = True - break - else: - print("Invalid input\nPlease press [y] if you want to generate a new file or [n] if you want to use the preexisting file") - break - else: - continue - if File_of_same_name_found: - print("No new file will be generated, the program can continue") - elif generat_new_file: - print("A new file will be generated please wait...\n") - else: - print("No pre-existing file of the relevant type has been found.\nA new file will be generated please wait...\n") - return(File_of_same_name_found) - -def bar_builder(percentage = 0,length_multiplyer = 2,start_time = time.time(),bar = str()): - """This function creates a loading bar that can load in 10% increments starting a 0% and ending at 100% - Expected inputs: - percentage: int between 0 and 100 in steps of 10; default = 0 #defines the current loading increment - length_multiplyer: int > 0 ; default = 2 #determiens the amount of symbols per loading increment - start_time: any int ; default= time.time() #for determening loading time - bar: str ; default = str()#input of the current bar status does not need to be defined if for the 0% increment - """ - if percentage == 100: - bar = bar.replace("-","#") - print("\r"+bar+"\t"+"100%\t\t"+str(int(time.time()-start_time))) - elif percentage > 0: - bar = bar.replace("-","#",length_multiplyer) - print("\r"+bar+"\t"+str(percentage)+"%", end='',flush=True) - elif percentage == 0: - bar = "["+"-"*length_multiplyer*10+"]" - print(bar+"\t", end='',flush=True) - return(bar,start_time) - -def __test_file_name(file_name,source_pathway_name = os.getcwd()): - """This function validates that the source file exists at the source path. It turns the file name input in a standardized format that can be used in the next steps""" - - directory_content = os.listdir(source_pathway_name) - - index_of_the_dot = file_name.rfind(".") - valide_source_file = False - validate_source_file = True - if index_of_the_dot ==-1: - file_name += ".gtf" - else: - source_file_typ = file_name[index_of_the_dot:] - not_a_file_type = re.compile(".\d{1,13}") - try_not_a_file_type = not_a_file_type.search(source_file_typ) - if source_file_typ == ".gtf": - file_name = file_name - elif try_not_a_file_type: - file_name += ".gtf" - else: - print("This program can not handle",source_file_typ,"files. \nplease use a .gtf file" ) - validate_source_file = False - #The block above tests if the file_name includes the file type and if no - #file type is found adds ".gtf" und if a non ".gtf" file is found gives an error - - if validate_source_file: - for file in directory_content: - if file == file_name: - valide_source_file = True - break - #The block above tests if a file on the given name is in the given directora - - if valide_source_file: - print("The file:",file_name,"has been found.\n") - else: - print("No .gtf file of the name",file_name,"has been found in this pathway") - #The bock above gives feed back regarding the results of the file test - - file_name = file_name.replace(".gtf","") - #This line normalizes the file name - return(valide_source_file,file_name) - -def __do_pathways_exist__(source_pathway_name,deposit_pathway_name): - """This funtion tests that the entered pathways actualy exist""" - does_source_pathway_exist = os.path.exists(source_pathway_name) - does_deposit_pathway_exist = os.path.exists(deposit_pathway_name) - #The Block above does the actual testing - if does_source_pathway_exist: - source_pathway_name = source_pathway_name - else: - print("The source pathway:",source_pathway_name,"has not been found\nThe source pathway was set to the default") - source_pathway_name = os.getcwd() - #The block above detail the possible reactions for the source pathe existing or not existing - if does_deposit_pathway_exist: - deposit_pathway_name = deposit_pathway_name - else: - print("The deposit pathway:",deposit_pathway_name,"has not been found\nThe deposit pathway was set to the default") - deposit_pathway_name = source_pathway_name - #The block above details the possible reactions for the deposit pathway existing or not existing - return(source_pathway_name,deposit_pathway_name) - -def gene_ID_finder(entry): - """This function is supposed to find the gene ID of a known gene entry - Expected inputs: - entry: str #a line from a gtf file that contains a gene ID""" - index_gene_id = entry.find("gene_id") - find_gene_id_name = re.compile("\"\S{1,25}\"") - sub_entry = entry[index_gene_id:] - try_find_gene_id_name = find_gene_id_name.search(sub_entry) - gene_ID = try_find_gene_id_name[0].replace("\"","") - return (gene_ID) - -def transcript_ID_finder (entry): - """This function is supposed to finde the transcript ID in a known transcript entry - Expected inputs: - entry: str #a line from a gtf file that contains a transcript ID""" - index_transcript_id = entry.find("transcript_id") - find_transcript_id_name = re.compile("\"\S{1,25}\"") - sub_entry = entry[index_transcript_id:] - try_find_transcript_id_name = find_transcript_id_name.search(sub_entry) - - try: - transcript_ID = try_find_transcript_id_name[0].replace("\"","") - except: - transcript_ID = "" - return (transcript_ID) - -def transcript_support_level_finder(entry): - """This function is supposed to find the transcript support level in a known transcript entry - Expected input: - entry: str #a line from a gtf file that be blongs to a transcript""" - transcript_support_level_start_ID = entry.find("transcript_support_level") - sub_entry = entry[transcript_support_level_start_ID:] - - try: - score_finder = re.compile("\W\w{1,16}\W{2}") - try_score_finder = score_finder.search(sub_entry) - Pre_score_1 = try_score_finder[0] - Pre_score_2 = Pre_score_1.replace("\"","") - Pre_score_2 = Pre_score_2.replace("(","") - transcript_support_level = Pre_score_2.replace(";","") - if "NA" in transcript_support_level: - transcript_support_level = 100 - #I changed This tell laura - - - except: - transcript_support_level = 100 - return (transcript_support_level) - - - - -def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name): - """This function extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt. - Expected input: - file_name: str #the name of the gft file you want to look at without the .gtf part - source_pathway_name: str #path of the gtf file - deposit_pathway_name: str #path for saving the intermediat file""" - - with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f: - total_entrys =len(f.readlines()) - with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f: - current_entry = 0 - percentage_done = 0 - bar,start_time = bar_builder(length_multiplyer = 3) - - - Old_gen_ID = str() - #stand-in as the first couple entrys are not genes - with open(os.path.join(deposit_pathway_name,file_name+"_"+"intermediate_file"+".txt"),"w") as IMF: - transcript_number = 0 - for entry in f: - - - current_entry += 1 - current_percentage_done = 100* current_entry/total_entrys - if current_percentage_done > percentage_done +10: - bar,start_time = bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar) - percentage_done = int(current_percentage_done) - - if "gene_id" in entry: - Gen_ID = gene_ID_finder(entry) - else: - Gen_ID = Old_gen_ID - - if Gen_ID != Old_gen_ID: - Gen_entry = ">"+ Gen_ID +"\n" - IMF.write(Gen_entry) - transcript_number = 0 - Old_gen_ID = Gen_ID - - if "\ttranscript\t" in entry: - transcript_number += 1 - Transcript_ID = transcript_ID_finder(entry) - #the function that determins the transcript ID is called - transcript_support_level = transcript_support_level_finder(entry) - #the function that determins the transcript support level is called - New_entry = str(transcript_number)+"\t"+str(Transcript_ID)+"\t"+str(transcript_support_level)+"\t"+"\t\n" - IMF.write(New_entry) - bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar) - print("The transcripts have been collected") - - -def extract_transcript(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = False,Input_free = False): - """ This it the overall exetutable funtion that will execute the transcript extraction process for a given file with all checks. - Expected input: - file_name: str ; default = test #the name of the gft file you want to look at - source_pathway_name: str ; default = current work directory #path of the gtf file - deposit_pathway_name: str ; default = source_pathway_name #path for saving the intermediat file - Outputs: - file_name: str - source_pathway_name: str - deposit_pathway_name: str - """ - - - if deposit_pathway_name == False: - deposit_pathway_name = source_pathway_name - if Input_free: - validated_file_name = __test_file_name(file_name,source_pathway_name) - file_name = validated_file_name[1] - _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name) - else: - file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name) - source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name) - validated_file_name = __test_file_name(file_name,source_pathway_name) - file_name = validated_file_name[1] - if validated_file_name[0]: - if __searche_for_preexisting_files(file_name+"_intermediate_file.txt",deposit_pathway_name): - print("The transcripts has been collected\n") - else: - _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name) - return(file_name,source_pathway_name,deposit_pathway_name) - -#### Dev part #### - -if __name__ == "__main__": - #version_control(module_list,modul_name_list,python_version) - extract_transcript() -#This line allows the file to be executed on its own also from - - diff --git a/transcript_sampler/poisson_sampling.py b/transcript_sampler/poisson_sampling.py index 6c586aca38c5cc56664d7d8d0344a7cb5b3d6048..0df129c1fbc688507858970bc8ca2657e8552aca 100644 --- a/transcript_sampler/poisson_sampling.py +++ b/transcript_sampler/poisson_sampling.py @@ -1,12 +1,12 @@ -"""Sample transcripts by Poisson-sampling""" +"""Sample transcripts by Poisson-sampling.""" -import pandas as pd +import pandas as pd # type: ignore import numpy as np +# pylint: disable=R0903 class SampleTranscript: - ''' - Sample transcript + """Sample transcript. This part of the code does Poisson sampling proportionally to gene expression levels for each gene. @@ -17,10 +17,11 @@ class SampleTranscript: output: csv file with gene id and count gtf file with transcript samples - ''' + """ + @staticmethod def transcript_sampling(total_transcript_number, df_repr, output_csv): - """Samples transcript based on Poisson-sampling""" + """Sample transcript based on Poisson-sampling.""" total = df_repr["level"].sum() total_transcript_number = int(total_transcript_number) normalized = total_transcript_number / total @@ -28,48 +29,4 @@ class SampleTranscript: transcript_numbers = pd.DataFrame({ "id": df_repr["id"], "count": levels }) - transcript_numbers.to_csv(output_csv, index=False) - - -# python_version = "3.7.13" -# module_list = [pd, np, argparse] -# modul_name_list = ["pd", "np", "argparse"] - -# def transcript_sampling(total_transcript_number, df_repr, output_csv): -# # df = pd.read_csv( -# # csv_file, sep="\t", lineterminator="\n", names=["id", "level"]) -# # the function match_reprTranscript_expressionLevel() now outputs a df -# df = df_repr -# levels = [] -# sums = df['level'].tolist() -# total = sum(sums) -# # I added this because writting a number in the terminal inputed a string -# total_transcript_number = int(total_transcript_number) -# normalized = total_transcript_number/total -# for expression_level in df['level']: -# poisson_sampled = np.random.poisson(expression_level*normalized) -# levels.append(poisson_sampled) - -# transcript_numbers = pd.DataFrame({'id': df['id'], 'count': levels}) -# pd.DataFrame.to_csv(transcript_numbers, output_csv) - - -# if __name__ == '__main__': -# # te.version_control(module_list,modul_name_list,python_version) -# parser = argparse.ArgumentParser( -# description="Transcript Poisson sampler, csv output", -# formatter_class=argparse.ArgumentDefaultsHelpFormatter -# ) - -# parser.add_argument("--expression_level", required=True, -# help="csv file with expression level") -# parser.add_argument("--output_csv", required=True, -# help="output csv file") -# parser.add_argument("--input_csv", required=True, -# help="input csv file") -# parser.add_argument("--transcript_number", required=True, -# help="total number of transcripts to sample") -# args = parser.parse_args() - -# transcript_sampling(args.transcript_number, args.input_csv, -# args.output_csv, args.transcript_number) + transcript_numbers.to_csv(output_csv, index=False, header=False) diff --git a/transcript_sampler/transcript_sampler_org.ipynb b/transcript_sampler/transcript_sampler_org.ipynb deleted file mode 100644 index c3346592594e6bcacce01a74c4844370c8d9686a..0000000000000000000000000000000000000000 --- a/transcript_sampler/transcript_sampler_org.ipynb +++ /dev/null @@ -1,574 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 85, - "metadata": {}, - "outputs": [], - "source": [ - "import argparse\n", - "import pandas as pd\n", - "import numpy as np\n", - "from gtfparse import read_gtf" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "find representative transcripts" - ] - }, - { - "cell_type": "code", - "execution_count": 86, - "metadata": {}, - "outputs": [], - "source": [ - "def attributs_converter(attributs):\n", - " \"\"\"\n", - " This funtion converts the \"unstrucktured\" ;-seperated part of he line into a list of identifyers and coresponding data the struckture of\n", - " which can be used ot find the data easyly e.g the index of the identifier transcrip_id + 1 will give the trasncript id of the current gene\n", - " Input: \n", - " attributs = str() #the unstrucktured part of the entry\n", - " Output:\n", - " attributs = list() # cleand list with the characterritsics discribed above\n", - " \"\"\"\n", - " attributs = attributs.replace(\"\\\"\",\"\")\n", - " attributs = attributs.replace(\";\",\"\")\n", - " attributs = attributs.replace(\"\\\\n\",\"\")\n", - " attributs =attributs.split(\" \")\n", - " \n", - " return(attributs)\n", - "\n", - "def find_in_attributs (attributs,look_for):\n", - " \"\"\"\n", - " This function finds a key word and used that to lokat the value of that key word e.g key = gene_id, value = 'ENSMUSG00002074970',\n", - " this works as they are next to each other in the attributs list. \n", - " Inputs:\n", - " sub_enty = list() \n", - " look_fore = str() #string of with the name of the key to look for\n", - " Output: \n", - " attributs[index] or NA = str() #NA is returned if the key was not found in the attributs\n", - " \"\"\"\n", - " try:\n", - " index = attributs.index(look_for)+1\n", - " return attributs[index]\n", - " except: \n", - " #print(\"No\",look_for,\"in the entry the return was set to NA\\n\",attributs)\n", - " return \"NA\"\n", - "\n", - "def _re_format(rep_trans_dict):\n", - " \"\"\"\n", - " This function is ment to reformat dictionary of the representatice transcripts into an dictionary with only one entry per key\n", - " Input:\n", - " rep_trans_dict = {gene_id : [transcript_id , transcript_support_level , transcript_length]}\n", - " Output: \n", - " rep_transcripts = {gene_id : transcript_id}\n", - " \"\"\"\n", - " rep_transcripts = dict()\n", - " for gene_id in rep_trans_dict: \n", - " rep_transcripts[gene_id] = rep_trans_dict[gene_id][0]\n", - " \n", - " return rep_transcripts\n", - " \n", - " \n", - "\n", - "def get_rep_trans(file_name = \"test\"):\n", - " \"\"\" \n", - " This is the main function of this script it selects one representative transcrip per gene based on a gtf annotation file. \n", - " It does so be two criteria: first the transcript support level and it there are several transcript \n", - " of one gene that have the same trasncript_support_level it chooses the one that corresponds to the longest mRNA.\n", - " Input: \n", - " file_name = str() # name of the annotation file with or without the .gtf part\n", - " Output: \n", - " rep_transcripts = {gene_id : transcript_id}\n", - " \"\"\"\n", - " \n", - " #setting defoult variables\n", - " rep_trans = dict()\n", - " cur_gID = str()\n", - " cur_best_trans = [str(),100,0] # [transcript_id , transcript_support_level , transcript_length]\n", - " pot_best_trans = False\n", - " cur_tID = str()\n", - " ignor_trans = False\n", - " \n", - " with open (file_name,\"r\") as f: \n", - " for line in f: \n", - " entry = line.split(\"\\t\")\n", - " \n", - " #removes expected but unneeded entrys\n", - " exp_unneed = [\"CDS\",\"stop_codon\",\"five_prime_utr\",\"three_prime_utr\",\"start_codon\",'Selenocysteine']\n", - " if len(entry) == 1 or entry[2] in exp_unneed:\n", - " continue\n", - " \n", - " #this function turns the less organized part of the entry into a reable list\n", - " attributs = attributs_converter(entry[8])\n", - " #looking for and processing exons entrys\n", - " if entry[2] == \"exon\": \n", - " \n", - " #dicide if to contiune or not\n", - " if ignor_trans: \n", - " continue\n", - " elif cur_gID != attributs[1]:\n", - " raise ValueError(\"ERROR exon from an unexpected Gen\")\n", - " continue\n", - " elif find_in_attributs (attributs,\"transcript_id\") != cur_tID:\n", - " raise ValueError(\"exon from an unexpected transcript\")\n", - " continue\n", - " \n", - " #adding the length of the exon to the appropriat list and chacking for changes in best transcript\n", - " if pot_best_trans: \n", - " pot_best_trans[2]+= int(entry[4])-int(entry[3])\n", - " if pot_best_trans[2] > cur_best_trans[2]: \n", - " cur_best_trans = pot_best_trans\n", - " pot_best_trans = False\n", - " else:\n", - " cur_best_trans[2]+= int(entry[4])-int(entry[3])\n", - "\n", - " \n", - " \n", - " #looking for and processing transcript entrys\n", - " elif entry[2] == \"transcript\":\n", - " \n", - " #varryfi that the gen is correct\n", - " if cur_gID != attributs[1]:\n", - " raise ValueError(\"ERROR transcript from an unexpected Gen\")\n", - " continue\n", - " \n", - " #finding the transcript id and the support level\n", - " cur_tID = find_in_attributs (attributs,\"transcript_id\") \n", - " t_supp_lvl = find_in_attributs (attributs,\"transcript_support_level\") \n", - " \n", - " #If there is no transcript support level or the level is given as NA it is nomed as 100. else the transcript support level is tunrn into int\n", - " if t_supp_lvl == \"NA\": \n", - " t_supp_lvl = 100\n", - " else:\n", - " try:\n", - " t_supp_lvl = int(t_supp_lvl)\n", - " except: \n", - " t_supp_lvl = 100\n", - " \n", - " \n", - " #decides if the transcript has potential to become the representative transcript\n", - " if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == \"\":\n", - " cur_best_trans = [cur_tID,t_supp_lvl,0]\n", - " pot_best_trans = False\n", - " ignor_trans = False\n", - " \n", - " elif t_supp_lvl == cur_best_trans[1]:\n", - " pot_best_trans = [cur_tID,t_supp_lvl,0] \n", - " else:\n", - " ignor_trans = True\n", - " \n", - " \n", - " #looking for and processing gene entrys\n", - " elif entry[2] == \"gene\":\n", - " \n", - " #updating rep_trans dict\n", - " if cur_gID not in rep_trans: \n", - " rep_trans[cur_gID] = cur_best_trans\n", - " else: \n", - " if rep_trans[cur_gID][1] > cur_best_trans[1]: \n", - " rep_trans[cur_gID] = cur_best_trans\n", - " elif rep_trans[cur_gID][1] == cur_best_trans[1] and rep_trans[cur_gID][2] < cur_best_trans[2]: \n", - " rep_trans[cur_gID] = cur_best_trans\n", - " \n", - " #updating cur_gID and resetting cur_best_trans\n", - " cur_gID = attributs[1]\n", - " cur_best_trans = [str(),100,0]\n", - " \n", - " #raises an error for unidentifyable entrys\n", - " else: \n", - " raise ValueError(\"This entry could not be identified\\n\",entry)\n", - " \n", - " #addding the final gene to the dictionary\n", - " if cur_gID not in rep_trans: \n", - " rep_trans[cur_gID] = cur_best_trans\n", - " else: \n", - " if rep_trans[cur_gID][1] > cur_best_trans[1]: \n", - " rep_trans[cur_gID] = cur_best_trans\n", - " elif rep_trans[cur_gID][1] == cur_best_trans[1] and rep_trans[cur_gID][2] < cur_best_trans[2]: \n", - " rep_trans[cur_gID] = cur_best_trans \n", - " \n", - " del rep_trans[\"\"]\n", - " rep_transcripts = _re_format(rep_trans)\n", - " return(rep_transcripts )\n", - "\n", - "\n", - "def _test(): \n", - " \"\"\"\n", - " This funtion is ment to be run for test\n", - " Output: \n", - " file with the dictionary generated based on the test file \n", - " \"\"\"\n", - " file_name = \"test.gtf\"\n", - " rt = get_rep_trans(file_name)\n", - " expected_result = {\"ENSG00000160072\":\"ENST00000472194\",\"ENSG00000234396\":\"ENST00000442483\",\n", - " \"ENSG00000225972\":\"ENST00000416931\",\"ENSG00000224315\":\"ENST00000428803\",\n", - " \"ENSG00000198744\":\"ENST00000416718\",\"ENSG00000279928\":\"ENST00000624431\",\n", - " \"ENSG00000228037\":\"ENST00000424215\",'ENSG00000142611':'ENST00000378391'}\n", - " if rt != expected_result: \n", - " print(\"The test fail due to not yieding the same results\")\n", - " print(\"The results the program got\\n\",rt)\n", - " print(\"The expected results\\n\",expected_result)\n", - " else: \n", - " print(\"The test was succses full\")" - ] - }, - { - "cell_type": "code", - "execution_count": 87, - "metadata": {}, - "outputs": [], - "source": [ - "def gtf_file_writer (original_file, rep_transcript_dict, output_file): \n", - " \"\"\"\n", - " this function writes the output GTF file\n", - " \"\"\"\n", - " output = [] \n", - "\n", - " with open(original_file, 'r') as f:\n", - " for line in f: \n", - " entry = line.split(\"\\t\")\n", - " if line[0] != '#':\n", - " attributes = attributs_converter(entry[8])\n", - " type_ = entry[2]\n", - " else:\n", - " continue\n", - " if type_ == 'gene':\n", - " gene_id = find_in_attributs(attributes, 'gene_id')\n", - " output.append(line)\n", - " else:\n", - " transcript_id = find_in_attributs(attributes, 'transcript_id')\n", - " if rep_transcript_dict[gene_id] == transcript_id:\n", - " output.append(line)\n", - "\n", - " with open(output_file, 'w') as last_file:\n", - " for item in output : \n", - " last_file.write(item)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Match representative transcript and expression level file " - ] - }, - { - "cell_type": "code", - "execution_count": 88, - "metadata": {}, - "outputs": [], - "source": [ - "def gtf_to_df(gtf_file:str)-> pd.DataFrame: \n", - " \"\"\"\n", - " This function take a .gtf file and convert it into a \n", - " dataframe containing gene_id and their transcripts_id.\n", - " Args:\n", - " gtf_file (str) : path to the .gtf file\n", - "\n", - " Returns:\n", - " df_gtf (pd.DataFrame) : pandas dataframe containing columns\n", - " gene_id and their transcripts_id.\n", - " Raises : \n", - " None \n", - " \n", - " \"\"\"\n", - " df_gtf = read_gtf(gtf_file)\n", - " df_gtf = df_gtf.loc[df_gtf[\"feature\"]==\"transcript\"]\n", - " df_gtf = df_gtf[[\"gene_id\",\"transcript_id\"]]\n", - " df_gtf = df_gtf.rename(columns={\"gene_id\":\"Gene\",\"transcript_id\":\"Transcript\"})\n", - " return df_gtf" - ] - }, - { - "cell_type": "code", - "execution_count": 89, - "metadata": {}, - "outputs": [], - "source": [ - "def dict_reprTrans_to_df(dict_reprTrans: dict[str, str]) -> pd.DataFrame:\n", - "\n", - " \"\"\"Convert a dictionary of genes and their representative transcript into a dataframe \n", - "\n", - " Args:\n", - " dict_reprTrans (dict) : {'Gene':['transcriptA', 'transcriptB'], ...}\n", - "\n", - " Returns:\n", - " Pandas dataframe having Gene and transcript as columns\n", - " \n", - " Raises:\n", - " Only dict are allowed\n", - " Key should be strings\n", - " Value should be strings\n", - " \n", - " \"\"\"\n", - " pass\n", - " if not type(dict_reprTrans) is dict:\n", - " raise TypeError(\"Only dict are allowed\")\n", - " if type(list(dict_reprTrans.keys())[0]) is not str:\n", - " raise TypeError(\"Key should be strings\")\n", - " if type(list(dict_reprTrans.values())[0]) is not str:\n", - " raise TypeError(\"Values should be strings\")\n", - "\n", - " df_reprTrans = pd.DataFrame.from_dict(\n", - " dict_reprTrans, orient=\"index\", columns=[\"reprTranscript\"]\n", - " )\n", - " df_reprTrans = df_reprTrans.reset_index(level=0)\n", - " df_reprTrans.columns = [\"Gene\", \"reprTrans\"]\n", - " df_reprTrans[\"reprTrans\"] = df_reprTrans[\"reprTrans\"].str.replace(\n", - " r\"\\.[1-9]\", \"\", regex=True\n", - " )\n", - " return df_reprTrans\n", - "\n", - "\n", - "\n", - "def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:\n", - " \"\"\"Convert tsv or csv file into a pandas dataframe\n", - "\n", - " Args:\n", - " input_txt (str): csv or tsv file containing transcript expression level\n", - "\n", - " Returns:\n", - " df_gene (str): Pandas dataframe having transcript and expression level\n", - " as columns \n", - " \n", - " Raises:\n", - " None \n", - " \"\"\"\n", - " pass\n", - " df_input = pd.read_csv(\n", - " input_txt,\n", - " sep=r\"[\\t,]\",\n", - " lineterminator=\"\\n\",\n", - " names=[\"Transcript\", \"Expression_level\"],\n", - " engine=\"python\",\n", - " )\n", - " return df_input\n", - "\n", - "\n", - "def exprLevel_byGene(\n", - " df_exprTrasncript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame\n", - ") -> pd.DataFrame:\n", - " \"\"\"find the gene of each transcipt given by the expression level csv/tsv file,\n", - " and summ expression level of all transcipts from the same gene. \n", - "\n", - " Args:\n", - " df_exprTranscript : pandas Dataframe containing transcript and their expression level,\n", - " generated by \"tsv_or_csv_to_df\" function\n", - " df_output_gtf_selection : pandas Dataframe containing genes and transcripts,\n", - " generated by \"transcripts_by_gene_inDf\" function \n", - "\n", - " Returns:\n", - " Pandas dataframe having gene and sum of its transcript expression level\n", - " \n", - " Raises:\n", - " None \n", - " \"\"\"\n", - " pass\n", - " df_merged = pd.merge(\n", - " df_output_gtf_selection, df_exprTrasncript, how=\"inner\", on=\"Transcript\"\n", - " )\n", - " df_sum = df_merged.groupby(\"Gene\").sum(\n", - " \"Expression_level\"\n", - " ) \n", - " return df_sum\n", - "\n", - "\n", - "def match_byGene(\n", - " df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame\n", - ") -> pd.DataFrame:\n", - " \"\"\"Find matching genes bewteen the 2 args \n", - "\n", - " Args:\n", - " df_reprTranscript : pandas Dataframe containing genes \n", - " and their representative transcript, generated by\n", - " \"dict_reprTrans_to_df()\" \n", - " df_expressionLevel_byGene : pandas Dataframe containing \n", - " genes and their expression level generated by \n", - " \"transcript_by_gene_inDf()\"\n", - "\n", - " Returns:\n", - " Pandas dataframe having representative trasncripts \n", - " and their expression level\n", - " \n", - " Raises:\n", - " None \n", - " \"\"\"\n", - " pass\n", - " df_merged = pd.merge(\n", - " df_reprTranscript, df_expressionLevel_byGene, how=\"outer\", on=\"Gene\"\n", - " )\n", - " df_clean = df_merged.dropna(axis=0)\n", - " df_clean = df_clean.loc[:, [\"reprTrans\", \"Expression_level\"]]\n", - " return df_clean\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "### functions to run this part of the programm\n", - "\n", - "\n", - "def match_reprTranscript_expressionLevel(\n", - " exprTrans: str, dict_reprTrans: dict, gtf_file: str,\n", - "):\n", - " \"\"\"Combine functions to replace transcripts from an expression level csv/tsv file \n", - " with representative transcripts \n", - "\n", - " Args:\n", - " exprTrans (str): csv or tsv file containing transcripts\n", - " and their expression level \n", - " dict_reprTrans (dict) : dict of genes and their \n", - " representative transcipt\n", - " intemediate_file (str) : txt file containing genes, transcript \n", - " and their expression level from the transkript_extractor function\n", - " output_path : path indicating were the tsv file should be written\n", - "\n", - " Returns:\n", - " tsv file of representative trasncripts and their expression level\n", - " \n", - " Raises:\n", - " None \n", - " \"\"\"\n", - " df_gene_transcript = gtf_to_df(gtf_file)\n", - " df_exprTrans = tsv_or_csv_to_df(exprTrans)\n", - " df_reprTrans = dict_reprTrans_to_df(dict_reprTrans)\n", - " df_exprLevel_byGene = exprLevel_byGene(df_exprTrans, df_gene_transcript)\n", - " df_match = match_byGene(df_reprTrans, df_exprLevel_byGene)\n", - " df_match.rename(columns = {'reprTrans':'id', 'Expression_level':'level'}, inplace = True)\n", - " return df_match\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "poisson sampling" - ] - }, - { - "cell_type": "code", - "execution_count": 90, - "metadata": {}, - "outputs": [], - "source": [ - "def transcript_sampling(total_transcript_number, df_repr, output_csv):\n", - " df = df_repr \n", - " levels = []\n", - " sums = df['level'].tolist()\n", - " total = sum(sums)\n", - " total_transcript_number=int(total_transcript_number)\n", - " normalized = total_transcript_number/total\n", - " for expression_level in df['level']:\n", - " poisson_sampled = np.random.poisson(expression_level*normalized)\n", - " levels.append(poisson_sampled)\n", - "\n", - " transcript_numbers = pd.DataFrame({'id': df['id'],'count': levels})\n", - " pd.DataFrame.to_csv(transcript_numbers, output_csv)\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "execution" - ] - }, - { - "cell_type": "code", - "execution_count": 91, - "metadata": {}, - "outputs": [], - "source": [ - "input_gtf = r\"C:\\Users\\input_files\\test.gtf\"\n", - "input_csv = r\"C:\\Users\\input_files\\expression.csv\"\n", - "number_to_sample = 100\n", - "output_gtf = r\"C:\\Users\\output_gtf.gtf\"\n", - "output_csv = r\"C:\\Users\\output_csv.csv\"" - ] - }, - { - "cell_type": "code", - "execution_count": 92, - "metadata": {}, - "outputs": [], - "source": [ - "def exe(input_gtf, input_csv, transcript_nr, output_gtf, output_csv, input_free = True):\n", - " dict_repr_trans = get_rep_trans(input_gtf)\n", - " df_repr = match_reprTranscript_expressionLevel(dict_reprTrans=dict_repr_trans, exprTrans=input_csv, gtf_file=input_gtf)\n", - " print(df_repr)\n", - " print(\"Finiding match between representative transcripts and expression level file\") \n", - " print(\"Poisson sampling of transcripts\")\n", - " transcript_sampling(transcript_nr, df_repr, output_csv)\n", - " print(\"output csv file ready\")\n", - " print(\"writing output gtf file\")\n", - " gtf_file_writer(input_gtf, dict_repr_trans, output_gtf)" - ] - }, - { - "cell_type": "code", - "execution_count": 93, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:root:Extracted GTF attributes: ['gene_id', 'gene_version', 'gene_name', 'gene_source', 'gene_biotype', 'transcript_id', 'transcript_version', 'transcript_name', 'transcript_source', 'transcript_biotype', 'tag', 'ccds_id', 'exon_number', 'exon_id', 'exon_version', 'protein_id', 'protein_version', 'transcript_support_level']\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - " id level\n", - "0 ENST00000472194 1.980250\n", - "1 ENST00000442483 0.838144\n", - "7 ENST00000378391 0.914558\n", - "Finiding match between representative transcripts and expression level file\n", - "Poisson sampling of transcripts\n", - "output csv file ready\n", - "writing output gtf file\n" - ] - } - ], - "source": [ - "exe(input_gtf=input_gtf, input_csv=input_csv, transcript_nr=number_to_sample, output_gtf=output_gtf, output_csv=output_csv)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "base", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.12" - }, - "orig_nbformat": 4, - "vscode": { - "interpreter": { - "hash": "1e2fb702d9886c77776a2bcd731cac0877c51b973772daa49239be006cec7b9a" - } - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/transcript_sampler/transcript_sampler_org.py b/transcript_sampler/transcript_sampler_org.py deleted file mode 100644 index 923cbf81367972b0c39fd7631243aac39a9322b3..0000000000000000000000000000000000000000 --- a/transcript_sampler/transcript_sampler_org.py +++ /dev/null @@ -1,400 +0,0 @@ -import pandas as pd -import numpy as np -import logging -from gtfparse import read_gtf - -LOG = logging.getLogger(__name__) - -def attributes_converter(attributes: str) -> list: - """ - This funtion converts the "unstructured" ;-seperated part of he line into - a list of identifiers and corresponding data, the structure of - which can be used ot find the data easily e.g the index of the identifier - transcript_id + 1 will give the transcript id of the current gene - Input: - attributes = str() # the unstructured part of the entry - Output: - attributes = list() # cleaned list with the characteristics described - """ - attributes = ( - attributes.replace('"', "") - .replace(";", "") - .replace("\\n", "") - .split(" ") - ) - return attributes - - -def find_in_attributes(attributes: list, look_for: str) -> str: - """ - This function finds a keyword and used that to locate the value of that - keyword e.g key = gene_id, value = 'ENSMUSG00002074970', - this works as they are next to each other in the attributes list. - Inputs: - attributes = list() - look_for = str() # string of the name of the key to look for - Output: - attributes[index] or NA = str() # NA is returned if the key - was not found in the attributes - """ - if look_for in attributes: - index = attributes.index(look_for) + 1 - return attributes[index] - else: - LOG.warning(f'No {look_for} in the entry, the return was set to NA') - return "NA" - - -def _re_format(rep_trans_dict: dict) -> dict: - """ - This function is meant to reformat dictionary of the representative - transcripts into an dictionary with only one entry per key - Input: - rep_trans_dict = {gene_id : [ - transcript_id, transcript_support_level, transcript_length]} - Output: - rep_transcripts = {gene_id : transcript_id} - """ - rep_transcripts = dict() - for gene_id in rep_trans_dict: - rep_transcripts[gene_id] = rep_trans_dict[gene_id][0] - - return rep_transcripts - - -def get_rep_trans(file_name: str = "test.gtf") -> dict: - """ - This is the main function of this script. It selects one representative transcript per gene based on a GTF annotation file. - It does so by two criteria: the transcript support level and if there are several transcripts of one gene that have the same transcript_support_level, it chooses the one that corresponds to the longest mRNA. - - Args: - file_name (str): Name of the annotation file with or without the .gtf extension. - - Returns: - rep_transcripts (dict): Dictionary of gene_id to transcript_id representing the selected representative transcripts. - - Raises: - ValueError: If an unexpected entry is encountered in the GTF file. - """ - - # setting default variables - rep_transcripts = {} - cur_gID = "" - cur_best_trans = ["", 100, 0] # [transcript_id, transcript_support_level, transcript_length] - - with open(file_name, "r") as f: - for line in f: - entry = line.split("\t") - - # removes expected but unneeded entries - if len(entry) == 1 or entry[2] in [ - "CDS", - "stop_codon", - "five_prime_utr", - "three_prime_utr", - "start_codon", - "Selenocysteine" - ]: - continue - - # this function turns the less organized part of the entry - # into a readable list - attributes = attributes_converter(entry[8]) - - # looking for and processing exons entries - if entry[2] == "exon": - if ignor_trans: - continue - elif cur_gID != attributes[1]: - LOG.error() - raise ValueError("Exon from an unexpected gene") - elif find_in_attributes(attributes, "transcript_id") != cur_tID: - LOG.error() - raise ValueError("Exon from an unexpected transcript") - - # adding the length of the exon to the appropriate list and - # checking for changes in best transcript - if pot_best_trans: - pot_best_trans[2] += int(entry[4]) - int(entry[3]) - if pot_best_trans[2] > cur_best_trans[2]: - cur_best_trans = pot_best_trans - pot_best_trans = False - else: - cur_best_trans[2] += int(entry[4]) - int(entry[3]) - - # looking for and processing transcript entries - elif entry[2] == "transcript": - # verify that the gen is correct - if cur_gID != attributes[1]: - LOG.error() - raise ValueError("Transcript from an unexpected gene") - - # finding the transcript id and the support level - cur_tID = find_in_attributes(attributes, "transcript_id") - t_supp_lvl = find_in_attributes(attributes, "transcript_support_level") - - # If there is no transcript support level or the level is - # given as NA it is nomed as 100. else the transcript - # support level is turned into int - if t_supp_lvl == "NA": - t_supp_lvl = 100 - else: - if t_supp_lvl.isdigit(): - t_supp_lvl = int(t_supp_lvl) - else: - t_supp_lvl = 100 - - # decides if the transcript has potential to become the - # representative transcript - if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "": - cur_best_trans = [cur_tID, t_supp_lvl, 0] - pot_best_trans = False - ignor_trans = False - elif t_supp_lvl == cur_best_trans[1]: - pot_best_trans = [cur_tID, t_supp_lvl, 0] - else: - ignor_trans = True - - # looking for and processing gene entries - elif entry[2] == "gene": - # updating rep_transcripts dict - if cur_gID in rep_transcripts: - if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]): - rep_transcripts[cur_gID] = cur_best_trans - else: - rep_transcripts[cur_gID] = cur_best_trans - - # updating cur_gID and resetting cur_best_trans - cur_gID = attributes[1] - cur_best_trans = ["", 100, 0] - - # raises an error for unidentifiable entries - else: - LOG.error() - raise ValueError("This entry could not be identified") - - # adding the final gene to the dictionary - if cur_gID in rep_transcripts: - if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]): - rep_transcripts[cur_gID] = cur_best_trans - else: - rep_transcripts[cur_gID] = cur_best_trans - - del rep_transcripts[""] - rep_transcripts = _re_format(rep_transcripts) - return rep_transcripts - - -def _test(): - """ - This funtion is meant to be run for test - Output: - file with the dictionary generated based on the test file - """ - file_name = "test.gtf" - rt = get_rep_trans(file_name) - expected_result = { - "ENSG00000160072": "ENST00000472194", - "ENSG00000234396": "ENST00000442483", - "ENSG00000225972": "ENST00000416931", - "ENSG00000224315": "ENST00000428803", - "ENSG00000198744": "ENST00000416718", - "ENSG00000279928": "ENST00000624431", - "ENSG00000228037": "ENST00000424215", - "ENSG00000142611": "ENST00000378391", - } - if rt != expected_result: - print("The test fail due to not yieding the same results") - print("The results the program got\n", rt) - print("The expected results\n", expected_result) - else: - print("The test was succsesfull") - - -def gtf_file_writer(original_file: str, rep_transcript_dict: dict, output_file: str): - """ - This function writes the output GTF file. - """ - output = [] - - with open(original_file, "r") as f: - for line in f: - if line.startswith("#"): - continue - - entry = line.split("\t") - attributes = attributes_converter(entry[8]) - feature_type = entry[2] - - if feature_type == "gene": - gene_id = find_in_attributes(attributes, "gene_id") - output.append(line) - else: - transcript_id = find_in_attributes(attributes, "transcript_id") - if gene_id in rep_transcript_dict and rep_transcript_dict[gene_id] == transcript_id: - output.append(line) - - with open(output_file, "w") as last_file: - last_file.writelines(output) - - -def gtf_to_df(gtf_file: str) -> pd.DataFrame: - """ - This function takes a .gtf file and converts it into a pandas DataFrame - containing gene_id and their transcript_id. - - Args: - gtf_file (str): Path to the .gtf file. - - Returns: - df_gtf (pd.DataFrame): Pandas DataFrame containing columns 'Gene' and 'Transcript'. - - Raises: - None - """ - df_gtf = read_gtf(gtf_file,).to_pandas() - df_gtf = df_gtf[df_gtf["feature"] == "transcript"] - df_gtf = df_gtf[["gene_id", "transcript_id"]] - df_gtf = df_gtf.rename(columns={"gene_id": "Gene", "transcript_id": "Transcript"}) - return df_gtf - - -def dict_reprTrans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame: - """ - Convert a dictionary of genes and their representative transcript into a DataFrame. - - Args: - dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...} - - Returns: - Pandas DataFrame with 'Gene' and 'Transcript' as columns. - - Raises: - TypeError: Only dictionaries are allowed. - TypeError: Keys should be strings. - TypeError: Values should be strings. - """ - if not isinstance(dict_reprTrans, dict): - LOG.error() - raise TypeError("Only dictionaries are allowed") - if not all(isinstance(key, str) for key in dict_reprTrans.keys()): - LOG.error() - raise TypeError("Keys should be strings") - if not all(isinstance(value, str) for value in dict_reprTrans.values()): - LOG.error() - raise TypeError("Values should be strings") - - df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"]) - df_reprTrans = df_reprTrans.reset_index() - df_reprTrans.columns = ["Gene", "reprTrans"] - df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True) - - return df_reprTrans - - -def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame: - """ - Convert a TSV or CSV file into a pandas DataFrame. - - Args: - input_txt (str): TSV or CSV file containing transcript expression levels. - - Returns: - df_gene (pd.DataFrame): Pandas DataFrame with 'Transcript' and 'Expression_level' as columns. - - Raises: - None - """ - df_input = pd.read_csv( - input_txt, - sep=r"[\t,]", - lineterminator="\n", - names=["Transcript", "Expression_level"], - engine="python", - ) - return df_input - - -def exprLevel_byGene( - df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame -) -> pd.DataFrame: - """ - Find the gene of each transcript given by the expression level CSV/TSV file - and sum the expression level of all transcripts from the same gene. - - Args: - df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels, - generated by the "tsv_or_csv_to_df" function. - df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts, - generated by the "transcripts_by_gene_inDf" function. - - Returns: - Pandas DataFrame having 'Gene' and sum of its transcript expression levels. - - Raises: - None - """ - df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript") - df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index() - return df_sum - - -def match_byGene( - df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame -) -> pd.DataFrame: - """ - Find matching genes between the two DataFrames. - - Args: - df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts, - generated by the "dict_reprTrans_to_df()" function. - df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels, - generated by the "transcript_by_gene_inDf()" function. - - Returns: - Pandas DataFrame having representative transcripts and their expression levels. - - Raises: - None - """ - df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene") - df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]] - return df_clean - - -# functions to run this part of the program - - -def match_reprTranscript_expressionLevel( - exprTrans: str, dict_reprTrans: dict, gtf_file: str, -): - """ - Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts. - - Args: - exprTrans (str): CSV or TSV file containing transcripts and their expression level. - dict_reprTrans (dict): Dictionary of genes and their representative transcripts. - gtf_file (str): Path to the GTF file. - - Returns: - Pandas DataFrame of representative transcripts and their expression level. - - Raises: - None - """ - df_gene_transcript = gtf_to_df(gtf_file) - df_exprTrans = tsv_or_csv_to_df(exprTrans) - df_reprTrans = dict_reprTrans_to_df(dict_reprTrans) - df_exprLevel_byGene = exprLevel_byGene(df_exprTrans, df_gene_transcript) - df_match = match_byGene(df_reprTrans, df_exprLevel_byGene) - df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True) - return df_match - - -def transcript_sampling(total_transcript_number, df_repr, output_csv): - total = df_repr["level"].sum() - total_transcript_number = int(total_transcript_number) - normalized = total_transcript_number / total - levels = np.random.poisson(df_repr["level"] * normalized) - transcript_numbers = pd.DataFrame({"id": df_repr["id"], "count": levels}) - transcript_numbers.to_csv(output_csv, index=False)