Skip to content
Snippets Groups Projects
Commit 7d389a97 authored by Jakob Rien's avatar Jakob Rien
Browse files

Replace exon_length_filter.py

parent 13c2c54f
Branches
No related tags found
No related merge requests found
#### Exon length filter ##### #### Exon length filter #####
"""Exon length filter """Exon length filter
Version 1.1.0""" Version 1.1.0"""
### Called Packages ### ### Called Packages ###
import re import re
import os import os
import transcript_extractor as te import transcript_extractor as te
### Functions ### ### Functions ###
def exon_length_calculator(entry): def exon_length_calculator(entry):
"""This function finds the start and end cordinates of the exon and uses them to calculate its length""" """This function finds the start and end cordinates of the exon and uses them to calculate its length"""
try: try:
find_exon_coordinates = re.compile("\t\d{1,15}\t") find_exon_coordinates = re.compile("\t\d{1,15}\t")
#this difines the pattern of the coordinates #this difines the pattern of the coordinates
try_find_start_coordinates = find_exon_coordinates.search(entry) try_find_start_coordinates = find_exon_coordinates.search(entry)
#this line findes the start coordinares based on the pattern #this line findes the start coordinares based on the pattern
start_coordinates = int(try_find_start_coordinates[0].replace("\t","")) start_coordinates = int(try_find_start_coordinates[0].replace("\t",""))
#this line removes the \t at the end and the start of the pattern and #this line removes the \t at the end and the start of the pattern and
#turn the string of the coordinates into intergers #turn the string of the coordinates into intergers
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1 final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
#this line determines the indes of the final digit of the start coordinates #this line determines the indes of the final digit of the start coordinates
sub_entry = entry[final_index_start_coordinates:] sub_entry = entry[final_index_start_coordinates:]
#this lineused the index determin above a starting point for a new sub entry #this lineused the index determin above a starting point for a new sub entry
try_find_end_coordinates = find_exon_coordinates.search(sub_entry) try_find_end_coordinates = find_exon_coordinates.search(sub_entry)
end_coordinates = int(try_find_end_coordinates[0].replace("\t","")) end_coordinates = int(try_find_end_coordinates[0].replace("\t",""))
#these two lines find the end coordinates and turn tham int an int #these two lines find the end coordinates and turn tham int an int
exon_length = end_coordinates-start_coordinates exon_length = end_coordinates-start_coordinates
#this line claculates the transcript length #this line claculates the transcript length
except: except:
print("\n\nIn the following enty only one or no valid coordinates could be found:\n",entry,"the value will be set to NA") print("\n\nIn the following enty only one or no valid coordinates could be found:\n",entry,"the value will be set to NA")
exon_length = "NA" exon_length = "NA"
return(exon_length) return(exon_length)
def exon_fider(entry): def exon_fider(entry):
"""This funtion determines if a given entry belongs to an exon """This funtion determines if a given entry belongs to an exon
Expected inputs: Expected inputs:
entry: str #any enty of a gtf file""" entry: str #any enty of a gtf file"""
exon_test = entry.find("\texon\t") exon_test = entry.find("\texon\t")
#This line look for the entry exon in the file #This line look for the entry exon in the file
if exon_test == -1: if exon_test == -1:
try_exon_test = False try_exon_test = False
else: else:
try_exon_test = True try_exon_test = True
#The block above evaluates the results of the search for the wort exon #The block above evaluates the results of the search for the wort exon
return(try_exon_test) return(try_exon_test)
def __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID): def __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID):
"""This funtion encapsulates an operation that has to be carried out at several points in the exon_length_filter function and serves to make that function more modular""" """This funtion encapsulates an operation that has to be carried out at several points in the exon_length_filter function and serves to make that function more modular"""
if current_exon_length > longest_transcript: if current_exon_length > longest_transcript:
#This condition updates the most promesing for #This condition updates the most promesing for
#beeing the representative transcript #beeing the representative transcript
longest_transcript = current_exon_length longest_transcript = current_exon_length
longest_transcript_ID = old_transcript_ID longest_transcript_ID = old_transcript_ID
current_exon_length = 0 current_exon_length = 0
return(current_exon_length,longest_transcript,longest_transcript_ID) return(current_exon_length,longest_transcript,longest_transcript_ID)
def _representative_transcript_csv (representative_transcript,file_name = "test",deposit_pathway_name =os.getcwd()): def _representative_transcript_csv (representative_transcript,file_name = "test",deposit_pathway_name =os.getcwd()):
with open(os.path.join(deposit_pathway_name,file_name+"_"+"representative_transcripts"+".csv"),"w") as rt: with open(os.path.join(deposit_pathway_name,file_name+"_"+"representative_transcripts"+".csv"),"w") as rt:
for i in representative_transcript: for i in representative_transcript:
transcript = representative_transcript[i] transcript = representative_transcript[i]
new_entry = str(i)+","+transcript+"\n" new_entry = str(i)+","+transcript+"\n"
rt.write(new_entry) rt.write(new_entry)
def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}): def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This funtion selects only the transcripts for a dictionary that have the longest total mRNA""" """This funtion selects only the transcripts for a dictionary that have the longest total mRNA"""
bar,start_time = te.bar_builder(length_multiplyer = 3) bar,start_time = te.bar_builder(length_multiplyer = 3)
total_genes = len(gen_dict) total_genes = len(gen_dict)
gens_done = 0 gens_done = 0
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f: with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
old_gen = str() old_gen = str()
old_transcript_ID = str() old_transcript_ID = str()
representative_transcript = dict() representative_transcript = dict()
representative_trasnscript_not_found = True representative_trasnscript_not_found = True
longest_transcript_ID = str() longest_transcript_ID = str()
current_exon_length = 0 current_exon_length = 0
longest_transcript = 0 longest_transcript = 0
percentage_done = 0 percentage_done = 0
for entry in f: for entry in f:
try: try:
corrent_gen = te.gene_ID_finder(entry) corrent_gen = te.gene_ID_finder(entry)
except: except:
corrent_gen = old_gen corrent_gen = old_gen
#The block above test if there is a gen name in the entry #The block above test if there is a gen name in the entry
if corrent_gen != old_gen: if corrent_gen != old_gen:
representative_trasnscript_not_found = True representative_trasnscript_not_found = True
#The block above determines if the Gen name is new and set the test #The block above determines if the Gen name is new and set the test
#representative_trasnscript_not_found back to true which is used to #representative_trasnscript_not_found back to true which is used to
#make the program faster if there is just one transcript for a given #make the program faster if there is just one transcript for a given
#gen in the dict #gen in the dict
if representative_trasnscript_not_found and corrent_gen != str(): if representative_trasnscript_not_found and corrent_gen != str():
#print(corrent_gen) #print(corrent_gen)
#The conditon prvents serges if a representative transcript has #The conditon prvents serges if a representative transcript has
#all ready been chosen #all ready been chosen
if corrent_gen != old_gen: if corrent_gen != old_gen:
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID representative_transcript[old_gen] = longest_transcript_ID
try: try:
del gen_dict[old_gen] del gen_dict[old_gen]
old_gen = corrent_gen old_gen = corrent_gen
gens_done += 1 gens_done += 1
corrent_percentage_done = (gens_done/total_genes)*100 corrent_percentage_done = (gens_done/total_genes)*100
if corrent_percentage_done > percentage_done+10: if corrent_percentage_done > percentage_done+10:
bar,start_time = te.bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar) bar,start_time = te.bar_builder(percentage=percentage_done+10,length_multiplyer = 3,start_time=start_time,bar =bar)
percentage_done = int(corrent_percentage_done) percentage_done = int(corrent_percentage_done)
except: except:
old_gen = corrent_gen old_gen = corrent_gen
longest_transcript = 0 longest_transcript = 0
#The block above adds the transcript of the last gen that #The block above adds the transcript of the last gen that
#had the longest exons into the representative transcripts dict #had the longest exons into the representative transcripts dict
try: try:
#This try / except block test if the gen is in the input dictionary #This try / except block test if the gen is in the input dictionary
transcript_IDs = gen_dict[corrent_gen] transcript_IDs = gen_dict[corrent_gen]
if len(gen_dict[corrent_gen]) == 1: if len(gen_dict[corrent_gen]) == 1:
#This conditions is a short cut for Genes that #This conditions is a short cut for Genes that
#allready have a representative transcript #allready have a representative transcript
representative_transcript=gen_dict[corrent_gen[0]] representative_transcript=gen_dict[corrent_gen[0]]
representative_trasnscript_not_found = False representative_trasnscript_not_found = False
continue continue
except: except:
continue continue
try: try:
current_transcript_ID = te.transcript_ID_finder(entry) current_transcript_ID = te.transcript_ID_finder(entry)
except: except:
continue continue
#The block above searches for a transcript ID in the current entry #The block above searches for a transcript ID in the current entry
if current_transcript_ID in transcript_IDs: if current_transcript_ID in transcript_IDs:
#This condition test if the Transcript is one of the #This condition test if the Transcript is one of the
#candidates for representative transcripts #candidates for representative transcripts
if current_transcript_ID != old_transcript_ID: if current_transcript_ID != old_transcript_ID:
#This condition if the enty still belongs to the #This condition if the enty still belongs to the
#previous transcript and is triggers if that is not the case #previous transcript and is triggers if that is not the case
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
try: try:
transcript_IDs.remove(old_transcript_ID) transcript_IDs.remove(old_transcript_ID)
old_transcript_ID = current_transcript_ID old_transcript_ID = current_transcript_ID
except: except:
old_transcript_ID = current_transcript_ID old_transcript_ID = current_transcript_ID
if exon_fider(entry): if exon_fider(entry):
exon_length = exon_length_calculator(entry) exon_length = exon_length_calculator(entry)
current_exon_length += exon_length current_exon_length += exon_length
else: else:
continue continue
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID) current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID representative_transcript[old_gen] = longest_transcript_ID
del representative_transcript[str()] del representative_transcript[str()]
te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar) te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
return(representative_transcript) return(representative_transcript)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]},Input_free = False): def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]},Input_free = False):
"""This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene ans saves tham in a "," seperated csv file. """This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene ans saves tham in a "," seperated csv file.
Expected inputs: Expected inputs:
file_name: str ; default = test #the name of the gft file you want to look at file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = current work directory #path for saving the csv file deposit_pathway_name: str ; default = current work directory #path for saving the csv file
gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]} gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]}
Input_free: tuple ; default = False # this input should be set to True for automation""" Input_free: tuple ; default = False # this input should be set to True for automation"""
print("Representative trascipts are filterd based on exon length please wait...") print("Representative trascipts are filterd based on exon length please wait...")
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name) source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
if Input_free: if Input_free:
pre_existing_file = False pre_existing_file = False
else: else:
search_profile = file_name+"_"+"representative_transcripts"+".csv" search_profile = file_name+"_"+"representative_transcripts"+".csv"
pre_existing_file = te.__searche_for_preexisting_files(search_profile,deposit_pathway_name) pre_existing_file = te.__searche_for_preexisting_files(search_profile,deposit_pathway_name)
if pre_existing_file == False: if pre_existing_file == False:
representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict) representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict)
_representative_transcript_csv(representative_transcript,file_name,deposit_pathway_name) _representative_transcript_csv(representative_transcript,file_name,deposit_pathway_name)
print("\nRepresentative transcripts collected") print("\nRepresentative transcripts collected")
if __name__ == "__main__": if __name__ == "__main__":
help(exon_length_filter) help(exon_length_filter)
exon_length_filter() exon_length_filter()
#This line allows the file to be executed on its own also from #This line allows the file to be executed on its own also from
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment