Skip to content
Snippets Groups Projects
Commit b700c8a1 authored by Mate Balajti's avatar Mate Balajti
Browse files

remove obsolete scripts

parent f97ac4ef
No related branches found
No related tags found
1 merge request!7feat: add tests
# Exon length filter #
"""Exon length filter
Version 2.1.0"""
# Called Packages #
import re
import os
import transcript_extractor as te
python_version = "3.7.13"
module_list = [re, os]
modul_name_list = ["re", "os"]
# Functions #
def exon_length_calculator(entry):
"""This function finds the start and end cordinates of the
exon and uses them to calculate its length"""
try:
find_exon_coordinates = re.compile(r"\t\d{1,15}\t")
# this difines the pattern of the coordinates
try_find_start_coordinates = find_exon_coordinates.search(entry)
# this line findes the start coordinares based on the pattern
start_coordinates = int(try_find_start_coordinates[0].replace("\t", ""))
# this line removes the \t at the end and the start of the pattern and
# turn the string of the coordinates into intergers
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
# this line determines the indes of the final digit
# of the start coordinates
sub_entry = entry[final_index_start_coordinates:]
# this lineused the index determin above a starting point
# for a new sub entry
try_find_end_coordinates = find_exon_coordinates.search(sub_entry)
end_coordinates = int(try_find_end_coordinates[0].replace("\t", ""))
# these two lines find the end coordinates and turn tham int an int
exon_length = end_coordinates-start_coordinates
# this line claculates the transcript length
except:
print("\n\nIn the following enty only one or no valid coordinates \
could be found:\n",entry,"the value will be set to NA")
exon_length = "NA"
return exon_length
def exon_fider(entry):
"""This funtion determines if a given entry belongs to an exon
Expected inputs:
entry: str #any enty of a gtf file"""
exon_test = entry.find(r"\texon\t")
# This line look for the entry exon in the file
if exon_test == -1:
try_exon_test = False
else:
try_exon_test = True
# The block above evaluates the results of the search for the wort exon
return try_exon_test
def __longest_transcript_finder(
current_exon_length,
longest_transcript,
longest_transcript_ID,
old_transcript_ID
):
"""This funtion encapsulates an operation that has to be carried out
at several points in the exon_length_filter function and serves to
make that function more modular"""
if current_exon_length > longest_transcript:
# This condition updates the most promesing for
# beeing the representative transcript
longest_transcript = current_exon_length
longest_transcript_ID = old_transcript_ID
current_exon_length = 0
return current_exon_length, longest_transcript, longest_transcript_ID
def _representative_transcript_csv(
representative_transcript, file_name = "test", deposit_pathway_name =os.getcwd()
):
with open(os.path.join(
deposit_pathway_name, file_name+"_"+"representative_transcripts"+".csv"
), "w", encoding="utf-8") as rt:
for i in representative_transcript:
transcript = representative_transcript[i]
new_entry = str(i)+","+transcript+"\n"
rt.write(new_entry)
def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This funtion selects only the transcripts for a dictionary that have the longest total mRNA"""
bar,start_time = te.bar_builder(length_multiplyer = 3)
total_genes = len(gen_dict)
gens_done = 0
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
old_gen = str()
old_transcript_ID = str()
representative_transcript = dict()
representative_trasnscript_not_found = True
longest_transcript_ID = str()
current_exon_length = 0
longest_transcript = 0
percentage_done = 0
for entry in f:
try:
corrent_gen = te.gene_ID_finder(entry)
except:
corrent_gen = old_gen
#The block above test if there is a gen name in the entry
if corrent_gen != old_gen:
representative_trasnscript_not_found = True
#The block above determines if the Gen name is new and set the test
#representative_trasnscript_not_found back to true which is used to
#make the program faster if there is just one transcript for a given
#gen in the dict
if representative_trasnscript_not_found and corrent_gen != str():
#print(corrent_gen)
#The conditon prvents serges if a representative transcript has
#all ready been chosen
if corrent_gen != old_gen:
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
try:
del gen_dict[old_gen]
old_gen = corrent_gen
gens_done += 1
corrent_percentage_done = (gens_done/total_genes)*100
if corrent_percentage_done > percentage_done+10:
bar,start_time = te.bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar)
percentage_done = int(corrent_percentage_done)
except:
old_gen = corrent_gen
longest_transcript = 0
#The block above adds the transcript of the last gen that
#had the longest exons into the representative transcripts dict
try:
#This try / except block test if the gen is in the input dictionary
transcript_IDs = gen_dict[corrent_gen]
if len(gen_dict[corrent_gen]) == 1:
#This conditions is a short cut for Genes that
#allready have a representative transcript
representative_transcript=gen_dict[corrent_gen[0]]
representative_trasnscript_not_found = False
continue
except:
continue
try:
current_transcript_ID = te.transcript_ID_finder(entry)
except:
continue
#The block above searches for a transcript ID in the current entry
if current_transcript_ID in transcript_IDs:
#This condition test if the Transcript is one of the
#candidates for representative transcripts
if current_transcript_ID != old_transcript_ID:
#This condition if the enty still belongs to the
#previous transcript and is triggers if that is not the case
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
try:
transcript_IDs.remove(old_transcript_ID)
old_transcript_ID = current_transcript_ID
except:
old_transcript_ID = current_transcript_ID
if exon_fider(entry):
exon_length = exon_length_calculator(entry)
current_exon_length += exon_length
else:
continue
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
del representative_transcript[str()]
te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
return(representative_transcript)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene and returns an dictionary {gene_ID : transcript_ID}.
Expected inputs:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = current work directory #path for files
gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]}"""
print("Representative trascipts are filterd based on exon length please wait...")
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict)
print("\nRepresentative transcripts collected")
return representative_transcript
if __name__ == "__main__":
# te.version_control(module_list,modul_name_list,python_version)
exon_length_filter()
# This line allows the file to be executed on its own also from
import pytest
import representative as repr
import numpy as np
import test_Functions as tFun
def test_import_gtfSelection_to_df():
"""
Test if gencode.vM31.annotation_intermediat_file.txt
is imported in the correct pandas df format
Args:
None
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
df = repr.import_gtfSelection_to_df(path)
datatype = {'Gene': np.dtype('O'), 'Transcript': np.dtype('O'),
'Support_level': np.dtype('float64')}
assert tFun.column_number(df) == (
3, "number of columns is not equal to 3")
assert tFun.column_dType(df) == (
datatype, "at lease one column has the wrong datatype")
assert tFun.duplicated_rows(df).empty, "at lease one row are duplicated "
assert tFun.NA_value(df) == 0, "at lease one row contain NA values "
with pytest.raises(TypeError, match=r"Only str path is allowed"):
repr.import_gtfSelection_to_df(123)
def test_representative_transcript_inDict():
"""
Test if df generated by "import_gtfSelection_to_df()" output
a dict in the right format
Args:
Pandas dataframe with [Gene, Transcript, Support_level]
as columns, validated with test_import_gtfSelection_to_df()
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
df = repr.import_gtfSelection_to_df(path)
dict_to_test = repr.representative_transcripts_inDict(df)
dict_expected = {
'ENSMUSG00000024691': ['ENSMUST00000025595.5'],
'ENSMUSG00000063683': ['ENSMUST00000044976.12',
'ENSMUST00000119960.2'],
'ENSMUSG00000079415': ['ENSMUST00000112933.2']}
assert dict_to_test == dict_expected
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict(123)
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict("hello")
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict(["hello", "world", 123])
with pytest.raises(TypeError, match=r"Only pandas DataFrame is allowed"):
repr.representative_transcripts_inDict({"hello": "world",
"bonjour": ["le monde", 123]})
def test_find_repr_by_SupportLevel():
"""
Test if the correct dict is generated from
gencode.vM31.annotation_intermediat_file.txt
Args:
None
Returns:
Assert results
Raises:
None
"""
path = tFun.find_path_intermediateFile()
dict_to_test = repr.find_repr_by_SupportLevel(path)
dict_expected = {
'ENSMUSG00000024691': ['ENSMUST00000025595.5'],
'ENSMUSG00000063683': ['ENSMUST00000044976.12',
'ENSMUST00000119960.2'],
'ENSMUSG00000079415': ['ENSMUST00000112933.2']}
assert dict_to_test == dict_expected
test_representative_transcript_inDict()
test_find_repr_by_SupportLevel()
test_import_gtfSelection_to_df()
print("test_representative is done ! No error was found")
'''
This part of the code take as input a gtf modified file
and return a dictionary of transcripts with best
support level for each gene of the input
'''
import pandas as pd
# import os
def import_gtf_selection_to_df(gtf_modified_file: str) -> pd.DataFrame:
"""Import intermediate file from gtf and create a df
Args:
gtf_modified_file (str) : path to the intermediate file
Returns:
Pandas dataframe having Gene, transcript
and support level as columns
Raises:
TypeError : Only str path is allowed
"""
if not isinstance(gtf_modified_file, str):
raise TypeError("Only str path is allowed")
df_input = pd.read_csv(
gtf_modified_file, sep='\t', lineterminator='\n',
names=["Gene_mixed", "Transcript", "Support_level", "Na1", "Na2"]
)
df_input["Support_level"] = df_input["Support_level"].replace(" ", "")
df_input["Gene"] = df_input["Gene_mixed"].str.extract(
r'([A-Z]\w{0,})', expand=True # noqa: W605
)
df_input["Transcript_number"] = df_input["Gene_mixed"].str.extract(
r'(^\d)', expand=True # noqa: W605
)
df_clean = df_input.loc[:, ["Gene", "Transcript", "Support_level"]]
df_clean["Gene"] = df_clean["Gene"].fillna(method='ffill')
df_clean = df_clean.dropna(axis=0)
return df_clean
def representative_transcripts_in_dict(
df_gtf_selection: pd.DataFrame) -> pd.DataFrame:
"""Return a dict containing for each gene transcripts
with highest confidence level
Args:
df_gtf_selection (str): Pandas dataframe having Gene,
transcript and support level as columns
Returns:
Dict {'Gene':['transcriptA', 'transcriptB'], ...}
Raises:
TypeError : Only pandas DataFrame is allowed
"""
if not isinstance(df_gtf_selection, pd.DataFrame):
raise TypeError("Only pandas DataFrame is allowed")
df_min = df_gtf_selection[
df_gtf_selection["Support_level"] ==
df_gtf_selection.groupby("Gene")["Support_level"].transform(min)
]
df_final = df_min.drop(columns=["Support_level"])
dict_representative_transcripts = df_final.groupby("Gene")[
"Transcript"].apply(list).to_dict()
return dict_representative_transcripts
def find_repr_by_support_level(intermediate_file: str) -> dict[str, str]:
"""Combine functions import_gtf_selection_to_df()
and representative_transcripts_in_dict()
Args:
intermediate_file : path to the intermediate file
Returns:
Dict {'Gene':['transcriptA', 'transcriptB'], ...}
Raises:
None
"""
df_gtf = import_gtf_selection_to_df(intermediate_file)
dict_repr_trans = representative_transcripts_in_dict(df_gtf)
return dict_repr_trans
# if __name__ == "__main__":
# find_repr_by_support_level()
#### Transcript extractor #####
"""Transcript extractor
Version 1.2.0"""
### Called Packages ###
import re
import os
import time
python_version = "3.7.13"
module_list =[re,os,time]
modul_name_list = ["re","os","time"]
### Functions ###
def version_control(module_list,modul_name_list,python_version):
with open("required.txt","a") as req:
for i in range(len(module_list)):
try:
version = module_list[i].__version__
entry = modul_name_list[i]+"\t"+str(version)+"\n"
req.write(entry)
except:
version = python_version
entry = modul_name_list[i]+"\t"+str(version)+"\n"
req.write(entry)
def __parameter_editor(file_name,source_pathway_name,deposit_pathway_name):
"""This function allows for changing the parameters after running the program"""
while True:
print("The program will run with the following parameters:\nFile name:\t\t",file_name,"\nSource pathway:\t",source_pathway_name,"\nDeposit pathway:\t",deposit_pathway_name,"\n")
parameter_conformation = input("To continue with these parameters input [continue or c] to change them input [edit]\n>")
if parameter_conformation == "continue"or parameter_conformation =="c":
break
elif parameter_conformation == "edit":
#edit the parameters
while True:
change_question = input("select the parameter you want to change [nfile/spath/dpath] or input [b] to go back\n>")
if change_question == "nfile":
#This condition allows the user to chenge the file name
file_name = input("Please input the new file name\n>")
break
elif change_question == "spath":
#This condition allows the user to change the source path
source_pathway_name = input("Please input the new source path\n>")
does_source_pathway_exist = os.path.exists(source_pathway_name)
if does_source_pathway_exist:
break
else:
print("The new source pathway:",source_pathway_name,"does not exist\nThe source pathway was returned to default:",os.getcwd())
source_pathway_name = os.getcwd()
elif change_question == "dpath":
#This condition allows the user to change output file location
deposit_pathway_name = input("Please input the new output file path name\n>")
does_deposit_pathway_exist = os.path.exists(deposit_pathway_name)
if does_deposit_pathway_exist:
break
else:
print("The new deposit pathway:",deposit_pathway_name,"does not existe\nThe deposit pathway was returnt to default:",source_pathway_name)
deposit_pathway_name = source_pathway_name
#The block above test if the new deposit pathway is valid
elif change_question == "b":
# This condition allows the user to return to the main loop
break
else:
#This condition covers all non valid inputs into the secund loop
print("The input",change_question,"is not valid. Please use one of the specified commands")
else:
#This condition covers all non valid input for the main loop
print("The input",parameter_conformation,"is not valide please use one of the specified comands\n")
return(file_name,source_pathway_name,deposit_pathway_name)
def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()):
"""This function searches for preexisting files of the same name as the results file of the current program. It allows the user to choose to move on with the pre-existing file """
File_of_same_name_found = False
generat_new_file = False
directory_content = os.listdir(deposit_pathway_name)
for file in directory_content:
if file == file_name:
while True:
File_found_input = input (file_name+" has allready been generated\nDo you want to generate a new one [y/n] \n>")
if File_found_input == "n":
File_of_same_name_found = True
break
elif File_found_input == "y":
generat_new_file = True
break
else:
print("Invalid input\nPlease press [y] if you want to generate a new file or [n] if you want to use the preexisting file")
break
else:
continue
if File_of_same_name_found:
print("No new file will be generated, the program can continue")
elif generat_new_file:
print("A new file will be generated please wait...\n")
else:
print("No pre-existing file of the relevant type has been found.\nA new file will be generated please wait...\n")
return(File_of_same_name_found)
def bar_builder(percentage = 0,length_multiplyer = 2,start_time = time.time(),bar = str()):
"""This function creates a loading bar that can load in 10% increments starting a 0% and ending at 100%
Expected inputs:
percentage: int between 0 and 100 in steps of 10; default = 0 #defines the current loading increment
length_multiplyer: int > 0 ; default = 2 #determiens the amount of symbols per loading increment
start_time: any int ; default= time.time() #for determening loading time
bar: str ; default = str()#input of the current bar status does not need to be defined if for the 0% increment
"""
if percentage == 100:
bar = bar.replace("-","#")
print("\r"+bar+"\t"+"100%\t\t"+str(int(time.time()-start_time)))
elif percentage > 0:
bar = bar.replace("-","#",length_multiplyer)
print("\r"+bar+"\t"+str(percentage)+"%", end='',flush=True)
elif percentage == 0:
bar = "["+"-"*length_multiplyer*10+"]"
print(bar+"\t", end='',flush=True)
return(bar,start_time)
def __test_file_name(file_name,source_pathway_name = os.getcwd()):
"""This function validates that the source file exists at the source path. It turns the file name input in a standardized format that can be used in the next steps"""
directory_content = os.listdir(source_pathway_name)
index_of_the_dot = file_name.rfind(".")
valide_source_file = False
validate_source_file = True
if index_of_the_dot ==-1:
file_name += ".gtf"
else:
source_file_typ = file_name[index_of_the_dot:]
not_a_file_type = re.compile(".\d{1,13}")
try_not_a_file_type = not_a_file_type.search(source_file_typ)
if source_file_typ == ".gtf":
file_name = file_name
elif try_not_a_file_type:
file_name += ".gtf"
else:
print("This program can not handle",source_file_typ,"files. \nplease use a .gtf file" )
validate_source_file = False
#The block above tests if the file_name includes the file type and if no
#file type is found adds ".gtf" und if a non ".gtf" file is found gives an error
if validate_source_file:
for file in directory_content:
if file == file_name:
valide_source_file = True
break
#The block above tests if a file on the given name is in the given directora
if valide_source_file:
print("The file:",file_name,"has been found.\n")
else:
print("No .gtf file of the name",file_name,"has been found in this pathway")
#The bock above gives feed back regarding the results of the file test
file_name = file_name.replace(".gtf","")
#This line normalizes the file name
return(valide_source_file,file_name)
def __do_pathways_exist__(source_pathway_name,deposit_pathway_name):
"""This funtion tests that the entered pathways actualy exist"""
does_source_pathway_exist = os.path.exists(source_pathway_name)
does_deposit_pathway_exist = os.path.exists(deposit_pathway_name)
#The Block above does the actual testing
if does_source_pathway_exist:
source_pathway_name = source_pathway_name
else:
print("The source pathway:",source_pathway_name,"has not been found\nThe source pathway was set to the default")
source_pathway_name = os.getcwd()
#The block above detail the possible reactions for the source pathe existing or not existing
if does_deposit_pathway_exist:
deposit_pathway_name = deposit_pathway_name
else:
print("The deposit pathway:",deposit_pathway_name,"has not been found\nThe deposit pathway was set to the default")
deposit_pathway_name = source_pathway_name
#The block above details the possible reactions for the deposit pathway existing or not existing
return(source_pathway_name,deposit_pathway_name)
def gene_ID_finder(entry):
"""This function is supposed to find the gene ID of a known gene entry
Expected inputs:
entry: str #a line from a gtf file that contains a gene ID"""
index_gene_id = entry.find("gene_id")
find_gene_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_gene_id:]
try_find_gene_id_name = find_gene_id_name.search(sub_entry)
gene_ID = try_find_gene_id_name[0].replace("\"","")
return (gene_ID)
def transcript_ID_finder (entry):
"""This function is supposed to finde the transcript ID in a known transcript entry
Expected inputs:
entry: str #a line from a gtf file that contains a transcript ID"""
index_transcript_id = entry.find("transcript_id")
find_transcript_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_transcript_id:]
try_find_transcript_id_name = find_transcript_id_name.search(sub_entry)
try:
transcript_ID = try_find_transcript_id_name[0].replace("\"","")
except:
transcript_ID = ""
return (transcript_ID)
def transcript_support_level_finder(entry):
"""This function is supposed to find the transcript support level in a known transcript entry
Expected input:
entry: str #a line from a gtf file that be blongs to a transcript"""
transcript_support_level_start_ID = entry.find("transcript_support_level")
sub_entry = entry[transcript_support_level_start_ID:]
try:
score_finder = re.compile("\W\w{1,16}\W{2}")
try_score_finder = score_finder.search(sub_entry)
Pre_score_1 = try_score_finder[0]
Pre_score_2 = Pre_score_1.replace("\"","")
Pre_score_2 = Pre_score_2.replace("(","")
transcript_support_level = Pre_score_2.replace(";","")
if "NA" in transcript_support_level:
transcript_support_level = 100
#I changed This tell laura
except:
transcript_support_level = 100
return (transcript_support_level)
def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name):
"""This function extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt.
Expected input:
file_name: str #the name of the gft file you want to look at without the .gtf part
source_pathway_name: str #path of the gtf file
deposit_pathway_name: str #path for saving the intermediat file"""
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
total_entrys =len(f.readlines())
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
current_entry = 0
percentage_done = 0
bar,start_time = bar_builder(length_multiplyer = 3)
Old_gen_ID = str()
#stand-in as the first couple entrys are not genes
with open(os.path.join(deposit_pathway_name,file_name+"_"+"intermediate_file"+".txt"),"w") as IMF:
transcript_number = 0
for entry in f:
current_entry += 1
current_percentage_done = 100* current_entry/total_entrys
if current_percentage_done > percentage_done +10:
bar,start_time = bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar)
percentage_done = int(current_percentage_done)
if "gene_id" in entry:
Gen_ID = gene_ID_finder(entry)
else:
Gen_ID = Old_gen_ID
if Gen_ID != Old_gen_ID:
Gen_entry = ">"+ Gen_ID +"\n"
IMF.write(Gen_entry)
transcript_number = 0
Old_gen_ID = Gen_ID
if "\ttranscript\t" in entry:
transcript_number += 1
Transcript_ID = transcript_ID_finder(entry)
#the function that determins the transcript ID is called
transcript_support_level = transcript_support_level_finder(entry)
#the function that determins the transcript support level is called
New_entry = str(transcript_number)+"\t"+str(Transcript_ID)+"\t"+str(transcript_support_level)+"\t"+"\t\n"
IMF.write(New_entry)
bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
print("The transcripts have been collected")
def extract_transcript(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = False,Input_free = False):
""" This it the overall exetutable funtion that will execute the transcript extraction process for a given file with all checks.
Expected input:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = source_pathway_name #path for saving the intermediat file
Outputs:
file_name: str
source_pathway_name: str
deposit_pathway_name: str
"""
if deposit_pathway_name == False:
deposit_pathway_name = source_pathway_name
if Input_free:
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
else:
file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name)
source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
if validated_file_name[0]:
if __searche_for_preexisting_files(file_name+"_intermediate_file.txt",deposit_pathway_name):
print("The transcripts has been collected\n")
else:
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
return(file_name,source_pathway_name,deposit_pathway_name)
#### Dev part ####
if __name__ == "__main__":
#version_control(module_list,modul_name_list,python_version)
extract_transcript()
#This line allows the file to be executed on its own also from
This diff is collapsed.
import pandas as pd
import numpy as np
import logging
from gtfparse import read_gtf
LOG = logging.getLogger(__name__)
def attributes_converter(attributes: str) -> list:
"""
This funtion converts the "unstructured" ;-seperated part of he line into
a list of identifiers and corresponding data, the structure of
which can be used ot find the data easily e.g the index of the identifier
transcript_id + 1 will give the transcript id of the current gene
Input:
attributes = str() # the unstructured part of the entry
Output:
attributes = list() # cleaned list with the characteristics described
"""
attributes = (
attributes.replace('"', "")
.replace(";", "")
.replace("\\n", "")
.split(" ")
)
return attributes
def find_in_attributes(attributes: list, look_for: str) -> str:
"""
This function finds a keyword and used that to locate the value of that
keyword e.g key = gene_id, value = 'ENSMUSG00002074970',
this works as they are next to each other in the attributes list.
Inputs:
attributes = list()
look_for = str() # string of the name of the key to look for
Output:
attributes[index] or NA = str() # NA is returned if the key
was not found in the attributes
"""
if look_for in attributes:
index = attributes.index(look_for) + 1
return attributes[index]
else:
LOG.warning(f'No {look_for} in the entry, the return was set to NA')
return "NA"
def _re_format(rep_trans_dict: dict) -> dict:
"""
This function is meant to reformat dictionary of the representative
transcripts into an dictionary with only one entry per key
Input:
rep_trans_dict = {gene_id : [
transcript_id, transcript_support_level, transcript_length]}
Output:
rep_transcripts = {gene_id : transcript_id}
"""
rep_transcripts = dict()
for gene_id in rep_trans_dict:
rep_transcripts[gene_id] = rep_trans_dict[gene_id][0]
return rep_transcripts
def get_rep_trans(file_name: str = "test.gtf") -> dict:
"""
This is the main function of this script. It selects one representative transcript per gene based on a GTF annotation file.
It does so by two criteria: the transcript support level and if there are several transcripts of one gene that have the same transcript_support_level, it chooses the one that corresponds to the longest mRNA.
Args:
file_name (str): Name of the annotation file with or without the .gtf extension.
Returns:
rep_transcripts (dict): Dictionary of gene_id to transcript_id representing the selected representative transcripts.
Raises:
ValueError: If an unexpected entry is encountered in the GTF file.
"""
# setting default variables
rep_transcripts = {}
cur_gID = ""
cur_best_trans = ["", 100, 0] # [transcript_id, transcript_support_level, transcript_length]
with open(file_name, "r") as f:
for line in f:
entry = line.split("\t")
# removes expected but unneeded entries
if len(entry) == 1 or entry[2] in [
"CDS",
"stop_codon",
"five_prime_utr",
"three_prime_utr",
"start_codon",
"Selenocysteine"
]:
continue
# this function turns the less organized part of the entry
# into a readable list
attributes = attributes_converter(entry[8])
# looking for and processing exons entries
if entry[2] == "exon":
if ignor_trans:
continue
elif cur_gID != attributes[1]:
LOG.error()
raise ValueError("Exon from an unexpected gene")
elif find_in_attributes(attributes, "transcript_id") != cur_tID:
LOG.error()
raise ValueError("Exon from an unexpected transcript")
# adding the length of the exon to the appropriate list and
# checking for changes in best transcript
if pot_best_trans:
pot_best_trans[2] += int(entry[4]) - int(entry[3])
if pot_best_trans[2] > cur_best_trans[2]:
cur_best_trans = pot_best_trans
pot_best_trans = False
else:
cur_best_trans[2] += int(entry[4]) - int(entry[3])
# looking for and processing transcript entries
elif entry[2] == "transcript":
# verify that the gen is correct
if cur_gID != attributes[1]:
LOG.error()
raise ValueError("Transcript from an unexpected gene")
# finding the transcript id and the support level
cur_tID = find_in_attributes(attributes, "transcript_id")
t_supp_lvl = find_in_attributes(attributes, "transcript_support_level")
# If there is no transcript support level or the level is
# given as NA it is nomed as 100. else the transcript
# support level is turned into int
if t_supp_lvl == "NA":
t_supp_lvl = 100
else:
if t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl)
else:
t_supp_lvl = 100
# decides if the transcript has potential to become the
# representative transcript
if t_supp_lvl < cur_best_trans[1] or cur_best_trans[0] == "":
cur_best_trans = [cur_tID, t_supp_lvl, 0]
pot_best_trans = False
ignor_trans = False
elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_tID, t_supp_lvl, 0]
else:
ignor_trans = True
# looking for and processing gene entries
elif entry[2] == "gene":
# updating rep_transcripts dict
if cur_gID in rep_transcripts:
if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]):
rep_transcripts[cur_gID] = cur_best_trans
else:
rep_transcripts[cur_gID] = cur_best_trans
# updating cur_gID and resetting cur_best_trans
cur_gID = attributes[1]
cur_best_trans = ["", 100, 0]
# raises an error for unidentifiable entries
else:
LOG.error()
raise ValueError("This entry could not be identified")
# adding the final gene to the dictionary
if cur_gID in rep_transcripts:
if rep_transcripts[cur_gID][1] > cur_best_trans[1] or (rep_transcripts[cur_gID][1] == cur_best_trans[1] and rep_transcripts[cur_gID][2] < cur_best_trans[2]):
rep_transcripts[cur_gID] = cur_best_trans
else:
rep_transcripts[cur_gID] = cur_best_trans
del rep_transcripts[""]
rep_transcripts = _re_format(rep_transcripts)
return rep_transcripts
def _test():
"""
This funtion is meant to be run for test
Output:
file with the dictionary generated based on the test file
"""
file_name = "test.gtf"
rt = get_rep_trans(file_name)
expected_result = {
"ENSG00000160072": "ENST00000472194",
"ENSG00000234396": "ENST00000442483",
"ENSG00000225972": "ENST00000416931",
"ENSG00000224315": "ENST00000428803",
"ENSG00000198744": "ENST00000416718",
"ENSG00000279928": "ENST00000624431",
"ENSG00000228037": "ENST00000424215",
"ENSG00000142611": "ENST00000378391",
}
if rt != expected_result:
print("The test fail due to not yieding the same results")
print("The results the program got\n", rt)
print("The expected results\n", expected_result)
else:
print("The test was succsesfull")
def gtf_file_writer(original_file: str, rep_transcript_dict: dict, output_file: str):
"""
This function writes the output GTF file.
"""
output = []
with open(original_file, "r") as f:
for line in f:
if line.startswith("#"):
continue
entry = line.split("\t")
attributes = attributes_converter(entry[8])
feature_type = entry[2]
if feature_type == "gene":
gene_id = find_in_attributes(attributes, "gene_id")
output.append(line)
else:
transcript_id = find_in_attributes(attributes, "transcript_id")
if gene_id in rep_transcript_dict and rep_transcript_dict[gene_id] == transcript_id:
output.append(line)
with open(output_file, "w") as last_file:
last_file.writelines(output)
def gtf_to_df(gtf_file: str) -> pd.DataFrame:
"""
This function takes a .gtf file and converts it into a pandas DataFrame
containing gene_id and their transcript_id.
Args:
gtf_file (str): Path to the .gtf file.
Returns:
df_gtf (pd.DataFrame): Pandas DataFrame containing columns 'Gene' and 'Transcript'.
Raises:
None
"""
df_gtf = read_gtf(gtf_file,).to_pandas()
df_gtf = df_gtf[df_gtf["feature"] == "transcript"]
df_gtf = df_gtf[["gene_id", "transcript_id"]]
df_gtf = df_gtf.rename(columns={"gene_id": "Gene", "transcript_id": "Transcript"})
return df_gtf
def dict_reprTrans_to_df(dict_reprTrans: "dict[str, str]") -> pd.DataFrame:
"""
Convert a dictionary of genes and their representative transcript into a DataFrame.
Args:
dict_reprTrans (dict): {'Gene': ['transcriptA', 'transcriptB'], ...}
Returns:
Pandas DataFrame with 'Gene' and 'Transcript' as columns.
Raises:
TypeError: Only dictionaries are allowed.
TypeError: Keys should be strings.
TypeError: Values should be strings.
"""
if not isinstance(dict_reprTrans, dict):
LOG.error()
raise TypeError("Only dictionaries are allowed")
if not all(isinstance(key, str) for key in dict_reprTrans.keys()):
LOG.error()
raise TypeError("Keys should be strings")
if not all(isinstance(value, str) for value in dict_reprTrans.values()):
LOG.error()
raise TypeError("Values should be strings")
df_reprTrans = pd.DataFrame.from_dict(dict_reprTrans, orient="index", columns=["reprTranscript"])
df_reprTrans = df_reprTrans.reset_index()
df_reprTrans.columns = ["Gene", "reprTrans"]
df_reprTrans["reprTrans"] = df_reprTrans["reprTrans"].str.replace(r"\.[1-9]", "", regex=True)
return df_reprTrans
def tsv_or_csv_to_df(input_txt: str) -> pd.DataFrame:
"""
Convert a TSV or CSV file into a pandas DataFrame.
Args:
input_txt (str): TSV or CSV file containing transcript expression levels.
Returns:
df_gene (pd.DataFrame): Pandas DataFrame with 'Transcript' and 'Expression_level' as columns.
Raises:
None
"""
df_input = pd.read_csv(
input_txt,
sep=r"[\t,]",
lineterminator="\n",
names=["Transcript", "Expression_level"],
engine="python",
)
return df_input
def exprLevel_byGene(
df_exprTranscript: pd.DataFrame, df_output_gtf_selection: pd.DataFrame
) -> pd.DataFrame:
"""
Find the gene of each transcript given by the expression level CSV/TSV file
and sum the expression level of all transcripts from the same gene.
Args:
df_exprTranscript (pd.DataFrame): Pandas DataFrame containing transcripts and their expression levels,
generated by the "tsv_or_csv_to_df" function.
df_output_gtf_selection (pd.DataFrame): Pandas DataFrame containing genes and transcripts,
generated by the "transcripts_by_gene_inDf" function.
Returns:
Pandas DataFrame having 'Gene' and sum of its transcript expression levels.
Raises:
None
"""
df_merged = pd.merge(df_output_gtf_selection, df_exprTranscript, how="inner", on="Transcript")
df_sum = df_merged.groupby("Gene")["Expression_level"].sum().reset_index()
return df_sum
def match_byGene(
df_reprTranscript: pd.DataFrame, df_expressionLevel_byGene: pd.DataFrame
) -> pd.DataFrame:
"""
Find matching genes between the two DataFrames.
Args:
df_reprTranscript (pd.DataFrame): Pandas DataFrame containing genes and their representative transcripts,
generated by the "dict_reprTrans_to_df()" function.
df_expressionLevel_byGene (pd.DataFrame): Pandas DataFrame containing genes and their expression levels,
generated by the "transcript_by_gene_inDf()" function.
Returns:
Pandas DataFrame having representative transcripts and their expression levels.
Raises:
None
"""
df_merged = pd.merge(df_reprTranscript, df_expressionLevel_byGene, how="inner", on="Gene")
df_clean = df_merged.loc[:, ["reprTrans", "Expression_level"]]
return df_clean
# functions to run this part of the program
def match_reprTranscript_expressionLevel(
exprTrans: str, dict_reprTrans: dict, gtf_file: str,
):
"""
Combine functions to replace transcripts from an expression level CSV/TSV file with representative transcripts.
Args:
exprTrans (str): CSV or TSV file containing transcripts and their expression level.
dict_reprTrans (dict): Dictionary of genes and their representative transcripts.
gtf_file (str): Path to the GTF file.
Returns:
Pandas DataFrame of representative transcripts and their expression level.
Raises:
None
"""
df_gene_transcript = gtf_to_df(gtf_file)
df_exprTrans = tsv_or_csv_to_df(exprTrans)
df_reprTrans = dict_reprTrans_to_df(dict_reprTrans)
df_exprLevel_byGene = exprLevel_byGene(df_exprTrans, df_gene_transcript)
df_match = match_byGene(df_reprTrans, df_exprLevel_byGene)
df_match.rename(columns={"reprTrans": "id", "Expression_level": "level"}, inplace=True)
return df_match
def transcript_sampling(total_transcript_number, df_repr, output_csv):
total = df_repr["level"].sum()
total_transcript_number = int(total_transcript_number)
normalized = total_transcript_number / total
levels = np.random.poisson(df_repr["level"] * normalized)
transcript_numbers = pd.DataFrame({"id": df_repr["id"], "count": levels})
transcript_numbers.to_csv(output_csv, index=False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment