Skip to content
Snippets Groups Projects
Commit 83f6b6f3 authored by Laura Urbanska's avatar Laura Urbanska
Browse files

updated filter and transcript extractor files

parent 4b4e11b3
No related branches found
No related tags found
No related merge requests found
### Imports ###
import os
import transkript_extractor as te
import Exon_length_filter as elf
import representative_v4 as rtcl
### Scipt ###
def exe(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = os.getcwd(),Input_free = True):
file_name,source_pathway_name_2,deposit_pathway_name_2 = te.extract_transkript(file_name,source_pathway_name,deposit_pathway_name,Input_free = Input_free)
inter_mediate_file_directory = os.path.join(deposit_pathway_name,file_name+"_intermediate_file.txt")
print("Transcripts are filterd based on transcipt score please wait...")
pre_filter_representative_transcripts_dict = rtcl.find_repr_by_SupportLevel(inter_mediate_file_directory)
print("Transcripts filtered\n")
elf.exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict= pre_filter_representative_transcripts_dict,Input_free = Input_free)
return(file_name,source_pathway_name,deposit_pathway_name)
### from consol ####
##D:\\Uni\\Sem 9\\Programing in the Life sciences\\Projekt\\Intermediat Files
if __name__ == "__main__":
exe()
\ No newline at end of file
#### Exon length filter #####
"""Exon length filter
Version 1.1.0"""
### Called Packages ###
import re
import os
import transcript_extractor as te
import transkript_extractor as te
### Functions ###
def exon_length_calculator(entry):
"""This funtion finds the start and end cordinates of the exon and uses them to calculate its lenght"""
try:
find_exon_coordinates = re.compile("\t\d{1,15}\t")
#this difines the pattern of the coordinates
try_find_start_coordinates = find_exon_coordinates.search(entry)
#this line findes the start coordinares based on the pattern
start_coordinates = int(try_find_start_coordinates[0].replace("\t",""))
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
#this line removes the \t at the end and the start of the pattern and
#turn the string of the coordinates into intergers
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
#this line determines the indes of the final digit of the start coordinates
sub_entry = entry[final_index_start_coordinates:]
#this lineused the index determin above a starting point for a new sub entry
try_find_end_coordinates = find_exon_coordinates.search(sub_entry)
end_coordinates = int(try_find_end_coordinates[0].replace("\t",""))
exon_length = end_coordinates-start_coordinates
#these two lines find the end coordinates and turn tham int an int
exon_lenght = end_coordinates-start_coordinates
#this line claculates the transcript length
except:
print("\n\nIn the following enty only one or no valid coordinates could be found:\n",entry,"the value will be set to NA")
exon_length = "NA"
return(exon_length)
exon_lenght = "NA"
return(exon_lenght)
def exon_fider(entry):
"""This funtion determines if a given entry belongs to an exon"""
"""This funtion determines if a given entry belongs to an exon
Expected inputs:
entry: str #any enty of a gtf file"""
exon_test = entry.find("\texon\t")
#This line look for the entry exon in the file
if exon_test == -1:
try_exon_test = False
else:
try_exon_test = True
#The block above evaluates the results of the search for the wort exon
return(try_exon_test)
def __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID):
"""This function encapsulates an operation that has to be carried out multiple times in the exon_length_filter"""
"""This funtion encapsulates an opperation that has to be carried out at several point ind the exon_length_filter funktion and servers to make that funktion more modular"""
if current_exon_length > longest_transcript:
#This condition updates the most promesing for
#beeing the representative transcript
longest_transcript = current_exon_length
longest_transcript_ID = old_transcript_ID
current_exon_length = 0
return(current_exon_length,longest_transcript,longest_transcript_ID)
def _representative_transcript_csv (representative_transcript,file_name = "test",deposit_pathway_name =os.getcwd()):
with open(os.path.join(deposit_pathway_name,file_name+"_"+"representative_transcripts"+".csv"),"w") as rt:
for i in representative_transcript:
transcript = representative_transcript[i]
new_entry = str(i)+","+transcript+"\n"
rt.write(new_entry)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This function selects only the transcripts that have the longest total mRNA"""
print("Representative transcipts are filtered based on exon length. Please wait...")
def _exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This funtion selects only the transcripts for a dictionar that have the longest total mRNA"""
bar,start_time = te.bar_builder(length_multiplyer = 3)
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
total_genes = len(gen_dict)
gens_done = 0
with open(source_pathway_name+"\\"+file_name+".gtf", 'r') as f:
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
old_gen = str()
old_transcript_ID = str()
......@@ -114,7 +133,7 @@ def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),depo
current_transcript_ID = te.transcript_ID_finder(entry)
except:
continue
#The block above searches for a transcript ID in the current enty
#The block above searches for a trnascript ID in the current enty
if current_transcript_ID in transcript_IDs:
#This condition test if the Transcript is one of the
......@@ -139,6 +158,31 @@ def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),depo
te.bar_builder(100,length_multiplyer = 3,start_time=start_time,bar =bar)
return(representative_transcript)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]},Input_free = False):
"""This function filters a dictionary of genes and there transcripts by the length of there exons an selects the longes transcript for each gene ans saves tham in a "," seperated csv file.
Expected inputs:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = current work directory #path for saving the csv file
gen_dict:dict{key == gene ID:[transcript IDs that belong to that gene]}
Input_free: tuple ; default = False # this input should be set to True for automation"""
print("Representative trascipts are filterd based on exon length please wait...")
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
if Input_free:
pre_existing_file = False
else:
search_profile = file_name+"_"+"representative_transcripts"+".csv"
pre_existing_file = te.__searche_for_preexisting_files(search_profile,deposit_pathway_name)
if pre_existing_file == False:
representative_transcript = _exon_length_filter(file_name,source_pathway_name,deposit_pathway_name,gen_dict)
_representative_transcript_csv(representative_transcript,file_name,deposit_pathway_name)
print("\nRepresentative transcripts collected")
if __name__ == "__main__":
help(exon_length_filter)
exon_length_filter()
#This line allows the file to be executed on its own also from
\ No newline at end of file
#### Transcript extractor #####
"""Transcript extractor
Version 1.1.0"""
### Called Packages ###
import re
import os
......@@ -68,10 +69,9 @@ def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()
generat_new_file = False
directory_content = os.listdir(deposit_pathway_name)
for file in directory_content:
Search_profile = file_name+"_intermediate_file.txt"
if file == Search_profile:
if file == file_name:
while True:
File_found_input = input ("An intermediate file has allready been generated from this file\nDo you want to generate a new one [y/n] \n>")
File_found_input = input (file_name+" has allready been generated\nDo you want to generate a new one [y/n] \n>")
if File_found_input == "n":
File_of_same_name_found = True
break
......@@ -88,10 +88,17 @@ def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()
elif generat_new_file:
print("A new file will be generated please wait...\n")
else:
print("No pre-existing intermediate file based on the currend file have been found.\nA new file will be generated please wait...\n")
print("No pre-existing file of the relevant type has been found.\nA new file will be generated please wait...\n")
return(File_of_same_name_found)
def bar_builder(percentage = 0,length_multiplyer = 2,start_time = time.time(),bar = str()):
"""This function creates a loading bar that can load in 10% increments starting a 0% and ending at 100%
Expected inputs:
percentage: int between 0 and 100 in steps of 10; default = 0 #defines the current loading increment
length_multiplyer: int > 0 ; default = 2 #determiens the amount of symbols per loading increment
start_time: any int ; default= time.time() #for determening loading time
bar: str ; default = str()#input of the current bar status does not need to be defined if for the 0% increment
"""
if percentage == 100:
bar = bar.replace("-","#")
print("\r"+bar+"\t"+"100%\t\t"+str(int(time.time()-start_time)))
......@@ -164,7 +171,9 @@ def __do_pathways_exist__(source_pathway_name,deposit_pathway_name):
return(source_pathway_name,deposit_pathway_name)
def gene_ID_finder(entry):
"""This function is supposed to find the gene ID of a known gene entry"""
"""This function is supposed to find the gene ID of a known gene entry
Expected inputs:
entry: str #a line from a gtf file that contains a gene ID"""
index_gene_id = entry.find("gene_id")
find_gene_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_gene_id:]
......@@ -173,7 +182,9 @@ def gene_ID_finder(entry):
return (gene_ID)
def transcript_ID_finder (entry):
"""This function is supposed to finde the transcript ID in a known transcript entry"""
"""This function is supposed to finde the transcript ID in a known transcript entry
Expected inputs:
entry: str #a line from a gtf file that contains a transcript ID"""
index_transcript_id = entry.find("transcript_id")
find_transcript_id_name = re.compile("\"\S{1,25}\"")
sub_entry = entry[index_transcript_id:]
......@@ -186,7 +197,9 @@ def transcript_ID_finder (entry):
return (transcript_ID)
def transcript_support_level_finder(entry):
"""This function is supposed to find the transcript support level in a known transcript entry"""
"""This function is supposed to find the transcript support level in a known transcript entry
Expected input:
entry: str #a line from a gtf file that be blongs to a transcript"""
transcript_support_level_start_ID = entry.find("transcript_support_level")
sub_entry = entry[transcript_support_level_start_ID:]
......@@ -210,10 +223,15 @@ def transcript_support_level_finder(entry):
def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name):
"""This functi extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt. It only works in the directory of the skript at this point"""
with open(source_pathway_name+"\\"+file_name+".gtf", 'r') as f:
"""This function extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt.
Expected input:
file_name: str #the name of the gft file you want to look at without the .gtf part
source_pathway_name: str #path of the gtf file
deposit_pathway_name: str #path for saving the intermediat file"""
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
total_entrys =len(f.readlines())
with open(source_pathway_name+"\\"+file_name+".gtf", 'r') as f:
with open(os.path.join(source_pathway_name,file_name+".gtf"), 'r') as f:
current_entry = 0
percentage_done = 0
bar,start_time = bar_builder(length_multiplyer = 3)
......@@ -221,7 +239,7 @@ def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name):
Old_gen_ID = str()
#stand-in as the first couple entrys are not genes
with open(deposit_pathway_name+"\\"+file_name+"_"+"intermediate_file"+".txt","w") as IMF:
with open(os.path.join(deposit_pathway_name,file_name+"_"+"intermediate_file"+".txt"),"w") as IMF:
transcript_number = 0
for entry in f:
......@@ -255,20 +273,33 @@ def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name):
print("The transcripts have been collected")
def extract_transkript (file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = True):
def extract_transkript (file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = False,Input_free = False):
"""This it the overall exetutable funtion that will execute the transcript extraction process for a given file with all checks.
The default file name is "test". This function will also return the file name, the source pathway and the depisti pathway that have been used to generate the intermediat file"""
if deposit_pathway_name and type(deposit_pathway_name) != str :
deposit_pathway_name = source_pathway_name
file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name)
source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
if validated_file_name[0]:
if __searche_for_preexisting_files(file_name,deposit_pathway_name):
print("The transcripts has been collected\n")
else:
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
Expected input:
file_name: str ; default = test #the name of the gft file you want to look at
source_pathway_name: str ; default = current work directory #path of the gtf file
deposit_pathway_name: str ; default = source_pathway_name #path for saving the intermediat file
Outputs:
file_name: str
source_pathway_name: str
deposit_pathway_name: str"""
if deposit_pathway_name == False:
deposit_pathway_name = source_pathway_name
if Input_free:
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
else:
file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name)
source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
validated_file_name = __test_file_name(file_name,source_pathway_name)
file_name = validated_file_name[1]
if validated_file_name[0]:
if __searche_for_preexisting_files(file_name+"_intermediate_file.txt",deposit_pathway_name):
print("The transcripts has been collected\n")
else:
_transcript_extractor (file_name,source_pathway_name,deposit_pathway_name)
return(file_name,source_pathway_name,deposit_pathway_name)
#### Dev part ####
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment