Skip to content
Snippets Groups Projects
Commit 3d8c725a authored by Laura Urbanska's avatar Laura Urbanska
Browse files

added representative transcript and exon length filter

parent 17a98ceb
No related branches found
No related tags found
No related merge requests found
#### Exon length filter #####
### Called Packages ###
import re
import os
import transkript_extractor as te
### Functions ###
def exon_length_calculator(entry):
"""This funtion finds the start and end cordinates of the exon and uses them to calculate its lenght"""
try:
find_exon_coordinates = re.compile("\t\d{1,15}\t")
#this difines the pattern of the coordinates
try_find_start_coordinates = find_exon_coordinates.search(entry)
#this line findes the start coordinares based on the pattern
start_coordinates = int(try_find_start_coordinates[0].replace("\t",""))
#this line removes the \t at the end and the start of the pattern and
#turn the string of the coordinates into intergers
final_index_start_coordinates = entry.find(try_find_start_coordinates[0])+len(try_find_start_coordinates[0])-1
#this line determines the indes of the final digit of the start coordinates
sub_entry = entry[final_index_start_coordinates:]
#this lineused the index determin above a starting point for a new sub entry
try_find_end_coordinates = find_exon_coordinates.search(sub_entry)
end_coordinates = int(try_find_end_coordinates[0].replace("\t",""))
#these two lines find the end coordinates and turn tham int an int
exon_lenght = end_coordinates-start_coordinates
#this line claculates the transcript length
except:
print("\n\nIn the following enty only one or no valid coordinates could be found:\n",entry,"the value will be set to NA")
exon_lenght = "NA"
return(exon_lenght)
def exon_fider(entry):
"""This funtion determines if a given entry belongs to an exon"""
exon_test = entry.find("\texon\t")
#This line look for the entry exon in the file
if exon_test == -1:
try_exon_test = False
else:
try_exon_test = True
#The block above evaluates the results of the search for the wort exon
return(try_exon_test)
def __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID):
"""This funtion encapsulates an opperation that has to be carried out at several point ind the exon_length_filter funktion and servers to make that funktion more modular"""
if current_exon_length > longest_transcript:
#This condition updates the most promesing for
#beeing the representative transcript
longest_transcript = current_exon_length
longest_transcript_ID = old_transcript_ID
current_exon_length = 0
return(current_exon_length,longest_transcript,longest_transcript_ID)
def exon_length_filter(file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name =os.getcwd(),gen_dict = {"ENSG00000160072":["ENST00000673477","ENST00000472194","ENST00000378736","ENST00000308647","ENST00000442483"],"ENSG00000225972":["ENST00000416931"],"ENSG00000279928":["ENST00000624431","ENST00000424215"],"ENSG00000142611":["ENST00000378391","ENST00000607632","ENST00000511072"]}):
"""This funtion selects only the transcripts for a dictionar that have the longest total mRNA"""
print("Representative trascripts are filterd based on exon length please wait...")
source_pathway_name,deposit_pathway_name = te.__do_pathways_exist__(source_pathway_name,deposit_pathway_name)
with open(source_pathway_name+"\\"+file_name+".gtf", 'r') as f:
old_gen = str()
old_transcript_ID = str()
representative_transcript = dict()
representative_trasnscript_not_found = True
longest_transcript_ID = str()
current_exon_length = 0
longest_transcript = 0
for entry in f:
try:
corrent_gen = te.gene_ID_finder(entry)
except:
corrent_gen = old_gen
#The block above test if there is a gen name in the entry
if corrent_gen != old_gen:
representative_trasnscript_not_found = True
#The block above determines if the Gen name is new and set the test
#representative_trasnscript_not_found back to true which is used to
#make the program faster if there is just one transcript for a given
#gen in the dict
if representative_trasnscript_not_found and corrent_gen in gen_dict:
#print(corrent_gen)
#The conditon prvents serges if a representative transcript has
#all ready been chosen
if corrent_gen != old_gen:
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
try:
del gen_dict[old_gen]
old_gen = corrent_gen
except:
old_gen = corrent_gen
longest_transcript = 0
#The block above adds the transcript of the last gen that
#had the longest exons into the representative transcripts dict
try:
#This try / except block test if the gen is in the input dictionary
transcript_IDs = gen_dict[corrent_gen]
if len(gen_dict[corrent_gen]) == 1:
#This conditions is a short cut for Genes that
#allready have a representative transcript
representative_transcript=gen_dict[corrent_gen[0]]
representative_trasnscript_not_found = False
continue
except:
continue
try:
current_transcript_ID = te.transcript_ID_finder(entry)
except:
continue
#The block above searches for a trnascript ID in the current enty
if current_transcript_ID in transcript_IDs:
#This condition test if the Transcript is one of the
#candidates for representative transcripts
if current_transcript_ID != old_transcript_ID:
#This condition if the enty still belongs to the
#previous transcript and is triggers if that is not the case
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
try:
transcript_IDs.remove(old_transcript_ID)
old_transcript_ID = current_transcript_ID
except:
old_transcript_ID = current_transcript_ID
if exon_fider(entry):
exon_length = exon_length_calculator(entry)
current_exon_length += exon_length
else:
continue
current_exon_length,longest_transcript,longest_transcript_ID = __longest_transcript_finder(current_exon_length,longest_transcript,longest_transcript_ID,old_transcript_ID)
representative_transcript[old_gen] = longest_transcript_ID
del representative_transcript[str()]
print("Representative transcripts collected\n")
return(representative_transcript)
if __name__ == "__main__":
exon_length_filter()
#This line allows the file to be executed on its own also from
\ No newline at end of file
import pandas as pd
import re
'''
This part of the code find a representative transcript for each ID given in the csv input file.
It works with the output of the function "transcript selector".
It return a csv file containing the names of the representative transcript and their levels.
'''
#import csv file [ID, Expression_level] and create a df
def import_csv_to_df(csv_file) :
df_csv = pd.read_csv(csv_file, names=["ID", "Expression_level"])
return df_csv
#import modified gtf file and create a df
def import_gtfSelection_to_df(gtf_modified_file):
#create a df from the tab separated file input
df_input =pd.read_csv(gtf_modified_file, sep='\t', lineterminator='\n',
names =["Gene_mixed", "Transcript", "Support_level", "Length", "Line_index", "Na"] )
#Create a new column with only gene name from Gene_mixed column
df_input["Gene"] = df_input["Gene_mixed"].str.extract('([A-Z]\w{0,})', expand=True)
#Create a new column with only transcript number from Gene_mixed column
df_input["Transcript_number"] = df_input["Gene_mixed"].str.extract('(^\d)', expand=True)
#Create a new df with relevant column and without NA
df_clean = df_input.loc[:, ["Gene","Transcript_number","Transcript","Support_level", "Length", "Line_index"]]
df_clean["Gene"] = df_clean["Gene"].fillna(method='ffill')
df_clean = df_clean.dropna(axis=0)
return df_clean
# Return a df containing representative transcripts and their expression level from genes mentioned in the csv file
def representative_from_gene(df_gtfSelection, df_csv):
#check wich genes are shared by df_gtfSelection and df_csv and create a reduced df
df_shared = df_gtfSelection[(df_gtfSelection["Gene"].isin(df_csv["ID"]))]# /!\ return an empty list if I do the same with "Transcript" column.
#pick only the transcripts with the highest support level (best is = 1 )
df_filtered = df_shared[df_shared["Support_level"]==df_shared["Support_level"].min()]
#pick the transcript with the greatest length if multiple transcript have the same support level
df_filtered2 = df_filtered.sort_index().groupby(df_filtered["Gene"]).max()
#combine expression level with transcript with highest support level and greatest length
df_filtered2["Expression_level"] = df_filtered2.Gene.map(df_csv.set_index("ID")['Expression_level'])
#create a reduced df with only the representative transcript and its support level
df_final = df_filtered2[["Transcript","Expression_level"]]
print("This is your representative transcripts : \n \n {}".format(df_final))
return df_final
#Missing conditions :
# If multiple transcript form the same gene, add their expression
# If given transcript name, return the most representative transcript
# Output csv file containing representative transcript levels and id
def write_csv_Transcript_Expression_level(df_representative):
df_final = df_representative[["Transcript","Expression_level"]]
with open("transcripts.csv", "w") as fileout :
fileout.write(df_representative.to_csv(columns=["Transcript","Expression_level"], index=False))
### put your inputs here ! ###
ID_levels_csv = "scRNA\input.csv" # put your csv file here
gtf_file = "scRNA/test_inter_mediat_file.txt" # put your gtf selected file here
df_gtf = import_gtfSelection_to_df(gtf_file)
df_csv_input = import_csv_to_df(ID_levels_csv)
representative_transcript = representative_from_gene(df_gtf, df_csv_input)
write_csv_Transcript_Expression_level(representative_transcript) # create a csv file named "transcript.csv"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment